Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data lost when using Loader, remove does not work wit map.lock #187

Closed
igornovak opened this issue Jun 13, 2012 · 5 comments

Comments

Projects
None yet
2 participants
@igornovak
Copy link

commented Jun 13, 2012

When custom loader is used with write behind, every time store is called in loader and then remove on locked map, value for key is read from store and not from map. Consequently put/set/update is overridden and data is lost.

This is the same as issue 816 from old google code issue system, except that lock on map is used.

Here is example of test code that produces error within 20 seconds:

"
public class HzIssue816plusGlobalLock {
public volatile static CounterHolder counterHolder = new CounterHolder(-1);

static HazelcastInstance hzInstance;

public static void main(String[] args) {
    String key = "1";

    HzIssue816plusGlobalLock hzIssue816plusLock = new HzIssue816plusGlobalLock();
    HashMapStore<String, CounterHolder> store = new HashMapStore<String, CounterHolder>();

    Config c = new Config();
    c.getMapConfig("test").setMapStoreConfig(new MapStoreConfig().setEnabled(true).setWriteDelaySeconds(2).setImplementation(store));

    hzInstance = Hazelcast.init(c);

    IMap<String, CounterHolder> mapSerKey = Hazelcast.getMap("test");

    CounterHolder newCounterHolder = new CounterHolder(26);
    mapSerKey.set(key, newCounterHolder, 0, null);
    HzIssue816plusGlobalLock.counterHolder = newCounterHolder;

    try { Thread.sleep(3000); } catch (InterruptedException e) { }

    ExecutorService exec = Executors.newCachedThreadPool();

    List<TestMapThread> threads = new ArrayList<TestMapThread>();

    int numberOfThreads = 1;
    for (int th = 1; th < numberOfThreads+1; th++) {
        TestMapThread testMapThread = hzIssue816plusLock.new TestMapThread(mapSerKey, "t"+th, th*100);
        exec.submit(testMapThread);

        threads.add(testMapThread);
        try { Thread.sleep(100); } catch (InterruptedException e) { }
    }

    int secondsToWait = 20;
    for (int i = 0; i < secondsToWait; i++) {
        try { Thread.sleep(TimeUnit.SECONDS.toMillis(1)); } catch (InterruptedException e) { }
    }

    for (TestMapThread testMapThread : threads) {
        testMapThread.cancel();
    }

    exec.shutdown();
    try {
        exec.awaitTermination(300, TimeUnit.SECONDS);
    } catch (InterruptedException e1) {
        e1.printStackTrace();
    }

    hzInstance.getLifecycleService().shutdown();
}

public class TestMapThread implements Callable {
    final String key = "1";

    private IMap<String, CounterHolder> mapSerKey;
    private int counter;
    private String name;

    private volatile boolean cancelled;

    public TestMapThread(IMap<String, CounterHolder> mapSerKey,  String name, int counter) {
        this.mapSerKey = mapSerKey;
        this.counter = counter;
        this.name = name;
    }

    @Override
    public Object call() throws Exception {
        int errorCounter = 0;

        while (!cancelled) {
            try {

                mapSerKey.lock(key);
                randomThreadSleep(5, 50);
                //GET
                CounterHolder counterHolder = mapSerKey.get(key);

                if (counterHolder != null) {
                    if (counterHolder.getCounter() != HzIssue816plusGlobalLock.counterHolder.getCounter()) {
                        errorCounter++;

                        System.err.println("Thread: "+name+" OOOOOPPPPPPAAAASSSSSSSSSAAAAAAAA WRONG: value is: " + counterHolder.getCounter()+" should be: "+HzIssue816plusGlobalLock.counterHolder.getCounter()+" number of errors: "+errorCounter);                           
                    } else {
                        System.out.println("Thread: "+name+" Values OK!");
                    }
                } else {
                    System.out.println("Thread: "+name+" Not Found counter, is OK!");
                }
                CounterHolder newCounterHolder = new CounterHolder(HzIssue816plusGlobalLock.counterHolder.getCounter()+counter);
                mapSerKey.put(key, newCounterHolder);
                HzIssue816plusGlobalLock.counterHolder = newCounterHolder;

            } finally {
                mapSerKey.unlock(key);
            }

            randomThreadSleep(5, 20);

            try {
                mapSerKey.lock(key);
                randomThreadSleep(5, 23);
                if (getRandomBetween(1, 100) > 95) {
                    //REMOVE
                    mapSerKey.remove(key);
                    System.out.println("Thread: "+name+" Counter REMOVED!");
                }
            } finally {
                mapSerKey.unlock(key);
            }

            counter++;
        }
        if (errorCounter == 0) {
            System.out.println("Thread: "+name+" No errors detected!");
        } else {
            System.err.println("----------------------------------------------- ");
            System.err.println("Thread: "+name+" --------------------- Number of wrong reads: " + errorCounter);
            System.err.println("----------------------------------------------- ");
        }
        return null;
    }

    public void cancel() {
        cancelled = true;
    }

    public void randomThreadSleep(int min, int max) {
        if (!cancelled) {
            try { Thread.sleep(getRandomBetween(min, max)); } catch (InterruptedException e) { }
        }
    }

