Skip to content

Commit

Permalink
Merge pull request #4402 from gurbuzali/ticket679/3.3.4
Browse files Browse the repository at this point in the history
fixes ticket 679, retrieve processed value from record in case of post-processing map-store
  • Loading branch information
ahmetmircik committed Jan 12, 2015
2 parents 664a602 + 5d79e62 commit eec3586
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,8 @@ protected List<Object> convertToObjectKeys(Collection keys) {
return objectKeys;
}

@Override
public boolean isPostProcessingMapStore() {
return store.isPostProcessingMapStore();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,9 @@ public Object flush(Object key, Object value, long now, boolean backup) {
public int notFinishedOperationsCount() {
return 0;
}

@Override
public boolean isPostProcessingMapStore() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public interface MapDataStore<K, V> {

int notFinishedOperationsCount();

boolean isPostProcessingMapStore();

Collection flush();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.hazelcast.map.EntryViews;
import com.hazelcast.map.MapEventPublisher;
import com.hazelcast.map.MapServiceContext;
import com.hazelcast.map.mapstore.MapDataStore;
import com.hazelcast.map.record.Record;
import com.hazelcast.map.record.RecordInfo;
import com.hazelcast.nio.serialization.Data;
Expand Down Expand Up @@ -87,7 +88,14 @@ public boolean shouldBackup() {
public Operation getBackupOperation() {
final Record record = recordStore.getRecord(dataKey);
final RecordInfo replicationInfo = buildRecordInfo(record);
return new PutBackupOperation(name, dataKey, dataValue, replicationInfo);
MapDataStore<Data, Object> mapDataStore = recordStore.getMapDataStore();
Data dataValueForBackup = dataValue;
// if data-store is post processing, then we need to retrieve the 'processed' value from record
// not the value initially provided
if (mapDataStore.isPostProcessingMapStore()) {
dataValueForBackup = mapService.getMapServiceContext().toData(record.getValue());
}
return new PutBackupOperation(name, dataKey, dataValueForBackup, replicationInfo);
}

public final int getAsyncBackupCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.hazelcast.map.MapServiceContext;
import com.hazelcast.map.NearCacheProvider;
import com.hazelcast.map.RecordStore;
import com.hazelcast.map.mapstore.MapDataStore;
import com.hazelcast.map.record.Record;
import com.hazelcast.map.record.RecordInfo;
import com.hazelcast.map.record.Records;
Expand All @@ -37,6 +38,7 @@
import com.hazelcast.util.Clock;

import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -101,6 +103,7 @@ public void run() {
final EntryView entryView = EntryViews.createSimpleEntryView(dataKey, dataValueAsData, record);
mapEventPublisher.publishWanReplicationUpdate(name, entryView);
}
entry = getProcessedValue(entry, record);
backupEntrySet.add(entry);
RecordInfo replicationInfo = Records.buildRecordInfo(recordStore.getRecord(dataKey));
backupRecordInfos.add(replicationInfo);
Expand All @@ -110,6 +113,16 @@ public void run() {
invalidateNearCaches(keysToInvalidate);
}

public Map.Entry<Data, Data> getProcessedValue(Map.Entry<Data, Data> entry, Record record) {
MapDataStore<Data, Object> mapDataStore = recordStore.getMapDataStore();
if (!mapDataStore.isPostProcessingMapStore()) {
return entry;
}
MapServiceContext mapServiceContext = mapService.getMapServiceContext();
Data dataForBackup = mapServiceContext.toData(record.getValue());
return new AbstractMap.SimpleImmutableEntry<Data, Data>(entry.getKey(), dataForBackup);
}

protected final void invalidateNearCaches(Set<Data> keys) {
final NearCacheProvider nearCacheProvider = mapService.getMapServiceContext().getNearCacheProvider();
if (nearCacheProvider.isNearCacheAndInvalidationEnabled(name)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.hazelcast.core.EntryEventType;
import com.hazelcast.map.MapService;
import com.hazelcast.map.MapServiceContext;
import com.hazelcast.map.mapstore.MapDataStore;
import com.hazelcast.map.operation.BasePutOperation;
import com.hazelcast.map.operation.PutBackupOperation;
import com.hazelcast.map.record.Record;
Expand Down Expand Up @@ -112,7 +113,14 @@ public boolean shouldNotify() {
public Operation getBackupOperation() {
final Record record = recordStore.getRecord(dataKey);
final RecordInfo replicationInfo = Records.buildRecordInfo(record);
return new PutBackupOperation(name, dataKey, dataValue, replicationInfo, true);
MapDataStore<Data, Object> mapDataStore = recordStore.getMapDataStore();
Data dataValueForBackup = dataValue;
// if data-store is post processing, then we need to retrieve the 'processed' value from record
// not the value initially provided
if (mapDataStore.isPostProcessingMapStore()) {
dataValueForBackup = mapService.getMapServiceContext().toData(record.getValue());
}
return new PutBackupOperation(name, dataKey, dataValueForBackup, replicationInfo, true);
}

public void onWaitExpire() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package com.hazelcast.map.mapstore;

import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.MapStoreConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.core.MapStore;
import com.hazelcast.core.PostProcessingMapStore;
import com.hazelcast.core.TransactionalMap;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.TestHazelcastInstanceFactory;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.transaction.TransactionException;
import com.hazelcast.transaction.TransactionalTask;
import com.hazelcast.transaction.TransactionalTaskContext;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import static org.junit.Assert.assertEquals;

@RunWith(HazelcastParallelClassRunner.class)
@Category(QuickTest.class)
public class PostProcessingMapStoreTest extends HazelcastTestSupport {

@Test
public void testProcessedValueCarriedToTheBackup_whenPut() {
String name = randomString();
HazelcastInstance[] instances = getInstances(name);
IMap<Integer, SampleObject> map1 = instances[0].getMap(name);
IMap<Integer, SampleObject> map2 = instances[1].getMap(name);
int size = 1000;

for (int i = 0; i < size; i++) {
map1.put(i, new SampleObject(i));
}

for (int i = 0; i < size; i++) {
SampleObject o = map1.get(i);
assertEquals(i + 1, o.version);
}

for (int i = 0; i < size; i++) {
SampleObject o = map2.get(i);
assertEquals(i + 1, o.version);
}
}

@Test
public void testProcessedValueCarriedToTheBackup_whenTxnPut() {
final String name = randomString();
HazelcastInstance[] instances = getInstances(name);
IMap<Integer, SampleObject> map1 = instances[0].getMap(name);
IMap<Integer, SampleObject> map2 = instances[1].getMap(name);
int size = 1000;

for (int i = 0; i < size; i++) {
final int key = i;
instances[0].executeTransaction(new TransactionalTask<Object>() {
@Override
public Object execute(TransactionalTaskContext context) throws TransactionException {
TransactionalMap<Integer, SampleObject> map = context.getMap(name);
map.put(key, new SampleObject(key));
return null;
}
});
}

for (int i = 0; i < size; i++) {
SampleObject o = map1.get(i);
assertEquals(i + 1, o.version);
}

for (int i = 0; i < size; i++) {
SampleObject o = map2.get(i);
assertEquals(i + 1, o.version);
}
}

@Test
public void testProcessedValueCarriedToTheBackup_whenPutAll() {
String name = randomString();
HazelcastInstance[] instances = getInstances(name);
IMap<Integer, SampleObject> map1 = instances[0].getMap(name);
IMap<Integer, SampleObject> map2 = instances[1].getMap(name);
int size = 1000;

Map<Integer, SampleObject> temp = new HashMap<Integer, SampleObject>();
for (int i = 0; i < size; i++) {
temp.put(i, new SampleObject(i));
}
map1.putAll(temp);


for (int i = 0; i < size; i++) {
SampleObject o = map1.get(i);
assertEquals(i + 1, o.version);
}

for (int i = 0; i < size; i++) {
SampleObject o = map2.get(i);
assertEquals(i + 1, o.version);
}
}

HazelcastInstance[] getInstances(String name) {
TestHazelcastInstanceFactory factory = createHazelcastInstanceFactory(2);

Config config = new Config();
MapConfig mapConfig = config.getMapConfig(name);
mapConfig.setReadBackupData(true);
MapStoreConfig mapStoreConfig = new MapStoreConfig();
mapStoreConfig.setEnabled(true).setClassName(IncrementerPostProcessingMapStore.class.getName());
mapConfig.setMapStoreConfig(mapStoreConfig);

HazelcastInstance instance1 = factory.newHazelcastInstance(config);
HazelcastInstance instance2 = factory.newHazelcastInstance(config);

return new HazelcastInstance[]{instance1, instance2};
}


public static class IncrementerPostProcessingMapStore implements MapStore<Integer, SampleObject>, PostProcessingMapStore {

Map<Integer, SampleObject> map = new HashMap<Integer, SampleObject>();

@Override
public void store(Integer key, SampleObject value) {
value.version++;
map.put(key, value);
}

@Override
public void storeAll(Map<Integer, SampleObject> map) {
for (Map.Entry<Integer, SampleObject> entry : map.entrySet()) {
store(entry.getKey(), entry.getValue());
}
}

@Override
public void delete(Integer key) {
map.remove(key);
}

@Override
public void deleteAll(Collection<Integer> keys) {
for (Integer key : keys) {
map.remove(key);
}
}

@Override
public SampleObject load(Integer key) {
return map.get(key);
}

@Override
public Map<Integer, SampleObject> loadAll(Collection<Integer> keys) {
HashMap<Integer, SampleObject> temp = new HashMap<Integer, SampleObject>();
for (Integer key : keys) {
temp.put(key, map.get(key));
}
return temp;
}

@Override
public Set<Integer> loadAllKeys() {
return map.keySet();
}
}

public static class SampleObject implements Serializable {

public int version;

public SampleObject(int version) {
this.version = version;
}
}
}

0 comments on commit eec3586

Please sign in to comment.