diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 7a24aa14ded91..f04a6ce2fa025 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -21,18 +21,18 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; -import java.util.Queue; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.LinkedBlockingDeque; @@ -87,7 +87,6 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.GPC; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; @@ -97,13 +96,11 @@ import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; -import org.jsr166.ConcurrentLinkedDeque8; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.getLong; -import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -156,9 +153,6 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana /** */ private GridFutureAdapter reconnectExchangeFut; - /** */ - private final Queue> rebalanceQ = new ConcurrentLinkedDeque8<>(); - /** * Partition map futures. * This set also contains already completed exchange futures to address race conditions when coordinator @@ -1596,12 +1590,8 @@ void addFuture(GridDhtPartitionsExchangeFuture exchFut) { @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { long timeout = cctx.gridConfig().getNetworkTimeout(); - boolean startEvtFired = false; - int cnt = 0; - IgniteInternalFuture asyncStartFut = null; - while (!isCancelled()) { GridDhtPartitionsExchangeFuture exchFut = null; @@ -1703,20 +1693,8 @@ void addFuture(GridDhtPartitionsExchangeFuture exchFut) { continue; changed |= cacheCtx.topology().afterExchange(exchFut); - - // Preload event notification. - if (!exchFut.skipPreload() && cacheCtx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED)) { - if (!cacheCtx.isReplicated() || !startEvtFired) { - DiscoveryEvent discoEvt = exchFut.discoveryEvent(); - - cacheCtx.events().addPreloadEvent(-1, EVT_CACHE_REBALANCE_STARTED, - discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp()); - } - } } - startEvtFired = true; - if (!cctx.kernalContext().clientNode() && changed && futQ.isEmpty()) refreshPartitions(); } @@ -1755,8 +1733,6 @@ void addFuture(GridDhtPartitionsExchangeFuture exchFut) { if (assignsMap != null) { int size = assignsMap.size(); - rebalanceQ.clear(); - NavigableMap> orderMap = new TreeMap<>(); for (Map.Entry e : assignsMap.entrySet()) { @@ -1772,101 +1748,65 @@ void addFuture(GridDhtPartitionsExchangeFuture exchFut) { orderMap.get(order).add(cacheId); } - Callable marshR = null; - List> orderedRs = new ArrayList<>(size); + Runnable r = null; + + List rebList = new LinkedList<>(); + + boolean assignsCancelled = false; - //Ordered rebalance scheduling. - for (Integer order : orderMap.keySet()) { + for (Integer order : orderMap.descendingKeySet()) { for (Integer cacheId : orderMap.get(order)) { GridCacheContext cacheCtx = cctx.cacheContext(cacheId); - List waitList = new ArrayList<>(size - 1); + GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId); - for (List cIds : orderMap.headMap(order).values()) { - for (Integer cId : cIds) - waitList.add(cctx.cacheContext(cId).name()); - } + if (assigns != null) + assignsCancelled |= assigns.cancelled(); - Callable r = cacheCtx.preloader().addAssignments(assignsMap.get(cacheId), + // Cancels previous rebalance future (in case it's not done yet). + // Sends previous rebalance stopped event (if necessary). + // Creates new rebalance future. + // Sends current rebalance started event (if necessary). + // Finishes cache sync future (on empty assignments). + Runnable cur = cacheCtx.preloader().addAssignments(assigns, forcePreload, - waitList, - cnt); + cnt, + r); - if (r != null) { - U.log(log, "Cache rebalancing scheduled: [cache=" + cacheCtx.name() + - ", waitList=" + waitList.toString() + "]"); + if (cur != null) { + rebList.add(U.maskName(cacheCtx.name())); - if (cacheId == CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME)) - marshR = r; - else - orderedRs.add(r); + r = cur; } } } - if (asyncStartFut != null) - asyncStartFut.get(); // Wait for thread stop. + if (assignsCancelled) { // Pending exchange. + U.log(log, "Skipping rebalancing (obsolete exchange ID) " + + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + + ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); + } + else if (r != null) { + Collections.reverse(rebList); - rebalanceQ.addAll(orderedRs); + U.log(log, "Rebalancing scheduled [order=" + rebList + "]"); - if (marshR != null || !rebalanceQ.isEmpty()) { if (futQ.isEmpty()) { - U.log(log, "Rebalancing required " + + U.log(log, "Rebalancing started " + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); - if (marshR != null) { - try { - marshR.call(); //Marshaller cache rebalancing launches in sync way. - } - catch (Exception ex) { - if (log.isDebugEnabled()) - log.debug("Failed to send initial demand request to node"); - - continue; - } - } - - final GridFutureAdapter fut = new GridFutureAdapter(); - - asyncStartFut = fut; - - cctx.kernalContext().closure().callLocalSafe(new GPC() { - @Override public Boolean call() { - try { - while (true) { - Callable r = rebalanceQ.poll(); - - if (r == null) - return false; - - if (!r.call()) - return false; - } - } - catch (Exception ex) { - if (log.isDebugEnabled()) - log.debug("Failed to send initial demand request to node"); - - return false; - } - finally { - fut.onDone(); - } - } - }, /*system pool*/true); + r.run(); // Starts rebalancing routine. } - else { + else U.log(log, "Skipping rebalancing (obsolete exchange ID) " + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); - } } - else { + else U.log(log, "Skipping rebalancing (nothing scheduled) " + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); - } } } catch (IgniteInterruptedCheckedException e) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index 1d1cfab04ce63..3c4456d5984d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -19,7 +19,6 @@ import java.util.Collection; import java.util.UUID; -import java.util.concurrent.Callable; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -84,14 +83,14 @@ public interface GridCachePreloader { * * @param assignments Assignments to add. * @param forcePreload Force preload flag. - * @param caches Rebalancing of these caches will be finished before this started. * @param cnt Counter. - * @return Rebalancing closure. + * @param next Runnable responsible for cache rebalancing start. + * @return Rebalancing runnable. */ - public Callable addAssignments(GridDhtPreloaderAssignments assignments, + public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, - Collection caches, - int cnt); + int cnt, + Runnable next); /** * @param p Preload predicate. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index b15ebc5029b12..656a960b56b9f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -19,7 +19,6 @@ import java.util.Collection; import java.util.UUID; -import java.util.concurrent.Callable; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.affinity.AffinityFunction; @@ -166,8 +165,8 @@ public GridCachePreloaderAdapter(GridCacheContext cctx) { } /** {@inheritDoc} */ - @Override public Callable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, - Collection caches, int cnt) { + @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, + int cnt, Runnable next) { return null; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 57d522922e801..a6808c73577e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -28,8 +28,8 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; @@ -72,6 +72,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED; +import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED; import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; @@ -121,6 +122,18 @@ public class GridDhtPartitionDemander { /** Cached rebalance topics. */ private final Map rebalanceTopics; + /** + * Started event sent. + * Make sense for replicated cache only. + */ + private final AtomicBoolean startedEvtSent = new AtomicBoolean(); + + /** + * Stopped event sent. + * Make sense for replicated cache only. + */ + private final AtomicBoolean stoppedEvtSent = new AtomicBoolean(); + /** * @param cctx Cctx. * @param demandLock Demand lock. @@ -249,45 +262,25 @@ private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) { } /** - * @param name Cache name. - * @param fut Future. - * @throws IgniteCheckedException If failed. + * Sets last exchange future. + * + * @param lastFut Last future to set. */ - private boolean waitForCacheRebalancing(String name, RebalanceFuture fut) throws IgniteCheckedException { - if (log.isDebugEnabled()) - log.debug("Waiting for another cache to start rebalancing [cacheName=" + cctx.name() + - ", waitCache=" + name + ']'); - - RebalanceFuture wFut = (RebalanceFuture)cctx.kernalContext().cache().internalCache(name) - .preloader().rebalanceFuture(); - - if (!topologyChanged(fut) && wFut.updateSeq == fut.updateSeq) { - if (!wFut.get()) { - U.log(log, "Skipping waiting of " + name + " cache [top=" + fut.topologyVersion() + - "] (cache rebalanced with missed partitions)"); - - return false; - } - - return true; - } - else { - U.log(log, "Skipping waiting of " + name + " cache [top=" + fut.topologyVersion() + - "] (topology already changed)"); - - return false; - } + void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut) { + lastExchangeFut = lastFut; } /** * @param assigns Assignments. * @param force {@code True} if dummy reassign. - * @param caches Rebalancing of these caches will be finished before this started. * @param cnt Counter. - * @return Rebalancing closure. + * @param next Runnable responsible for cache rebalancing start. + * @return Rebalancing runnable. */ - Callable addAssignments(final GridDhtPreloaderAssignments assigns, boolean force, - final Collection caches, int cnt) { + Runnable addAssignments(final GridDhtPreloaderAssignments assigns, + boolean force, + int cnt, + final Runnable next) { if (log.isDebugEnabled()) log.debug("Adding partition assignments: " + assigns); @@ -296,7 +289,7 @@ Callable addAssignments(final GridDhtPreloaderAssignments assigns, bool if (delay == 0 || force) { final RebalanceFuture oldFut = rebalanceFut; - final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, oldFut.isInitial(), cnt); + final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, startedEvtSent, stoppedEvtSent, cnt); if (!oldFut.isInitial()) oldFut.cancel(); @@ -310,20 +303,69 @@ Callable addAssignments(final GridDhtPreloaderAssignments assigns, bool rebalanceFut = fut; - if (assigns.isEmpty()) { - fut.doneIfEmpty(assigns.cancelled()); + fut.sendRebalanceStartedEvent(); + + if (assigns.cancelled()) { // Pending exchange. + if (log.isDebugEnabled()) + log.debug("Rebalancing skipped due to cancelled assignments."); + + fut.onDone(false); + + fut.sendRebalanceFinishedEvent(); + + return null; + } + + if (assigns.isEmpty()) { // Nothing to rebalance. + if (log.isDebugEnabled()) + log.debug("Rebalancing skipped due to empty assignments."); + + fut.onDone(true); + + ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone(); + + fut.sendRebalanceFinishedEvent(); return null; } - return new Callable() { - @Override public Boolean call() throws Exception { - for (String c : caches) { - if (!waitForCacheRebalancing(c, fut)) - return false; + return new Runnable() { + @Override public void run() { + try { + if (next != null) + fut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture f) { + try { + if (f.get()) // Not cancelled. + next.run(); // Starts next cache rebalancing (according to the order). + } + catch (IgniteCheckedException ignored) { + if (log.isDebugEnabled()) + log.debug(ignored.getMessage()); + } + } + }); + + requestPartitions(fut, assigns); } + catch (IgniteCheckedException e) { + ClusterTopologyCheckedException cause = e.getCause(ClusterTopologyCheckedException.class); - return requestPartitions(fut, assigns); + if (cause != null) + log.warning("Failed to send initial demand request to node. " + e.getMessage()); + else + log.error("Failed to send initial demand request to node.", e); + + fut.cancel(); + } + catch (Throwable th) { + log.error("Runtime error caught during initial demand request sending.", th); + + fut.cancel(); + + if (th instanceof Error) + throw th; + } } }; } @@ -361,14 +403,17 @@ else if (delay > 0) { * @throws IgniteCheckedException If failed. * @return Partitions were requested. */ - private boolean requestPartitions( + private void requestPartitions( RebalanceFuture fut, GridDhtPreloaderAssignments assigns ) throws IgniteCheckedException { - for (Map.Entry e : assigns.entrySet()) { - if (topologyChanged(fut)) - return false; + if (topologyChanged(fut)) { + fut.cancel(); + + return; + } + for (Map.Entry e : assigns.entrySet()) { final ClusterNode node = e.getKey(); GridDhtPartitionDemandMessage d = e.getValue(); @@ -387,7 +432,7 @@ private boolean requestPartitions( //Check remote node rebalancing API version. if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >= 0) { - U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() + + U.log(log, "Starting rebalancing [mode=" + cfg.getRebalanceMode() + ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() + ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]"); @@ -446,8 +491,6 @@ private boolean requestPartitions( worker.run(node, d); } } - - return true; } /** @@ -738,15 +781,6 @@ else if (log.isDebugEnabled()) return S.toString(GridDhtPartitionDemander.class, this); } - /** - * Sets last exchange future. - * - * @param lastFut Last future to set. - */ - void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) { - lastExchangeFut = lastFut; - } - /** * */ @@ -754,8 +788,11 @@ public static class RebalanceFuture extends GridFutureAdapter { /** */ private static final long serialVersionUID = 1L; - /** Should EVT_CACHE_REBALANCE_STOPPED event be sent of not. */ - private final boolean sndStoppedEvnt; + /** Should EVT_CACHE_REBALANCE_STARTED event be sent or not. */ + private final AtomicBoolean startedEvtSent; + + /** Should EVT_CACHE_REBALANCE_STOPPED event be sent or not. */ + private final AtomicBoolean stoppedEvtSent; /** */ private final GridCacheContext cctx; @@ -783,13 +820,15 @@ public static class RebalanceFuture extends GridFutureAdapter { * @param assigns Assigns. * @param cctx Context. * @param log Logger. - * @param sentStopEvnt Stop event flag. + * @param startedEvtSent Start event sent flag. + * @param stoppedEvtSent Stop event sent flag. * @param updateSeq Update sequence. */ RebalanceFuture(GridDhtPreloaderAssignments assigns, GridCacheContext cctx, IgniteLogger log, - boolean sentStopEvnt, + AtomicBoolean startedEvtSent, + AtomicBoolean stoppedEvtSent, long updateSeq) { assert assigns != null; @@ -797,7 +836,8 @@ public static class RebalanceFuture extends GridFutureAdapter { this.topVer = assigns.topologyVersion(); this.cctx = cctx; this.log = log; - this.sndStoppedEvnt = sentStopEvnt; + this.startedEvtSent = startedEvtSent; + this.stoppedEvtSent = stoppedEvtSent; this.updateSeq = updateSeq; } @@ -809,7 +849,8 @@ public RebalanceFuture() { this.topVer = null; this.cctx = null; this.log = null; - this.sndStoppedEvnt = false; + this.startedEvtSent = null; + this.stoppedEvtSent = null; this.updateSeq = -1; } @@ -847,24 +888,6 @@ private void appendPartitions(UUID nodeId, Collection parts) { } } - /** - * @param cancelled Is cancelled. - */ - private void doneIfEmpty(boolean cancelled) { - synchronized (this) { - if (isDone()) - return; - - assert remaining.isEmpty(); - - if (log.isDebugEnabled()) - log.debug("Rebalancing is not required [cache=" + cctx.name() + - ", topology=" + topVer + "]"); - - checkIsDone(cancelled, true); - } - } - /** * Cancels this future. * @@ -875,8 +898,7 @@ private void doneIfEmpty(boolean cancelled) { if (isDone()) return true; - U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name() - + ", topology=" + topologyVersion() + ']'); + U.log(log, "Cancelled rebalancing from all nodes [topology=" + topologyVersion() + ']'); if (!cctx.kernalContext().isStopping()) { for (UUID nodeId : remaining.keySet()) @@ -885,7 +907,7 @@ private void doneIfEmpty(boolean cancelled) { remaining.clear(); - checkIsDone(true /* cancelled */, false); + checkIsDone(true /* cancelled */); } return true; @@ -907,7 +929,7 @@ private void cancel(UUID nodeId) { remaining.remove(nodeId); - onDone(false); // Finishing rebalance future a non completed. + onDone(false); // Finishing rebalance future as non completed. checkIsDone(); // But will finish syncFuture only when other nodes are preloaded or rebalancing cancelled. } @@ -988,8 +1010,7 @@ private void partitionDone(UUID nodeId, int p) { if (parts.isEmpty()) { U.log(log, "Completed " + ((remaining.size() == 1 ? "(final) " : "") + - "rebalancing [cache=" + cctx.name() + - ", fromNode=" + nodeId + ", topology=" + topologyVersion() + + "rebalancing [fromNode=" + nodeId + ", topology=" + topologyVersion() + ", time=" + (U.currentTimeMillis() - t.get1()) + " ms]")); remaining.remove(nodeId); @@ -1022,23 +1043,20 @@ private void preloadEvent(int type, DiscoveryEvent discoEvt) { * */ private void checkIsDone() { - checkIsDone(false, false); + checkIsDone(false); } /** * @param cancelled Is cancelled. - * @param wasEmpty {@code True} if future was created without assignments. */ - private void checkIsDone(boolean cancelled, boolean wasEmpty) { + private void checkIsDone(boolean cancelled) { if (remaining.isEmpty()) { - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sndStoppedEvnt)) - preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent()); + sendRebalanceFinishedEvent(); if (log.isDebugEnabled()) log.debug("Completed rebalance future: " + this); - if (!wasEmpty) - cctx.shared().exchange().scheduleResendPartitions(); + cctx.shared().exchange().scheduleResendPartitions(); Collection m = new HashSet<>(); @@ -1064,6 +1082,30 @@ private void checkIsDone(boolean cancelled, boolean wasEmpty) { } } + /** + * + */ + private void sendRebalanceStartedEvent() { + if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED) && + (!cctx.isReplicated() || !startedEvtSent.get())) { + preloadEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent()); + + startedEvtSent.set(true); + } + } + + /** + * + */ + private void sendRebalanceFinishedEvent() { + if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && + (!cctx.isReplicated() || !stoppedEvtSent.get())) { + preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent()); + + stoppedEvtSent.set(true); + } + } + /** {@inheritDoc} */ public String toString() { return S.toString(RebalanceFuture.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 0865d9f709d16..692e7c0de45a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -22,7 +22,6 @@ import java.util.Collections; import java.util.List; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; @@ -255,7 +254,7 @@ private IgniteCheckedException stopError() { @Override public void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut) { supplier.onTopologyChanged(lastFut.topologyVersion()); - demander.updateLastExchangeFuture(lastFut); + demander.onTopologyChanged(lastFut); } /** {@inheritDoc} */ @@ -413,9 +412,9 @@ public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage } /** {@inheritDoc} */ - @Override public Callable addAssignments(GridDhtPreloaderAssignments assignments, - boolean forcePreload, Collection caches, int cnt) { - return demander.addAssignments(assignments, forcePreload, caches, cnt); + @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, + boolean forcePreload, int cnt, Runnable next) { + return demander.addAssignments(assignments, forcePreload, cnt, next); } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java index de38952da7fd9..3dfcd85ea46b9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java @@ -501,6 +501,8 @@ protected void checkPartitionMapMessagesAbsent() throws Exception { record = true; + log.info("Checking GridDhtPartitions*Message absent (it will take 30 SECONDS) ... "); + U.sleep(30_000); record = false;