diff --git a/hazelcast/src/main/java/com/hazelcast/cache/impl/AbstractCacheRecordStore.java b/hazelcast/src/main/java/com/hazelcast/cache/impl/AbstractCacheRecordStore.java index f3d764417de5..d36c7d2a8994 100644 --- a/hazelcast/src/main/java/com/hazelcast/cache/impl/AbstractCacheRecordStore.java +++ b/hazelcast/src/main/java/com/hazelcast/cache/impl/AbstractCacheRecordStore.java @@ -1650,6 +1650,13 @@ public void destroy() { onDestroy(); } + @Override + public void destroyInternals() { + reset(); + closeListeners(); + onDestroy(); + } + protected void onDestroy() { } diff --git a/hazelcast/src/main/java/com/hazelcast/cache/impl/AbstractCacheService.java b/hazelcast/src/main/java/com/hazelcast/cache/impl/AbstractCacheService.java index 393d4c7f7f26..f5675f8e0179 100644 --- a/hazelcast/src/main/java/com/hazelcast/cache/impl/AbstractCacheService.java +++ b/hazelcast/src/main/java/com/hazelcast/cache/impl/AbstractCacheService.java @@ -135,12 +135,13 @@ public Set createNew(String name) { } }; + protected ILogger logger; protected NodeEngine nodeEngine; protected CachePartitionSegment[] segments; protected CacheEventHandler cacheEventHandler; - protected CacheSplitBrainHandlerService splitBrainHandlerService; protected RingbufferCacheEventJournalImpl eventJournal; - protected ILogger logger; + protected CacheMergePolicyProvider mergePolicyProvider; + protected CacheSplitBrainHandlerService splitBrainHandlerService; @Override public final void init(NodeEngine nodeEngine, Properties properties) { @@ -151,15 +152,20 @@ public final void init(NodeEngine nodeEngine, Properties properties) { segments[i] = newPartitionSegment(i); } this.cacheEventHandler = new CacheEventHandler(nodeEngine); - this.splitBrainHandlerService = newSplitBrainHandlerService(nodeEngine); + this.splitBrainHandlerService = new CacheSplitBrainHandlerService(nodeEngine, segments); this.logger = nodeEngine.getLogger(getClass()); this.eventJournal = new RingbufferCacheEventJournalImpl(nodeEngine); + this.mergePolicyProvider = new CacheMergePolicyProvider(nodeEngine); + postInit(nodeEngine, properties); } - // this method is overridden on ee - protected CacheSplitBrainHandlerService newSplitBrainHandlerService(NodeEngine nodeEngine) { - return new CacheSplitBrainHandlerService(nodeEngine, configs, segments); + public CacheMergePolicyProvider getMergePolicyProvider() { + return mergePolicyProvider; + } + + public ConcurrentMap getConfigs() { + return configs; } protected void postInit(NodeEngine nodeEngine, Properties properties) { @@ -237,7 +243,6 @@ public DistributedObject createDistributedObject(String cacheNameWithPrefix) { cacheConfig.setManagerPrefix(HazelcastCacheManager.CACHE_MANAGER_PREFIX); } - CacheMergePolicyProvider mergePolicyProvider = splitBrainHandlerService.getMergePolicyProvider(); checkCacheConfig(cacheConfig, mergePolicyProvider); Object mergePolicy = mergePolicyProvider.getMergePolicy(cacheConfig.getMergePolicy()); @@ -744,10 +749,6 @@ public Runnable prepareMergeRunnable() { return splitBrainHandlerService.prepareMergeRunnable(); } - public CacheMergePolicyProvider getCacheMergePolicyProvider() { - return splitBrainHandlerService.getMergePolicyProvider(); - } - public CacheEventHandler getCacheEventHandler() { return cacheEventHandler; } diff --git a/hazelcast/src/main/java/com/hazelcast/cache/impl/CacheMergeRunnable.java b/hazelcast/src/main/java/com/hazelcast/cache/impl/CacheMergeRunnable.java index 1bf01dff692a..c268b861b418 100644 --- a/hazelcast/src/main/java/com/hazelcast/cache/impl/CacheMergeRunnable.java +++ b/hazelcast/src/main/java/com/hazelcast/cache/impl/CacheMergeRunnable.java @@ -19,6 +19,7 @@ import com.hazelcast.cache.CacheEntryView; import com.hazelcast.cache.CacheMergePolicy; import com.hazelcast.cache.impl.merge.entry.DefaultCacheEntryView; +import com.hazelcast.cache.impl.merge.policy.CacheMergePolicyProvider; import com.hazelcast.cache.impl.operation.CacheLegacyMergeOperation; import com.hazelcast.cache.impl.record.CacheRecord; import com.hazelcast.config.CacheConfig; @@ -28,6 +29,7 @@ import com.hazelcast.spi.Operation; import com.hazelcast.spi.OperationFactory; import com.hazelcast.spi.impl.merge.AbstractMergeRunnable; +import com.hazelcast.spi.impl.merge.BaseSplitBrainHandlerService; import com.hazelcast.spi.merge.SplitBrainMergePolicy; import com.hazelcast.spi.merge.SplitBrainMergeTypes.CacheMergeTypes; import com.hazelcast.util.function.BiConsumer; @@ -35,7 +37,9 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import static com.hazelcast.cache.impl.AbstractCacheRecordStore.SOURCE_NOT_AVAILABLE; import static com.hazelcast.cache.impl.ICacheService.SERVICE_NAME; import static com.hazelcast.config.MergePolicyConfig.DEFAULT_BATCH_SIZE; import static com.hazelcast.spi.impl.merge.MergingValueFactory.createMergingEntry; @@ -43,34 +47,37 @@ class CacheMergeRunnable extends AbstractMergeRunnable { private final CacheService cacheService; - private final CacheSplitBrainHandlerService cacheSplitBrainHandlerService; + private final CacheMergePolicyProvider mergePolicyProvider; - CacheMergeRunnable(Map> collectedStores, - Map> collectedStoresWithLegacyPolicies, - Collection backupStores, - CacheSplitBrainHandlerService cacheSplitBrainHandlerService, + CacheMergeRunnable(Collection mergingStores, + BaseSplitBrainHandlerService splitBrainHandlerService, NodeEngine nodeEngine) { - super(CacheService.SERVICE_NAME, collectedStores, collectedStoresWithLegacyPolicies, backupStores, nodeEngine); + super(CacheService.SERVICE_NAME, mergingStores, splitBrainHandlerService, nodeEngine); this.cacheService = nodeEngine.getService(SERVICE_NAME); - this.cacheSplitBrainHandlerService = cacheSplitBrainHandlerService; + this.mergePolicyProvider = cacheService.mergePolicyProvider; } @Override - protected void consumeStore(ICacheRecordStore store, BiConsumer consumer) { + protected void onMerge(String cacheName) { + cacheService.sendInvalidationEvent(cacheName, null, SOURCE_NOT_AVAILABLE); + } + + @Override + protected void mergeStore(ICacheRecordStore store, BiConsumer consumer) { int partitionId = store.getPartitionId(); for (Map.Entry entry : store.getReadOnlyRecords().entrySet()) { - Data key = entry.getKey(); + Data key = toHeapData(entry.getKey()); CacheRecord record = entry.getValue(); - Data dataValue = toData(record.getValue()); + Data dataValue = toHeapData(record.getValue()); consumer.accept(partitionId, createMergingEntry(getSerializationService(), key, dataValue, record)); } } @Override - protected void consumeStoreLegacy(ICacheRecordStore recordStore, BiConsumer consumer) { + protected void mergeStoreLegacy(ICacheRecordStore recordStore, BiConsumer consumer) { int partitionId = recordStore.getPartitionId(); String name = recordStore.getName(); CacheMergePolicy mergePolicy = ((CacheMergePolicy) getMergePolicy(name)); @@ -92,7 +99,7 @@ protected void consumeStoreLegacy(ICacheRecordStore recordStore, BiConsumer configs = cacheService.getConfigs(); + CacheConfig cacheConfig = configs.get(dataStructureName); + String mergePolicyName = cacheConfig.getMergePolicy(); + return mergePolicyProvider.getMergePolicy(mergePolicyName); + } + + @Override + protected String getDataStructureName(ICacheRecordStore iCacheRecordStore) { + return iCacheRecordStore.getName(); } @Override - protected void destroyStores(Collection stores) { - cacheSplitBrainHandlerService.destroyStores(stores); + protected int getPartitionId(ICacheRecordStore store) { + return store.getPartitionId(); } @Override diff --git a/hazelcast/src/main/java/com/hazelcast/cache/impl/CacheService.java b/hazelcast/src/main/java/com/hazelcast/cache/impl/CacheService.java index 0e5f6076e47c..eaa707192210 100644 --- a/hazelcast/src/main/java/com/hazelcast/cache/impl/CacheService.java +++ b/hazelcast/src/main/java/com/hazelcast/cache/impl/CacheService.java @@ -100,6 +100,7 @@ public Operation prepareReplicationOperation(PartitionReplicationEvent event, CachePartitionSegment segment = segments[event.getPartitionId()]; CacheReplicationOperation op = newCacheReplicationOperation(); + op.setPartitionId(event.getPartitionId()); op.prepare(segment, namespaces, event.getReplicaIndex()); return op.isEmpty() ? null : op; } @@ -111,7 +112,6 @@ private boolean assertAllKnownNamespaces(Collection namespaces return true; } - protected CacheReplicationOperation newCacheReplicationOperation() { return new CacheReplicationOperation(); } diff --git a/hazelcast/src/main/java/com/hazelcast/cache/impl/CacheSplitBrainHandlerService.java b/hazelcast/src/main/java/com/hazelcast/cache/impl/CacheSplitBrainHandlerService.java index fe4a2a47a9f8..c4ca0d6eb42f 100644 --- a/hazelcast/src/main/java/com/hazelcast/cache/impl/CacheSplitBrainHandlerService.java +++ b/hazelcast/src/main/java/com/hazelcast/cache/impl/CacheSplitBrainHandlerService.java @@ -16,85 +16,52 @@ package com.hazelcast.cache.impl; -import com.hazelcast.cache.impl.merge.policy.CacheMergePolicyProvider; -import com.hazelcast.config.CacheConfig; import com.hazelcast.spi.NodeEngine; -import com.hazelcast.spi.impl.merge.AbstractSplitBrainHandlerService; +import com.hazelcast.spi.impl.merge.BaseSplitBrainHandlerService; import java.util.Collection; import java.util.Iterator; -import java.util.Map; -import static com.hazelcast.cache.impl.AbstractCacheRecordStore.SOURCE_NOT_AVAILABLE; import static com.hazelcast.cache.impl.ICacheService.SERVICE_NAME; -import static com.hazelcast.config.InMemoryFormat.NATIVE; -import static java.util.Collections.singletonList; +import static com.hazelcast.util.ThreadUtil.assertRunningOnPartitionThread; /** * Handles split-brain functionality for cache. */ -class CacheSplitBrainHandlerService extends AbstractSplitBrainHandlerService { +class CacheSplitBrainHandlerService extends BaseSplitBrainHandlerService { private final CacheService cacheService; private final CachePartitionSegment[] segments; - private final Map configs; - private final CacheMergePolicyProvider mergePolicyProvider; CacheSplitBrainHandlerService(NodeEngine nodeEngine, - Map configs, CachePartitionSegment[] segments) { super(nodeEngine); - this.configs = configs; this.segments = segments; - this.mergePolicyProvider = new CacheMergePolicyProvider(nodeEngine); this.cacheService = nodeEngine.getService(SERVICE_NAME); } - public CacheMergePolicyProvider getMergePolicyProvider() { - return mergePolicyProvider; - } - - @Override - protected Runnable newMergeRunnable(Map> collectedStores, - Map> collectedStoresWithLegacyPolicies, - Collection backupStores, - NodeEngine nodeEngine) { - return new CacheMergeRunnable(collectedStores, collectedStoresWithLegacyPolicies, - backupStores, this, nodeEngine); - } - @Override - public String getDataStructureName(ICacheRecordStore recordStore) { - return recordStore.getName(); + protected Runnable newMergeRunnable(Collection mergingStores, + BaseSplitBrainHandlerService splitBrainHandlerService) { + return new CacheMergeRunnable(mergingStores, splitBrainHandlerService, cacheService.nodeEngine); } @Override - protected Object getMergePolicy(String dataStructureName) { - CacheConfig cacheConfig = configs.get(dataStructureName); - String mergePolicyName = cacheConfig.getMergePolicy(); - return mergePolicyProvider.getMergePolicy(mergePolicyName); + protected Iterator storeIterator(int partitionId) { + return segments[partitionId].recordStoreIterator(); } @Override - protected void onPrepareMergeRunnableEnd(Collection dataStructureNames) { - for (String cacheName : dataStructureNames) { - cacheService.sendInvalidationEvent(cacheName, null, SOURCE_NOT_AVAILABLE); - } - } + protected void destroyStore(ICacheRecordStore store) { + assertRunningOnPartitionThread(); - @Override - protected Collection> iteratorsOf(int partitionId) { - return singletonList(segments[partitionId].recordStoreIterator()); + store.destroyInternals(); } @Override - protected void destroyStore(ICacheRecordStore store) { - assert store.getConfig().getInMemoryFormat() != NATIVE; - - store.destroy(); - } + protected boolean hasEntry(ICacheRecordStore store) { + assertRunningOnPartitionThread(); - public Map getConfigs() { - return configs; + return store.size() > 0; } } diff --git a/hazelcast/src/main/java/com/hazelcast/cache/impl/HazelcastServerCacheManager.java b/hazelcast/src/main/java/com/hazelcast/cache/impl/HazelcastServerCacheManager.java index e17c6b9ebbf7..ff3f2681958b 100644 --- a/hazelcast/src/main/java/com/hazelcast/cache/impl/HazelcastServerCacheManager.java +++ b/hazelcast/src/main/java/com/hazelcast/cache/impl/HazelcastServerCacheManager.java @@ -178,7 +178,7 @@ protected void removeCacheConfigFromLocal(String cacheNameWithPrefix) { @Override protected void validateCacheConfig(CacheConfig cacheConfig) { - CacheMergePolicyProvider mergePolicyProvider = cacheService.getCacheMergePolicyProvider(); + CacheMergePolicyProvider mergePolicyProvider = cacheService.getMergePolicyProvider(); checkCacheConfig(cacheConfig, mergePolicyProvider); Object mergePolicy = mergePolicyProvider.getMergePolicy(cacheConfig.getMergePolicy()); diff --git a/hazelcast/src/main/java/com/hazelcast/cache/impl/ICacheRecordStore.java b/hazelcast/src/main/java/com/hazelcast/cache/impl/ICacheRecordStore.java index 24ae0f784e23..cba022cd5a8e 100644 --- a/hazelcast/src/main/java/com/hazelcast/cache/impl/ICacheRecordStore.java +++ b/hazelcast/src/main/java/com/hazelcast/cache/impl/ICacheRecordStore.java @@ -357,6 +357,12 @@ public interface ICacheRecordStore { */ void destroy(); + /** + * Like {@link #destroy()} but does not touch state on other services + * like event journal service. + */ + void destroyInternals(); + /** * Gets the configuration of the cache that this store belongs to. * diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/cache/CacheCreateConfigMessageTask.java b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/cache/CacheCreateConfigMessageTask.java index bdcaf8722729..73c33ae65173 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/cache/CacheCreateConfigMessageTask.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/cache/CacheCreateConfigMessageTask.java @@ -54,7 +54,7 @@ protected Object call() throws Exception { CacheService cacheService = getService(CacheService.SERVICE_NAME); if (cacheConfig != null) { - CacheMergePolicyProvider mergePolicyProvider = cacheService.getCacheMergePolicyProvider(); + CacheMergePolicyProvider mergePolicyProvider = cacheService.getMergePolicyProvider(); checkCacheConfig(cacheConfig, mergePolicyProvider); Object mergePolicy = mergePolicyProvider.getMergePolicy(cacheConfig.getMergePolicy()); diff --git a/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/ClusterMergeTask.java b/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/ClusterMergeTask.java index 56a847901c82..f40a133ec9cd 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/ClusterMergeTask.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/ClusterMergeTask.java @@ -20,7 +20,6 @@ import com.hazelcast.instance.LifecycleServiceImpl; import com.hazelcast.instance.Node; import com.hazelcast.internal.cluster.ClusterService; -import com.hazelcast.logging.ILogger; import com.hazelcast.nio.Disposable; import com.hazelcast.spi.CoreService; import com.hazelcast.spi.ManagedService; @@ -46,16 +45,12 @@ class ClusterMergeTask implements Runnable { private static final String MERGE_TASKS_EXECUTOR = "hz:cluster-merge"; - private final boolean wasLiteMember; private final Node node; - private final ILogger logger; private final LifecycleServiceImpl lifecycleService; ClusterMergeTask(Node node) { this.node = node; - this.logger = node.getLogger(getClass()); this.lifecycleService = node.hazelcastInstance.getLifecycleService(); - this.wasLiteMember = node.clusterService.getLocalMember().isLiteMember(); } public void run() { @@ -83,13 +78,7 @@ public void run() { } } } finally { - try { - if (joined) { - tryToPromoteLocalLiteMember(); - } - } finally { - lifecycleService.fireLifecycleEvent(joined ? MERGED : MERGE_FAILED); - } + lifecycleService.fireLifecycleEvent(joined ? MERGED : MERGE_FAILED); } } @@ -106,19 +95,6 @@ private void disposeTasks(Collection... tasks) { } } - private void tryToPromoteLocalLiteMember() { - if (wasLiteMember) { - // this node was a lite-member so no promotion needed after merging - return; - } - - logger.info("Local lite-member was previously a data-member, now trying to promote it back..."); - - node.clusterService.promoteLocalLiteMember(); - - logger.info("Promoted local lite-member upon finish of split brain healing"); - } - private boolean isJoined() { return node.isRunning() && node.getClusterService().isJoined(); } diff --git a/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/ClusterServiceImpl.java b/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/ClusterServiceImpl.java index aedbe0120d76..928cc519016a 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/ClusterServiceImpl.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/ClusterServiceImpl.java @@ -312,14 +312,10 @@ public void merge(Address newTargetAddress) { @Override public void reset() { - reset(false); - } - - public void reset(boolean isForceStart) { lock.lock(); try { resetJoinState(); - resetLocalMember(isForceStart); + resetLocalMemberUuid(); resetClusterId(); clearInternalState(); } finally { @@ -327,15 +323,7 @@ public void reset(boolean isForceStart) { } } - /** - * Reset means: - * - Give a new uuid to local member - * - If {@code isForceStart} is {@code false}, set member type of local member to lite - * - * @param isForceStart set {@code true} if this method is called to start local node forcibly for hot-restart, - * otherwise set it to {@code false} when resetting this service - */ - private void resetLocalMember(boolean isForceStart) { + private void resetLocalMemberUuid() { assert lock.isHeldByCurrentThread() : "Called without holding cluster service lock!"; assert !isJoined() : "Cannot reset local member UUID when joined."; @@ -344,19 +332,13 @@ private void resetLocalMember(boolean isForceStart) { logger.warning("Resetting local member UUID. Previous: " + localMember.getUuid() + ", new: " + newUuid); - boolean liteMember = !isForceStart || localMember.isLiteMember(); - - MemberImpl resetLocalMember = new MemberImpl(address, localMember.getVersion(), - true, newUuid, localMember.getAttributes(), liteMember, + MemberImpl memberWithNewUuid = new MemberImpl(address, localMember.getVersion(), + true, newUuid, localMember.getAttributes(), localMember.isLiteMember(), localMember.getMemberListJoinVersion(), node.hazelcastInstance); - localMember = resetLocalMember; + localMember = memberWithNewUuid; - if (!isForceStart) { - logger.info("Converted local member to lite-member before start of split brain healing"); - } - - node.loggingService.setThisMember(this.localMember); + node.loggingService.setThisMember(localMember); } public void resetJoinState() { diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/MapMergeRunnable.java b/hazelcast/src/main/java/com/hazelcast/map/impl/MapMergeRunnable.java index f971435a34ef..9c87898e9f56 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/MapMergeRunnable.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/MapMergeRunnable.java @@ -24,10 +24,12 @@ import com.hazelcast.map.impl.record.Record; import com.hazelcast.map.impl.recordstore.RecordStore; import com.hazelcast.map.merge.MapMergePolicy; +import com.hazelcast.map.merge.MergePolicyProvider; import com.hazelcast.nio.serialization.Data; import com.hazelcast.spi.Operation; import com.hazelcast.spi.OperationFactory; import com.hazelcast.spi.impl.merge.AbstractMergeRunnable; +import com.hazelcast.spi.impl.merge.BaseSplitBrainHandlerService; import com.hazelcast.spi.merge.SplitBrainMergePolicy; import com.hazelcast.spi.merge.SplitBrainMergeTypes.MapMergeTypes; import com.hazelcast.util.Clock; @@ -36,7 +38,6 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; -import java.util.Map; import static com.hazelcast.map.impl.EntryViews.createSimpleEntryView; import static com.hazelcast.spi.impl.merge.MergingValueFactory.createMergingEntry; @@ -44,21 +45,19 @@ class MapMergeRunnable extends AbstractMergeRunnable { private final MapServiceContext mapServiceContext; - private final MapSplitBrainHandlerService mapSplitBrainHandlerService; + private final MergePolicyProvider mergePolicyProvider; - MapMergeRunnable(Map> collectedStores, - Map> collectedStoresWithLegacyPolicies, - Collection backupStores, MapServiceContext mapServiceContext, - MapSplitBrainHandlerService mapSplitBrainHandlerService) { - super(MapService.SERVICE_NAME, collectedStores, collectedStoresWithLegacyPolicies, - backupStores, mapServiceContext.getNodeEngine()); + MapMergeRunnable(Collection mergingStores, + BaseSplitBrainHandlerService splitBrainHandlerService, + MapServiceContext mapServiceContext) { + super(MapService.SERVICE_NAME, mergingStores, splitBrainHandlerService, mapServiceContext.getNodeEngine()); this.mapServiceContext = mapServiceContext; - this.mapSplitBrainHandlerService = mapSplitBrainHandlerService; + this.mergePolicyProvider = mapServiceContext.getMergePolicyProvider(); } @Override - protected void consumeStore(RecordStore store, BiConsumer consumer) { + protected void mergeStore(RecordStore store, BiConsumer consumer) { long now = Clock.currentTimeMillis(); int partitionId = store.getPartitionId(); @@ -66,14 +65,16 @@ protected void consumeStore(RecordStore store, BiConsumer consumer) { + protected void mergeStoreLegacy(RecordStore store, BiConsumer consumer) { long now = Clock.currentTimeMillis(); int partitionId = store.getPartitionId(); String name = store.getName(); @@ -95,25 +96,37 @@ protected void consumeStoreLegacy(RecordStore store, BiConsumer stores) { - mapSplitBrainHandlerService.destroyStores(stores); + protected int getPartitionId(RecordStore store) { + return store.getPartitionId(); } @Override diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/MapSplitBrainHandlerService.java b/hazelcast/src/main/java/com/hazelcast/map/impl/MapSplitBrainHandlerService.java index 08c07365d684..2f28ee283e76 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/MapSplitBrainHandlerService.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/MapSplitBrainHandlerService.java @@ -16,76 +16,54 @@ package com.hazelcast.map.impl; -import com.hazelcast.config.MapConfig; -import com.hazelcast.config.MergePolicyConfig; import com.hazelcast.map.impl.recordstore.RecordStore; import com.hazelcast.map.merge.IgnoreMergingEntryMapMergePolicy; -import com.hazelcast.map.merge.MergePolicyProvider; -import com.hazelcast.spi.NodeEngine; -import com.hazelcast.spi.impl.merge.AbstractSplitBrainHandlerService; +import com.hazelcast.spi.impl.merge.BaseSplitBrainHandlerService; import java.util.Collection; import java.util.Iterator; -import java.util.Map; -import static com.hazelcast.config.InMemoryFormat.NATIVE; -import static java.util.Collections.singletonList; +import static com.hazelcast.util.ThreadUtil.assertRunningOnPartitionThread; -class MapSplitBrainHandlerService extends AbstractSplitBrainHandlerService { +class MapSplitBrainHandlerService extends BaseSplitBrainHandlerService { private final MapServiceContext mapServiceContext; - private final MergePolicyProvider mergePolicyProvider; MapSplitBrainHandlerService(MapServiceContext mapServiceContext) { super(mapServiceContext.getNodeEngine()); this.mapServiceContext = mapServiceContext; - this.mergePolicyProvider = mapServiceContext.getMergePolicyProvider(); } @Override - protected Runnable newMergeRunnable(Map> collectedStores, - Map> collectedStoresWithLegacyPolicies, - Collection backupStores, - NodeEngine nodeEngine) { - return new MapMergeRunnable(collectedStores, collectedStoresWithLegacyPolicies, - backupStores, mapServiceContext, this); + protected Runnable newMergeRunnable(Collection mergingStores, + BaseSplitBrainHandlerService splitBrainHandlerService) { + return new MapMergeRunnable(mergingStores, splitBrainHandlerService, mapServiceContext); } @Override - protected String getDataStructureName(RecordStore recordStore) { - return recordStore.getName(); + protected Iterator storeIterator(int partitionId) { + PartitionContainer partitionContainer = mapServiceContext.getPartitionContainer(partitionId); + Collection recordStores = partitionContainer.getAllRecordStores(); + return recordStores.iterator(); } @Override - protected Object getMergePolicy(String dataStructureName) { - MapConfig mapConfig = getMapConfig(dataStructureName); - MergePolicyConfig mergePolicyConfig = mapConfig.getMergePolicyConfig(); - return mergePolicyProvider.getMergePolicy(mergePolicyConfig.getPolicy()); - } + protected void destroyStore(RecordStore store) { + assertRunningOnPartitionThread(); - @Override - protected boolean isDiscardPolicy(Object mergePolicy) { - return mergePolicy instanceof IgnoreMergingEntryMapMergePolicy - || super.isDiscardPolicy(mergePolicy); + store.destroyInternals(); } @Override - protected Collection> iteratorsOf(int partitionId) { - PartitionContainer partitionContainer = mapServiceContext.getPartitionContainer(partitionId); - Collection recordStores = partitionContainer.getAllRecordStores(); - return singletonList(recordStores.iterator()); + protected boolean hasDiscardPolicy(Object mergePolicy) { + return mergePolicy instanceof IgnoreMergingEntryMapMergePolicy + || super.hasDiscardPolicy(mergePolicy); } @Override - protected void destroyStore(RecordStore store) { - assert store.getMapContainer().getMapConfig().getInMemoryFormat() != NATIVE; - - store.getMapContainer().getIndexes(store.getPartitionId()).clearIndexes(); - store.destroy(); - } + protected boolean hasEntry(RecordStore store) { + assertRunningOnPartitionThread(); - public MapConfig getMapConfig(String dataStructureName) { - MapContainer mapContainer = mapServiceContext.getMapContainer(dataStructureName); - return mapContainer.getMapConfig(); + return store.size() > 0; } } diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/DefaultRecordStore.java b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/DefaultRecordStore.java index 5d4e93aea7d5..e814929a6455 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/DefaultRecordStore.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/DefaultRecordStore.java @@ -129,6 +129,14 @@ public void destroy() { eventJournal.destroy(mapContainer.getObjectNamespace(), partitionId); } + @Override + public void destroyInternals() { + clearIndexes(); + clearMapStore(); + clearStorage(false); + storage.destroy(false); + } + @Override public long softFlush() { updateStoreStats(); @@ -216,13 +224,40 @@ public Iterator loadAwareIterator(long now, boolean backup) { @Override public void clearPartition(boolean onShutdown) { + clearLockStore(); + clearIndexes(); + clearMapStore(); + clearStorage(onShutdown); + } + + protected void clearLockStore() { NodeEngine nodeEngine = mapServiceContext.getNodeEngine(); LockService lockService = nodeEngine.getSharedService(LockService.SERVICE_NAME); if (lockService != null) { ObjectNamespace namespace = MapService.getObjectNamespace(name); lockService.clearLockStore(partitionId, namespace); } + } + + protected void clearStorage(boolean onShutdown) { + if (onShutdown) { + NodeEngine nodeEngine = mapServiceContext.getNodeEngine(); + NativeMemoryConfig nativeMemoryConfig = nodeEngine.getConfig().getNativeMemoryConfig(); + boolean shouldClear = (nativeMemoryConfig != null && nativeMemoryConfig.getAllocatorType() != POOLED); + if (shouldClear) { + storage.clear(true); + } + storage.destroy(true); + } else { + storage.clear(false); + } + } + protected void clearMapStore() { + mapDataStore.reset(); + } + + protected void clearIndexes() { Indexes indexes = mapContainer.getIndexes(partitionId); if (indexes.isGlobal()) { if (indexes.hasIndex()) { @@ -235,19 +270,6 @@ public void clearPartition(boolean onShutdown) { } else { indexes.clearIndexes(); } - - mapDataStore.reset(); - - if (onShutdown) { - NativeMemoryConfig nativeMemoryConfig = nodeEngine.getConfig().getNativeMemoryConfig(); - boolean shouldClear = (nativeMemoryConfig != null && nativeMemoryConfig.getAllocatorType() != POOLED); - if (shouldClear) { - storage.clear(true); - } - storage.destroy(true); - } else { - storage.clear(false); - } } /** @@ -384,7 +406,7 @@ public int clear() { // This conversion is required by mapDataStore#removeAll call. List keys = getKeysFromRecords(clearableRecords); mapDataStore.removeAll(keys); - mapDataStore.reset(); + clearMapStore(); removeIndex(clearableRecords); return removeRecords(clearableRecords); } @@ -439,7 +461,7 @@ protected Collection getNotLockedRecords() { */ @Override public void reset() { - mapDataStore.reset(); + clearMapStore(); storage.clear(false); stats.reset(); } diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/RecordStore.java b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/RecordStore.java index 1eb38734b815..b94660d06e7f 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/RecordStore.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/RecordStore.java @@ -381,6 +381,12 @@ public interface RecordStore { void destroy(); + /** + * Like {@link #destroy()} but does not touch state on other services + * like lock service or event journal service. + */ + void destroyInternals(); + /** * Initialize the recordStore after creation */ diff --git a/hazelcast/src/main/java/com/hazelcast/replicatedmap/impl/ReplicatedMapMergeRunnable.java b/hazelcast/src/main/java/com/hazelcast/replicatedmap/impl/ReplicatedMapMergeRunnable.java index 70aeb3a5d9b0..370b1db1366b 100644 --- a/hazelcast/src/main/java/com/hazelcast/replicatedmap/impl/ReplicatedMapMergeRunnable.java +++ b/hazelcast/src/main/java/com/hazelcast/replicatedmap/impl/ReplicatedMapMergeRunnable.java @@ -24,11 +24,13 @@ import com.hazelcast.replicatedmap.impl.record.ReplicatedMapEntryView; import com.hazelcast.replicatedmap.impl.record.ReplicatedRecord; import com.hazelcast.replicatedmap.impl.record.ReplicatedRecordStore; +import com.hazelcast.replicatedmap.merge.MergePolicyProvider; import com.hazelcast.replicatedmap.merge.ReplicatedMapMergePolicy; import com.hazelcast.spi.NodeEngine; import com.hazelcast.spi.Operation; import com.hazelcast.spi.OperationFactory; import com.hazelcast.spi.impl.merge.AbstractMergeRunnable; +import com.hazelcast.spi.impl.merge.BaseSplitBrainHandlerService; import com.hazelcast.spi.merge.SplitBrainMergePolicy; import com.hazelcast.spi.merge.SplitBrainMergeTypes.ReplicatedMapMergeTypes; import com.hazelcast.util.function.BiConsumer; @@ -36,26 +38,27 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; -import java.util.Map; import static com.hazelcast.replicatedmap.impl.ReplicatedMapService.SERVICE_NAME; import static com.hazelcast.spi.impl.merge.MergingValueFactory.createMergingEntry; -class ReplicatedMapMergeRunnable extends AbstractMergeRunnable { +class ReplicatedMapMergeRunnable + extends AbstractMergeRunnable { - private final ReplicatedMapSplitBrainHandlerService replicatedMapSplitBrainHandlerService; + private final ReplicatedMapService service; + private final MergePolicyProvider mergePolicyProvider; - ReplicatedMapMergeRunnable(Map> collectedStores, - Map> collectedStoresWithLegacyPolicies, - Collection backupStores, - NodeEngine nodeEngine, - ReplicatedMapSplitBrainHandlerService replicatedMapSplitBrainHandlerService) { - super(SERVICE_NAME, collectedStores, collectedStoresWithLegacyPolicies, backupStores, nodeEngine); - this.replicatedMapSplitBrainHandlerService = replicatedMapSplitBrainHandlerService; + ReplicatedMapMergeRunnable(Collection mergingStores, + BaseSplitBrainHandlerService splitBrainHandlerService, + NodeEngine nodeEngine) { + super(SERVICE_NAME, mergingStores, splitBrainHandlerService, nodeEngine); + + this.service = nodeEngine.getService(SERVICE_NAME); + this.mergePolicyProvider = service.getMergePolicyProvider(); } @Override - protected void consumeStore(ReplicatedRecordStore store, BiConsumer consumer) { + protected void mergeStore(ReplicatedRecordStore store, BiConsumer consumer) { int partitionId = store.getPartitionId(); Iterator iterator = store.recordIterator(); @@ -68,7 +71,7 @@ protected void consumeStore(ReplicatedRecordStore store, BiConsumer consumer) { + protected void mergeStoreLegacy(ReplicatedRecordStore store, BiConsumer consumer) { int partitionId = store.getPartitionId(); String name = store.getName(); ReplicatedMapMergePolicy mergePolicy = ((ReplicatedMapMergePolicy) getMergePolicy(name)); @@ -97,25 +100,35 @@ private ReplicatedMapEntryView createEntryView(ReplicatedRecord record) { @Override protected int getBatchSize(String dataStructureName) { - ReplicatedMapConfig replicatedMapConfig = replicatedMapSplitBrainHandlerService.getReplicatedMapConfig(dataStructureName); + ReplicatedMapConfig replicatedMapConfig = getReplicatedMapConfig(dataStructureName); MergePolicyConfig mergePolicyConfig = replicatedMapConfig.getMergePolicyConfig(); return mergePolicyConfig.getBatchSize(); } @Override protected InMemoryFormat getInMemoryFormat(String dataStructureName) { - ReplicatedMapConfig replicatedMapConfig = replicatedMapSplitBrainHandlerService.getReplicatedMapConfig(dataStructureName); + ReplicatedMapConfig replicatedMapConfig = getReplicatedMapConfig(dataStructureName); return replicatedMapConfig.getInMemoryFormat(); } @Override protected Object getMergePolicy(String dataStructureName) { - return replicatedMapSplitBrainHandlerService.getMergePolicy(dataStructureName); + MergePolicyConfig mergePolicyConfig = getReplicatedMapConfig(dataStructureName).getMergePolicyConfig(); + return mergePolicyProvider.getMergePolicy(mergePolicyConfig.getPolicy()); + } + + private ReplicatedMapConfig getReplicatedMapConfig(String name) { + return service.getReplicatedMapConfig(name); + } + + @Override + protected String getDataStructureName(ReplicatedRecordStore replicatedRecordStore) { + return replicatedRecordStore.getName(); } @Override - protected void destroyStores(Collection stores) { - replicatedMapSplitBrainHandlerService.destroyStores(stores); + protected int getPartitionId(ReplicatedRecordStore replicatedRecordStore) { + return replicatedRecordStore.getPartitionId(); } @Override diff --git a/hazelcast/src/main/java/com/hazelcast/replicatedmap/impl/ReplicatedMapService.java b/hazelcast/src/main/java/com/hazelcast/replicatedmap/impl/ReplicatedMapService.java index ac70251ca400..a67558a3e464 100644 --- a/hazelcast/src/main/java/com/hazelcast/replicatedmap/impl/ReplicatedMapService.java +++ b/hazelcast/src/main/java/com/hazelcast/replicatedmap/impl/ReplicatedMapService.java @@ -126,6 +126,7 @@ public LocalReplicatedMapStatsImpl createNew(String arg) { private final ReplicatedMapSplitBrainHandlerService splitBrainHandlerService; private ScheduledFuture antiEntropyFuture; + private MergePolicyProvider mergePolicyProvider; public ReplicatedMapService(NodeEngine nodeEngine) { this.nodeEngine = nodeEngine; @@ -135,8 +136,9 @@ public ReplicatedMapService(NodeEngine nodeEngine) { this.operationService = nodeEngine.getOperationService(); this.partitionContainers = new PartitionContainer[nodeEngine.getPartitionService().getPartitionCount()]; this.eventPublishingService = new ReplicatedMapEventPublishingService(this); - this.splitBrainHandlerService = new ReplicatedMapSplitBrainHandlerService(this, new MergePolicyProvider(nodeEngine)); + this.splitBrainHandlerService = new ReplicatedMapSplitBrainHandlerService(this); this.quorumService = nodeEngine.getQuorumService(); + this.mergePolicyProvider = new MergePolicyProvider(nodeEngine); } @Override @@ -206,7 +208,7 @@ public LocalReplicatedMapStatsImpl createReplicatedMapStats(String name) { @Override public DistributedObject createDistributedObject(String objectName) { ReplicatedMapConfig replicatedMapConfig = getReplicatedMapConfig(objectName); - checkReplicatedMapConfig(replicatedMapConfig, splitBrainHandlerService.getMergePolicyProvider()); + checkReplicatedMapConfig(replicatedMapConfig, mergePolicyProvider); if (nodeEngine.getLocalMember().isLiteMember()) { throw new ReplicatedMapCantBeCreatedOnLiteMemberException(nodeEngine.getThisAddress()); } @@ -386,6 +388,10 @@ public void triggerAntiEntropy() { antiEntropyTask.triggerAntiEntropy(); } + public MergePolicyProvider getMergePolicyProvider() { + return mergePolicyProvider; + } + private class AntiEntropyTask implements Runnable { @Override diff --git a/hazelcast/src/main/java/com/hazelcast/replicatedmap/impl/ReplicatedMapSplitBrainHandlerService.java b/hazelcast/src/main/java/com/hazelcast/replicatedmap/impl/ReplicatedMapSplitBrainHandlerService.java index 9b1dde0d169f..63e103a1d43f 100644 --- a/hazelcast/src/main/java/com/hazelcast/replicatedmap/impl/ReplicatedMapSplitBrainHandlerService.java +++ b/hazelcast/src/main/java/com/hazelcast/replicatedmap/impl/ReplicatedMapSplitBrainHandlerService.java @@ -16,75 +16,60 @@ package com.hazelcast.replicatedmap.impl; -import com.hazelcast.config.MergePolicyConfig; import com.hazelcast.config.ReplicatedMapConfig; import com.hazelcast.replicatedmap.impl.record.ReplicatedRecordStore; -import com.hazelcast.replicatedmap.merge.MergePolicyProvider; -import com.hazelcast.spi.NodeEngine; -import com.hazelcast.spi.impl.merge.AbstractSplitBrainHandlerService; +import com.hazelcast.spi.impl.merge.BaseSplitBrainHandlerService; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; -import java.util.Map; import java.util.concurrent.ConcurrentMap; -import static java.util.Collections.emptyList; -import static java.util.Collections.singletonList; +import static com.hazelcast.util.ThreadUtil.assertRunningOnPartitionThread; /** * Contains split-brain handling logic for {@link com.hazelcast.core.ReplicatedMap}. */ -class ReplicatedMapSplitBrainHandlerService extends AbstractSplitBrainHandlerService { +class ReplicatedMapSplitBrainHandlerService extends BaseSplitBrainHandlerService { private final ReplicatedMapService service; - private final MergePolicyProvider mergePolicyProvider; - ReplicatedMapSplitBrainHandlerService(ReplicatedMapService service, MergePolicyProvider mergePolicyProvider) { + ReplicatedMapSplitBrainHandlerService(ReplicatedMapService service) { super(service.getNodeEngine()); this.service = service; - this.mergePolicyProvider = mergePolicyProvider; } public ReplicatedMapConfig getReplicatedMapConfig(String name) { return service.getReplicatedMapConfig(name); } - public MergePolicyProvider getMergePolicyProvider() { - return mergePolicyProvider; - } - @Override - protected Runnable newMergeRunnable(Map> collectedStores, - Map> collectedStoresWithLegacyPolicies, - Collection backupStores, - NodeEngine nodeEngine) { - return new ReplicatedMapMergeRunnable(collectedStores, collectedStoresWithLegacyPolicies, backupStores, - service.getNodeEngine(), this); + protected Runnable newMergeRunnable(Collection mergingStores, + BaseSplitBrainHandlerService splitBrainHandlerService) { + return new ReplicatedMapMergeRunnable(mergingStores, splitBrainHandlerService, service.getNodeEngine()); } @Override - protected String getDataStructureName(ReplicatedRecordStore recordStore) { - return recordStore.getName(); - } - - @Override - protected Object getMergePolicy(String dataStructureName) { - MergePolicyConfig mergePolicyConfig = getReplicatedMapConfig(dataStructureName).getMergePolicyConfig(); - return mergePolicyProvider.getMergePolicy(mergePolicyConfig.getPolicy()); - } - - @Override - protected Collection> iteratorsOf(int partitionId) { + protected Iterator storeIterator(int partitionId) { PartitionContainer partitionContainer = service.getPartitionContainer(partitionId); if (partitionContainer == null) { - return emptyList(); + return Collections.emptyList().iterator(); } ConcurrentMap stores = partitionContainer.getStores(); - return singletonList(stores.values().iterator()); + return stores.values().iterator(); + } + + @Override + protected void destroyStore(ReplicatedRecordStore replicatedRecordStore) { + assertRunningOnPartitionThread(); + + replicatedRecordStore.destroy(); } @Override - protected void destroyStore(ReplicatedRecordStore store) { - store.destroy(); + protected boolean hasEntry(ReplicatedRecordStore store) { + assertRunningOnPartitionThread(); + + return store.size() > 0; } } diff --git a/hazelcast/src/main/java/com/hazelcast/spi/impl/merge/AbstractMergeRunnable.java b/hazelcast/src/main/java/com/hazelcast/spi/impl/merge/AbstractMergeRunnable.java index 80dfc7e4f9bd..d201cc255516 100644 --- a/hazelcast/src/main/java/com/hazelcast/spi/impl/merge/AbstractMergeRunnable.java +++ b/hazelcast/src/main/java/com/hazelcast/spi/impl/merge/AbstractMergeRunnable.java @@ -22,8 +22,8 @@ import com.hazelcast.internal.serialization.InternalSerializationService; import com.hazelcast.logging.ILogger; import com.hazelcast.nio.Address; -import com.hazelcast.nio.Disposable; import com.hazelcast.nio.serialization.Data; +import com.hazelcast.nio.serialization.DataType; import com.hazelcast.spi.NodeEngine; import com.hazelcast.spi.Operation; import com.hazelcast.spi.OperationFactory; @@ -38,6 +38,8 @@ import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -48,6 +50,7 @@ import static com.hazelcast.internal.config.MergePolicyValidator.checkMergePolicySupportsInMemoryFormat; import static com.hazelcast.util.ExceptionUtil.rethrow; import static java.lang.String.format; +import static java.util.Collections.singleton; import static java.util.concurrent.TimeUnit.MILLISECONDS; /** @@ -60,8 +63,7 @@ * @param type of the store in a partition * @param type of the merging item */ -public abstract class AbstractMergeRunnable> - implements Runnable, Disposable { +public abstract class AbstractMergeRunnable> implements Runnable { private static final long TIMEOUT_FACTOR = 500; private static final long MINIMAL_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(5); @@ -71,25 +73,40 @@ public abstract class AbstractMergeRunnable backupStores; - private final Map> collectedStores; - private final Map> collectedStoresWithLegacyPolicies; + private final BaseSplitBrainHandlerService splitBrainHandlerService; + private final InternalSerializationService serializationService; + + private Map> mergingStoresByName; - protected AbstractMergeRunnable(String serviceName, Map> collectedStores, - Map> collectedStoresWithLegacyPolicies, - Collection backupStores, NodeEngine nodeEngine) { + protected AbstractMergeRunnable(String serviceName, + Collection mergingStores, + BaseSplitBrainHandlerService splitBrainHandlerService, + NodeEngine nodeEngine) { + this.mergingStoresByName = groupStoresByName(mergingStores); this.serviceName = serviceName; this.logger = nodeEngine.getLogger(getClass()); this.partitionService = nodeEngine.getPartitionService(); this.clusterService = nodeEngine.getClusterService(); this.operationService = nodeEngine.getOperationService(); this.serializationService = (InternalSerializationService) nodeEngine.getSerializationService(); - this.backupStores = backupStores; - this.collectedStores = collectedStores; - this.collectedStoresWithLegacyPolicies = collectedStoresWithLegacyPolicies; + this.splitBrainHandlerService = splitBrainHandlerService; + } + + private Map> groupStoresByName(Collection stores) { + Map> storesByName = new HashMap>(); + for (Store store : stores) { + String dataStructureName = getDataStructureName(store); + + Collection storeList = storesByName.get(dataStructureName); + if (storeList == null) { + storeList = new LinkedList(); + storesByName.put(dataStructureName, storeList); + } + storeList.add(store); + } + return storesByName; } @Override @@ -104,59 +121,101 @@ public final void run() { private int mergeWithSplitBrainMergePolicy() { int mergedCount = 0; - for (Map.Entry> entry : collectedStores.entrySet()) { - String dataStructureName = entry.getKey(); - - //noinspection unchecked - SplitBrainMergePolicy mergePolicy - = ((SplitBrainMergePolicy) getMergePolicy(dataStructureName)); - if (!isClusterVersion310OrLater(dataStructureName, mergePolicy)) { - continue; - } + Iterator>> iterator = mergingStoresByName.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry> entry = iterator.next(); - int batchSize = getBatchSize(dataStructureName); - MergingItemBiConsumer consumer = new MergingItemBiConsumer(dataStructureName, mergePolicy, batchSize); - for (Store recordStore : entry.getValue()) { - consumeStore(recordStore, consumer); + String dataStructureName = entry.getKey(); + Collection stores = entry.getValue(); + + if (getMergePolicy(dataStructureName) instanceof SplitBrainMergePolicy) { + if (canMerge(dataStructureName)) { + MergingItemBiConsumer consumer = newConsumer(dataStructureName); + for (Store store : stores) { + try { + mergeStore(store, consumer); + consumer.consumeRemaining(); + } finally { + asyncDestroyStores(singleton(store)); + } + } + mergedCount += consumer.mergedCount; + onMerge(dataStructureName); + } else { + asyncDestroyStores(stores); + } + iterator.remove(); } - consumer.consumeRemaining(); - - mergedCount += consumer.mergedCount; } return mergedCount; } - // RU_COMPAT_3_9 - private boolean isClusterVersion310OrLater(String dataStructureName, SplitBrainMergePolicy policy) { + /** + * Check if data structure can use {@link SplitBrainMergePolicy} + */ + private boolean canMerge(String dataStructureName) { + Version v310 = V3_10; Version currentVersion = clusterService.getClusterVersion(); if (currentVersion.isGreaterOrEqual(V3_10)) { return true; } - - String msg = "Cannot merge '%s' with merge policy '%s'. Cluster version should be %s or later, but is %s"; - logger.info(format(msg, dataStructureName, policy, V3_10, currentVersion)); + // RU_COMPAT_3_9 + String msg = "Cannot merge '%s' with merge policy '%s'." + + " Cluster version should be %s or later but found %s"; + logger.info(format(msg, dataStructureName, getMergePolicy(dataStructureName), v310, currentVersion)); return false; } + private MergingItemBiConsumer newConsumer(String dataStructureName) { + int batchSize = getBatchSize(dataStructureName); + SplitBrainMergePolicy policy = ((SplitBrainMergePolicy) getMergePolicy(dataStructureName)); + return new MergingItemBiConsumer(dataStructureName, policy, batchSize); + } + private int mergeWithLegacyMergePolicy() { LegacyOperationBiConsumer consumer = new LegacyOperationBiConsumer(); - for (Map.Entry> entry : collectedStoresWithLegacyPolicies.entrySet()) { - String dataStructureName = entry.getKey(); - if (!checkMergePolicySupportsInMemoryFormat(dataStructureName, getMergePolicy(dataStructureName), - getInMemoryFormat(dataStructureName), clusterService.getClusterVersion(), false, logger)) { - continue; - } - - Collection recordStores = entry.getValue(); - for (Store recordStore : recordStores) { - consumeStoreLegacy(recordStore, consumer); + Iterator>> iterator = mergingStoresByName.entrySet().iterator(); + while (iterator.hasNext()) { + try { + Map.Entry> entry = iterator.next(); + + String dataStructureName = entry.getKey(); + Collection stores = entry.getValue(); + + if (canMergeLegacy(dataStructureName)) { + for (Store store : stores) { + try { + mergeStoreLegacy(store, consumer); + } finally { + asyncDestroyStores(singleton(store)); + } + } + onMerge(dataStructureName); + } else { + asyncDestroyStores(stores); + } + } finally { + iterator.remove(); } } return consumer.mergedCount; } + /** + * Check if data structures in-memory-format appropriate to merge + * with legacy policies + */ + private boolean canMergeLegacy(String dataStructureName) { + Object mergePolicy = getMergePolicy(dataStructureName); + InMemoryFormat inMemoryFormat = getInMemoryFormat(dataStructureName); + Version clusterVersion = clusterService.getClusterVersion(); + + return checkMergePolicySupportsInMemoryFormat(dataStructureName, + mergePolicy, inMemoryFormat, clusterVersion, false, logger); + } + private void waitMergeEnd(int mergedCount) { try { long timeoutMillis = Math.max(mergedCount * TIMEOUT_FACTOR, MINIMAL_TIMEOUT_MILLIS); @@ -322,19 +381,6 @@ public void accept(Integer partitionId, Operation operation) { } } - @Override - public final void dispose() { - for (Collection stores : collectedStores.values()) { - destroyStores(stores); - } - - for (Collection stores : collectedStoresWithLegacyPolicies.values()) { - destroyStores(stores); - } - - destroyStores(backupStores); - } - protected InternalSerializationService getSerializationService() { return serializationService; } @@ -343,20 +389,29 @@ protected Data toData(Object object) { return serializationService.toData(object); } - /** - * Destroys a collection of stores. - */ - protected abstract void destroyStores(Collection stores); + protected Data toHeapData(Object object) { + return serializationService.toData(object, DataType.HEAP); + } + + private void asyncDestroyStores(Collection stores) { + for (Store store : stores) { + splitBrainHandlerService.asyncDestroyStores(singleton(store), getPartitionId(store)); + } + } + + protected void onMerge(String dataStructureName) { + // override to take action on merge + } /** * Used to merge with {@link SplitBrainMergePolicy}. */ - protected abstract void consumeStore(Store recordStore, BiConsumer consumer); + protected abstract void mergeStore(Store recordStore, BiConsumer consumer); /** * Used to merge with legacy merge policies. */ - protected abstract void consumeStoreLegacy(Store recordStore, BiConsumer consumer); + protected abstract void mergeStoreLegacy(Store recordStore, BiConsumer consumer); /** * This batch size can only be used with {@link SplitBrainMergePolicy}, @@ -371,6 +426,10 @@ protected Data toData(Object object) { */ protected abstract Object getMergePolicy(String dataStructureName); + protected abstract String getDataStructureName(Store store); + + protected abstract int getPartitionId(Store store); + /** * @return in memory format of data structure */ diff --git a/hazelcast/src/main/java/com/hazelcast/spi/impl/merge/AbstractSplitBrainHandlerService.java b/hazelcast/src/main/java/com/hazelcast/spi/impl/merge/AbstractSplitBrainHandlerService.java deleted file mode 100644 index 3e87b35500fb..000000000000 --- a/hazelcast/src/main/java/com/hazelcast/spi/impl/merge/AbstractSplitBrainHandlerService.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hazelcast.spi.impl.merge; - -import com.hazelcast.spi.NodeEngine; -import com.hazelcast.spi.SplitBrainHandlerService; -import com.hazelcast.spi.merge.DiscardMergePolicy; -import com.hazelcast.spi.merge.SplitBrainMergePolicy; -import com.hazelcast.spi.partition.IPartitionService; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -/** - * Contains common functionality to implement a {@link SplitBrainHandlerService}. - * - * @param store of a partition - */ -@SuppressWarnings("WeakerAccess") -public abstract class AbstractSplitBrainHandlerService implements SplitBrainHandlerService { - - private final int partitionCount; - private final NodeEngine nodeEngine; - private final IPartitionService partitionService; - - protected AbstractSplitBrainHandlerService(NodeEngine nodeEngine) { - this.nodeEngine = nodeEngine; - this.partitionService = nodeEngine.getPartitionService(); - this.partitionCount = partitionService.getPartitionCount(); - } - - @Override - public final Runnable prepareMergeRunnable() { - onPrepareMergeRunnableStart(); - - Map> collectedStores = new HashMap>(); - Map> collectedStoresWithLegacyPolicies = new HashMap>(); - List backupStores = new LinkedList(); - - LinkedList mergingDataStructureNames = new LinkedList(); - - for (int partitionId = 0; partitionId < partitionCount; partitionId++) { - boolean partitionOwner = partitionService.isPartitionOwner(partitionId); - Collection> iterators = iteratorsOf(partitionId); - for (Iterator iterator : iterators) { - while (iterator.hasNext()) { - Store store = iterator.next(); - - if (partitionOwner) { - String dataStructureName = getDataStructureName(store); - Object mergePolicy = getMergePolicy(dataStructureName); - - if (!isDiscardPolicy(mergePolicy)) { - if (mergePolicy instanceof SplitBrainMergePolicy) { - copyToCollectedStores(store, collectedStores); - } else { - copyToCollectedStores(store, collectedStoresWithLegacyPolicies); - } - - mergingDataStructureNames.add(dataStructureName); - } - } else { - backupStores.add(store); - } - - iterator.remove(); - } - } - } - - onPrepareMergeRunnableEnd(mergingDataStructureNames); - - return newMergeRunnable(collectedStores, collectedStoresWithLegacyPolicies, backupStores, nodeEngine); - } - - protected void onPrepareMergeRunnableStart() { - // NOP intentionally, implementations can override this method - } - - protected void onPrepareMergeRunnableEnd(Collection dataStructureNames) { - // NOP intentionally, implementations can override this method - } - - @SuppressWarnings("BooleanMethodIsAlwaysInverted") - protected boolean isDiscardPolicy(Object mergePolicy) { - return mergePolicy instanceof DiscardMergePolicy; - } - - // overridden on EE - public void destroyStores(Collection recordStores) { - Iterator iterator = recordStores.iterator(); - while (iterator.hasNext()) { - try { - destroyStore(iterator.next()); - } finally { - iterator.remove(); - } - } - } - - private void copyToCollectedStores(Store store, Map> collectedStores) { - String dataStructureName = getDataStructureName(store); - - Collection stores = collectedStores.get(dataStructureName); - if (stores == null) { - stores = new LinkedList(); - collectedStores.put(dataStructureName, stores); - } - - stores.add(store); - } - - /** - * Destroys the provided store. - */ - protected abstract void destroyStore(Store store); - - /** - * @return name of the data structure - */ - protected abstract String getDataStructureName(Store store); - - /** - * @return merge policy for the data structure - */ - protected abstract Object getMergePolicy(String dataStructureName); - - /** - * @return collection of iterators to scan data in the partition - */ - protected abstract Collection> iteratorsOf(int partitionId); - - /** - * @return new merge runnable - */ - protected abstract Runnable newMergeRunnable(Map> collectedStores, - Map> collectedStoresWithLegacyPolicies, - Collection backupStores, - NodeEngine nodeEngine); -} diff --git a/hazelcast/src/main/java/com/hazelcast/spi/impl/merge/BaseSplitBrainHandlerService.java b/hazelcast/src/main/java/com/hazelcast/spi/impl/merge/BaseSplitBrainHandlerService.java new file mode 100644 index 000000000000..4614c3761707 --- /dev/null +++ b/hazelcast/src/main/java/com/hazelcast/spi/impl/merge/BaseSplitBrainHandlerService.java @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.spi.impl.merge; + +import com.hazelcast.spi.NodeEngine; +import com.hazelcast.spi.SplitBrainHandlerService; +import com.hazelcast.spi.impl.PartitionSpecificRunnable; +import com.hazelcast.spi.impl.operationexecutor.OperationExecutor; +import com.hazelcast.spi.impl.operationservice.impl.OperationServiceImpl; +import com.hazelcast.spi.merge.DiscardMergePolicy; +import com.hazelcast.spi.partition.IPartition; +import com.hazelcast.spi.partition.IPartitionService; + +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; + +import static java.lang.Thread.currentThread; + +/** + * Collects mergeable stores and passes them to merge-runnable. + * + * @param store of a partition + */ +@SuppressWarnings("WeakerAccess") +public abstract class BaseSplitBrainHandlerService implements SplitBrainHandlerService { + + private final IPartitionService partitionService; + private final OperationExecutor operationExecutor; + + protected BaseSplitBrainHandlerService(NodeEngine nodeEngine) { + this.partitionService = nodeEngine.getPartitionService(); + this.operationExecutor = ((OperationServiceImpl) nodeEngine.getOperationService()).getOperationExecutor(); + } + + @Override + public final Runnable prepareMergeRunnable() { + ConcurrentLinkedQueue mergingStores = new ConcurrentLinkedQueue(); + + collectStores(mergingStores); + + return newMergeRunnable(mergingStores, this); + } + + private void collectStores(final ConcurrentLinkedQueue mergingStores) { + int partitionCount = partitionService.getPartitionCount(); + final CountDownLatch latch = new CountDownLatch(partitionCount); + + for (int i = 0; i < partitionCount; i++) { + operationExecutor.execute(new StoreCollector(mergingStores, i, latch)); + } + + try { + latch.await(); + } catch (InterruptedException e) { + currentThread().interrupt(); + } + } + + private class StoreCollector implements PartitionSpecificRunnable { + private final int partitionId; + private final CountDownLatch latch; + private final ConcurrentLinkedQueue mergingStores; + + public StoreCollector(ConcurrentLinkedQueue mergingStores, + int partitionId, + CountDownLatch latch) { + this.mergingStores = mergingStores; + this.partitionId = partitionId; + this.latch = latch; + } + + @Override + public int getPartitionId() { + return partitionId; + } + + @Override + public void run() { + LinkedList storesToDestroy = new LinkedList(); + try { + Iterator iterator = storeIterator(partitionId); + while (iterator.hasNext()) { + Store store = iterator.next(); + + if (isLocalPartition(partitionId) + && hasEntry(store) + && !hasDiscardPolicy(store)) { + mergingStores.add(store); + } else { + storesToDestroy.add(store); + } + + iterator.remove(); + } + + asyncDestroyStores(storesToDestroy, partitionId); + + } finally { + latch.countDown(); + } + } + } + + void asyncDestroyStores(final Collection stores, final int partitionID) { + operationExecutor.execute(new PartitionSpecificRunnable() { + @Override + public void run() { + for (Store store : stores) { + destroyStore(store); + } + } + + @Override + public int getPartitionId() { + return partitionID; + } + }); + } + + private boolean isLocalPartition(int partitionId) { + IPartition partition = partitionService.getPartition(partitionId, false); + return partition.isLocal(); + } + + // overridden in implementations + protected boolean hasDiscardPolicy(Object mergePolicy) { + return mergePolicy instanceof DiscardMergePolicy; + } + + protected abstract void destroyStore(Store store); + + protected abstract boolean hasEntry(Store store); + + protected abstract Runnable newMergeRunnable(Collection mergingStores, + BaseSplitBrainHandlerService splitBrainHandlerService); + + /** + * @return collection of iterators to scan data in the partition + */ + protected abstract Iterator storeIterator(int partitionId); +} diff --git a/hazelcast/src/main/java/com/hazelcast/spi/impl/merge/MergingValueFactory.java b/hazelcast/src/main/java/com/hazelcast/spi/impl/merge/MergingValueFactory.java index 752c9e8a8e84..bca07925d5a8 100644 --- a/hazelcast/src/main/java/com/hazelcast/spi/impl/merge/MergingValueFactory.java +++ b/hazelcast/src/main/java/com/hazelcast/spi/impl/merge/MergingValueFactory.java @@ -105,9 +105,10 @@ public static MapMergeTypes createMergingEntry(SerializationService serializatio .setCost(record.getCost()); } - public static MapMergeTypes createMergingEntry(SerializationService serializationService, Record record, Data dataValue) { + public static MapMergeTypes createMergingEntry(SerializationService serializationService, + Data dataKey, Data dataValue, Record record) { return new MapMergingEntryImpl(serializationService) - .setKey(record.getKey()) + .setKey(dataKey) .setValue(dataValue) .setCreationTime(record.getCreationTime()) .setExpirationTime(record.getExpirationTime()) diff --git a/hazelcast/src/test/java/com/hazelcast/cache/merge/CacheMergePolicyProviderTest.java b/hazelcast/src/test/java/com/hazelcast/cache/merge/CacheMergePolicyProviderTest.java index 190676fe1e0f..5e1f570b57f8 100644 --- a/hazelcast/src/test/java/com/hazelcast/cache/merge/CacheMergePolicyProviderTest.java +++ b/hazelcast/src/test/java/com/hazelcast/cache/merge/CacheMergePolicyProviderTest.java @@ -18,6 +18,7 @@ import com.hazelcast.cache.BuiltInCacheMergePolicies; import com.hazelcast.cache.CacheMergePolicy; +import com.hazelcast.cache.impl.CacheService; import com.hazelcast.cache.impl.merge.policy.CacheMergePolicyProvider; import com.hazelcast.config.InvalidConfigurationException; import com.hazelcast.test.HazelcastParallelClassRunner; @@ -46,7 +47,8 @@ public class CacheMergePolicyProviderTest extends HazelcastTestSupport { @Before public void setup() { - mergePolicyProvider = new CacheMergePolicyProvider(getNode(createHazelcastInstance()).getNodeEngine()); + CacheService service = getNode(createHazelcastInstance()).getNodeEngine().getService(CacheService.SERVICE_NAME); + mergePolicyProvider = service.getMergePolicyProvider(); } @Test diff --git a/hazelcast/src/test/java/com/hazelcast/internal/cluster/impl/MemberListJoinVersionTest.java b/hazelcast/src/test/java/com/hazelcast/internal/cluster/impl/MemberListJoinVersionTest.java index e36cd299c6bb..2c2e361718cb 100644 --- a/hazelcast/src/test/java/com/hazelcast/internal/cluster/impl/MemberListJoinVersionTest.java +++ b/hazelcast/src/test/java/com/hazelcast/internal/cluster/impl/MemberListJoinVersionTest.java @@ -134,10 +134,7 @@ public void stateChanged(LifecycleEvent event) { assertNotEquals(afterJoinVersionOnMember1, beforeJoinVersionOnMember3); int versionOnLocalMember3 = getNode(member3).getLocalMember().getMemberListJoinVersion(); - // during join, we are forcibly changing member-type of member3 to lite member, upon end of merge operations - // promoting that member3 to data member, in this scenario, local members version is not incremented and - // in this test it should be off by 1. - assertEquals(afterJoinVersionOnMember1, versionOnLocalMember3 + 1); + assertEquals(afterJoinVersionOnMember1, versionOnLocalMember3); assertMemberViewsAreSame(getMemberMap(member1), getMemberMap(member3)); assertTrueEventually(new AssertTask() {