Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MapPartitionLostListener not working #8505

Closed
lukasblu opened this issue Jul 11, 2016 · 2 comments

Comments

Projects
None yet
3 participants
@lukasblu
Copy link
Contributor

commented Jul 11, 2016

Hi,

it looks as if the MapPartitionLostListener is not working. We simply can not get it to fire - at all... We tested on 3.6.5-dev (a version from 2016-06-29).

here is a test.

Thanks for checking and best,
Lukas

package com.nm.test.hazelcast.map;

import com.hazelcast.config.Config;
import com.hazelcast.config.MapPartitionLostListenerConfig;
import com.hazelcast.config.XmlConfigBuilder;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.core.PartitionService;
import com.hazelcast.map.MapPartitionLostEvent;
import com.hazelcast.map.listener.MapPartitionLostListener;
import com.nm.test.hazelcast.TestHazelcast;
import com.nm.test.hazelcast.utils.Sleep;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.TestCase;

/**
 * Test to ensure MapPartitionLostListener works as expected.
 */
@RunWith(JUnit4.class)
public class TestMap11 extends TestCase {

    private static final Logger logger = Logger.getLogger(TestMap11.class);

    private static final String mapName = "testMap" + TestMap11.class.getSimpleName();

    private static final int MAP_SIZE = 100000;

    private HazelcastInstance hcInstance1;
    private HazelcastInstance hcInstance2;

    private final AtomicInteger mapSize = new AtomicInteger();
    private final AtomicBoolean isLossDetected = new AtomicBoolean();

    @Before
    public void setUp() throws Exception {

        // configure logging
        if (!TestHazelcast.loggingInitialized) {
            TestHazelcast.loggingInitialized = true;
            BasicConfigurator.configure();
        }

        mapSize.set(0);
        isLossDetected.set(false);
    }

