Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for WAN replication of IMap/ICache evictions [HZ-2619] #24941

Merged
merged 4 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import com.hazelcast.spi.impl.tenantcontrol.TenantContextual;
import com.hazelcast.spi.merge.SplitBrainMergePolicy;
import com.hazelcast.spi.merge.SplitBrainMergeTypes.CacheMergeTypes;
import com.hazelcast.spi.properties.ClusterProperty;
import com.hazelcast.spi.tenantcontrol.TenantControl;
import com.hazelcast.wan.impl.CallerProvenance;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
Expand Down Expand Up @@ -148,6 +149,7 @@ public abstract class AbstractCacheRecordStore<R extends CacheRecord, CRM extend
protected Iterator<Map.Entry<Data, R>> expirationIterator;
protected InvalidationQueue<ExpiredKey> expiredKeys = new InvalidationQueue<ExpiredKey>();
protected boolean hasEntryWithExpiration;
protected boolean wanReplicateEvictions;

@SuppressWarnings({"checkstyle:npathcomplexity", "checkstyle:executablestatementcount", "checkstyle:methodlength"})
public AbstractCacheRecordStore(String cacheNameWithPrefix, int partitionId, NodeEngine nodeEngine,
Expand Down Expand Up @@ -188,6 +190,9 @@ public AbstractCacheRecordStore(String cacheNameWithPrefix, int partitionId, Nod
statistics = cacheService.createCacheStatIfAbsent(cacheNameWithPrefix);
}

this.wanReplicateEvictions = isWanReplicationEnabled()
&& cacheService.getNodeEngine().getProperties().getBoolean(ClusterProperty.WAN_REPLICATE_ICACHE_EVICTIONS);

TenantControl tenantControl = nodeEngine
.getTenantControlService()
.getTenantControl(ICacheService.SERVICE_NAME, cacheNameWithPrefix);
Expand Down Expand Up @@ -566,6 +571,10 @@ public void onEvict(Data key, R record, boolean wasExpired) {
compositeCacheRSMutationObserver.onEvict(key, record.getValue());
}
invalidateEntry(key);

if (wanReplicateEvictions) {
cacheService.getCacheWanEventPublisher().publishWanRemove(name, toHeapData(key));
}
}

protected void invalidateEntry(Data key, UUID source) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.merge.SplitBrainMergePolicy;
import com.hazelcast.spi.merge.SplitBrainMergeTypes.MapMergeTypes;
import com.hazelcast.spi.properties.ClusterProperty;
import com.hazelcast.wan.impl.CallerProvenance;

import javax.annotation.Nonnull;
Expand All @@ -83,6 +84,7 @@
import static com.hazelcast.internal.util.ConcurrencyUtil.CALLER_RUNS;
import static com.hazelcast.internal.util.MapUtil.createHashMap;
import static com.hazelcast.internal.util.MapUtil.isNullOrEmpty;
import static com.hazelcast.internal.util.ToHeapDataConverter.toHeapData;
import static com.hazelcast.internal.util.counters.SwCounter.newSwCounter;
import static com.hazelcast.map.impl.mapstore.MapDataStores.EMPTY_MAP_DATA_STORE;
import static com.hazelcast.map.impl.record.Record.UNSET;
Expand Down Expand Up @@ -128,6 +130,11 @@ public class DefaultRecordStore extends AbstractEvictableRecordStore {
* key loading.
*/
private boolean loadedOnPreMigration;
/**
* Defined by {@link com.hazelcast.spi.properties.ClusterProperty#WAN_REPLICATE_IMAP_EVICTIONS},
* if set to true then eviction operations by this RecordStore will be WAN replicated
*/
private boolean wanReplicateEvictions;

private final IPartitionService partitionService;
private final InterceptorRegistry interceptorRegistry;
Expand All @@ -145,6 +152,8 @@ public DefaultRecordStore(MapContainer mapContainer, int partitionId,
this.recordStoreLoader = createRecordStoreLoader(mapStoreContext);
this.partitionService = mapServiceContext.getNodeEngine().getPartitionService();
this.interceptorRegistry = mapContainer.getInterceptorRegistry();
this.wanReplicateEvictions = mapContainer.isWanReplicationEnabled()
&& mapServiceContext.getNodeEngine().getProperties().getBoolean(ClusterProperty.WAN_REPLICATE_IMAP_EVICTIONS);
initJsonMetadataStore();
}

Expand Down Expand Up @@ -555,6 +564,10 @@ private void removeOrEvictEntry(Data dataKey, Record record, boolean eviction, b
}
removeKeyFromExpirySystem(dataKey);
storage.removeRecord(dataKey, record);

if (wanReplicateEvictions && eviction) {
mapEventPublisher.publishWanRemove(name, toHeapData(dataKey));
}
}

@Override
Expand All @@ -570,6 +583,9 @@ public Object evict(Data key, boolean backup) {
if (!backup) {
mapServiceContext.interceptRemove(interceptorRegistry, value);
}
if (wanReplicateEvictions) {
mapEventPublisher.publishWanRemove(name, toHeapData(key));
}
}
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1884,6 +1884,44 @@ private int getWhenNoSSLDetected() {
public static final HazelcastProperty JAR_UPLOAD_DIR_PATH
= new HazelcastProperty("hazelcast.cluster.jarupload.dirpath");

/**
* Defines whether WAN replication events should be fired when values are evicted
* from {@link IMap} objects.
* <p>
* The default value is {@code false}.
* <p>
* NOTE: The expected use-case for this property to be enabled is very specific, namely where
* an exact copy of a source is wanted on a target with no evictions enabled; however in this
* scenario, the target cluster would need to have evictions enabled if it were to become the
* active cluster - failing to do so could lead to Out Of Memory or data inconsistency issues.
* The reverse would also be necessary if returning back to the original cluster. Ensure you
* have a plan for handling these scenarios (such as using Management Centre to configure
* evictions manually) before enabling this property and changing between active clusters.
*
* @since 5.4
*/
public static final HazelcastProperty WAN_REPLICATE_IMAP_EVICTIONS
JamesHazelcast marked this conversation as resolved.
Show resolved Hide resolved
= new HazelcastProperty("hazelcast.wan.replicate.imap.evictions", false);

/**
* Defines whether WAN replication events should be fired when values are evicted
* from {@link com.hazelcast.cache.ICache} objects.
* <p>
* The default value is {@code false}.
* <p>
* NOTE: The expected use-case for this property to be enabled is very specific, namely where
* an exact copy of a source is wanted on a target with no evictions enabled; however in this
* scenario, the target cluster would need to have evictions enabled if it were to become the
* active cluster - failing to do so could lead to Out Of Memory or data inconsistency issues.
* The reverse would also be necessary if returning back to the original cluster. Ensure you
* have a plan for handling these scenarios (such as using Management Centre to configure
* evictions manually) before enabling this property and changing between active clusters.
*
* @since 5.4
*/
public static final HazelcastProperty WAN_REPLICATE_ICACHE_EVICTIONS
= new HazelcastProperty("hazelcast.wan.replicate.icache.evictions", false);

private ClusterProperty() {
}
}