Skip to content

Commit

Permalink
Add support for WAN replication of IMap/ICache evictions [HZ-2619] (#…
Browse files Browse the repository at this point in the history
…24941)

In the current implementation of Hazelcast, evictions are not replicated
over WAN to target clusters - this is done intentionally as evictions
are purely local operations that usually occur as a last resort to free
resources on demand. However, there are circumstances where some users
may want to have evictions replicated over WAN to try and maintain more
synchronization between 2 clusters (even though this does still not
guarantee data consistency using WAN).

This commit introduces 2 new `ClusterProperty` entries that allow WAN
replication of `IMap` and `ICache` eviction events respectively. This
property is disabled by default and must be explicitly enabled as WAN
replication of eviction events was purposefully omitted prior to this.

Code changes are simple, introducing `publishWanRemove()` calls within
`DefaultRecordStore` (for `IMap`) and `AbstractCacheRecordStore` (for
`ICache`) - I had originally trialed a solution that introduced these
calls within operations (`EvictOperation` etc) to match existing WAN
replication mechanics, but due to the mostly-local nature of eviction
that is often triggered as a result of other operations (not eviction
operations directly), it made more sense to implement at the root of
eviction calls, which resides in these record stores.

This solution, when enabled, fires WAN replication events for all
evictions (expiration, resource constraints, user-invoked, etc) - the
reason for this is the use-case for these config options is to attempt
to keep data more consistent between 2 clusters over WAN, and so in this
scenario (although still not bulletproof by any means), it makes sense
to try and replicate all operations that result in mutation (as is the
case with all evictions).

This commit also includes a regression test that confirms consistency
between 2 clusters under ideal circumstances with evictions on the
source cluster.

Fixes https://hazelcast.atlassian.net/browse/HZ-2619
EE PR: hazelcast/hazelcast-enterprise#6201
  • Loading branch information
JamesHazelcast committed Jul 17, 2023
1 parent 1ee1e1a commit c5f4a6f
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import com.hazelcast.spi.impl.tenantcontrol.TenantContextual;
import com.hazelcast.spi.merge.SplitBrainMergePolicy;
import com.hazelcast.spi.merge.SplitBrainMergeTypes.CacheMergeTypes;
import com.hazelcast.spi.properties.ClusterProperty;
import com.hazelcast.spi.tenantcontrol.TenantControl;
import com.hazelcast.wan.impl.CallerProvenance;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
Expand Down Expand Up @@ -148,6 +149,7 @@ public abstract class AbstractCacheRecordStore<R extends CacheRecord, CRM extend
protected Iterator<Map.Entry<Data, R>> expirationIterator;
protected InvalidationQueue<ExpiredKey> expiredKeys = new InvalidationQueue<ExpiredKey>();
protected boolean hasEntryWithExpiration;
protected boolean wanReplicateEvictions;

@SuppressWarnings({"checkstyle:npathcomplexity", "checkstyle:executablestatementcount", "checkstyle:methodlength"})
public AbstractCacheRecordStore(String cacheNameWithPrefix, int partitionId, NodeEngine nodeEngine,
Expand Down Expand Up @@ -188,6 +190,9 @@ public AbstractCacheRecordStore(String cacheNameWithPrefix, int partitionId, Nod
statistics = cacheService.createCacheStatIfAbsent(cacheNameWithPrefix);
}

this.wanReplicateEvictions = isWanReplicationEnabled()
&& cacheService.getNodeEngine().getProperties().getBoolean(ClusterProperty.WAN_REPLICATE_ICACHE_EVICTIONS);

TenantControl tenantControl = nodeEngine
.getTenantControlService()
.getTenantControl(ICacheService.SERVICE_NAME, cacheNameWithPrefix);
Expand Down Expand Up @@ -566,6 +571,10 @@ public void onEvict(Data key, R record, boolean wasExpired) {
compositeCacheRSMutationObserver.onEvict(key, record.getValue());
}
invalidateEntry(key);

if (wanReplicateEvictions) {
cacheService.getCacheWanEventPublisher().publishWanRemove(name, toHeapData(key));
}
}

protected void invalidateEntry(Data key, UUID source) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.merge.SplitBrainMergePolicy;
import com.hazelcast.spi.merge.SplitBrainMergeTypes.MapMergeTypes;
import com.hazelcast.spi.properties.ClusterProperty;
import com.hazelcast.wan.impl.CallerProvenance;

