Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IMap.size() reports stale result when blocked by MapStore init #7905

Closed
lukasblu opened this issue Apr 4, 2016 · 7 comments

Comments

Projects
None yet
2 participants
@lukasblu
Copy link
Contributor

commented Apr 4, 2016

Hi,

assuming you have an IMap with an attached MapStore.

when you do this:

  1. start initializing the map and loading its data (this will take a while)
  2. in the meantime, add another node to the cluster
  3. call IMap.size()

the call to IMap.size() in (3) will block - which is fine. However, when data loading finishes, the call returns and reports a stale result. The reported result does not consider the entries in the partitions that have been migrated to the newly added node in (2).

calling IMap.size() again after map init has finished works fine.

I tested this on a SNAPSHOT of 3.6 from 2016-03-16.

And here is a reproducer test.

Best,
Lukas

package com.nm.test.hazelcast.mapstore;

import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.MapStoreConfig;
import com.hazelcast.config.XmlConfigBuilder;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.nm.test.hazelcast.TestHazelcast;
import com.nm.test.hazelcast.utils.InMemoryMapStore;
import com.nm.test.hazelcast.utils.Sleep;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.TestCase;

/**
 * A test to ensure IMap.size() is correct even when blocked by data loading.
 */
public class TestMapStore29 extends TestCase {

    private static final Logger logger = Logger.getLogger(TestMapStore29.class);

    private static final String mapName = "testMap" + TestMapStore29.class.getSimpleName();

    private static final int mapSizePreload = 30;

    @Override
    protected void setUp() throws Exception {

        // configure logging
        if (!TestHazelcast.loggingInitialized) {
            TestHazelcast.loggingInitialized = true;
            BasicConfigurator.configure();
        }
    }

    public void testJoinInLoadAllKeys() throws Exception {

        // create hazelcast config
        Config config = new XmlConfigBuilder().build();
        config.setProperty("hazelcast.logging.type", "log4j");
        config.setProperty("hazelcast.version.check.enabled", "false");
        config.setProperty("hazelcast.phone.home.enabled", "false");

        // create map store
        // note: this map store blocks for 12s during loadAllKeys()
        InMemoryMapStore store = new InMemoryMapStore(false, 20, true, 12000);
        store.preload(mapSizePreload);

        // configure map store
        MapStoreConfig mapStoreConfig = new MapStoreConfig();
        mapStoreConfig.setEnabled(true);
        mapStoreConfig.setWriteDelaySeconds(10);
        mapStoreConfig.setClassName(null);
        mapStoreConfig.setImplementation(store);
        MapConfig mapConfig = config.getMapConfig(mapName);
        mapConfig.setMapStoreConfig(mapStoreConfig);

        // start 2 hazelcast instances
        List<HazelcastInstance> hcInstances = new ArrayList<HazelcastInstance>();
        hcInstances.add(Hazelcast.newHazelcastInstance(config));
        hcInstances.add(Hazelcast.newHazelcastInstance(config));
        final HazelcastInstance hcInstance1 = hcInstances.get(0);
        final HazelcastInstance hcInstance2 = hcInstances.get(1);
        logger.info("Hazelcast cluster created.");
        logger.info("Cluster size (via instance 1) = " + hcInstance1.getCluster().getMembers().size());
        logger.info("Cluster size (via instance 2) = " + hcInstance2.getCluster().getMembers().size());

        // thread 1:
        // start data loading by calling getMap()
        Thread thread1 = new Thread(new Runnable() {

            @Override
            public void run() {

                // calls loadAllKeys()... -> which will take 12s
                logger.info("Initializing map...");
                int size = hcInstance1.getMap(mapName).size();
                logger.info("Initializing map done (size=" + size + ").");
            }

        }, "Thread 1");
        thread1.start();

        // start 1 more hazelcast instance after 2s (this is during the loadAllKeys() call above)
        Sleep.sleep(2000, true);
        HazelcastInstance hcInstance3 = Hazelcast.newHazelcastInstance(config);
        hcInstances.add(hcInstance3);
        logger.info("Another hazelcast instance added.");
        logger.info("Cluster size (via instance 1) = " + hcInstance1.getCluster().getMembers().size());
        logger.info("Cluster size (via instance 2) = " + hcInstance2.getCluster().getMembers().size());
        logger.info("Cluster size (via instance 3) = " + hcInstance3.getCluster().getMembers().size());

        // print map sizes using each hazelcast instance
        final AtomicBoolean incorrectMapSize = new AtomicBoolean(false);
        List<Thread> threads = new ArrayList<Thread>();
        int i = 0;
        for (final HazelcastInstance hcInstanceCur : hcInstances) {
            i++;
            final int iFinal = i;
            Thread threadCur = new Thread(new Runnable() {

                @Override
                public void run() {
                    logger.info("Getting map via instance " + iFinal + "...");
                    IMap<String, String> map = hcInstanceCur.getMap(mapName);
                    int mapSize = map.size();
                    int mapLocalKeySetSize = map.localKeySet().size();
                    logger.info("Map size on instance " + iFinal + ":             " + mapSize);
                    logger.info("Number of owned keys on instance " + iFinal + ": " + mapLocalKeySetSize);

                    // remember issue
                    if (mapSize != mapSizePreload) {
                        incorrectMapSize.set(true);
                    }
                }
            }, "Thread getting Size via instance" + i);
            threadCur.start();
            threads.add(threadCur);
        }

        // join thread which ask for size
        for (Thread thread : threads) {
            thread.join();
        }
        Sleep.sleep(1000, true);

        // print map sizes using each hazelcast instance after init is done
        i = 0;
        for (HazelcastInstance hcInstanceCur : hcInstances) {
            i++;
            logger.info("Map size on instance " + i + " after init:             " + hcInstanceCur.getMap(mapName).size());
            logger.info("Number of owned keys on instance " + i + " after init: " + hcInstanceCur.getMap(mapName).localKeySet().size());
        }

        // shutdown hazelcast instance
        for (HazelcastInstance hcInstanceCur : hcInstances) {
            hcInstanceCur.getLifecycleService().terminate();
        }

        // fail if wrong size reported
        if (incorrectMapSize.get()) {
            fail("Incorrect map size reported.");
        }
    }

}