    @After
    public void tearDown() {
        try {
            hcInstance1 = null;
            hcInstance2 = null;
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void testMapPartitionLostEventStatic() throws Exception {
        testInternal(false);
    }

    @Test
    public void testMapPartitionLostEventDynamic() throws Exception {
        testInternal(true);
    }

    private void testInternal(final boolean registerDynamically) throws Exception {

        // thread 1: start a first node
        Thread thread1 = new Thread(new Runnable() {

            @Override
            public void run() {
                final String threadName = Thread.currentThread().getName();

                // create listener
                CountingMapPartitionLostListener listener = new CountingMapPartitionLostListener(threadName, isLossDetected);

                // create config
                Config config = createInitialConfig();
                if (!registerDynamically) {
                    MapPartitionLostListenerConfig listenerConfig = new MapPartitionLostListenerConfig();
                    listenerConfig.setImplementation(listener);
                    config.getMapConfig(mapName).addMapPartitionLostListenerConfig(listenerConfig);
                    logger.info(threadName + " map partition lost listener configured before starting.");
                }

                // start cluster
                hcInstance1 = Hazelcast.newHazelcastInstance(config);

                // try-finally to stop hazelcast instance
                try {

                    // log started
                    logger.info(threadName + " started.");

                    // create map
                    final IMap<String, String> map = hcInstance1.getMap(mapName);
                    logger.info(threadName + " map created.");

                    // add listener dynamically
                    if (registerDynamically) {
                        map.addPartitionLostListener(listener);
                        logger.info(threadName + " map partition lost listener registered dynamically.");
                    }

                    // populate map
                    for (int i = 0; i < MAP_SIZE; i++) {
                        map.put(String.valueOf(i), "value" + i);
                    }
                    logger.info(threadName + " map populated.");

                    // print size
                    int size = map.size();
                    mapSize.set(size);
                    logger.info(threadName + " Map size 1 = " + size);

                    // wait 30s (during this time a second node joins and dies)
                    Sleep.sleep(30000, true);

                    // print size
                    size = map.size();
                    mapSize.set(size);
                    logger.info(threadName + " Map size 2 = " + size);

                } finally {
                    hcInstance1.getLifecycleService().shutdown();
                }
                logger.info(threadName + " done.");
            }
        }, "Thread 1");
        thread1.start();

        // wait 20s after starting first thread
        Sleep.sleep(20000, true);

        // thread 2: start a second node
        // - this joins the cluster and kills itself while parition migrations are happening
        Thread thread2 = new Thread(new Runnable() {

            @Override
            public void run() {
                final String threadName = Thread.currentThread().getName();

                // create listener
                CountingMapPartitionLostListener listener = new CountingMapPartitionLostListener(threadName, isLossDetected);

                // create config
                Config config = createInitialConfig();
                if (!registerDynamically) {
                    MapPartitionLostListenerConfig listenerConfig = new MapPartitionLostListenerConfig();
                    listenerConfig.setImplementation(listener);
                    config.getMapConfig(mapName).addMapPartitionLostListenerConfig(listenerConfig);
                    logger.info(threadName + " map partition lost listener configured before starting.");
                }

                // join
                hcInstance2 = Hazelcast.newHazelcastInstance(config);
                logger.info(threadName + " hazelcast instance joined.");

                // try-finally to kill hazelcast instance
                try {

                    // get map
                    IMap<String, String> map = hcInstance2.getMap(mapName);

                    // add listener dynamically
                    if (registerDynamically) {
                        map.addPartitionLostListener(listener);
                        logger.info(threadName + " map partition lost listener registered dynamically.");
                    }

                    // print size
                    int size = map.size();
                    mapSize.set(size);
                    logger.info(threadName + " Map size = " + size);

                    // wait before kill
                    PartitionService ps = hcInstance2.getPartitionService();
                    Sleep.sleep(3000, true);
                    logger.info(threadName + " Is local member safe? " + ps.isLocalMemberSafe());

                } finally {

                    // use terminate here to stop before partition migrations are done
                    hcInstance2.getLifecycleService().terminate();
                }
                logger.info(threadName + " done.");
            }
        }, "Thread 2");
        thread2.start();

        // join threads
        thread1.join();
        thread2.join();

        // ensure valid execution
        // - 1st condition: the test execution is only valid if the final map size is *not* equal to the initial map size
        // - 2nd condition: we detected the data loss
        final int mapSizeFinal = mapSize.get();
        final boolean isLossDetectedFinal = isLossDetected.get();
        logger.info("Final map size = " + mapSizeFinal);
        logger.info("Was data loss detected? " + isLossDetectedFinal);
        assertTrue("Invalid test execution: No data loss produced.", mapSizeFinal != MAP_SIZE);
        assertTrue("Data loss produced, but not detected.", isLossDetectedFinal);
    }

    private static class CountingMapPartitionLostListener implements MapPartitionLostListener {

        private final String threadName;

        private final AtomicBoolean isLossDetected;

        private final AtomicInteger numPartitionsLost;

        public CountingMapPartitionLostListener(String threadName, AtomicBoolean isLossDetected) {
            this.threadName = threadName;
            this.isLossDetected = isLossDetected;
            this.numPartitionsLost = new AtomicInteger(0);
        }

        @Override
        public void partitionLost(MapPartitionLostEvent event) {
            numPartitionsLost.incrementAndGet();

            // log and remember
            logger.info(threadName + " has lost a partition. Current call count = " + numPartitionsLost);
            isLossDetected.set(true);
        }

    }

    private Config createInitialConfig() {
        Config config = new XmlConfigBuilder().build();
        config.setProperty("hazelcast.logging.type", "log4j");
        config.setProperty("hazelcast.jmx", "false");
        config.setProperty("hazelcast.version.check.enabled", "false");
        config.setProperty("hazelcast.phone.home.enabled", "false");
        return config;
    }

}


package com.nm.test.hazelcast.utils;

import org.apache.log4j.Logger;

public class Sleep {

    private static final Logger logger = Logger.getLogger(Sleep.class);

    /**
     * Tries to sleep for the desired number of milliseconds, throwing a RuntimeException in case
     * the thread is interrupted.
     * 
     * @param ms Number of milliseconds to sleep
     * @param log true to log, false otherwise.
     */
    public static void sleep(long ms, boolean log) {
        if (ms <= 0) {
            return;
        }
        try {
            if (log) {
                logger.info("Starting to sleep for " + (ms / 1000) + "s...");
            }
            Thread.sleep(ms);
            if (log) {
                logger.info("Slept " + (ms / 1000) + "s.");
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

}

@metanet metanet self-assigned this Jul 11, 2016

@tombujok tombujok added the Team: Core label Jul 11, 2016

metanet added a commit to metanet/hazelcast that referenced this issue Jul 15, 2016

Do not overwrite SYNC_WAITING_REPLICA flag on migration commit
If a source node commits a migration, it clears its replica indices. However, it should keep SYNC_WAITING_REPLICA flags if it moves to a backup index in the committed partition.

This behavior doesn't exist in the master branch.

Fixes hazelcast#8505

@metanet metanet added this to the 3.6.5 milestone Jul 15, 2016

@metanet

This comment has been minimized.

Copy link
Contributor

commented Jul 15, 2016

Hi @lukasblu

Thank you for reporting the issue and providing a very useful reproducer.

The fix is ready at #8532 and most probably it will be merged next week.

Regards,

metanet added a commit to metanet/hazelcast that referenced this issue Jul 15, 2016

Do not overwrite SYNC_WAITING_REPLICA flag on migration commit
If a source node commits a migration, it clears its replica indices. However, it should keep SYNC_WAITING_REPLICA flags if it moves to a backup index in the committed partition.

This behavior doesn't exist in the master branch.

Fixes hazelcast#8505

metanet added a commit to metanet/hazelcast that referenced this issue Jul 15, 2016

Do not overwrite SYNC_WAITING_REPLICA flag on migration commit
If a source node commits a migration, it clears its replica indices. However, it should keep SYNC_WAITING_REPLICA flags if it moves to a backup index in the committed partition.

This behavior doesn't exist in the master branch.

Fixes hazelcast#8505
@metanet

This comment has been minimized.

Copy link
Contributor

commented Jul 18, 2016

Closed by #8532

@metanet metanet closed this Jul 18, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.