diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index f8cc86f8cecdb..5c78eb5b4b4a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -861,7 +861,8 @@ private long allocateForTree() throws IgniteCheckedException { } @Override protected void onClose() throws IgniteCheckedException { - assert loc != null && loc.state() == OWNING && loc.reservations() > 0; + assert loc != null && loc.state() == OWNING && loc.reservations() > 0 + : "Partition should be in OWNING state and has at least 1 reservation: " + loc; loc.release(); } @@ -874,36 +875,37 @@ private long allocateForTree() throws IgniteCheckedException { throws IgniteCheckedException { final TreeMap> iterators = new TreeMap<>(); - Set missing = null; + + Set missing = new HashSet<>(); for (Integer p : parts.fullSet()) { GridCloseableIterator partIter = reservedIterator(p, topVer); if (partIter == null) { - if (missing == null) - missing = new HashSet<>(); - missing.add(p); + + continue; } - else - iterators.put(p, partIter); + + iterators.put(p, partIter); } - IgniteRebalanceIterator iter = new IgniteRebalanceIteratorImpl(iterators, historicalIterator(parts.historicalMap())); + IgniteHistoricalIterator historicalIterator = historicalIterator(parts.historicalMap(), missing); - if (missing != null) { - for (Integer p : missing) - iter.setPartitionMissing(p); - } + IgniteRebalanceIterator iter = new IgniteRebalanceIteratorImpl(iterators, historicalIterator); + + for (Integer p : missing) + iter.setPartitionMissing(p); return iter; } /** * @param partCntrs Partition counters map. + * @param missing Set of partitions need to populate if partition is missing or failed to reserve. * @return Historical iterator. */ - @Nullable protected IgniteHistoricalIterator historicalIterator(CachePartitionPartialCountersMap partCntrs) + @Nullable protected IgniteHistoricalIterator historicalIterator(CachePartitionPartialCountersMap partCntrs, Set missing) throws IgniteCheckedException { return null; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index dc4bfe9bc1720..c94f511d91887 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -459,7 +459,9 @@ private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssign + ", topology=" + fut.topologyVersion() + ", rebalanceId=" + fut.rebalanceId + "]"); } - int stripes = ctx.gridConfig().getRebalanceThreadPoolSize(); + int totalStripes = ctx.gridConfig().getRebalanceThreadPoolSize(); + + int stripes = totalStripes; final List stripePartitions = new ArrayList<>(stripes); for (int i = 0; i < stripes; i++) @@ -467,7 +469,7 @@ private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssign // Reserve one stripe for historical partitions. if (parts.hasHistorical()) { - stripePartitions.add(stripes - 1, new IgniteDhtDemandedPartitionsMap(parts.historicalMap(), null)); + stripePartitions.set(stripes - 1, new IgniteDhtDemandedPartitionsMap(parts.historicalMap(), null)); if (stripes > 1) stripes--; @@ -478,7 +480,7 @@ private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssign for (int i = 0; it.hasNext(); i++) stripePartitions.get(i % stripes).addFull(it.next()); - for (int stripe = 0; stripe < stripes; stripe++) { + for (int stripe = 0; stripe < totalStripes; stripe++) { if (!stripePartitions.get(stripe).isEmpty()) { // Create copy of demand message with new striped partitions map. final GridDhtPartitionDemandMessage demandMsg = d.withNewPartitionsMap(stripePartitions.get(stripe)); @@ -489,23 +491,27 @@ private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssign final int topicId = stripe; - Runnable initDemandRequestTask = () -> { + IgniteInternalFuture clearAllFuture = clearFullPartitions(fut, demandMsg.partitions().fullSet()); + + // Start rebalancing after clearing full partitions is finished. + clearAllFuture.listen(f -> ctx.kernalContext().closure().runLocalSafe(() -> { + if (fut.isDone()) + return; + try { - if (!fut.isDone()) { - ctx.io().sendOrderedMessage(node, rebalanceTopics.get(topicId), - demandMsg.convertIfNeeded(node.version()), grp.ioPolicy(), demandMsg.timeout()); - - // Cleanup required in case partitions demanded in parallel with cancellation. - synchronized (fut) { - if (fut.isDone()) - fut.cleanupRemoteContexts(node.id()); - } + ctx.io().sendOrderedMessage(node, rebalanceTopics.get(topicId), + demandMsg.convertIfNeeded(node.version()), grp.ioPolicy(), demandMsg.timeout()); - if (log.isDebugEnabled()) - log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + - topicId + ", partitions count=" + stripePartitions.get(topicId).size() + - " (" + stripePartitions.get(topicId).partitionsList() + ")]"); + // Cleanup required in case partitions demanded in parallel with cancellation. + synchronized (fut) { + if (fut.isDone()) + fut.cleanupRemoteContexts(node.id()); } + + if (log.isDebugEnabled()) + log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + + topicId + " " + demandMsg.rebalanceId() + ", partitions count=" + stripePartitions.get(topicId).size() + + " (" + stripePartitions.get(topicId).partitionsList() + ")]"); } catch (IgniteCheckedException e1) { ClusterTopologyCheckedException cause = e1.getCause(ClusterTopologyCheckedException.class); @@ -522,31 +528,26 @@ private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssign fut.cancel(); } - }; - - awaitClearingAndStartRebalance(fut, demandMsg, initDemandRequestTask); + }, true)); } } } } /** - * Awaits partitions clearing for full partitions and sends initial demand request - * after all partitions are cleared and safe to consume data. + * Creates future which will be completed when all {@code fullPartitions} are cleared. * * @param fut Rebalance future. - * @param demandMessage Initial demand message which contains set of full partitions to await. - * @param initDemandRequestTask Task which sends initial demand request. + * @param fullPartitions Set of full partitions need to be cleared. + * @return Future which will be completed when given partitions are cleared. */ - private void awaitClearingAndStartRebalance(RebalanceFuture fut, - GridDhtPartitionDemandMessage demandMessage, - Runnable initDemandRequestTask) { - Set fullPartitions = demandMessage.partitions().fullSet(); + private IgniteInternalFuture clearFullPartitions(RebalanceFuture fut, Set fullPartitions) { + final GridFutureAdapter clearAllFuture = new GridFutureAdapter(); if (fullPartitions.isEmpty()) { - ctx.kernalContext().closure().runLocalSafe(initDemandRequestTask, true); + clearAllFuture.onDone(); - return; + return clearAllFuture; } for (GridCacheContext cctx : grp.caches()) { @@ -560,16 +561,19 @@ private void awaitClearingAndStartRebalance(RebalanceFuture fut, final AtomicInteger clearingPartitions = new AtomicInteger(fullPartitions.size()); for (int partId : fullPartitions) { - if (fut.isDone()) - return; + if (fut.isDone()) { + clearAllFuture.onDone(); + + return clearAllFuture; + } GridDhtLocalPartition part = grp.topology().localPartition(partId); if (part != null && part.state() == MOVING) { part.onClearFinished(f -> { - // Cancel rebalance if partition clearing was failed. - if (f.error() != null) { - if (!fut.isDone()) { + if (!fut.isDone()) { + // Cancel rebalance if partition clearing was failed. + if (f.error() != null) { for (GridCacheContext cctx : grp.caches()) { if (cctx.statisticsEnabled()) { final CacheMetricsImpl metrics = cctx.cache().metrics0(); @@ -581,30 +585,54 @@ private void awaitClearingAndStartRebalance(RebalanceFuture fut, log.error("Unable to await partition clearing " + part, f.error()); fut.cancel(); + + clearAllFuture.onDone(f.error()); } - } - else { - if (!fut.isDone()) { - int existed = clearingPartitions.decrementAndGet(); + else { + int remaining = clearingPartitions.decrementAndGet(); for (GridCacheContext cctx : grp.caches()) { if (cctx.statisticsEnabled()) { final CacheMetricsImpl metrics = cctx.cache().metrics0(); - metrics.rebalanceClearingPartitions(existed); + metrics.rebalanceClearingPartitions(remaining); } } - // If all partitions are cleared send initial demand message. - if (existed == 0) - ctx.kernalContext().closure().runLocalSafe(initDemandRequestTask, true); + if (log.isDebugEnabled()) + log.debug("Remaining clearing partitions [grp=" + grp.cacheOrGroupName() + + ", remaining=" + remaining + "]"); + + if (remaining == 0) + clearAllFuture.onDone(); } } + else { + clearAllFuture.onDone(); + } }); } - else - clearingPartitions.decrementAndGet(); + else { + int remaining = clearingPartitions.decrementAndGet(); + + for (GridCacheContext cctx : grp.caches()) { + if (cctx.statisticsEnabled()) { + final CacheMetricsImpl metrics = cctx.cache().metrics0(); + + metrics.rebalanceClearingPartitions(remaining); + } + } + + if (log.isDebugEnabled()) + log.debug("Remaining clearing partitions [grp=" + grp.cacheOrGroupName() + + ", remaining=" + remaining + "]"); + + if (remaining == 0) + clearAllFuture.onDone(); + } } + + return clearAllFuture; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 6d2f526d73780..a3ee305406207 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -173,7 +173,8 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand if (curTop.compareTo(demTop) > 0) { if (log.isDebugEnabled()) - log.debug("Demand request outdated [currentTopVer=" + curTop + log.debug("Demand request outdated [grp=" + grp.cacheOrGroupName() + + ", currentTopVer=" + curTop + ", demandTopVer=" + demTop + ", from=" + nodeId + ", topicId=" + topicId + "]"); @@ -189,10 +190,19 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand if (sctx != null && sctx.rebalanceId == -d.rebalanceId()) { clearContext(scMap.remove(contextId), log); + + if (log.isDebugEnabled()) + log.debug("Supply context cleaned [grp=" + grp.cacheOrGroupName() + + ", from=" + nodeId + + ", demandMsg=" + d + + ", supplyContext=" + sctx + "]"); } else { if (log.isDebugEnabled()) - log.debug("Stale context cleanup message " + d + ", supplyContext=" + sctx); + log.debug("Stale supply context cleanup message [grp=" + grp.cacheOrGroupName() + + ", from=" + nodeId + + ", demandMsg=" + d + + ", supplyContext=" + sctx + "]"); } return; @@ -200,13 +210,16 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand } if (log.isDebugEnabled()) - log.debug("Demand request accepted [current=" + curTop + ", demanded=" + demTop + - ", from=" + nodeId + ", topicId=" + topicId + "]"); + log.debug("Demand request accepted [grp=" + grp.cacheOrGroupName() + + ", from=" + nodeId + + ", currentVer=" + curTop + + ", demandedVer=" + demTop + + ", topicId=" + topicId + "]"); ClusterNode node = grp.shared().discovery().node(nodeId); if (node == null) - return; // Context will be cleaned at topology change. + return; try { SupplyContext sctx; @@ -217,13 +230,27 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand if (sctx != null && d.rebalanceId() < sctx.rebalanceId) { // Stale message, return context back and return. scMap.put(contextId, sctx); + + if (log.isDebugEnabled()) + log.debug("Stale demand message [grp=" + grp.cacheOrGroupName() + + ", actualContext=" + sctx + + ", from=" + nodeId + + ", demandMsg=" + d + "]"); + return; } } // Demand request should not contain empty partitions if no supply context is associated with it. - if (sctx == null && (d.partitions() == null || d.partitions().isEmpty())) + if (sctx == null && (d.partitions() == null || d.partitions().isEmpty())) { + if (log.isDebugEnabled()) + log.debug("Empty demand message [grp=" + grp.cacheOrGroupName() + + ", from=" + nodeId + + ", topicId=" + topicId + + ", demandMsg=" + d + "]"); + return; + } assert !(sctx != null && !d.partitions().isEmpty()); @@ -271,7 +298,8 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); - assert loc != null && loc.state() == GridDhtPartitionState.OWNING; + assert loc != null && loc.state() == GridDhtPartitionState.OWNING + : "Partition should be in OWNING state: " + loc; s.addEstimatedKeysCount(grp.offheap().totalPartitionEntriesCount(part)); } @@ -323,7 +351,8 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); - assert (loc != null && loc.state() == OWNING && loc.reservations() > 0) || iter.isPartitionMissing(part) : loc; + assert (loc != null && loc.state() == OWNING && loc.reservations() > 0) || iter.isPartitionMissing(part) + : "Partition should be in OWNING state and has at least 1 reservation " + loc; if (iter.isPartitionMissing(part) && remainingParts.contains(part)) { s.missed(part); @@ -361,9 +390,6 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand remainingParts.remove(part); } - - // Need to manually prepare cache message. - // TODO GG-11141. } Iterator remainingIter = remainingParts.iterator(); @@ -374,7 +400,8 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand if (iter.isPartitionDone(p)) { GridDhtLocalPartition loc = top.localPartition(p, d.topologyVersion(), false); - assert loc != null; + assert loc != null + : "Supply partition is gone: grp=" + grp.cacheOrGroupName() + ", p=" + p; s.last(p, loc.updateCounter()); @@ -387,7 +414,8 @@ else if (iter.isPartitionMissing(p)) { } } - assert remainingParts.isEmpty(); + assert remainingParts.isEmpty() + : "Partitions after rebalance should be either done or missing: " + remainingParts; if (sctx != null) clearContext(sctx, log); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index ddcb81e238965..700f0cf98b7a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -187,7 +187,7 @@ private IgniteCheckedException stopError() { AffinityAssignment aff = grp.affinity().cachedAffinity(topVer); - CachePartitionFullCountersMap cntrMap = top.fullUpdateCounters(); + CachePartitionFullCountersMap countersMap = grp.topology().fullUpdateCounters(); for (int p = 0; p < partCnt; p++) { if (ctx.exchange().hasPendingExchange()) { @@ -251,7 +251,7 @@ private IgniteCheckedException stopError() { ); } - msg.partitions().addHistorical(p, cntrMap.initialUpdateCounter(p), cntrMap.updateCounter(p), partCnt); + msg.partitions().addHistorical(p, part.initialUpdateCounter(), countersMap.updateCounter(p), partCnt); } else { Collection picked = pickOwners(p, topVer); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java index 7066e0d72ac3d..8515004c3fdf4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java @@ -90,7 +90,7 @@ public synchronized void put(UUID nodeId, int cacheId, int partId) { /** * @return {@code True} if empty. */ - public boolean isEmpty() { + public synchronized boolean isEmpty() { return map == null || map.isEmpty(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index 68ec83db1b4b6..5feaa252dd3fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -755,8 +755,8 @@ private Metas getOrAllocateCacheMetas() throws IgniteCheckedException { } /** {@inheritDoc} */ - @Override @Nullable protected IgniteHistoricalIterator historicalIterator( - CachePartitionPartialCountersMap partCntrs) throws IgniteCheckedException { + @Override @Nullable protected WALHistoricalIterator historicalIterator( + CachePartitionPartialCountersMap partCntrs, Set missing) throws IgniteCheckedException { if (partCntrs == null || partCntrs.isEmpty()) return null; @@ -773,13 +773,18 @@ private Metas getOrAllocateCacheMetas() throws IgniteCheckedException { if (startPtr == null) throw new IgniteCheckedException("Could not find start pointer for partition [part=" + p + ", partCntrSince=" + initCntr + "]"); - if (minPtr == null || startPtr.compareTo(minPtr) == -1) + if (minPtr == null || startPtr.compareTo(minPtr) < 0) minPtr = startPtr; } WALIterator it = grp.shared().wal().replay(minPtr); - return new WALIteratorAdapter(grp, partCntrs, it); + WALHistoricalIterator iterator = new WALHistoricalIterator(grp, partCntrs, it); + + // Add historical partitions which are unabled to reserve to missing set. + missing.addAll(iterator.missingParts); + + return iterator; } /** @@ -807,7 +812,7 @@ long freeSpace() { /** * */ - private static class WALIteratorAdapter implements IgniteHistoricalIterator { + private static class WALHistoricalIterator implements IgniteHistoricalIterator { /** */ private static final long serialVersionUID = 0L; @@ -817,6 +822,9 @@ private static class WALIteratorAdapter implements IgniteHistoricalIterator { /** Partition counters map. */ private final CachePartitionPartialCountersMap partMap; + /** Partitions marked as missing (unable to reserve or partition is not in OWNING state). */ + private final Set missingParts = new HashSet<>(); + /** Partitions marked as done. */ private final Set doneParts = new HashSet<>(); @@ -830,19 +838,24 @@ private static class WALIteratorAdapter implements IgniteHistoricalIterator { private Iterator entryIt; /** */ - private CacheDataRow next; + private DataEntry next; + + /** Flag indicates that partition belongs to current {@link #next} is finished and no longer needs to rebalance. */ + private boolean reachedPartitionEnd; /** * @param grp Cache context. * @param walIt WAL iterator. */ - private WALIteratorAdapter(CacheGroupContext grp, CachePartitionPartialCountersMap partMap, WALIterator walIt) { + private WALHistoricalIterator(CacheGroupContext grp, CachePartitionPartialCountersMap partMap, WALIterator walIt) { this.grp = grp; this.partMap = partMap; this.walIt = walIt; cacheIds = grp.cacheIds(); + reservePartitions(); + advance(); } @@ -859,6 +872,7 @@ private WALIteratorAdapter(CacheGroupContext grp, CachePartitionPartialCountersM /** {@inheritDoc} */ @Override public void close() throws IgniteCheckedException { walIt.close(); + releasePartitions(); } /** {@inheritDoc} */ @@ -896,7 +910,13 @@ private WALIteratorAdapter(CacheGroupContext grp, CachePartitionPartialCountersM if (next == null) throw new NoSuchElementException(); - CacheDataRow val = next; + CacheDataRow val = new DataEntryRow(next); + + if (reachedPartitionEnd) { + doneParts.add(next.partitionId()); + + reachedPartitionEnd = false; + } advance(); @@ -908,6 +928,46 @@ private WALIteratorAdapter(CacheGroupContext grp, CachePartitionPartialCountersM throw new UnsupportedOperationException(); } + /** + * Reserve historical partitions. + * If partition is unable to reserve, id of that partition is placed to {@link #missingParts} set. + */ + private void reservePartitions() { + for (int i = 0; i < partMap.size(); i++) { + int p = partMap.partitionAt(i); + GridDhtLocalPartition part = grp.topology().localPartition(p); + + if (part == null || !part.reserve()) { + missingParts.add(p); + continue; + } + + if (part.state() != OWNING) { + part.release(); + missingParts.add(p); + } + } + } + + /** + * Release historical partitions. + */ + private void releasePartitions() { + for (int i = 0; i < partMap.size(); i++) { + int p = partMap.partitionAt(i); + + if (missingParts.contains(p)) + continue; + + GridDhtLocalPartition part = grp.topology().localPartition(p); + + assert part != null && part.state() == OWNING && part.reservations() > 0 + : "Partition should in OWNING state and has at least 1 reservation"; + + part.release(); + } + } + /** * */ @@ -922,7 +982,7 @@ private void advance() { if (cacheIds.contains(entry.cacheId())) { int idx = partMap.partitionIndex(entry.partitionId()); - if (idx < 0) + if (idx < 0 || missingParts.contains(idx)) continue; long from = partMap.initialUpdateCounterAt(idx); @@ -930,9 +990,9 @@ private void advance() { if (entry.partitionCounter() >= from && entry.partitionCounter() <= to) { if (entry.partitionCounter() == to) - doneParts.add(entry.partitionId()); + reachedPartitionEnd = true; - next = new DataEntryRow(entry); + next = entry; return; } diff --git a/modules/core/src/test/config/log4j-test.xml b/modules/core/src/test/config/log4j-test.xml index 9138c02e00248..b0b08e7d1a394 100755 --- a/modules/core/src/test/config/log4j-test.xml +++ b/modules/core/src/test/config/log4j-test.xml @@ -78,12 +78,6 @@ - -