Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write-behind MapStore is not re-trying to store an entry indefinitely #8918

Closed
rruxandra opened this issue Sep 20, 2016 · 4 comments

Comments

Projects
None yet
3 participants
@rruxandra
Copy link

commented Sep 20, 2016

Hello,

We discovered that when an entry cannot be persisted, the re-try mechanism stops working after a while (we saw a case when it stopped after 3 days and we reproduced the issue in a test after 12h).
This happens when the distributed map is configured to use the write-behind mechanism.
We were able to reproduce the issue in Hazelcast 3.5.5 (a snapshot from 2016-01-18) and 3.6.5 (a snapshot from 2016-07-11).

Here is a test case:

package com.nm.test.hazelcast.mapstore;

import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.MapStoreConfig;
import com.hazelcast.config.XmlConfigBuilder;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.nm.test.hazelcast.utils.CountingMapStore;
import com.nm.test.hazelcast.utils.values.FailableTestValue;
import junit.framework.TestCase;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;

/**
 * A test to analyze the long term behavior of an exception thrown during deserialization when trying to write the entry
 * to persistent storage.
 */
public class TestMapStore30 extends TestCase {

    private static final Logger logger = Logger.getLogger(TestMapStore30.class);

    private static final String MAP_NAME = "testMap" + TestMapStore30.class.getSimpleName();

    private static final int WRITE_DELAY = 2;

    private static final long MAX_TIME_NO_STORE = 10 * 60 * 1000; // 10 minutes

    private static final long MAX_TIME_TEST_RUNNING = 5 * 24 * 60 * 60 * 1000; // 5 days

    @Override
    protected void setUp() throws Exception {

        // configure logging
        BasicConfigurator.configure();
    }

    public void testPersistentMap() throws Exception {

        final long initialTime = System.currentTimeMillis();

        // create hazelcast config
        Config config = new XmlConfigBuilder().build();
        config.setProperty("hazelcast.logging.type", "log4j");
        config.setProperty("hazelcast.version.check.enabled", "false");

        // disable multicast for faster startup
        config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);

        // create map store
        CountingMapStore<FailableTestValue> store = new CountingMapStore<FailableTestValue>();

        // create map store config
        MapStoreConfig mapStoreConfig = new MapStoreConfig();
        mapStoreConfig.setEnabled(true);
        mapStoreConfig.setInitialLoadMode(MapStoreConfig.InitialLoadMode.EAGER);
        mapStoreConfig.setWriteDelaySeconds(WRITE_DELAY);
        mapStoreConfig.setClassName(null);
        mapStoreConfig.setImplementation(store);

        // configure map store
        MapConfig mapConfig = config.getMapConfig(MAP_NAME);
        mapConfig.setMapStoreConfig(mapStoreConfig);

        // start hazelcast instance
        HazelcastInstance hcInstance = Hazelcast.newHazelcastInstance(config);

        // try-finally to ensure hc shutdown
        try {
            int index = 0;

            // init map
            IMap<String, FailableTestValue> map = hcInstance.getMap(MAP_NAME);
            logger.info("Size = " + map.size());

            // create test data
            FailableTestValue failableTestValue = new FailableTestValue("should fail");
            failableTestValue.setFailInDeserialize(true);

            // put the first value which will throw de-serialization exceptions
            map.put("key" + index, failableTestValue);

            // variables
            int lastCountNumberStore = store.countNumberStore.get();
            long now = System.currentTimeMillis();
            long lastTimeCountNumberStoreChanged = now;

            // stop the test if no store operation was executed within 10 minutes
            while (now - lastTimeCountNumberStoreChanged < MAX_TIME_NO_STORE || now - initialTime < MAX_TIME_TEST_RUNNING) {
                index++;
                now = System.currentTimeMillis();

                // put other values which should be stored correctly
                FailableTestValue nonFailableTestValue = new FailableTestValue("should not fail");
                map.put("key" + index, nonFailableTestValue);

                store.printCounts((now - initialTime) + "ms after init");

                long timeDiff = now - lastTimeCountNumberStoreChanged;
                int currentCountNumberStore = store.countNumberStore.get();

                // check if the number of stored entries increased
                if (currentCountNumberStore > lastCountNumberStore) {
                    logger.info(String.format("Count number store has increased in the last %d ms", timeDiff));
                    lastTimeCountNumberStoreChanged = now;
                    lastCountNumberStore = currentCountNumberStore;
                } else {
                    logger.warn(String.format("Count number store didn't increase for %d ms", timeDiff));
                }

                Thread.sleep(60 * 1000);
            }

            // fail if no store operations executed anymore
            if (now - lastTimeCountNumberStoreChanged > MAX_TIME_NO_STORE) {
                fail(String.format("no store operation executed in the last %d minutes.", MAX_TIME_NO_STORE));
            }

        } finally {
            // shutdown hazelcast instance
            hcInstance.getLifecycleService().terminate();
        }
    }

}

