Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel execution of MapStore#store method for the same key triggered by IMap#flush #3338

Closed
atschofen opened this issue Aug 20, 2014 · 6 comments
Assignees
Milestone

Comments

@atschofen
Copy link

@atschofen atschofen commented Aug 20, 2014

Hi,

this issue exists in Hazelcast 3.3-EA and also a build of the 3.3 development branch from August 7, 2014. It is related to issue #2128.

If you have a map with write-behind and a map store configured (eviction is not needed), and you call the flush method in the IMap, the map store's store method can be called concurrently for the same key, namely for those keys which are in the write-behind queue and then forcibly stored by the flush. This is because the flush operation storing all entries in the write-behind queue seems to be executed in the operation thread, while the periodic processing of the write-behind queue is done by an executor service defined in the WriteBehindQueueManager.

The following piece of code is a unit test to reproduce the issue:

public class TestMapStore16 extends TestCase {

    private static final String mapName = "testMap" + TestMapStore15.class.getSimpleName();

    private static final int writeDelaySeconds = 1;

    @Override
    protected void setUp() throws Exception {

        // configure logging
        if (!TestHazelcast.loggingInitialized) {
            TestHazelcast.loggingInitialized = true;
            BasicConfigurator.configure();
        }
    }

    public void testNoStoreConcurrency() throws Exception {

        // create shared hazelcast instance config
        final Config config = new XmlConfigBuilder().build();
        config.setProperty("hazelcast.logging.type", "log4j");

        // create shared map store implementation
        SlowConcurrencyCheckingMapStore store = new SlowConcurrencyCheckingMapStore();

        // configure map store
        MapStoreConfig mapStoreConfig = new MapStoreConfig();
        mapStoreConfig.setEnabled(true);
        mapStoreConfig.setWriteDelaySeconds(writeDelaySeconds);
        mapStoreConfig.setClassName(null);
        mapStoreConfig.setImplementation(store);
        MapConfig mapConfig = config.getMapConfig(mapName);
        mapConfig.setMapStoreConfig(mapStoreConfig);

        // start hazelcast instance
        HazelcastInstance hcInstance = Hazelcast.newHazelcastInstance(config);

        IMap<String, String> testMap = hcInstance.getMap(mapName);

        // This will trigger a write-behind store in roughly writeDelaySeconds, the store itself is artificially delayed to take 10 seconds
        testMap.put("key", "value");
        // Wait until the store operation has started
        Thread.sleep((writeDelaySeconds + 2) * 1000);
        // Flush the map, causing the not yet stored entries to be stored 
        testMap.flush();

        // Make sure that the store triggered by the flush did not overlap with a write-behind call to store for the same key 
        assertEquals("There were concurrent executions of store for the same key", 0, store.getConcurrentStoreCount());

    }

}

It relies on the following dummy-store:

/**
 * Map store that sleeps for 10 seconds in the store implementation and counts the number of
 * concurrently executed stores for the same key.
 */
public class SlowConcurrencyCheckingMapStore implements MapStore<String, String> {

    private static final long SLEEP_TIME = 10000;

    private ConcurrentHashMap<String, String> store = new ConcurrentHashMap<String, String>();

    private Set<String> activeKeys = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());

    private AtomicInteger concurrentStoreCount = new AtomicInteger(0);

    public int getConcurrentStoreCount() {
        return concurrentStoreCount.get();
    }

    @Override
    public void store(String key, String value) {
        boolean added = activeKeys.add(key);
        if (!added) concurrentStoreCount.incrementAndGet();
        try {
            try {
                Thread.sleep(SLEEP_TIME);
            } catch (InterruptedException e) {
                // ignore
            }
            store.put(key, value);
        } finally {
            if (added) activeKeys.remove(key);
        }
    }

    @Override
    public void storeAll(Map<String, String> map) {
        for (Entry<String, String> entry : map.entrySet()) {
            store(entry.getKey(), entry.getValue());
        }
    }

    @Override
    public String load(String key) {
        return store.get(key);
    }

    @Override
    public Map<String, String> loadAll(Collection<String> keys) {
        Map<String, String> result = new HashMap<String, String>();
        for (String key : keys) {
            result.put(key, store.get(key));
        }
        return result;
    }

    @Override
    public Set<String> loadAllKeys() {
        return store.keySet();
    }

    @Override
    public void delete(String key) {
        store.remove(key);
    }

    @Override
    public void deleteAll(Collection<String> keys) {
        for (String key : keys) {
            store.remove(key);
        }
    }

}

A workaround is to ensure mutual exclusion in the MapStore implementation. However, I would expect the mutual exclusion guarantee from Hazelcast.

Cheers,
Andreas

@pveentjer pveentjer added this to the 3.3 milestone Aug 20, 2014
@pveentjer
Copy link
Member

@pveentjer pveentjer commented Aug 20, 2014

Thanks for your excellent bug report.

@ahmetmircik ahmetmircik self-assigned this Aug 20, 2014
@ahmetmircik
Copy link
Member

@ahmetmircik ahmetmircik commented Aug 20, 2014

Hi @atschofen, this a known issue for 3.3 but the fix is waiting for some hazelcast internal changes. At the moment you should use your own solution and maybe we can add this pitfall to documentation.

@ahmetmircik
Copy link
Member

@ahmetmircik ahmetmircik commented Aug 21, 2014

For now, added javadoc that notes this potential pitfall 1f45261

Closing this issue since it is a known issue and work in progress.

Thanks @atschofen.

@lukasblu
Copy link
Contributor

@lukasblu lukasblu commented Jul 31, 2015

Hi @ahmetmircik

the test above still fails in 3.5.1. Are there any plans to fix this eventually?

thanks,
Lukas

@ahmetmircik
Copy link
Member

@ahmetmircik ahmetmircik commented Jul 31, 2015

Hi @lukasblu, you are right, only fixed it for flushes caused by eviction, reopening this issue now and will try to send a fix in a close future, hopefully.

@ahmetmircik ahmetmircik reopened this Jul 31, 2015
@ahmetmircik ahmetmircik modified the milestones: 3.6, 3.3 Jul 31, 2015
@mdogan
Copy link
Contributor

@mdogan mdogan commented Nov 27, 2015

@ahmetmircik: is this still planned for 3.6 or are you going to postpone it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked pull requests

Successfully merging a pull request may close this issue.

6 participants
You can’t perform that action at this time.