Skip to content

Commit

Permalink
Merge pull request #10786 from mmedenjak/wan-cache-putAll
Browse files Browse the repository at this point in the history
Add WAN support for ICache.putAll
  • Loading branch information
jerrinot committed Aug 16, 2017
2 parents 24c3d11 + 19318e4 commit 80b48ea
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

package com.hazelcast.cache.impl.operation;

import com.hazelcast.cache.CacheEntryView;
import com.hazelcast.cache.CacheNotExistsException;
import com.hazelcast.cache.impl.CacheDataSerializerHook;
import com.hazelcast.cache.impl.CacheEntryViews;
import com.hazelcast.cache.impl.ICacheRecordStore;
import com.hazelcast.cache.impl.ICacheService;
import com.hazelcast.cache.impl.event.CacheWanEventPublisher;
import com.hazelcast.cache.impl.record.CacheRecord;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
Expand Down Expand Up @@ -75,10 +78,26 @@ public void run() throws Exception {
for (Map.Entry<Data, CacheRecord> entry : cacheRecords.entrySet()) {
CacheRecord record = entry.getValue();
cache.putRecord(entry.getKey(), record);

publishWanEvent(entry.getKey(), record);
}
}
}

private void publishWanEvent(Data key, CacheRecord record) {
if (cache.isWanReplicationEnabled()) {
ICacheService service = getService();
final CacheWanEventPublisher publisher = service.getCacheWanEventPublisher();
final CacheEntryView<Data, Data> view = CacheEntryViews.createDefaultEntryView(
key, toData(record.getValue()), record);
publisher.publishWanReplicationUpdate(name, view);
}
}

private Data toData(Object o) {
return getNodeEngine().getSerializationService().toData(o);
}

@Override
public ObjectNamespace getServiceNamespace() {
ICacheRecordStore recordStore = cache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package com.hazelcast.cache.impl.operation;

import com.hazelcast.cache.impl.CacheDataSerializerHook;
import com.hazelcast.cache.impl.CacheEntryViews;
import com.hazelcast.cache.impl.ICacheRecordStore;
import com.hazelcast.cache.impl.ICacheService;
import com.hazelcast.cache.impl.event.CacheWanEventPublisher;
import com.hazelcast.cache.impl.record.CacheRecord;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
Expand Down Expand Up @@ -86,6 +88,16 @@ public void run()
Data value = entry.getValue();
CacheRecord backupRecord = cache.put(key, value, expiryPolicy, callerUuid, completionId);
backupRecords.put(key, backupRecord);

publishWanEvent(key, value, backupRecord);
}
}

private void publishWanEvent(Data key, Data value, CacheRecord backupRecord) {
if (cache.isWanReplicationEnabled()) {
ICacheService service = getService();
CacheWanEventPublisher publisher = service.getCacheWanEventPublisher();
publisher.publishWanReplicationUpdate(name, CacheEntryViews.createDefaultEntryView(key, value, backupRecord));
}
}

Expand Down

0 comments on commit 80b48ea

Please sign in to comment.