package com.nm.test.hazelcast.utils;

import com.hazelcast.core.MapStore;
import org.apache.log4j.Logger;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * A map store which counts the different operations.
 * <p>
 * Furthermore it can be configured to throw exceptions in store/storeAll/delete/deleteAll.
 */
public class CountingMapStore<V> implements MapStore<String, V> {

    private static final Logger logger = Logger.getLogger(CountingMapStore.class);

    // ---------------------------------------------------------------- counters

    public AtomicInteger countLoadAllKeys = new AtomicInteger();

    public AtomicInteger countLoad = new AtomicInteger();

    public AtomicInteger countLoadAll = new AtomicInteger();

    public AtomicInteger countStore = new AtomicInteger();

    public AtomicInteger countStoreAll = new AtomicInteger();

    public AtomicInteger countDelete = new AtomicInteger();

    public AtomicInteger countDeleteAll = new AtomicInteger();

    public AtomicInteger countNumberStore = new AtomicInteger();

    public AtomicInteger countNumberDelete = new AtomicInteger();

    // ---------------------------------------------------------------- members

    private ConcurrentHashMap<String, V> store = new ConcurrentHashMap<String, V>();

    private int numExceptionsInStore;

    private int numExceptionsInDelete;

    private boolean exceptionInStoreAll;

    private boolean exceptionInDeleteAll;

    // ----------------------------------------------------------- construction

    public CountingMapStore() {
        this.exceptionInStoreAll = false;
        this.exceptionInDeleteAll = false;
    }

    public CountingMapStore(boolean exceptionInStoreAll, boolean exceptionInDeleteAll) {
        this.exceptionInStoreAll = exceptionInStoreAll;
        this.exceptionInDeleteAll = exceptionInDeleteAll;
    }

    public CountingMapStore(int numExceptionsInStore, int numExceptionsInDelete) {
        this.numExceptionsInStore = numExceptionsInStore;
        this.numExceptionsInDelete = numExceptionsInDelete;
        this.exceptionInStoreAll = true;
        this.exceptionInDeleteAll = true;
    }

    // ----------------------------------------------------- MapStore interface

    @Override
    public Set<String> loadAllKeys() {
        if (logger.isDebugEnabled()) {
            logger.debug("loadAllKeys() called.");
        }
        countLoadAllKeys.incrementAndGet();
        return new HashSet<String>(store.keySet());
    }

    @Override
    public V load(String key) {
        if (logger.isDebugEnabled()) {
            logger.debug("load('" + key + "') called.");
        }
        countLoad.incrementAndGet();
        return store.get(key);
    }

    @Override
    public Map<String, V> loadAll(Collection<String> keys) {
        if (logger.isDebugEnabled()) {
            logger.debug("loadAll('" + keys + "') called.");
        }
        countLoadAll.incrementAndGet();
        Map<String, V> result = new HashMap<String, V>();
        for (String key : keys) {
            V value = store.get(key);
            if (value != null) {
                result.put(key, value);
            }
        }
        return result;
    }

    @Override
    public void store(String key, V value) {
        if (logger.isDebugEnabled()) {
            logger.debug("store('" + key + "', '" + value + "') called.");
        }
        countStore.incrementAndGet();
        countNumberStore.incrementAndGet();
        if (numExceptionsInStore > 0) {
            numExceptionsInStore--;
            throw new RuntimeException("Exception in store().");
        }
        store.put(key, value);
    }

    @Override
    public void storeAll(Map<String, V> map) {
        if (logger.isDebugEnabled()) {
            logger.debug("storeAll('" + map + "') called.");
        }
        countStoreAll.incrementAndGet();
        countNumberStore.addAndGet(map.size());
        store.putAll(map);
        if (exceptionInStoreAll) {
            throw new RuntimeException("Exception in storeAll().");
        }
    }

    @Override
    public void delete(String key) {
        if (logger.isDebugEnabled()) {
            logger.debug("delete('" + key + "') called.");
        }
        countDelete.incrementAndGet();
        countNumberDelete.incrementAndGet();
        if (numExceptionsInDelete > 0) {
            numExceptionsInDelete--;
            throw new RuntimeException("Exception in delete().");
        }
        store.remove(key);
    }

    @Override
    public void deleteAll(Collection<String> keys) {
        if (logger.isDebugEnabled()) {
            logger.debug("deleteAll('" + keys + "') called.");
        }
        countDeleteAll.incrementAndGet();
        countNumberDelete.addAndGet(keys.size());
        for (String key : keys) {
            store.remove(key);
        }
        if (exceptionInDeleteAll) {
            throw new RuntimeException("Exception in deleteAll().");
        }
    }

    /**
     * Get number of entries in store.
     */
    public int size() {
        return store.size();
    }

    // ---------------------------------------------------------------- helpers

    public void printCounts(String title) {
        StringBuilder buf = new StringBuilder();
        buf.append(title + ":\n");
        buf.append("- num load all keys = " + countLoadAllKeys.get() + "\n");
        buf.append("- num load          = " + countLoad.get() + "\n");
        buf.append("- num load all      = " + countLoadAll.get() + "\n");
        buf.append("- num store         = " + countStore.get() + "\n");
        buf.append("- num store all     = " + countStoreAll.get() + "\n");
        buf.append("- num delete        = " + countDelete.get() + "\n");
        buf.append("- num delete all    = " + countDeleteAll.get() + "\n");
        buf.append("- count store       = " + countNumberStore.get() + "\n");
        buf.append("- count delete      = " + countNumberDelete.get() + "\n");
        System.out.println(buf.toString());
    }

}


package com.nm.test.hazelcast.utils.values;

import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.DataSerializable;
import java.io.IOException;

/**
 * A test value which can be configured to fail during serialize or deserialize.
 */
public class FailableTestValue implements DataSerializable {

    private String value;

    private boolean failInSerialize;

    private boolean failInDeserialize;

    /*
     * protected constructor for deserialization
     */
    FailableTestValue() {
    }

    public FailableTestValue(String value) {
        this.value = value;
    }

    public String getValue() {
        return value;
    }

    public void setFailInSerialize(boolean failInSerialize) {
        this.failInSerialize = failInSerialize;
    }

    public void setFailInDeserialize(boolean failInDeserialize) {
        this.failInDeserialize = failInDeserialize;
    }

    // ---------------------------------------------------------- serialization

    @Override
    public void writeData(ObjectDataOutput out) throws IOException {
        if (failInSerialize) {
            throw new IOException("Intended failure during serialize for '" + value + "'.");
        }
        out.writeUTF(value);
        out.writeBoolean(failInSerialize);
        out.writeBoolean(failInDeserialize);
    }

    @Override
    public void readData(ObjectDataInput in) throws IOException {
        value = in.readUTF();
        failInSerialize = in.readBoolean();
        failInDeserialize = in.readBoolean();
        if (failInDeserialize) {
            throw new IOException("Intended failure during deserialize for '" + value + "'.");
        }
    }

    // ------------------------------------------------------- Object overrides

    @Override
    public String toString() {
        return "[" + (failInSerialize ? "-" : "+") + "," + (failInDeserialize ? "-" : "+") + "] " + value;
    }

}

Could you have a look?

Thanks,
Ruxandra and Lukas

@rruxandra

This comment has been minimized.

Copy link
Author

commented Sep 20, 2016

Our biggest issue is that the MapStore stops persisting any other entry which is added to the distributed map.

@ahmetmircik

This comment has been minimized.

Copy link
Member

commented Sep 20, 2016

Hi @rruxandra, thanks for the report, let me check it.

@jerrinot jerrinot added the Team: Core label Sep 22, 2016

@jerrinot jerrinot added this to the 3.8 milestone Sep 22, 2016

@ahmetmircik

This comment has been minimized.

Copy link
Member

commented Oct 6, 2016

Hi @rruxandra,

According to my tests, setting this config fixes the issue:
config.getExecutorConfig("hz:scheduled:mapstore:"+MAP_NAME).setQueueCapacity(Integer.MAX_VALUE).setPoolSize(1);

While we are working on a real fix, that config change can be seen as a workaround.

Is it possible to rerun your test with this workaround?

@ahmetmircik

This comment has been minimized.

Copy link
Member

commented Oct 11, 2016

should be fixed by #9076

@ahmetmircik ahmetmircik modified the milestones: 3.6.6, 3.8 Oct 11, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.