Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes IMap#replace not loading from MapLoader #14797

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions hazelcast/src/main/java/com/hazelcast/core/IMap.java
Expand Up @@ -1522,6 +1522,10 @@ public interface IMap<K, V> extends ConcurrentMap<K, V>, LegacyAsyncMap<K, V> {
*
* <p><b>Interactions with the map store</b>
* <p>
* If value with {@code key} is not found in memory,
* {@link MapLoader#load(Object)} is invoked to load the value from
* the map store backing the map.
* <p>
* If write-through persistence mode is configured, before the value
* is stored in memory, {@link MapStore#store(Object, Object)} is
* called to write the value into the map store. Exceptions thrown
Expand Down Expand Up @@ -1553,6 +1557,10 @@ public interface IMap<K, V> extends ConcurrentMap<K, V>, LegacyAsyncMap<K, V> {
*
* <p><b>Interactions with the map store</b>
* <p>
* If value with {@code key} is not found in memory,
* {@link MapLoader#load(Object)} is invoked to load the value from
* the map store backing the map.
* <p>
* If write-through persistence mode is configured, before the value
* is stored in memory, {@link MapStore#store(Object, Object)} is
* called to write the value into the map store. Exceptions thrown
Expand Down
Expand Up @@ -830,21 +830,32 @@ record = createRecord(key, newValue, DEFAULT_TTL, DEFAULT_MAX_IDLE, now);
return newValue != null;
}

// TODO why does not replace method load data from map store if currently not available in memory.
@Override
public Object replace(Data key, Object update) {
checkIfLoaded();
long now = getNow();
mustafaiman marked this conversation as resolved.
Show resolved Hide resolved

Record record = getRecordOrNull(key, now, false);
if (record == null || record.getValue() == null) {
Object oldValue;
if (record == null) {
oldValue = mapDataStore.load(key);
} else {
oldValue = record.getValue();
}
if (oldValue == null) {
return null;
}
Object oldValue = record.getValue();
update = mapServiceContext.interceptPut(name, oldValue, update);
update = mapDataStore.add(key, update, now);
if (record == null) {
record = createRecord(key, update, DEFAULT_TTL, DEFAULT_MAX_IDLE, now);
storage.put(key, record);
stats.setLastUpdateTime(now);
mutationObserver.onPutRecord(key, record);
} else {
updateRecord(key, record, update, now, true);
}
onStore(record);
updateRecord(key, record, update, now, true);
setExpirationTimes(record.getTtl(), record.getMaxIdle(), record, mapContainer.getMapConfig(), false);
saveIndex(record, oldValue);
return oldValue;
Expand All @@ -856,17 +867,29 @@ public boolean replace(Data key, Object expect, Object update) {
long now = getNow();

Record record = getRecordOrNull(key, now, false);
Object current;
if (record == null) {
current = mapDataStore.load(key);
} else {
current = record.getValue();
}
if (current == null) {
return false;
}
Object current = record.getValue();
if (!valueComparator.isEqual(expect, current, serializationService)) {
return false;
}
update = mapServiceContext.interceptPut(name, current, update);
update = mapDataStore.add(key, update, now);
if (record == null) {
record = createRecord(key, update, DEFAULT_TTL, DEFAULT_MAX_IDLE, now);
storage.put(key, record);
stats.setLastUpdateTime(now);
mutationObserver.onPutRecord(key, record);
} else {
updateRecord(key, record, update, now, true);
}
onStore(record);
updateRecord(key, record, update, now, true);
setExpirationTimes(record.getTtl(), record.getMaxIdle(), record, mapContainer.getMapConfig(), false);
saveIndex(record, current);
return true;
Expand Down
Expand Up @@ -390,6 +390,46 @@ public void testIssue583MapReplaceShouldTriggerMapStore() {
assertEquals(1L, store.get("one").longValue());
}

@Test
public void test_givenKeyNotExists_mapLoaderShouldServeOldValueForMutatingOperations() {
ConcurrentMap<String, Long> store = new ConcurrentHashMap<String, Long>();
MapStore<String, Long> myMapStore = new SimpleMapStore<String, Long>(store);
Config config = getConfig();
config.getMapConfig("myMap")
.setMapStoreConfig(new MapStoreConfig()
.setImplementation(myMapStore));
TestHazelcastInstanceFactory nodeFactory = createHazelcastInstanceFactory(3);
HazelcastInstance hc = nodeFactory.newHazelcastInstance(config);
IMap<String, Long> myMap = hc.getMap("myMap");

// partitions may be created lazily when we first do an operation, causing
// map loader to be triggered for all entries during initialization. So
// we make sure to initialize all required partitions before hand.
myMap.get("replace");
myMap.get("replaceIfSame");
myMap.get("remove");
myMap.get("put");
myMap.get("putIfAbsent");

store.put("replace", -1L);
store.put("replaceIfSame", -2L);
store.put("remove", -3L);
store.put("put", -4L);
store.put("putIfAbsent", -5L);

assertEquals(-1, (long) myMap.replace("replace", 1L));
assertTrue(myMap.replace("replaceIfSame", -2L, 2L));
assertEquals(-3, (long) myMap.remove("remove"));
assertEquals(-4, (long) myMap.put("put", 4L));
assertEquals(-5, (long) myMap.putIfAbsent("putIfAbsent", 5L));

assertEquals(1L, (long) store.get("replace"));
assertEquals(2L, (long) store.get("replaceIfSame"));
assertNull(store.get("remove"));
assertEquals(4L, (long) store.get("put"));
assertEquals(-5L, (long) store.get("putIfAbsent"));
}

@Test(timeout = 120000)
public void issue587CallMapLoaderDuringRemoval() {
final AtomicInteger loadCount = new AtomicInteger(0);
Expand Down