New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MapStore: Map init blocked when new nodes join during data loading #11407

Closed
rruxandra opened this Issue Sep 20, 2017 · 10 comments

Comments

Projects
None yet
7 participants
@rruxandra

rruxandra commented Sep 20, 2017

Hello,

We discovered an issue in Hazelcast 3.8.5 when using MapStores.
It seems that hcInstance.getMap(mapName) gets blocked when a new node joins and data is being loaded.
For us this is a serious problem since it might prevent the system from starting.

Here is a test that reproduces the issue:

package test;

import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.MapStoreConfig;
import com.hazelcast.config.XmlConfigBuilder;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.fail;

/**
 * Test to ensure there is no deadlock when a new node joins during data loading.
 */
public class TestMapStore33 {

    private static final Logger logger = Logger.getLogger(TestMapStore33.class);

    private static final int preloadSize = 1000;

    private static final int writeDelaySeconds = 5;

    private static final AtomicInteger mapSize = new AtomicInteger(-1);

    private final String mapName = "map" + getClass().getSimpleName();

    private final InMemoryMapStoreSleepAndCount store = new InMemoryMapStoreSleepAndCount(300);

    private final ExecutorService executorService = Executors.newFixedThreadPool(3);

    private final List<HazelcastInstance> hazelcastInstances = new ArrayList<>();

    @Before
    public void setUp() throws Exception {
        BasicConfigurator.configure();
    }

    @After
    public void tearDown() {
        for (HazelcastInstance hazelcastInstance : hazelcastInstances) {
            hazelcastInstance.getLifecycleService().terminate();
        }
    }

    @Test
    public void testMapStoreLoad() throws InterruptedException {

        // load data in the store
        store.preload(preloadSize);

        // runnable that creates a new Hazelcast instance and checks the map size
        Runnable task = new Runnable() {
            @Override
            public void run() {
                HazelcastInstance hcInstance = Hazelcast.newHazelcastInstance(getConfig());
                hazelcastInstances.add(hcInstance);
                IMap<String, String> map = hcInstance.getMap(mapName);
                int size = map.size();
                mapSize.set(size);
            }
        };

        // start first Hazelcast instance
        executorService.submit(task);

        // wait 6s so that the first instance triggers data loading on the map
        Thread.sleep(6000);

        // start second Hazelcast instance
        executorService.submit(task);

        // wait at most 150s
        final long t0 = System.currentTimeMillis();
        while ((System.currentTimeMillis() - t0) < 150000) {
            if (mapSize.get() == preloadSize) {
                break;
            }
            Thread.sleep(1000);
            logger.info(String.format("Loaded keys: %s", store.getCountLoadedKeys()));
        }

        // check for errors
        if (mapSize.get() != preloadSize) {
            fail(String.format("Not all data loaded (%s != %s).", mapSize.get(), preloadSize));
        }
    }

    private Config getConfig() {

        // create shared hazelcast config
        final Config config = new XmlConfigBuilder().build();
        config.setProperty("hazelcast.logging.type", "log4j");

        // disable JMX to make sure lazy loading works asynchronously
        config.setProperty("hazelcast.jmx", "false");

        // get map config
        MapConfig mapConfig = config.getMapConfig(mapName);

        // configure map store
        MapStoreConfig mapStoreConfig = new MapStoreConfig();
        mapStoreConfig.setEnabled(true);
        mapStoreConfig.setInitialLoadMode(MapStoreConfig.InitialLoadMode.EAGER);
        mapStoreConfig.setWriteDelaySeconds(writeDelaySeconds);
        mapStoreConfig.setClassName(null);
        mapStoreConfig.setImplementation(store);
        mapConfig.setMapStoreConfig(mapStoreConfig);

        return config;
    }

}

package test;

