Skip to content

Commit

Permalink
Fixes IMap#replace not loading from MapLoader
Browse files Browse the repository at this point in the history
replace operation just overlooked MapLoader when given key is not present in-memory. Instead, it should try to load from MapLoader first.

fixes #11300
  • Loading branch information
mustafaiman committed Apr 5, 2019
1 parent 95e1c34 commit 57e0a43
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 6 deletions.
8 changes: 8 additions & 0 deletions hazelcast/src/main/java/com/hazelcast/core/IMap.java
Expand Up @@ -1522,6 +1522,10 @@ public interface IMap<K, V> extends ConcurrentMap<K, V>, LegacyAsyncMap<K, V> {
*
* <p><b>Interactions with the map store</b>
* <p>
* If value with {@code key} is not found in memory,
* {@link MapLoader#load(Object)} is invoked to load the value from
* the map store backing the map.
* <p>
* If write-through persistence mode is configured, before the value
* is stored in memory, {@link MapStore#store(Object, Object)} is
* called to write the value into the map store. Exceptions thrown
Expand Down Expand Up @@ -1553,6 +1557,10 @@ public interface IMap<K, V> extends ConcurrentMap<K, V>, LegacyAsyncMap<K, V> {
*
* <p><b>Interactions with the map store</b>
* <p>
* If value with {@code key} is not found in memory,
* {@link MapLoader#load(Object)} is invoked to load the value from
* the map store backing the map.
* <p>
* If write-through persistence mode is configured, before the value
* is stored in memory, {@link MapStore#store(Object, Object)} is
* called to write the value into the map store. Exceptions thrown
Expand Down
Expand Up @@ -830,21 +830,32 @@ record = createRecord(key, newValue, DEFAULT_TTL, DEFAULT_MAX_IDLE, now);
return newValue != null;
}

// TODO why does not replace method load data from map store if currently not available in memory.
@Override
public Object replace(Data key, Object update) {
checkIfLoaded();
long now = getNow();

Record record = getRecordOrNull(key, now, false);
if (record == null || record.getValue() == null) {
Object oldValue;
if (record == null) {
oldValue = mapDataStore.load(key);
} else {
oldValue = record.getValue();
}
if (oldValue == null) {
return null;
}
Object oldValue = record.getValue();
update = mapServiceContext.interceptPut(name, oldValue, update);
update = mapDataStore.add(key, update, now);
if (record == null) {
record = createRecord(key, update, DEFAULT_TTL, DEFAULT_MAX_IDLE, now);
storage.put(key, record);
stats.setLastUpdateTime(now);
mutationObserver.onPutRecord(key, record);
} else {
updateRecord(key, record, update, now, true);
}
onStore(record);
updateRecord(key, record, update, now, true);
setExpirationTimes(record.getTtl(), record.getMaxIdle(), record, mapContainer.getMapConfig(), false);
saveIndex(record, oldValue);
return oldValue;
Expand All @@ -856,17 +867,29 @@ public boolean replace(Data key, Object expect, Object update) {
long now = getNow();

Record record = getRecordOrNull(key, now, false);
Object current;
if (record == null) {
current = mapDataStore.load(key);
} else {
current = record.getValue();
}
if (current == null) {
return false;
}
Object current = record.getValue();
if (!valueComparator.isEqual(expect, current, serializationService)) {
return false;
}
update = mapServiceContext.interceptPut(name, current, update);
update = mapDataStore.add(key, update, now);
if (record == null) {
record = createRecord(key, update, DEFAULT_TTL, DEFAULT_MAX_IDLE, now);
storage.put(key, record);
stats.setLastUpdateTime(now);
mutationObserver.onPutRecord(key, record);
} else {
updateRecord(key, record, update, now, true);
}
onStore(record);
updateRecord(key, record, update, now, true);
setExpirationTimes(record.getTtl(), record.getMaxIdle(), record, mapContainer.getMapConfig(), false);
saveIndex(record, current);
return true;
Expand Down
Expand Up @@ -390,6 +390,46 @@ public void testIssue583MapReplaceShouldTriggerMapStore() {
assertEquals(1L, store.get("one").longValue());
}

@Test
public void test_givenKeyNotExists_mapLoaderShouldServeOldValueForMutatingOperations() {
ConcurrentMap<String, Long> store = new ConcurrentHashMap<String, Long>();
MapStore<String, Long> myMapStore = new SimpleMapStore<String, Long>(store);
Config config = getConfig();
config.getMapConfig("myMap")
.setMapStoreConfig(new MapStoreConfig()
.setImplementation(myMapStore));
TestHazelcastInstanceFactory nodeFactory = createHazelcastInstanceFactory(3);
HazelcastInstance hc = nodeFactory.newHazelcastInstance(config);
IMap<String, Long> myMap = hc.getMap("myMap");

// partitions may be created lazily when we first do an operation, causing
// map loader to be triggered for all entries during initialization. So
// we make sure to initialize all required partitions before hand.
myMap.get("replace");
myMap.get("replaceIfSame");
myMap.get("remove");
myMap.get("put");
myMap.get("putIfAbsent");

store.put("replace", -1L);
store.put("replaceIfSame", -2L);
store.put("remove", -3L);
store.put("put", -4L);
store.put("putIfAbsent", -5L);

assertEquals(-1, (long) myMap.replace("replace", 1L));
assertTrue(myMap.replace("replaceIfSame", -2L, 2L));
assertEquals(-3, (long) myMap.remove("remove"));
assertEquals(-4, (long) myMap.put("put", 4L));
assertEquals(-5, (long) myMap.putIfAbsent("putIfAbsent", 5L));

assertEquals(1L, (long) store.get("replace"));
assertEquals(2L, (long) store.get("replaceIfSame"));
assertNull(store.get("remove"));
assertEquals(4L, (long) store.get("put"));
assertEquals(-5L, (long) store.get("putIfAbsent"));
}

@Test(timeout = 120000)
public void issue587CallMapLoaderDuringRemoval() {
final AtomicInteger loadCount = new AtomicInteger(0);
Expand Down

0 comments on commit 57e0a43

Please sign in to comment.