From faf50f1dad74028f6cd665ec23ee3dabc40ea923 Mon Sep 17 00:00:00 2001 From: Ilya Lantukh Date: Wed, 7 Feb 2018 13:33:28 +0300 Subject: [PATCH] IGNITE-7514 Affinity assignment should be recalculated when primary node is not OWNER --- .../internal/events/DiscoveryCustomEvent.java | 34 ++++ .../cache/CacheAffinitySharedManager.java | 86 ++++++--- .../processors/cache/ClusterCachesInfo.java | 1 + .../GridCachePartitionExchangeManager.java | 2 +- .../dht/GridClientPartitionTopology.java | 9 +- .../dht/GridDhtLocalPartition.java | 3 - .../dht/GridDhtPartitionTopology.java | 10 +- .../dht/GridDhtPartitionTopologyImpl.java | 47 ++++- .../GridDhtPartitionsExchangeFuture.java | 144 +++++++++------ .../GridCacheDatabaseSharedManager.java | 5 + .../CacheBaselineTopologyTest.java | 169 +++++++++++++++++- 11 files changed, 424 insertions(+), 86 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java index b3c6a2d865b2f..3b12b384f54be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java @@ -21,7 +21,10 @@ import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; /** * Custom event. @@ -85,4 +88,35 @@ public void affinityTopologyVersion(AffinityTopologyVersion affTopVer) { @Override public String toString() { return S.toString(DiscoveryCustomEvent.class, this, super.toString()); } + + /** + * @param evt Discovery event. + * @return {@code True} if event is DiscoveryCustomEvent that requires centralized affinity assignment. + */ + public static boolean requiresCentralizedAffinityAssignment(DiscoveryEvent evt) { + if (!(evt instanceof DiscoveryCustomEvent)) + return false; + + return requiresCentralizedAffinityAssignment(((DiscoveryCustomEvent)evt).customMessage()); + } + + /** + * @param msg Discovery custom message. + * @return {@code True} if message belongs to event that requires centralized affinity assignment. + */ + public static boolean requiresCentralizedAffinityAssignment(@Nullable DiscoveryCustomMessage msg) { + if (msg == null) + return false; + + if (msg instanceof ChangeGlobalStateMessage && ((ChangeGlobalStateMessage)msg).activate()) + return true; + + if (msg instanceof SnapshotDiscoveryMessage) { + SnapshotDiscoveryMessage snapMsg = (SnapshotDiscoveryMessage) msg; + + return snapMsg.needExchange() && snapMsg.needAssignPartitions(); + } + + return false; + } } \ No newline at end of file diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 4119f2357c896..7bf793c2f0b05 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -40,6 +40,7 @@ import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; @@ -57,7 +58,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; -import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.GridPartitionStateMap; @@ -161,13 +161,15 @@ void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer, DiscoveryDataClusterState state) { - if (state.transition() || !state.active()) + if ((state.transition() || !state.active()) && + !DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(customMsg)) return; if (type == EVT_NODE_JOINED && node.isLocal()) lastAffVer = null; - if (!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT)) { + if ((!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT)) || + DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(customMsg)) { synchronized (mux) { assert lastAffVer == null || topVer.compareTo(lastAffVer) > 0; @@ -1260,10 +1262,12 @@ public GridAffinityAssignmentCache affinity(Integer grpId) { } /** + * Applies affinity diff from the received full message. + * * @param fut Current exchange future. * @param msg Finish exchange message. */ - public void mergeExchangesOnServerLeft(final GridDhtPartitionsExchangeFuture fut, + public void applyAffinityFromFullMessage(final GridDhtPartitionsExchangeFuture fut, final GridDhtPartitionsFullMessage msg) { final Map nodesByOrder = new HashMap<>(); @@ -1396,7 +1400,7 @@ public void onServerJoinWithExchangeMergeProtocol(GridDhtPartitionsExchangeFutur * @return Computed difference with ideal affinity. * @throws IgniteCheckedException If failed. */ - public Map onServerLeftWithExchangeMergeProtocol( + public Map onServerLeftWithExchangeMergeProtocol( final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { final ExchangeDiscoveryEvents evts = fut.context().events(); @@ -1404,6 +1408,36 @@ public Map onServerLeftWithExchangeMergePro assert fut.context().mergeExchanges(); assert evts.hasServerLeft(); + return onReassignmentEnforced(fut); + } + + /** + * Calculates affinity on coordinator for custom event types that require centralized assignment. + * + * @param fut Current exchange future. + * @return Computed difference with ideal affinity. + * @throws IgniteCheckedException If failed. + */ + public Map onCustomEventWithEnforcedAffinityReassignment( + final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException + { + assert DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(fut.firstEvent()); + + return onReassignmentEnforced(fut); + } + + /** + * Calculates new affinity assignment on coordinator and creates affinity diff messages for other nodes. + * + * @param fut Current exchange future. + * @return Computed difference with ideal affinity. + * @throws IgniteCheckedException If failed. + */ + private Map onReassignmentEnforced( + final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException + { + final ExchangeDiscoveryEvents evts = fut.context().events(); + forAllRegisteredCacheGroups(new IgniteInClosureX() { @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { AffinityTopologyVersion topVer = evts.topologyVersion(); @@ -1418,7 +1452,7 @@ public Map onServerLeftWithExchangeMergePro } }); - Map>> diff = initAffinityOnNodeLeft0(evts.topologyVersion(), + Map>> diff = initAffinityBasedOnPartitionsAvailability(evts.topologyVersion(), fut, NODE_TO_ORDER, true); @@ -1642,17 +1676,16 @@ private GridDhtAffinityAssignmentResponse fetchAffinity(AffinityTopologyVersion } /** - * Called on exchange initiated by server node leave. + * Called on exchange initiated by server node leave or custom event with centralized affinity assignment. * * @param fut Exchange future. * @param crd Coordinator flag. * @throws IgniteCheckedException If failed. * @return {@code True} if affinity should be assigned by coordinator. */ - public boolean onServerLeft(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException { - ClusterNode leftNode = fut.firstEvent().eventNode(); - - assert !leftNode.isClient() : leftNode; + public boolean onCentralizedAffinityChange(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException { + assert (fut.events().hasServerLeft() && !fut.firstEvent().eventNode().isClient()) || + DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(fut.firstEvent()) : fut.firstEvent(); if (crd) { // Need initialize CacheGroupHolders if this node become coordinator on this exchange. @@ -2066,7 +2099,7 @@ public IgniteInternalFuture>>> initAffinity initFut.listen(new IgniteInClosure>() { @Override public void apply(IgniteInternalFuture initFut) { try { - resFut.onDone(initAffinityOnNodeLeft0(fut.initialVersion(), fut, NODE_TO_ID, false)); + resFut.onDone(initAffinityBasedOnPartitionsAvailability(fut.initialVersion(), fut, NODE_TO_ID, false)); } catch (IgniteCheckedException e) { resFut.onDone(e); @@ -2077,10 +2110,13 @@ public IgniteInternalFuture>>> initAffinity return resFut; } else - return new GridFinishedFuture<>(initAffinityOnNodeLeft0(fut.initialVersion(), fut, NODE_TO_ID, false)); + return new GridFinishedFuture<>(initAffinityBasedOnPartitionsAvailability(fut.initialVersion(), fut, NODE_TO_ID, false)); } /** + * Initializes current affinity assignment based on partitions availability. + * Nodes that have most recent data will be considered affinity nodes. + * * @param topVer Topology version. * @param fut Exchange future. * @param c Closure converting affinity diff. @@ -2088,12 +2124,17 @@ public IgniteInternalFuture>>> initAffinity * @return Affinity assignment. * @throws IgniteCheckedException If failed. */ - private Map>> initAffinityOnNodeLeft0(final AffinityTopologyVersion topVer, + private Map>> initAffinityBasedOnPartitionsAvailability(final AffinityTopologyVersion topVer, final GridDhtPartitionsExchangeFuture fut, final IgniteClosure c, final boolean initAff) throws IgniteCheckedException { - final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(fut.context().events().lastServerEventVersion()); + final boolean enforcedCentralizedAssignment = + DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(fut.firstEvent()); + + final WaitRebalanceInfo waitRebalanceInfo = enforcedCentralizedAssignment ? + new WaitRebalanceInfo(fut.exchangeId().topologyVersion()) : + new WaitRebalanceInfo(fut.context().events().lastServerEventVersion()); final Collection aliveNodes = fut.context().events().discoveryCache().serverNodes(); @@ -2103,13 +2144,14 @@ private Map>> initAffinityOnNodeLeft0(final Af @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { CacheGroupHolder grpHolder = groupHolder(topVer, desc); - if (!grpHolder.rebalanceEnabled || fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom())) + if (!grpHolder.rebalanceEnabled || + (fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom()) && !enforcedCentralizedAssignment)) return; AffinityTopologyVersion affTopVer = grpHolder.affinity().lastVersion(); - assert affTopVer.topologyVersion() > 0 && !affTopVer.equals(topVer) : "Invalid affinity version " + - "[last=" + affTopVer + ", futVer=" + topVer + ", grp=" + desc.cacheOrGroupName() + ']'; + assert (affTopVer.topologyVersion() > 0 && !affTopVer.equals(topVer)) || enforcedCentralizedAssignment : + "Invalid affinity version [last=" + affTopVer + ", futVer=" + topVer + ", grp=" + desc.cacheOrGroupName() + ']'; List> curAssignment = grpHolder.affinity().assignments(affTopVer); List> newAssignment = grpHolder.affinity().idealAssignment(); @@ -2141,6 +2183,12 @@ private Map>> initAffinityOnNodeLeft0(final Af ", node=" + newPrimary + ", topVer=" + topVer + ']'; + List owners = top.owners(p); + + // It is essential that curPrimary node has partition in OWNING state. + if (!owners.isEmpty() && !owners.contains(curPrimary)) + curPrimary = owners.get(0); + if (curPrimary != null && newPrimary != null && !curPrimary.equals(newPrimary)) { if (aliveNodes.contains(curPrimary)) { GridDhtPartitionState state = top.partitionState(newPrimary.id(), p); @@ -2173,8 +2221,6 @@ private Map>> initAffinityOnNodeLeft0(final Af } if (newNodes0 == null) { - List owners = top.owners(p); - for (ClusterNode owner : owners) { if (aliveNodes.contains(owner)) { newNodes0 = latePrimaryAssignment(grpHolder.affinity(), diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 08a910b81603d..2b2fb559c182e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -1186,6 +1186,7 @@ public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) { /** * @param msg Message. * @param topVer Current topology version. + * @param curState Current cluster state. * @return Exchange action. * @throws IgniteCheckedException If configuration validation failed. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 9b9284f04b10a..8aa6db933a5d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -965,7 +965,7 @@ public void scheduleResendPartitions() { * For coordinator causes {@link GridDhtPartitionsFullMessage FullMessages} send, * for non coordinator - {@link GridDhtPartitionsSingleMessage SingleMessages} send */ - private void refreshPartitions() { + public void refreshPartitions() { // TODO https://issues.apache.org/jira/browse/IGNITE-6857 if (cctx.snapshot().snapshotOperationInProgress()) { scheduleResendPartitions(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index e994113087ec8..def00f3f9ae89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -256,9 +256,9 @@ private String mapString(GridDhtPartitionMap map) { } /** {@inheritDoc} */ - @Override public void initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer, + @Override public boolean initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer, GridDhtPartitionsExchangeFuture exchFut) { - // No-op. + return false; } /** {@inheritDoc} */ @@ -382,6 +382,11 @@ else if (!node2part.nodeId().equals(loc.id())) { fullMapString() + ']'); } + /** {@inheritDoc} */ + @Override public void afterStateRestored(AffinityTopologyVersion topVer) { + // no-op + } + /** {@inheritDoc} */ @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException { AffinityTopologyVersion topVer = exchFut.topologyVersion(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index e1f1d6f43ce1e..e63aab6c0dc78 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -213,9 +213,6 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements // TODO ignite-db throw new IgniteException(e); } - - // Todo log moving state - casState(state.get(), MOVING); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 4ae68ef739f8c..13564c2af2666 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -117,11 +117,19 @@ public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, /** * @param affVer Affinity version. * @param exchFut Exchange future. + * @return {@code True} if partitions must be refreshed. * @throws IgniteInterruptedCheckedException If interrupted. */ - public void initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer, GridDhtPartitionsExchangeFuture exchFut) + public boolean initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer, GridDhtPartitionsExchangeFuture exchFut) throws IgniteInterruptedCheckedException; + /** + * Initializes local data structures after partitions are restored from persistence. + * + * @param topVer Topology version. + */ + public void afterStateRestored(AffinityTopologyVersion topVer); + /** * Post-initializes this topology. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 0a2c1541bd70d..020c3e7ad34bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -299,10 +299,12 @@ private String mapString(GridDhtPartitionMap map) { } /** {@inheritDoc} */ - @Override public void initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer, + @Override public boolean initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer, GridDhtPartitionsExchangeFuture exchFut) throws IgniteInterruptedCheckedException { + boolean needRefresh; + ctx.database().checkpointReadLock(); try { @@ -310,11 +312,11 @@ private String mapString(GridDhtPartitionMap map) { try { if (stopping) - return; + return false; long updateSeq = this.updateSeq.incrementAndGet(); - initPartitions0(affVer, exchFut, updateSeq); + needRefresh = initPartitions0(affVer, exchFut, updateSeq); consistencyCheck(); } @@ -325,16 +327,21 @@ private String mapString(GridDhtPartitionMap map) { finally { ctx.database().checkpointReadUnlock(); } + + return needRefresh; } /** * @param affVer Affinity version to use. * @param exchFut Exchange future. * @param updateSeq Update sequence. + * @return {@code True} if partitions must be refreshed. */ - private void initPartitions0(AffinityTopologyVersion affVer, GridDhtPartitionsExchangeFuture exchFut, long updateSeq) { + private boolean initPartitions0(AffinityTopologyVersion affVer, GridDhtPartitionsExchangeFuture exchFut, long updateSeq) { List> aff = grp.affinity().readyAssignments(affVer); + boolean needRefresh = false; + if (grp.affinityNode()) { ClusterNode loc = ctx.localNode(); @@ -378,6 +385,8 @@ private void initPartitions0(AffinityTopologyVersion affVer, GridDhtPartitionsEx ", part=" + locPart + ']'); } + needRefresh = true; + updateSeq = updateLocal(p, locPart.state(), updateSeq, affVer); } } @@ -423,6 +432,8 @@ else if (belongs) { } updateRebalanceVersion(aff); + + return needRefresh; } /** @@ -616,6 +627,30 @@ private boolean partitionLocalNode(int p, AffinityTopologyVersion topVer) { return grp.affinity().nodes(p, topVer).contains(ctx.localNode()); } + /** {@inheritDoc} */ + @Override public void afterStateRestored(AffinityTopologyVersion topVer) { + lock.writeLock().lock(); + + try { + if (node2part == null) + return; + + long updateSeq = this.updateSeq.incrementAndGet(); + + for (int p = 0; p < grp.affinity().partitions(); p++) { + GridDhtLocalPartition locPart = locParts.get(p); + + if (locPart == null) + updateLocal(p, EVICTED, updateSeq, topVer); + else + updateLocal(p, locPart.state(), updateSeq, topVer); + } + } + finally { + lock.writeLock().unlock(); + } + } + /** {@inheritDoc} */ @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) { boolean changed = false; @@ -996,9 +1031,11 @@ else if (loc != null && state == RENTING && !showRenting) { map.put(i, part.state()); } + GridDhtPartitionMap locPartMap = node2part != null ? node2part.get(ctx.localNodeId()) : null; + return new GridDhtPartitionMap(ctx.localNodeId(), updateSeq.get(), - readyTopVer, + locPartMap != null ? locPartMap.topologyVersion() : readyTopVer, map, true); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index a45c9b99cd85d..695c840de302a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -130,6 +130,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private static final int RELEASE_FUTURE_DUMP_THRESHOLD = IgniteSystemProperties.getInteger(IGNITE_PARTITION_RELEASE_FUTURE_DUMP_THRESHOLD, 0); + /** */ + private static final IgniteProductVersion FORCE_AFF_REASSIGNMENT_SINCE = IgniteProductVersion.fromString("2.4.3"); + /** */ @GridToStringExclude private final Object mux = new Object(); @@ -231,6 +234,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte */ private boolean centralizedAff; + /** + * Enforce affinity reassignment based on actual partition distribution. This mode should be used when partitions + * might be distributed not according to affinity assignment. + */ + private boolean forceAffReassignment; + /** Change global state exception. */ private Exception changeGlobalStateE; @@ -615,6 +624,9 @@ public void init(boolean newCrd) throws IgniteInterruptedCheckedException { DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)firstDiscoEvt).customMessage(); + forceAffReassignment = DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(msg) + && firstEventCache().minimumNodeVersion().compareToIgnoreTimestamp(FORCE_AFF_REASSIGNMENT_SINCE) >= 0; + if (msg instanceof ChangeGlobalStateMessage) { assert exchActions != null && !exchActions.empty(); @@ -636,6 +648,9 @@ else if (msg instanceof WalStateAbstractMessage) exchange = onAffinityChangeRequest(crdNode); } + if (forceAffReassignment) + cctx.affinity().onCentralizedAffinityChange(this, crdNode); + initCoordinatorCaches(newCrd); } else { @@ -781,7 +796,7 @@ private void initTopologies() throws IgniteCheckedException { if (grp.isLocal()) continue; - grp.topology().beforeExchange(this, !centralizedAff, false); + grp.topology().beforeExchange(this, !centralizedAff && !forceAffReassignment, false); } } } @@ -924,8 +939,6 @@ private ExchangeType onClusterStateChangeRequest(boolean crd) { else if (req.activate()) { // TODO: BLT changes on inactive cluster can't be handled easily because persistent storage hasn't been initialized yet. try { - cctx.affinity().onBaselineTopologyChanged(this, crd); - if (CU.isPersistenceEnabled(cctx.kernalContext().config()) && !cctx.kernalContext().clientNode()) cctx.kernalContext().state().onBaselineTopologyChanged(req.baselineTopology(), req.prevBaselineTopologyHistoryItem()); @@ -962,7 +975,8 @@ private ExchangeType onCacheChangeRequest(boolean crd) throws IgniteCheckedExcep * @return Exchange type. */ private ExchangeType onCustomMessageNoAffinityChange(boolean crd) { - cctx.affinity().onCustomMessageNoAffinityChange(this, crd, exchActions); + if (!forceAffReassignment) + cctx.affinity().onCustomMessageNoAffinityChange(this, crd, exchActions); return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL; } @@ -1017,7 +1031,7 @@ private ExchangeType onServerNodeEvent(boolean crd) throws IgniteCheckedExceptio exchCtx.events().warnNoAffinityNodes(cctx); - centralizedAff = cctx.affinity().onServerLeft(this, crd); + centralizedAff = cctx.affinity().onCentralizedAffinityChange(this, crd); } else cctx.affinity().onServerJoin(this, crd); @@ -1088,8 +1102,6 @@ private void distributedExchange() throws IgniteCheckedException { } } - cctx.database().beforeExchange(this); - if (!exchCtx.mergeExchanges()) { for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal() || cacheGroupStopping(grp.groupId())) @@ -1097,10 +1109,14 @@ private void distributedExchange() throws IgniteCheckedException { // It is possible affinity is not initialized yet if node joins to cluster. if (grp.affinity().lastVersion().topologyVersion() > 0) - grp.topology().beforeExchange(this, !centralizedAff, false); + grp.topology().beforeExchange(this, !centralizedAff && !forceAffReassignment, false); } } + // It is necessary to run database callback after all topology callbacks, so partition states could be + // correctly restored from the persistent store. + cctx.database().beforeExchange(this); + changeWalModeIfNeeded(); if (crd.isLocal()) { @@ -1534,19 +1550,24 @@ public void finishMerged() { } if (err == null) { - if (centralizedAff) { + if (centralizedAff || forceAffReassignment) { assert !exchCtx.mergeExchanges(); for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal()) continue; + boolean needRefresh = false; + try { - grp.topology().initPartitionsWhenAffinityReady(res, this); + needRefresh = grp.topology().initPartitionsWhenAffinityReady(res, this); } catch (IgniteInterruptedCheckedException e) { U.error(log, "Failed to initialize partitions.", e); } + + if (needRefresh) + cctx.exchange().refreshPartitions(); } } @@ -2342,7 +2363,7 @@ private void onAllReceived(@Nullable Collection sndResNodes) { if (!exchCtx.mergeExchanges() && !crd.equals(events().discoveryCache().serverNodes().get(0))) { for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (!grp.isLocal()) - grp.topology().beforeExchange(this, !centralizedAff, false); + grp.topology().beforeExchange(this, !centralizedAff && !forceAffReassignment, false); } } @@ -2474,6 +2495,9 @@ else if (discoveryCustomMessage instanceof SnapshotDiscoveryMessage detectLostPartitions(resTopVer); } + if (!exchCtx.mergeExchanges() && forceAffReassignment) + idealAffDiff = cctx.affinity().onCustomEventWithEnforcedAffinityReassignment(this); + for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) { if (!grpCtx.isLocal()) grpCtx.topology().applyUpdateCounters(); @@ -2496,6 +2520,8 @@ else if (discoveryCustomMessage instanceof SnapshotDiscoveryMessage if (exchCtx.events().hasServerLeft()) msg.idealAffinityDiff(idealAffDiff); } + else if (forceAffReassignment) + msg.idealAffinityDiff(idealAffDiff); msg.prepareMarshal(cctx); @@ -2549,65 +2575,69 @@ else if (discoveryCustomMessage instanceof SnapshotDiscoveryMessage nodes.addAll(sndResNodes); } - IgniteCheckedException err = null; + if (!nodes.isEmpty()) + sendAllPartitions(msg, nodes, mergedJoinExchMsgs0, joinedNodeAff); - if (stateChangeExchange()) { - StateChangeRequest req = exchActions.stateChangeRequest(); + if (!stateChangeExchange()) + onDone(exchCtx.events().topologyVersion(), null); - assert req != null : exchActions; + for (Map.Entry e : pendingSingleMsgs.entrySet()) { + if (log.isInfoEnabled()) { + log.info("Process pending message on coordinator [node=" + e.getKey() + + ", ver=" + initialVersion() + + ", resVer=" + resTopVer + ']'); + } - boolean stateChangeErr = false; + processSingleMessage(e.getKey(), e.getValue()); + } + } - if (!F.isEmpty(changeGlobalStateExceptions)) { - stateChangeErr = true; + if (stateChangeExchange()) { + IgniteCheckedException err = null; - err = new IgniteCheckedException("Cluster state change failed."); + StateChangeRequest req = exchActions.stateChangeRequest(); - cctx.kernalContext().state().onStateChangeError(changeGlobalStateExceptions, req); - } - else { - boolean hasMoving = !partsToReload.isEmpty(); + assert req != null : exchActions; - Set waitGrps = cctx.affinity().waitGroups(); + boolean stateChangeErr = false; - if (!hasMoving) { - for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) { - if (waitGrps.contains(grpCtx.groupId()) && grpCtx.topology().hasMovingPartitions()) { - hasMoving = true; + if (!F.isEmpty(changeGlobalStateExceptions)) { + stateChangeErr = true; - break; - } + err = new IgniteCheckedException("Cluster state change failed."); - } - } + cctx.kernalContext().state().onStateChangeError(changeGlobalStateExceptions, req); + } + else { + boolean hasMoving = !partsToReload.isEmpty(); - cctx.kernalContext().state().onExchangeFinishedOnCoordinator(this, hasMoving); - } + Set waitGrps = cctx.affinity().waitGroups(); - boolean active = !stateChangeErr && req.activate(); + if (!hasMoving) { + for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) { + if (waitGrps.contains(grpCtx.groupId()) && grpCtx.topology().hasMovingPartitions()) { + hasMoving = true; - ChangeGlobalStateFinishMessage stateFinishMsg = new ChangeGlobalStateFinishMessage( - req.requestId(), - active, - !stateChangeErr); + break; + } - cctx.discovery().sendCustomEvent(stateFinishMsg); + } + } + + cctx.kernalContext().state().onExchangeFinishedOnCoordinator(this, hasMoving); } - if (!nodes.isEmpty()) - sendAllPartitions(msg, nodes, mergedJoinExchMsgs0, joinedNodeAff); + boolean active = !stateChangeErr && req.activate(); - onDone(exchCtx.events().topologyVersion(), err); + ChangeGlobalStateFinishMessage stateFinishMsg = new ChangeGlobalStateFinishMessage( + req.requestId(), + active, + !stateChangeErr); - for (Map.Entry e : pendingSingleMsgs.entrySet()) { - if (log.isInfoEnabled()) { - log.info("Process pending message on coordinator [node=" + e.getKey() + - ", ver=" + initialVersion() + - ", resVer=" + resTopVer + ']'); - } + cctx.discovery().sendCustomEvent(stateFinishMsg); - processSingleMessage(e.getKey(), e.getValue()); - } + if (!centralizedAff) + onDone(exchCtx.events().topologyVersion(), err); } } catch (IgniteCheckedException e) { @@ -2950,7 +2980,7 @@ private void processFullMessage(boolean checkCrd, ClusterNode node, GridDhtParti cctx.affinity().onLocalJoin(this, msg, resTopVer); else { if (exchCtx.events().hasServerLeft()) - cctx.affinity().mergeExchangesOnServerLeft(this, msg); + cctx.affinity().applyAffinityFromFullMessage(this, msg); else cctx.affinity().onServerJoinWithExchangeMergeProtocol(this, false); @@ -2964,6 +2994,8 @@ private void processFullMessage(boolean checkCrd, ClusterNode node, GridDhtParti } else if (localJoinExchange() && !exchCtx.fetchAffinityOnJoin()) cctx.affinity().onLocalJoin(this, msg, resTopVer); + else if (forceAffReassignment) + cctx.affinity().applyAffinityFromFullMessage(this, msg); updatePartitionFullMap(resTopVer, msg); @@ -3073,6 +3105,9 @@ public void onAffinityChangeMessage(final ClusterNode node, final CacheAffinityC crd.isLocal(), msg); + IgniteCheckedException err = !F.isEmpty(msg.partitionsMessage().getErrorsMap()) ? + new IgniteCheckedException("Cluster state change failed.") : null; + if (!crd.isLocal()) { GridDhtPartitionsFullMessage partsMsg = msg.partitionsMessage(); @@ -3080,9 +3115,12 @@ public void onAffinityChangeMessage(final ClusterNode node, final CacheAffinityC assert partsMsg.lastVersion() != null : partsMsg; updatePartitionFullMap(resTopVer, partsMsg); + + if (exchActions != null && exchActions.stateChangeRequest() != null && err != null) + cctx.kernalContext().state().onStateChangeError(msg.partitionsMessage().getErrorsMap(), exchActions.stateChangeRequest()); } - onDone(resTopVer); + onDone(resTopVer, err); } else { if (log.isDebugEnabled()) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index bd80ec8c1be3f..1de48f67f0418 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -113,6 +113,8 @@ import org.apache.ignite.internal.processors.cache.StoredCacheData; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore; @@ -2364,6 +2366,9 @@ else if (restore != null) { updateState(part, restore.get1()); } } + + // After partition states are restored, it is necessary to update internal data structures in topology. + grp.topology().afterStateRestored(grp.topology().lastTopologyChangeVersion()); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java index 7b40b03cae393..6ccb450319180 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java @@ -25,11 +25,16 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; +import java.util.UUID; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.BaselineNode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; @@ -40,8 +45,15 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.TestDelayingCommunicationSpi; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -62,6 +74,9 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest { /** */ private static final int NODE_COUNT = 4; + /** */ + private static boolean delayRebalance = false; + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); @@ -106,6 +121,9 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest { if (client) cfg.setClientMode(true); + if (delayRebalance) + cfg.setCommunicationSpi(new DelayRebalanceCommunicationSpi()); + return cfg; } @@ -594,7 +612,10 @@ else if (grid(i).localNode().equals(affNodes.get(1))) { assertEquals(val2, primary.cache(CACHE_NAME).get(key)); assertEquals(val2, backup.cache(CACHE_NAME).get(key)); - primary.cache(CACHE_NAME).rebalance().get(); + for (int i = 0; i < NODE_COUNT; i++) + grid(i).cache(CACHE_NAME).rebalance().get(); + + awaitPartitionMapExchange(); affNodes = (List) ig.affinity(CACHE_NAME).mapKeyToPrimaryAndBackups(key); @@ -697,6 +718,83 @@ public void testNonPersistentCachesIgnoreBaselineTopology() throws Exception { assertTrue(ig.affinity(inMemoryCache.getName()).allPartitions(newNode.cluster().localNode()).length > 0); } + /** + * @throws Exception if failed. + */ + public void testAffinityAssignmentChangedAfterRestart() throws Exception { + delayRebalance = false; + + int parts = 32; + + final List partMapping = new ArrayList<>(); + + for (int p = 0; p < parts; p++) + partMapping.add(p); + + final AffinityFunction affFunc = new TestAffinityFunction(new RendezvousAffinityFunction(false, parts)); + + TestAffinityFunction.partsAffMapping = partMapping; + + String cacheName = CACHE_NAME + 2; + + startGrids(4); + + IgniteEx ig = grid(0); + + ig.cluster().active(true); + + IgniteCache cache = ig.createCache( + new CacheConfiguration() + .setName(cacheName) + .setCacheMode(PARTITIONED) + .setBackups(1) + .setPartitionLossPolicy(READ_ONLY_SAFE) + .setReadFromBackup(true) + .setWriteSynchronizationMode(FULL_SYNC) + .setRebalanceDelay(-1) + .setAffinity(affFunc)); + + Map keyToConsId = new HashMap<>(); + + for (int k = 0; k < 1000; k++) { + cache.put(k, k); + + keyToConsId.put(k, ig.affinity(cacheName).mapKeyToNode(k).consistentId().toString()); + } + + stopAllGrids(); + + Collections.shuffle(TestAffinityFunction.partsAffMapping, new Random(1)); + + delayRebalance = true; + + startGrids(4); + + ig = grid(0); + + ig.active(true); + + cache = ig.cache(cacheName); + + GridDhtPartitionFullMap partMap = ig.cachex(cacheName).context().topology().partitionMap(false); + + for (int i = 1; i < 4; i++) { + IgniteEx ig0 = grid(i); + + for (int p = 0; p < 32; p++) + assertEqualsCollections(ig.affinity(cacheName).mapPartitionToPrimaryAndBackups(p), ig0.affinity(cacheName).mapPartitionToPrimaryAndBackups(p)); + } + + for (Map.Entry e : keyToConsId.entrySet()) { + int p = ig.affinity(cacheName).partition(e.getKey()); + + assertEquals("p=" + p, GridDhtPartitionState.OWNING, partMap.get(ig.affinity(cacheName).mapKeyToNode(e.getKey()).id()).get(p)); + } + + for (int k = 0; k < 1000; k++) + assertEquals("k=" + k, Integer.valueOf(k), cache.get(k)); + } + /** */ private Collection baselineNodes(Collection clNodes) { Collection res = new ArrayList<>(clNodes.size()); @@ -758,4 +856,73 @@ private TestValue(int a) { return result; } } + + /** + * + */ + private static class TestAffinityFunction implements AffinityFunction { + /** */ + private final AffinityFunction delegate; + + /** */ + private static List partsAffMapping; + + /** */ + public TestAffinityFunction(AffinityFunction delegate) { + this.delegate = delegate; + } + + /** {@inheritDoc} */ + @Override public void reset() { + delegate.reset();; + } + + /** {@inheritDoc} */ + @Override public int partitions() { + return delegate.partitions(); + } + + /** {@inheritDoc} */ + @Override public int partition(Object key) { + return delegate.partition(key); + } + + /** {@inheritDoc} */ + @Override public List> assignPartitions(AffinityFunctionContext affCtx) { + List> res0 = delegate.assignPartitions(affCtx); + + List> res = new ArrayList<>(res0.size()); + + for (int p = 0; p < res0.size(); p++) + res.add(p, null); + + for (int p = 0; p < res0.size(); p++) + res.set(partsAffMapping.get(p), res0.get(p)); + + return res; + } + + /** {@inheritDoc} */ + @Override public void removeNode(UUID nodeId) { + delegate.removeNode(nodeId); + } + } + + /** + * + */ + private static class DelayRebalanceCommunicationSpi extends TestDelayingCommunicationSpi { + /** {@inheritDoc} */ + @Override protected boolean delayMessage(Message msg, GridIoMessage ioMsg) { + if (msg != null && (msg instanceof GridDhtPartitionDemandMessage || msg instanceof GridDhtPartitionSupplyMessage)) + return true; + + return false; + } + + /** {@inheritDoc} */ + @Override protected int delayMillis() { + return 1_000_000; + } + } }