    public int getRandomBetween(int min, int max) {
        Random randomNumbers = new Random();
        return min + randomNumbers.nextInt(max-min);
    }

}

 static class HashMapStore<K,V> implements MapStore<K,V> {
        private final Map<K,V> map = new HashMap<K, V>();

// private Random randomNumbers = new Random();
// private int min = 25; //min miliseconds
// private int max = 1500; //max miliseconds

        public HashMapStore() {
        }

        public void store(final K key, final V value) {
            System.out.println("--------- STORE: " + key + " : " + value);

// try { Thread.sleep(min + randomNumbers.nextInt(max-min)); } catch (InterruptedException e) { }
map.put(key, value);
}

        public void storeAll(final Map<K, V> kvMap) {
            System.out.println("STORE-ALL: " + kvMap);

// try { Thread.sleep(min + randomNumbers.nextInt(max-min)); } catch (InterruptedException e) { }
map.putAll(kvMap);
}

        public void delete(final K key) {
            System.out.println("DELETE: " + key);

// try { Thread.sleep(min + randomNumbers.nextInt(max-min)); } catch (InterruptedException e) { }
map.remove(key);
}

        public void deleteAll(final Collection<K> keys) {
            System.out.println("DELETE-ALL: " + keys);

// try { Thread.sleep(min + randomNumbers.nextInt(max-min)); } catch (InterruptedException e) { }
for (K key : keys) {
delete(key);
}
}

        public V load(final K key) {

// System.out.println("LOAD: " + key );
// try { Thread.sleep(min + randomNumbers.nextInt(max-min)); } catch (InterruptedException e) { }
return map.get(key);
}

        public Map<K, V> loadAll(final Collection<K> keys) {
            System.out.println("LOAD-ALL: " + keys);
            Map<K,V> m = new HashMap<K, V>();

// try { Thread.sleep(min + randomNumbers.nextInt(max-min)); } catch (InterruptedException e) { }
for (K key : keys) {
m.put(key, load(key));
}
return m;
}

        public Set<K> loadAllKeys() {
            System.out.println("LOAD-ALL-KEYS");

// try { Thread.sleep(min + randomNumbers.nextInt(max-min)); } catch (InterruptedException e) { }
return null;
}
}
}

public class CounterHolder implements Serializable {

/**
 * 
 */
private static final long serialVersionUID = 8215867173788097781L;

private int counter;

public CounterHolder(int counter) {
    this.counter = counter;
}

public int getCounter() {
    return counter;
}

public String toString() {
    return ""+counter;
}

}
"

I'm deeply disappointed about how many bugs hazelcast have on such core functionality.

Regards, Igor Novak

@igornovak

This comment has been minimized.

Copy link
Author

commented Jun 13, 2012

Affected versions are 2.0.2, 2.0.3, 2.1.2 and I presume also older versions ...

@mdogan

This comment has been minimized.

Copy link
Member

commented Jun 14, 2012

Simplified test case;

@Test
public void testMapLockAndGetAfterRemoveWithWriteBehindMapStore() throws InterruptedException {
    final String key = "key";
    Config config = new Config();
    config.getMapConfig("test").setMapStoreConfig(
            new MapStoreConfig().setEnabled(true)
                    .setWriteDelaySeconds(1).setImplementation(new SimpleMapStore()));

    IMap<String, String> map = Hazelcast.newHazelcastInstance(config).getMap("test");
    String value = "value";
    map.put(key, value);

    for (int i = 0; i < 100; i++) {
        try {
            map.lock(key);
            String v = map.get(key);
            if (v != null) {
                assertEquals("Old value is loaded from MapStore!", value, v);
            }
            v = "value" + i;
            map.put(key, v);
            value = v;
        } finally {
            map.unlock(key);
        }
        Thread.sleep((long) (50 * Math.random()));
        try {
            map.lock(key);
            if (Math.random() > 0.90f) {
                map.remove(key);
            }
        } finally {
            map.unlock(key);
        }
        Thread.sleep((long) (100 * Math.random()));
    }
}
@mdogan

This comment has been minimized.

Copy link
Member

commented Jun 14, 2012

Also note that this is a rare case. To encounter the issue one should

  • configure a write behind map-store
  • remove entry -> map.remove(key)
  • lock entry -> map.lock(key)
  • and get entry -> map.get(key) before map-store delete (deleteAll) is executed
@igornovak

This comment has been minimized.

Copy link
Author

commented Jun 14, 2012

For my application this is not rare, this in my application under heavy load always happens.

I had to refactor great deal of my code, to use distributed lock instead of map.lock, because of a project deadline ...

BTW sequence:

lock entry -> map.lock(key)
remove entry -> map.remove(key)
and get entry -> map.get(key) before map-store delete (deleteAll) is executed

also does not work

@mdogan

This comment has been minimized.

Copy link
Member

commented Jun 14, 2012

I see it is not rare for your case, what I mean is valid for common use.

That sequence is to reproduce (encounter) the issue which test case posted above follows that sequence.

@enesakar enesakar closed this in 60ee6dd Mar 20, 2013

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.