import javax.annotation.Nonnull;
Expand All @@ -83,6 +84,7 @@
import static com.hazelcast.internal.util.ConcurrencyUtil.CALLER_RUNS;
import static com.hazelcast.internal.util.MapUtil.createHashMap;
import static com.hazelcast.internal.util.MapUtil.isNullOrEmpty;
import static com.hazelcast.internal.util.ToHeapDataConverter.toHeapData;
import static com.hazelcast.internal.util.counters.SwCounter.newSwCounter;
import static com.hazelcast.map.impl.mapstore.MapDataStores.EMPTY_MAP_DATA_STORE;
import static com.hazelcast.map.impl.record.Record.UNSET;
Expand Down Expand Up @@ -128,6 +130,11 @@ public class DefaultRecordStore extends AbstractEvictableRecordStore {
* key loading.
*/
private boolean loadedOnPreMigration;
/**
* Defined by {@link com.hazelcast.spi.properties.ClusterProperty#WAN_REPLICATE_IMAP_EVICTIONS},
* if set to true then eviction operations by this RecordStore will be WAN replicated
*/
private boolean wanReplicateEvictions;

private final IPartitionService partitionService;
private final InterceptorRegistry interceptorRegistry;
Expand All @@ -145,6 +152,8 @@ public DefaultRecordStore(MapContainer mapContainer, int partitionId,
this.recordStoreLoader = createRecordStoreLoader(mapStoreContext);
this.partitionService = mapServiceContext.getNodeEngine().getPartitionService();
this.interceptorRegistry = mapContainer.getInterceptorRegistry();
this.wanReplicateEvictions = mapContainer.isWanReplicationEnabled()
&& mapServiceContext.getNodeEngine().getProperties().getBoolean(ClusterProperty.WAN_REPLICATE_IMAP_EVICTIONS);
initJsonMetadataStore();
}

Expand Down Expand Up @@ -555,6 +564,10 @@ private void removeOrEvictEntry(Data dataKey, Record record, boolean eviction, b
}
removeKeyFromExpirySystem(dataKey);
storage.removeRecord(dataKey, record);

if (wanReplicateEvictions && eviction) {
mapEventPublisher.publishWanRemove(name, toHeapData(dataKey));
}
}

@Override
Expand All @@ -570,6 +583,9 @@ public Object evict(Data key, boolean backup) {
if (!backup) {
mapServiceContext.interceptRemove(interceptorRegistry, value);
}
if (wanReplicateEvictions) {
mapEventPublisher.publishWanRemove(name, toHeapData(key));
}
}
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1884,6 +1884,44 @@ private int getWhenNoSSLDetected() {
public static final HazelcastProperty JAR_UPLOAD_DIR_PATH
= new HazelcastProperty("hazelcast.cluster.jarupload.dirpath");

/**
* Defines whether WAN replication events should be fired when values are evicted
* from {@link IMap} objects.
* <p>
* The default value is {@code false}.
* <p>
* NOTE: The expected use-case for this property to be enabled is very specific, namely where
* an exact copy of a source is wanted on a target with no evictions enabled; however in this
* scenario, the target cluster would need to have evictions enabled if it were to become the
* active cluster - failing to do so could lead to Out Of Memory or data inconsistency issues.
* The reverse would also be necessary if returning back to the original cluster. Ensure you
* have a plan for handling these scenarios (such as using Management Centre to configure
* evictions manually) before enabling this property and changing between active clusters.
*
* @since 5.4
*/
public static final HazelcastProperty WAN_REPLICATE_IMAP_EVICTIONS
= new HazelcastProperty("hazelcast.wan.replicate.imap.evictions", false);

/**
* Defines whether WAN replication events should be fired when values are evicted
* from {@link com.hazelcast.cache.ICache} objects.
* <p>
* The default value is {@code false}.
* <p>
* NOTE: The expected use-case for this property to be enabled is very specific, namely where
* an exact copy of a source is wanted on a target with no evictions enabled; however in this
* scenario, the target cluster would need to have evictions enabled if it were to become the
* active cluster - failing to do so could lead to Out Of Memory or data inconsistency issues.
* The reverse would also be necessary if returning back to the original cluster. Ensure you
* have a plan for handling these scenarios (such as using Management Centre to configure
* evictions manually) before enabling this property and changing between active clusters.
*
* @since 5.4
*/
public static final HazelcastProperty WAN_REPLICATE_ICACHE_EVICTIONS
= new HazelcastProperty("hazelcast.wan.replicate.icache.evictions", false);

private ClusterProperty() {
}
}

0 comments on commit c5f4a6f

Please sign in to comment.