import com.hazelcast.core.MapStore;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class InMemoryMapStoreSleepAndCount implements MapStore<String, String> {

    private final ConcurrentHashMap<String, String> store = new ConcurrentHashMap<>();

    private final int msPerLoad;

    private final AtomicInteger countLoadedKeys = new AtomicInteger(0);

    // ----------------------------------------------------------- construction

    public InMemoryMapStoreSleepAndCount(int msPerLoad) {
        this.msPerLoad = msPerLoad;
    }

    public void preload(int size) {
        for (int i = 0; i < size; i++) {
            store.put("k" + i, "v" + i);
        }
    }

    // ---------------------------------------------------------------- getters

    public int getCountLoadedKeys() {
        return countLoadedKeys.get();
    }

    // ----------------------------------------------------- MapStore interface

    @Override
    public String load(String key) {
        if (msPerLoad > 0) {
            try {
                Thread.sleep(msPerLoad);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        countLoadedKeys.incrementAndGet();
        return store.get(key);
    }

    @Override
    public Map<String, String> loadAll(Collection<String> keys) {
        List<String> keysList = new ArrayList<>(keys);
        Collections.sort(keysList);

        Map<String, String> result = new HashMap<>();
        for (String key : keys) {

            if (msPerLoad > 0) {
                try {
                    Thread.sleep(msPerLoad);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            String value = store.get(key);
            if (value != null) {
                result.put(key, value);
            }
        }

        countLoadedKeys.addAndGet(keys.size());
        return result;
    }

    @Override
    public Set<String> loadAllKeys() {
        Set<String> result = new HashSet<>(store.keySet());
        List<String> resultList = new ArrayList<>(result);
        Collections.sort(resultList);
        return result;
    }

    @Override
    public void store(String key, String value) {
        store.put(key, value);
    }

    @Override
    public void storeAll(Map<String, String> map) {
        store.putAll(map);
    }

    @Override
    public void delete(String key) {
        store.remove(key);
    }

    @Override
    public void deleteAll(Collection<String> keys) {
        List<String> keysList = new ArrayList<>(keys);
        Collections.sort(keysList);
        for (String key : keys) {
            store.remove(key);
        }
    }

}

Could you please check this?
Is there a workaround to avoid this issue in 3.8.5?

Thanks,
Ruxandra

@vbekiaris

This comment has been minimized.

Contributor

vbekiaris commented Sep 20, 2017

@rruxandra this is the expected behavior when a Map is configured with an EAGER map store: it will immediately load entries from the backing map store so that the returned Map is completely loaded before being returned to user.

If you configure the initial load mode as LAZY the IMap proxy will be returned without blocking but it becomes your application's responsibility to perform the "is loading completed" checks before attempting to use the map.

@rruxandra

This comment has been minimized.

rruxandra commented Sep 20, 2017

@vbekiaris,

Thanks for the quick response.
The problem is that the Map will never be returned if this race condition occurs.
If you add the following lines before checking the sizes, you will see that also the main thread gets blocked.

IMap<Object, Object> map = hazelcastInstances.get(0).getMap(mapName);
logger.info("Map size: " + map.size());

We've seen the same race condition happening also when using LAZY mode.
The problem seems to have been introduced in 3.7 because in 3.6.5 we don't have this problem.

Could you please check again?

Thanks,
Ruxandra

@vbekiaris

This comment has been minimized.

Contributor

vbekiaris commented Sep 21, 2017

I see, will take a closer look into this one. Cheers!

@vbekiaris

This comment has been minimized.

Contributor

vbekiaris commented Sep 21, 2017

@rruxandra not a solution neither sure if it's useful for your use case, however you may want to try setting property hazelcast.initial.min.cluster.size to your actual cluster size (eg. add config.setProperty("hazelcast.initial.min.cluster.size", "2"); in your test case above). This way Hazelcast will defer initialization until the cluster has that many members (2 in this case) and the data will be properly loaded. Still looking for the root cause, will update you as soon as I have some more concrete info.

@lukasblu

This comment has been minimized.

Contributor

lukasblu commented Sep 21, 2017

Hi @vbekiaris ,
thanks for the proposed workaround. We usually recommend the "hazelcast.initial.min.cluster.size" property to our clients. However, often they are not able to use it in real-life installations, because they would no longer be able to start the system if for some reason they don't have all the servers available.

Ruxandra and me have continued to debug this issue as well, but unfortunately we also did not found the root cause yet. The stack trace we see when the call to getMap() blocks and we stop the thread is the following:

169343 [pool-8-thread-1] WARN com.hazelcast.spi.ProxyService  - [172.20.0.1]:5701 [dev] [3.8.4] Error while initializing proxy: IMap{name='map4TestMapStore33'}
com.hazelcast.core.HazelcastException: java.lang.InterruptedException: sleep interrupted
	at com.hazelcast.util.ExceptionUtil.peel(ExceptionUtil.java:94)
	at com.hazelcast.util.ExceptionUtil.peel(ExceptionUtil.java:56)
	at com.hazelcast.util.ExceptionUtil.peel(ExceptionUtil.java:52)
	at com.hazelcast.util.ExceptionUtil.rethrow(ExceptionUtil.java:105)
	at com.hazelcast.map.impl.proxy.MapProxySupport.waitUntilLoaded(MapProxySupport.java:591)
	at com.hazelcast.map.impl.proxy.MapProxyImpl.waitUntilLoaded(MapProxyImpl.java:102)
	at com.hazelcast.map.impl.proxy.MapProxySupport.initializeMapStoreLoad(MapProxySupport.java:222)
	at com.hazelcast.map.impl.proxy.MapProxySupport.initialize(MapProxySupport.java:214)
	at com.hazelcast.map.impl.proxy.MapProxyImpl.initialize(MapProxyImpl.java:102)
	at com.hazelcast.spi.impl.proxyservice.impl.ProxyRegistry.doCreateProxy(ProxyRegistry.java:194)
	at com.hazelcast.spi.impl.proxyservice.impl.ProxyRegistry.createProxy(ProxyRegistry.java:184)
	at com.hazelcast.spi.impl.proxyservice.impl.ProxyRegistry.getOrCreateProxyFuture(ProxyRegistry.java:154)
	at com.hazelcast.spi.impl.proxyservice.impl.ProxyRegistry.getOrCreateProxy(ProxyRegistry.java:135)
	at com.hazelcast.spi.impl.proxyservice.impl.ProxyServiceImpl.getDistributedObject(ProxyServiceImpl.java:147)
	at com.hazelcast.instance.HazelcastInstanceImpl.getDistributedObject(HazelcastInstanceImpl.java:376)
	at com.hazelcast.instance.HazelcastInstanceImpl.getMap(HazelcastInstanceImpl.java:182)
	at com.hazelcast.instance.HazelcastInstanceProxy.getMap(HazelcastInstanceProxy.java:96)
	at com.nm.test.hazelcast.mapstore.TestMapStore33$1.run(TestMapStore33.java:133)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:340)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:360)
	at com.hazelcast.map.impl.proxy.MapProxySupport.waitAllTrue(MapProxySupport.java:611)
	at com.hazelcast.map.impl.proxy.MapProxySupport.waitUntilLoaded(MapProxySupport.java:589)
	... 18 more

maybe this helps...

thanks for looking into this and best,
Lukas

@lukasblu

This comment has been minimized.

Contributor

lukasblu commented Sep 21, 2017

... and one more observation:
Sometimes the second node (the one joining shortly after the first one has started to load data) also decides to load a small part of the data. This can be seen when the loaded keys count goes above 1000. E.g.

198357 [main] INFO com.nm.test.hazelcast.mapstore.TestMapStore33  - Loaded keys: 1296

When that happens it always failed for me. So, maybe taking a closer look at how this decision of the second node is taken could be interesting.

Cheers,
Lukas

@lukasblu

This comment has been minimized.

Contributor

lukasblu commented Sep 21, 2017

Hi @eminn ,
do you remember the following issue(s) from 3 years ago? :-)

Those tests are now in 3.8 sometimes failing for us again...
Could you maybe also have a look at this problem here?
Thanks a lot and best,
Lukas

vbekiaris added a commit to vbekiaris/hazelcast that referenced this issue Sep 24, 2017

[revert] hazelcast#11407
test demonstrating lack of progress as map is loaded and new member joins
@vbekiaris

This comment has been minimized.

Contributor

vbekiaris commented Sep 24, 2017

@rruxandra @lukasblu I think now we have nailed the root cause of this issue and it's related to value loading tracking not handling migrations properly. I have a work-in-progress branch here -- it's not a proper, mergeable fix but it does pass the test. I'll post updates as we shape this into a proper fix.

@lukasblu

This comment has been minimized.

Contributor

lukasblu commented Sep 25, 2017

Hi @vbekiaris ,
cool, thanks a lot for working on this. Your branch looks interesting, but also quite "dangerous" :-) (e.g. the BasicRecordStoreLoader not having a loaded flag anymore)... and it is therefore hard for me to guess at which point we could/should start testing this on our side too.
Should we already start looking into it now?
Thanks and best,
Lukas

@vbekiaris

This comment has been minimized.

Contributor

vbekiaris commented Sep 25, 2017

Hi @lukasblu , there are several subtleties involved in migrations & map loading process so I would suggest you wait for a proper fix PR before starting tests on your side.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment