Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[map] [map-store] MapStore: write delay not precisely respected #9745

Closed
rickymemphis opened this issue Jan 24, 2017 · 4 comments
Closed

[map] [map-store] MapStore: write delay not precisely respected #9745

rickymemphis opened this issue Jan 24, 2017 · 4 comments

Comments

@rickymemphis
Copy link

@rickymemphis rickymemphis commented Jan 24, 2017

Hi,

We have discovered that the write-behind queue behaviour differs from the last version of Hazelcast (3.7.4) to one of the previous ones (3.5.5), it looks like there was a problem introduced in 3.6.

Using a map store with a write delay and a batch size, if I set an entry in a map and shortly after a second one, the second entry will not be persisted by the map store after write delay seconds, but it will be persisted together with the first entry - meaning it is stored too early.

Attached there are two tests, the first one where the two set operations are executed with a delay bigger than the map write delay and it always passes.
The second test the two set operations are executed with a delay smaller than the map write delay and it always fails.

Can you please have a look at it?

While debugging we have discovered that you have removed

        long storeTime = now + writeDelayTime;

From WriteBehindStore.add(Data key, Object value, long now) method.

Thanks,
Riccardo

package com.nm.test.hazelcast.mapstore;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.MapStoreConfig;
import com.hazelcast.config.XmlConfigBuilder;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.nm.test.hazelcast.TestHazelcast;
import com.nm.test.hazelcast.utils.mapstore.WriteDelayMapStore;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * A test to analyze the behaviour of the Hazelcast map store using write delay and batch size.
 */
public class TestMapStore31 {

	private static final Logger logger = Logger.getLogger(TestMapStore31.class);

	private static final String MAP_NAME = "testMap" + TestMapStore31.class.getSimpleName();

	private int mapStoreWriteDelaySec = 10;

	private int mapStoreBatchSize = 50;

	private Config config;

	private WriteDelayMapStore<String> mapStore;

	private IMap<String, String> iMap;

	private HazelcastInstance hcInstance;

	@Before
	public void setUp() throws Exception {

		if (!TestHazelcast.loggingInitialized) {
			TestHazelcast.loggingInitialized = true;
			BasicConfigurator.configure();
		}

		// set hazelcast config
		config = new XmlConfigBuilder().build();
		config.setProperty("hazelcast.logging.type", "log4j");
		config.setProperty("hazelcast.version.check.enabled", "false");

		// disable multicast for faster startup
		config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);

		// create map store
		mapStore = new WriteDelayMapStore<String>();

		// set map store config
		MapStoreConfig mapStoreConfig = new MapStoreConfig();
		mapStoreConfig.setEnabled(true);
		mapStoreConfig.setInitialLoadMode(MapStoreConfig.InitialLoadMode.EAGER);
		mapStoreConfig.setWriteCoalescing(true);
		mapStoreConfig.setWriteDelaySeconds(mapStoreWriteDelaySec);
		mapStoreConfig.setWriteBatchSize(mapStoreBatchSize);
		mapStoreConfig.setClassName(null);
		mapStoreConfig.setImplementation(mapStore);

		// configure map config
		MapConfig mapConfig = config.getMapConfig(MAP_NAME);
		mapConfig.setMapStoreConfig(mapStoreConfig);

		// start hazelcast instance
		hcInstance = Hazelcast.newHazelcastInstance(config);

		// init map
		iMap = hcInstance.getMap(MAP_NAME);
		iMap.clear();
	}

	@After
	public void tearDown() throws Exception {

		// clear map
		iMap.clear();

		// shutdown hazelcast
		hcInstance.shutdown();

		// wait if hazelcast is still running
		while (hcInstance.getLifecycleService().isRunning()) {
			Thread.sleep(5000);
		}
	}

	/**
	 * This test checks if two set operations on a IMap executed with a delay between them greater
	 * than the map store write delay generates two write operations in the map store.
	 */
	@Test
	public void testTwoDelayedSets() throws Exception {

		logger.info("-- Test two delayed sets with mapWriteDelay=" + mapStoreWriteDelaySec + " mapBatchSize=" + mapStoreBatchSize);

		try {
			// init executor services
			ExecutorService checker1Executor = Executors.newSingleThreadExecutor();
			ExecutorService checker2Executor = Executors.newSingleThreadExecutor();

			// log map stats
			logger.info("map stats = " + iMap.getLocalMapStats());

			// set first object in the map and collect the time
			logger.info("setting key = key1 ...");
			long t1 = System.currentTimeMillis();
			iMap.set("key1", "value1");

			// start checker thread passing the time of the first set operation
			Future<int[]> checker1Future = checker1Executor.submit(new MapStoreWritesChecker(t1));

			// wait (2 times the write-behind interval)
			Thread.sleep(2 * mapStoreWriteDelaySec * 1000);

			// set second object in the map and collect the time
			logger.info("setting key = key2 ...");
			long t2 = System.currentTimeMillis();
			iMap.set("key2", "value2");

			// start checker thread passing the time of the second set operation
			Future<int[]> checker2Future = checker2Executor.submit(new MapStoreWritesChecker(t2));

			// wait for checkers to return and log collected writes counts
			logger.info("Writes count checker 1 getting result...");
			int[] writesCount1 = checker1Future.get();
			logger.info("Writes count checker 1: " + Arrays.toString(writesCount1));
			logger.info("Writes count checker 2 getting result...");
			int[] writesCount2 = checker2Future.get();
			logger.info("Writes count checker 2: " + Arrays.toString(writesCount2));

			// check the number of writes of the map store after write delay seconds of the first set
			assertEquals("Checker 1: Expected writes before=0 but are " + writesCount1[0], 0, writesCount1[0]);
			assertEquals("Checker 1: Expected writes after=1  but are " + writesCount1[1], 1, writesCount1[1]);

			// check the number of writes of the map store after write delay seconds of the second set
			assertEquals("Checker 2: Expected writes before=1 but are " + writesCount2[0], 1, writesCount2[0]);
			assertEquals("Checker 2: Expected writes after=2  but are " + writesCount2[1], 2, writesCount2[1]);

		} catch (Exception e) {
			fail(e.getMessage());
		}
	}

	/**
	 * This test checks if two set operations on a IMap executed with a delay between them shorter
	 * than the map store write delay generates two write operations in the map store.
	 */
	@Test
	public void testTwoCloseSets() throws Exception {

		logger.info("-- Test two close sets with mapWriteDelay=" + mapStoreWriteDelaySec + " mapBatchSize=" + mapStoreBatchSize);

		try {
			// init executor services
			ExecutorService checker1Executor = Executors.newSingleThreadExecutor();
			ExecutorService checker2Executor = Executors.newSingleThreadExecutor();

			logger.info("map stats = " + iMap.getLocalMapStats());

			// set first object in the map and collect the time (t=0)
			logger.info("setting key = key1 ...");
			long t1 = System.currentTimeMillis();
			iMap.set("key1", "value1");

			// start checker thread (will check at t=10s)
			Future<int[]> checker1Future = checker1Executor.submit(new MapStoreWritesChecker(t1));

			// wait (half the write-behind interval)
			Thread.sleep(mapStoreWriteDelaySec * 500);

			// set second object in the map and collect the time (t=5s)
			logger.info("setting key = key2 ...");
			long t2 = System.currentTimeMillis();
			iMap.set("key2", "value2");

			// start checker thread (will check at t=15s)
			Future<int[]> checker2Future = checker2Executor.submit(new MapStoreWritesChecker(t2));

			// wait for checkers to return
			logger.info("Writes count checker 1 getting result...");
			int[] writesCount1 = checker1Future.get();
			logger.info("Writes count checker 1: " + Arrays.toString(writesCount1));
			logger.info("Writes count checker 2 getting result...");
			int[] writesCount2 = checker2Future.get();
			logger.info("Writes count checker 2: " + Arrays.toString(writesCount2));

			// check writes of the store after write delay seconds of the first set
			//
			// PROBLEM: we expect
			// - 0 store calls before t=10s (e.g. at t= 9s) and
			// - 1 store calls after  t=10s (e.g. at t=11s)
			// the store call for key 2 should be later at around t=15s -> but it is not
			//
			assertEquals("Checker 1: Expected writes before=0 but are " + writesCount1[0], 0, writesCount1[0]);
			assertEquals("Checker 1: Expected writes after=1  but are " + writesCount1[1], 1, writesCount1[1]);

			// check writes of the store after write delay seconds of the second set
			assertEquals("Checker 2: Expected writes before=1 but are " + writesCount2[0], 1, writesCount2[0]);
			assertEquals("Checker 2: Expected writes after=2  but are " + writesCount2[1], 2, writesCount2[1]);

		} catch (Exception e) {
			fail(e.getMessage());
		}
	}

	/**
	 * This class checks how many write operations are executed at two moments and it reports those
	 * numbers.
	 * <p>
	 * Checks happen before a set operation plus the configured map store delay after the set
	 * operation plus the configured maps store delay.
	 */
	private class MapStoreWritesChecker implements Callable<int[]> {

		// timestamp when the set operation has happenes
		private long tSet;

		public MapStoreWritesChecker(long tSet) {
			this.tSet = tSet;
		}

		@Override
		public int[] call() throws Exception {

			// sleep time in milliseconds
			final int sleepMs = 500;

			// array containing the write operations before and after tSet + mapStoreWriteDelay
			int[] writeCountArray = new int[] {
					-1,
					-1 };

			while (true) {

				long tCheck = System.currentTimeMillis();

				// switch on time relative to write-behind interval
				final long duration = tCheck - tSet;
				if (duration >= (mapStoreWriteDelaySec - 2) * 1000 && duration <= (mapStoreWriteDelaySec - 1) * 1000) {

					// check _before_ mapWriteDelaySec (between 2 and 1 seconds before)
					// note: it is okay if the checker thread goes here twice

					int actualStoreWrites = mapStore.getCountStore().get();
					writeCountArray[0] = actualStoreWrites;

				} else if (duration >= (mapStoreWriteDelaySec + 1) * 1000) {

					// check _after_ mapWriteDelaySec (after 1 second after)

					int actualStoreWrites = mapStore.getCountStore().get();
					writeCountArray[1] = actualStoreWrites;

					// done
					break;

				} else {

					// sleep, it is too early to check
					try {
						Thread.sleep(sleepMs);
					} catch (InterruptedException e) {
						throw new RuntimeException(e);
					}
				}
			}

			// returns the write operations
			return writeCountArray;
		}
	}

}

package com.nm.test.hazelcast.utils.mapstore;

import com.hazelcast.core.MapStore;
import org.apache.log4j.Logger;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Map store implementation used to test the map store write delay feature.
 * <p>
 * This map store uses a ConcurrentHashMap to store the objects and a counter to collect the number
 * of write operations executed.
 */
public class WriteDelayMapStore<V> implements MapStore<String, V> {

	private static final Logger logger = Logger.getLogger(WriteDelayMapStore.class);

	// ---------------------------------------------------------------- counters

	/**
	 * Counter for store calls.
	 */
	private AtomicInteger countStore = new AtomicInteger();

	// ---------------------------------------------------------------- members

	private ConcurrentHashMap<String, V> inMemoryStore = new ConcurrentHashMap<String, V>();

	// ---------------------------------------------------------- final methods

	public final AtomicInteger getCountStore() {
		return countStore;
	}

	// ----------------------------------------------------- MapStore interface

	@Override
	public Set<String> loadAllKeys() {

		if (logger.isDebugEnabled()) {
			logger.debug("loadAllKeys() called.");
		}

		return new HashSet<String>(inMemoryStore.keySet());
	}

	@Override
	public V load(String key) {

		if (logger.isDebugEnabled()) {
			logger.debug("load('" + key + "') called.");
		}

		return inMemoryStore.get(key);
	}

	@Override
	public Map<String, V> loadAll(Collection<String> keys) {

		if (logger.isDebugEnabled()) {
			logger.debug("loadAll('" + keys + "') called.");
		}

		Map<String, V> result = new HashMap<String, V>();
		for (String key : keys) {
			V value = inMemoryStore.get(key);
			if (value != null) {
				result.put(key, value);
			}
		}
		return result;
	}

	@Override
	public void store(String key, V value) {

		if (logger.isDebugEnabled()) {
			logger.debug("store('" + key + "', '" + value + "') called.");
		}

		logger.info("store called. with key=" + key);

		// count store calls
		countStore.incrementAndGet();
		inMemoryStore.put(key, value);
	}

	@Override
	public void storeAll(Map<String, V> map) {

		if (logger.isDebugEnabled()) {
			logger.debug("storeAll('" + map + "') called.");
		}

		logger.info("storeAll called. with map size=" + map.size());

		for (Entry<String, V> entry : map.entrySet()) {

			// count store calls
			countStore.incrementAndGet();
			inMemoryStore.put(entry.getKey(), entry.getValue());
		}
	}

	@Override
	public void delete(String key) {

		if (logger.isDebugEnabled()) {
			logger.debug("delete('" + key + "') called.");
		}

		inMemoryStore.remove(key);
	}

	@Override
	public void deleteAll(Collection<String> keys) {

		if (logger.isDebugEnabled()) {
			logger.debug("deleteAll('" + keys + "') called.");
		}

		for (String key : keys) {
			inMemoryStore.remove(key);
		}
	}

}
@ahmetmircik
Copy link
Member

@ahmetmircik ahmetmircik commented Jan 24, 2017

Hi @rickymemphis,

It was changed when fixing this issue: #7464. With that change we are making use of batch store operations better. Current behaviour is like this: Say write-delay-seconds is 5 seconds. All operations in a 5 seconds window are put in a queue and are processed together in the end of that window. Previous approach was based on individual entries write delay time.

What kind of issues this new approach is causing for you?

@mdogan mdogan added the Team: Core label May 31, 2017
@mmedenjak
Copy link
Contributor

@mmedenjak mmedenjak commented Jul 11, 2017

@rickymemphis as Ahmet mentioned, the behaviour was changed as a result of an another issue in 3.7 and the stores are now put into "batches". Is the new behaviour unacceptable for you?

@mmedenjak mmedenjak changed the title MapStore: write delay not precisely respected [map] MapStore: write delay not precisely respected Jul 13, 2017
@mmedenjak mmedenjak added this to the 3.9 milestone Jul 13, 2017
@mmedenjak mmedenjak changed the title [map] MapStore: write delay not precisely respected [map] [map-store] MapStore: write delay not precisely respected Jul 15, 2017
@rickymemphis
Copy link
Author

@rickymemphis rickymemphis commented Jul 18, 2017

Hi everyone,
Sorry for late reply, but we wanted to test the new implementation a bit better before getting back to you. I would say that we can close the ticket for now and if we find some serious issues we will inform you.
So far what it is concerning us is that it is not 100% predictable when an entry in the map will be persisted, but at the same time I don't think it is so important to know in our use case.
Regards,
Riccardo

@ahmetmircik
Copy link
Member

@ahmetmircik ahmetmircik commented Jul 18, 2017

thanks @rickymemphis

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked pull requests

Successfully merging a pull request may close this issue.

None yet
4 participants
You can’t perform that action at this time.