package com.nm.test.hazelcast.utils;

import com.hazelcast.core.MapStore;
import org.apache.log4j.Logger;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class InMemoryMapStore implements MapStore<String, String> {

    private static final Logger logger = Logger.getLogger(InMemoryMapStore.class);

    // ----------------------------------------------------------------- config

    private final boolean infoOnLoad;

    private final int msPerLoad;

    private final boolean sleepBeforeLoadAllKeys;

    private final int msPerLoadAllKeys;

    // ------------------------------------------------------------------ state

    private final ConcurrentHashMap<String, String> store = new ConcurrentHashMap<String, String>();

    private final AtomicInteger countLoadAllKeys = new AtomicInteger(0);

    private final ConcurrentHashMap<String, Boolean> contextClassLoaders = new ConcurrentHashMap<String, Boolean>();

    // ----------------------------------------------------------- construction

    public InMemoryMapStore() {
        this.infoOnLoad = true;
        this.msPerLoad = -1;
        this.sleepBeforeLoadAllKeys = false;
        this.msPerLoadAllKeys = 5000;
    }

    public InMemoryMapStore(boolean infoOnLoad, int msPerLoad, boolean sleepBeforeLoadAllKeys) {
        this.infoOnLoad = infoOnLoad;
        this.msPerLoad = msPerLoad;
        this.sleepBeforeLoadAllKeys = sleepBeforeLoadAllKeys;
        this.msPerLoadAllKeys = 5000;
    }

    public InMemoryMapStore(boolean infoOnLoad, int msPerLoad, boolean sleepBeforeLoadAllKeys, int msPerLoadAllKeys) {
        this.infoOnLoad = infoOnLoad;
        this.msPerLoad = msPerLoad;
        this.sleepBeforeLoadAllKeys = sleepBeforeLoadAllKeys;
        this.msPerLoadAllKeys = msPerLoadAllKeys;
    }

    public void preload(int size) {
        for (int i = 0; i < size; i++) {
            store.put("k" + i, "v" + i);
        }
    }

    // ---------------------------------------------------------------- getters

    public int getCountLoadAllKeys() {
        return countLoadAllKeys.get();
    }

    public TreeMap<String, Boolean> getContextClassLoaders() {
        return new TreeMap<String, Boolean>(contextClassLoaders);
    }

    // ----------------------------------------------------- MapStore interface

    @Override
    public String load(String key) {

        // log
        if (infoOnLoad) {
            logger.info("load(" + key + ") called.");
        }

        // sleep
        if (msPerLoad > 0) {
            Sleep.sleep(msPerLoad, false);
        }

        // remember if context class loader was present
        Thread thread = Thread.currentThread();
        ClassLoader contextClassLoader = thread.getContextClassLoader();
        contextClassLoaders.putIfAbsent(thread.getName(), new Boolean(contextClassLoader != null));

        return store.get(key);
    }

    @Override
    public Map<String, String> loadAll(Collection<String> keys) {

        // log
        List<String> keysList = new ArrayList<String>(keys);
        Collections.sort(keysList);
        if (infoOnLoad) {
            logger.info("loadAll(" + keysList + ") called.");
        }

        // remember if context class loader was present
        Thread thread = Thread.currentThread();
        ClassLoader contextClassLoader = thread.getContextClassLoader();
        contextClassLoaders.putIfAbsent(thread.getName(), new Boolean(contextClassLoader != null));

        Map<String, String> result = new HashMap<String, String>();
        for (String key : keys) {

            // sleep
            if (msPerLoad > 0) {
                Sleep.sleep(msPerLoad, false);
            }

            String value = store.get(key);
            if (value != null) {
                result.put(key, value);
            }
        }
        return result;
    }

    @Override
    public Set<String> loadAllKeys() {

        // sleep 5s to highlight asynchronous behavior
        if (sleepBeforeLoadAllKeys) {
            Sleep.sleep(msPerLoadAllKeys, true);
        }

        countLoadAllKeys.incrementAndGet();
        logger.info("loadAllKeys() called (count now = " + countLoadAllKeys.get() + ").");
        Set<String> result = new HashSet<String>(store.keySet());
        List<String> resultList = new ArrayList<String>(result);
        Collections.sort(resultList);
        logger.info("loadAllKeys result: size = " + result.size() + ", keys = " + resultList + ".");
        return result;
    }

    @Override
    public void store(String key, String value) {
        logger.info("store(" + key + ") called.");
        store.put(key, value);
    }

    @Override
    public void storeAll(Map<String, String> map) {
        TreeSet<String> setSorted = new TreeSet<String>(map.keySet());
        logger.info("storeAll(" + setSorted + ") called.");
        store.putAll(map);
    }

    @Override
    public void delete(String key) {
        logger.info("delete(" + key + ") called.");
        store.remove(key);
    }

    @Override
    public void deleteAll(Collection<String> keys) {
        List<String> keysList = new ArrayList<String>(keys);
        Collections.sort(keysList);
        logger.info("deleteAll(" + keysList + ") called.");
        for (String key : keys) {
            store.remove(key);
        }
    }

}
@tombujok

