diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 2e24e67cae195..0f39c69654fb2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -84,8 +84,8 @@ import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl; import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter; @@ -4651,9 +4651,7 @@ protected Object readResolve() throws ObjectStreamException { /** {@inheritDoc} */ @Override public IgniteInternalFuture rebalance() { - ctx.preloader().forcePreload(); - - return ctx.preloader().syncFuture(); + return ctx.preloader().forceRebalance(); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index f04a6ce2fa025..8ea21694bbd58 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -711,9 +711,13 @@ public void forceDummyExchange(boolean reassign, * * @param exchFut Exchange future. */ - public void forcePreloadExchange(GridDhtPartitionsExchangeFuture exchFut) { + public IgniteInternalFuture forceRebalance(GridDhtPartitionsExchangeFuture exchFut) { + GridFutureAdapter fut = new GridFutureAdapter<>(); + exchWorker.addFuture( - new GridDhtPartitionsExchangeFuture(cctx, exchFut.discoveryEvent(), exchFut.exchangeId())); + new GridDhtPartitionsExchangeFuture(cctx, exchFut.discoveryEvent(), exchFut.exchangeId(), fut)); + + return fut; } /** @@ -1771,7 +1775,8 @@ void addFuture(GridDhtPartitionsExchangeFuture exchFut) { Runnable cur = cacheCtx.preloader().addAssignments(assigns, forcePreload, cnt, - r); + r, + exchFut.forcedRebalanceFuture()); if (cur != null) { rebList.add(U.maskName(cacheCtx.name())); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index 3c4456d5984d9..0c2869101aa79 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; @@ -90,7 +91,8 @@ public interface GridCachePreloader { public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, int cnt, - Runnable next); + Runnable next, + @Nullable GridFutureAdapter forcedRebFut); /** * @param p Preload predicate. @@ -150,9 +152,9 @@ public IgniteInternalFuture request(GridNearAtomicAbstractUpdateRequest AffinityTopologyVersion topVer); /** - * Force preload process. + * Force Rebalance process. */ - public void forcePreload(); + public IgniteInternalFuture forceRebalance(); /** * Unwinds undeploys. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index 656a960b56b9f..d7ec2888ca5e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; @@ -88,8 +89,8 @@ public GridCachePreloaderAdapter(GridCacheContext cctx) { } /** {@inheritDoc} */ - @Override public void forcePreload() { - // No-op. + @Override public IgniteInternalFuture forceRebalance() { + return new GridFinishedFuture<>(true); } /** {@inheritDoc} */ @@ -165,8 +166,11 @@ public GridCachePreloaderAdapter(GridCacheContext cctx) { } /** {@inheritDoc} */ - @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, - int cnt, Runnable next) { + @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, + boolean forcePreload, + int cnt, + Runnable next, + @Nullable GridFutureAdapter forcedRebFut) { return null; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index f87fa1d5d815f..f03a3b5b86e00 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -2326,9 +2326,7 @@ private void onLeave(GridCacheGateway gate) { /** {@inheritDoc} */ @Override public IgniteFuture rebalance() { - ctx.preloader().forcePreload(); - - return new IgniteFutureImpl<>(ctx.preloader().syncFuture()); + return new IgniteFutureImpl<>(ctx.preloader().forceRebalance()); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index a6808c73577e1..02c31da03bef3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.util.GridLeanSet; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -64,6 +65,7 @@ import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.spi.IgniteSpiException; @@ -216,9 +218,9 @@ void preloadPredicate(IgnitePredicate preloadPred) { } /** - * Force preload. + * Force Rebalance. */ - void forcePreload() { + IgniteInternalFuture forceRebalance() { GridTimeoutObject obj = lastTimeoutObj.getAndSet(null); if (obj != null) @@ -230,14 +232,31 @@ void forcePreload() { if (log.isDebugEnabled()) log.debug("Forcing rebalance event for future: " + exchFut); + final GridFutureAdapter fut = new GridFutureAdapter<>(); + exchFut.listen(new CI1>() { @Override public void apply(IgniteInternalFuture t) { - cctx.shared().exchange().forcePreloadExchange(exchFut); + IgniteInternalFuture fut0 = cctx.shared().exchange().forceRebalance(exchFut); + + fut0.listen(new IgniteInClosure>() { + @Override public void apply(IgniteInternalFuture future) { + try { + fut.onDone(future.get()); + } + catch (Exception e) { + fut.onDone(e); + } + } + }); } }); + + return fut; } else if (log.isDebugEnabled()) log.debug("Ignoring force rebalance request (no topology event happened yet)."); + + return new GridFinishedFuture<>(true); } /** @@ -275,15 +294,19 @@ void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut) { * @param force {@code True} if dummy reassign. * @param cnt Counter. * @param next Runnable responsible for cache rebalancing start. + * @param forcedRebFut External future for forced rebalance. * @return Rebalancing runnable. */ Runnable addAssignments(final GridDhtPreloaderAssignments assigns, boolean force, int cnt, - final Runnable next) { + final Runnable next, + @Nullable final GridFutureAdapter forcedRebFut) { if (log.isDebugEnabled()) log.debug("Adding partition assignments: " + assigns); + assert force == (forcedRebFut != null); + long delay = cctx.config().getRebalanceDelay(); if (delay == 0 || force) { @@ -301,6 +324,19 @@ Runnable addAssignments(final GridDhtPreloaderAssignments assigns, }); } + if (forcedRebFut != null) { + fut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture future) { + try { + forcedRebFut.onDone(future.get()); + } + catch (Exception e) { + forcedRebFut.onDone(e); + } + } + }); + } + rebalanceFut = fut; fut.sendRebalanceStartedEvent(); @@ -383,7 +419,7 @@ else if (delay > 0) { @Override public void onTimeout() { exchFut.listen(new CI1>() { @Override public void apply(IgniteInternalFuture f) { - cctx.shared().exchange().forcePreloadExchange(exchFut); + cctx.shared().exchange().forceRebalance(exchFut); } }); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index e945de958a339..4f3440152adcd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -54,7 +54,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; @@ -65,7 +64,6 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; @@ -194,6 +192,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter forcedRebFut; + /** * Dummy future created to trigger reassignments if partition * topology changed while preloading. @@ -227,15 +228,17 @@ public GridDhtPartitionsExchangeFuture( * @param cctx Cache context. * @param discoEvt Discovery event. * @param exchId Exchange id. + * @param forcedRebFut Forced Rebalance future. */ public GridDhtPartitionsExchangeFuture(GridCacheSharedContext cctx, DiscoveryEvent discoEvt, - GridDhtPartitionExchangeId exchId) { + GridDhtPartitionExchangeId exchId, GridFutureAdapter forcedRebFut) { dummy = false; forcePreload = true; this.exchId = exchId; this.discoEvt = discoEvt; this.cctx = cctx; + this.forcedRebFut = forcedRebFut; reassign = true; @@ -397,6 +400,13 @@ public GridDhtPartitionExchangeId exchangeId() { return exchId; } + /** + * @return Forced Rebalance future. + */ + @Nullable public GridFutureAdapter forcedRebalanceFuture() { + return forcedRebFut; + } + /** * @return {@code true} if entered to busy state. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 692e7c0de45a6..41bc2fc0096e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -413,8 +413,11 @@ public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage /** {@inheritDoc} */ @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, - boolean forcePreload, int cnt, Runnable next) { - return demander.addAssignments(assignments, forcePreload, cnt, next); + boolean forceRebalance, + int cnt, + Runnable next, + @Nullable GridFutureAdapter forcedRebFut) { + return demander.addAssignments(assignments, forceRebalance, cnt, next, forcedRebFut); } /** @@ -728,8 +731,8 @@ private GridDhtFuture request0(Collection keys, Affinity } /** {@inheritDoc} */ - @Override public void forcePreload() { - demander.forcePreload(); + @Override public IgniteInternalFuture forceRebalance() { + return demander.forceRebalance(); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java new file mode 100644 index 0000000000000..8d1f67af2c33f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Test for rebalancing. + */ +public class CacheRebalancingSelfTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCacheConfiguration(new CacheConfiguration()); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testRebalanceFuture() throws Exception { + IgniteEx ignite0 = startGrid(0); + startGrid(1); + + IgniteCache cache = ignite0.cache(null); + + IgniteFuture fut1 = cache.rebalance(); + + fut1.get(); + + startGrid(2); + + IgniteFuture fut2 = cache.rebalance(); + + assert internalFuture(fut2) != internalFuture(fut1); + + fut2.get(); + } + + /** + * @param future Future. + * @return Internal future. + */ + private static IgniteInternalFuture internalFuture(IgniteFuture future) { + assert future instanceof IgniteFutureImpl; + + return ((IgniteFutureImpl)future).internalFuture(); + } + +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java index 7f0e23c29aa19..144aac631b4d7 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java @@ -19,18 +19,19 @@ import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.CacheNearReaderUpdateTest; +import org.apache.ignite.internal.processors.cache.CacheRebalancingSelfTest; import org.apache.ignite.internal.processors.cache.CacheSerializableTransactionsTest; -import org.apache.ignite.internal.processors.cache.GridCacheSwapSpaceSpiConsistencySelfTest; import org.apache.ignite.internal.processors.cache.EntryVersionConsistencyReadThroughTest; +import org.apache.ignite.internal.processors.cache.GridCacheOffHeapCleanupTest; +import org.apache.ignite.internal.processors.cache.GridCacheSwapSpaceSpiConsistencySelfTest; import org.apache.ignite.internal.processors.cache.IgniteCachePutStackOverflowSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheReadThroughEvictionsVariationsSuite; import org.apache.ignite.internal.processors.cache.IgniteCacheStoreCollectionTest; -import org.apache.ignite.internal.processors.cache.GridCacheOffHeapCleanupTest; import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentFairAffinityTest; import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentNodeJoinValidationTest; import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentTest; -import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheSyncRebalanceModeSelfTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheTxIteratorSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheSyncRebalanceModeSelfTest; import org.apache.ignite.internal.processors.cache.store.IgniteCacheWriteBehindNoUpdateSelfTest; /** @@ -62,6 +63,8 @@ public static TestSuite suite() throws Exception { suite.addTestSuite(GridCacheOffHeapCleanupTest.class); + suite.addTestSuite(CacheRebalancingSelfTest.class); + return suite; } }