This comment has been minimized.

Copy link
Contributor

commented Apr 4, 2016

Lukas, thx for reporting this issue and the excellent reproducer!
I'll try to have a deeper look this week.

@tombujok tombujok self-assigned this Apr 4, 2016

@lukasblu

This comment has been minimized.

Copy link
Contributor Author

commented Apr 4, 2016

@tombujok ,

cool, thank you already.

For me it feels like the changes in the partition table are not considered when the call from IMap.size() returns. So, maybe you could check if there were any changes in the partition table during the execution of the Size operation...

Actually, looking at the code for just a moment, maybe this could/should be considered inside the operationService.invokeOnAllPartitions(String serviceName, OperationFactory operationFactory) method.

Let's wait for your opinion :-)

Cheers,
Lukas

@lukasblu

This comment has been minimized.

Copy link
Contributor Author

commented Apr 13, 2016

Hi @tombujok

did you already have a chance to look into this? For our internal planning, t would be good to know how difficult the fix is and in which version we could expect such a fix.

Thanks,
Lukas

@tombujok

This comment has been minimized.

Copy link
Contributor

commented Apr 27, 2016

Hi, @lukasblu sorry for the delay in the reply. I'll have it analysed in the next couple of days. Thx!

@tombujok tombujok added the Team: Core label Jul 4, 2016

@tombujok tombujok added this to the 3.7 milestone Jul 4, 2016

@tombujok

This comment has been minimized.

Copy link
Contributor

commented Jul 5, 2016

Hi @lukasblu. I've just tested in against the 3.7-master branch and it works fine.
I've run your test with @Repeat(1000) which is probably a good enough proof that it works.
As I understand it failed for you frequently, right?

We've added some changes to the migration algorithm in 3.7. I assume that these changes solved your issue.
Could you plz verify with 3.7-EA and let me know if it works for you? Thanks!

@lukasblu

This comment has been minimized.

Copy link
Contributor Author

commented Jul 11, 2016

Hi @tombujok
thanks for checking. We are currently still working and testing using 3.6.
I'll let you know should I experience the issue again.
Feel free to close this issue for now, I would either re-open or create another issue linked to this if it does not work.
Thanks and best,
Lukas

@tombujok

This comment has been minimized.

Copy link
Contributor

commented Jul 11, 2016

OK, thx. Feel free reopen should you experience it again on 3.7.

@tombujok tombujok closed this Jul 11, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.