Skip to content

Commit

Permalink
IGNITE-4336 Manual rebalance can't be requested twice
Browse files Browse the repository at this point in the history
  • Loading branch information
ilantukh authored and anton-vinogradov committed Dec 8, 2016
1 parent 59e6fec commit 9a691c4
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 32 deletions.
Expand Up @@ -84,8 +84,8 @@
import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl; import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl;
import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy; import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
Expand Down Expand Up @@ -4651,9 +4651,7 @@ protected Object readResolve() throws ObjectStreamException {


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public IgniteInternalFuture<?> rebalance() { @Override public IgniteInternalFuture<?> rebalance() {
ctx.preloader().forcePreload(); return ctx.preloader().forceRebalance();

return ctx.preloader().syncFuture();
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down
Expand Up @@ -711,9 +711,13 @@ public void forceDummyExchange(boolean reassign,
* *
* @param exchFut Exchange future. * @param exchFut Exchange future.
*/ */
public void forcePreloadExchange(GridDhtPartitionsExchangeFuture exchFut) { public IgniteInternalFuture<Boolean> forceRebalance(GridDhtPartitionsExchangeFuture exchFut) {
GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();

exchWorker.addFuture( exchWorker.addFuture(
new GridDhtPartitionsExchangeFuture(cctx, exchFut.discoveryEvent(), exchFut.exchangeId())); new GridDhtPartitionsExchangeFuture(cctx, exchFut.discoveryEvent(), exchFut.exchangeId(), fut));

return fut;
} }


/** /**
Expand Down Expand Up @@ -1771,7 +1775,8 @@ void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
Runnable cur = cacheCtx.preloader().addAssignments(assigns, Runnable cur = cacheCtx.preloader().addAssignments(assigns,
forcePreload, forcePreload,
cnt, cnt,
r); r,
exchFut.forcedRebalanceFuture());


if (cur != null) { if (cur != null) {
rebList.add(U.maskName(cacheCtx.name())); rebList.add(U.maskName(cacheCtx.name()));
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;


Expand Down Expand Up @@ -90,7 +91,8 @@ public interface GridCachePreloader {
public Runnable addAssignments(GridDhtPreloaderAssignments assignments, public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
boolean forcePreload, boolean forcePreload,
int cnt, int cnt,
Runnable next); Runnable next,
@Nullable GridFutureAdapter<Boolean> forcedRebFut);


/** /**
* @param p Preload predicate. * @param p Preload predicate.
Expand Down Expand Up @@ -150,9 +152,9 @@ public IgniteInternalFuture<Object> request(GridNearAtomicAbstractUpdateRequest
AffinityTopologyVersion topVer); AffinityTopologyVersion topVer);


/** /**
* Force preload process. * Force Rebalance process.
*/ */
public void forcePreload(); public IgniteInternalFuture<Boolean> forceRebalance();


/** /**
* Unwinds undeploys. * Unwinds undeploys.
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;


Expand Down Expand Up @@ -88,8 +89,8 @@ public GridCachePreloaderAdapter(GridCacheContext<?, ?> cctx) {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void forcePreload() { @Override public IgniteInternalFuture<Boolean> forceRebalance() {
// No-op. return new GridFinishedFuture<>(true);
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down Expand Up @@ -165,8 +166,11 @@ public GridCachePreloaderAdapter(GridCacheContext<?, ?> cctx) {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
int cnt, Runnable next) { boolean forcePreload,
int cnt,
Runnable next,
@Nullable GridFutureAdapter<Boolean> forcedRebFut) {
return null; return null;
} }


Expand Down
Expand Up @@ -2326,9 +2326,7 @@ private void onLeave(GridCacheGateway<K, V> gate) {


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public IgniteFuture<?> rebalance() { @Override public IgniteFuture<?> rebalance() {
ctx.preloader().forcePreload(); return new IgniteFutureImpl<>(ctx.preloader().forceRebalance());

return new IgniteFutureImpl<>(ctx.preloader().syncFuture());
} }


/** /**
Expand Down
Expand Up @@ -55,6 +55,7 @@
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude;
Expand All @@ -64,6 +65,7 @@
import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.IgniteSpiException;
Expand Down Expand Up @@ -216,9 +218,9 @@ void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
} }


/** /**
* Force preload. * Force Rebalance.
*/ */
void forcePreload() { IgniteInternalFuture<Boolean> forceRebalance() {
GridTimeoutObject obj = lastTimeoutObj.getAndSet(null); GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);


if (obj != null) if (obj != null)
Expand All @@ -230,14 +232,31 @@ void forcePreload() {
if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Forcing rebalance event for future: " + exchFut); log.debug("Forcing rebalance event for future: " + exchFut);


final GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();

exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
cctx.shared().exchange().forcePreloadExchange(exchFut); IgniteInternalFuture<Boolean> fut0 = cctx.shared().exchange().forceRebalance(exchFut);

fut0.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
@Override public void apply(IgniteInternalFuture<Boolean> future) {
try {
fut.onDone(future.get());
}
catch (Exception e) {
fut.onDone(e);
}
}
});
} }
}); });

return fut;
} }
else if (log.isDebugEnabled()) else if (log.isDebugEnabled())
log.debug("Ignoring force rebalance request (no topology event happened yet)."); log.debug("Ignoring force rebalance request (no topology event happened yet).");

return new GridFinishedFuture<>(true);
} }


/** /**
Expand Down Expand Up @@ -275,15 +294,19 @@ void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut) {
* @param force {@code True} if dummy reassign. * @param force {@code True} if dummy reassign.
* @param cnt Counter. * @param cnt Counter.
* @param next Runnable responsible for cache rebalancing start. * @param next Runnable responsible for cache rebalancing start.
* @param forcedRebFut External future for forced rebalance.
* @return Rebalancing runnable. * @return Rebalancing runnable.
*/ */
Runnable addAssignments(final GridDhtPreloaderAssignments assigns, Runnable addAssignments(final GridDhtPreloaderAssignments assigns,
boolean force, boolean force,
int cnt, int cnt,
final Runnable next) { final Runnable next,
@Nullable final GridFutureAdapter<Boolean> forcedRebFut) {
if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Adding partition assignments: " + assigns); log.debug("Adding partition assignments: " + assigns);


assert force == (forcedRebFut != null);

long delay = cctx.config().getRebalanceDelay(); long delay = cctx.config().getRebalanceDelay();


if (delay == 0 || force) { if (delay == 0 || force) {
Expand All @@ -301,6 +324,19 @@ Runnable addAssignments(final GridDhtPreloaderAssignments assigns,
}); });
} }


if (forcedRebFut != null) {
fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
@Override public void apply(IgniteInternalFuture<Boolean> future) {
try {
forcedRebFut.onDone(future.get());
}
catch (Exception e) {
forcedRebFut.onDone(e);
}
}
});
}

rebalanceFut = fut; rebalanceFut = fut;


fut.sendRebalanceStartedEvent(); fut.sendRebalanceStartedEvent();
Expand Down Expand Up @@ -383,7 +419,7 @@ else if (delay > 0) {
@Override public void onTimeout() { @Override public void onTimeout() {
exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
cctx.shared().exchange().forcePreloadExchange(exchFut); cctx.shared().exchange().forceRebalance(exchFut);
} }
}); });
} }
Expand Down
Expand Up @@ -54,7 +54,6 @@
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
Expand All @@ -65,7 +64,6 @@
import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.S;
Expand Down Expand Up @@ -194,6 +192,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/** */ /** */
private boolean centralizedAff; private boolean centralizedAff;


/** Forced Rebalance future. */
private GridFutureAdapter<Boolean> forcedRebFut;

/** /**
* Dummy future created to trigger reassignments if partition * Dummy future created to trigger reassignments if partition
* topology changed while preloading. * topology changed while preloading.
Expand Down Expand Up @@ -227,15 +228,17 @@ public GridDhtPartitionsExchangeFuture(
* @param cctx Cache context. * @param cctx Cache context.
* @param discoEvt Discovery event. * @param discoEvt Discovery event.
* @param exchId Exchange id. * @param exchId Exchange id.
* @param forcedRebFut Forced Rebalance future.
*/ */
public GridDhtPartitionsExchangeFuture(GridCacheSharedContext cctx, DiscoveryEvent discoEvt, public GridDhtPartitionsExchangeFuture(GridCacheSharedContext cctx, DiscoveryEvent discoEvt,
GridDhtPartitionExchangeId exchId) { GridDhtPartitionExchangeId exchId, GridFutureAdapter<Boolean> forcedRebFut) {
dummy = false; dummy = false;
forcePreload = true; forcePreload = true;


this.exchId = exchId; this.exchId = exchId;
this.discoEvt = discoEvt; this.discoEvt = discoEvt;
this.cctx = cctx; this.cctx = cctx;
this.forcedRebFut = forcedRebFut;


reassign = true; reassign = true;


Expand Down Expand Up @@ -397,6 +400,13 @@ public GridDhtPartitionExchangeId exchangeId() {
return exchId; return exchId;
} }


/**
* @return Forced Rebalance future.
*/
@Nullable public GridFutureAdapter<Boolean> forcedRebalanceFuture() {
return forcedRebFut;
}

/** /**
* @return {@code true} if entered to busy state. * @return {@code true} if entered to busy state.
*/ */
Expand Down
Expand Up @@ -413,8 +413,11 @@ public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
boolean forcePreload, int cnt, Runnable next) { boolean forceRebalance,
return demander.addAssignments(assignments, forcePreload, cnt, next); int cnt,
Runnable next,
@Nullable GridFutureAdapter<Boolean> forcedRebFut) {
return demander.addAssignments(assignments, forceRebalance, cnt, next, forcedRebFut);
} }


/** /**
Expand Down Expand Up @@ -728,8 +731,8 @@ private GridDhtFuture<Object> request0(Collection<KeyCacheObject> keys, Affinity
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void forcePreload() { @Override public IgniteInternalFuture<Boolean> forceRebalance() {
demander.forcePreload(); return demander.forceRebalance();
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.ignite.internal.processors.cache;

import org.apache.ignite.IgniteCache;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;

/**
* Test for rebalancing.
*/
public class CacheRebalancingSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);

cfg.setCacheConfiguration(new CacheConfiguration());

return cfg;
}

/**
* @throws Exception If failed.
*/
public void testRebalanceFuture() throws Exception {
IgniteEx ignite0 = startGrid(0);
startGrid(1);

IgniteCache<Object, Object> cache = ignite0.cache(null);

IgniteFuture fut1 = cache.rebalance();

fut1.get();

startGrid(2);

IgniteFuture fut2 = cache.rebalance();

assert internalFuture(fut2) != internalFuture(fut1);

fut2.get();
}

/**
* @param future Future.
* @return Internal future.
*/
private static IgniteInternalFuture internalFuture(IgniteFuture future) {
assert future instanceof IgniteFutureImpl;

return ((IgniteFutureImpl)future).internalFuture();
}

}

0 comments on commit 9a691c4

Please sign in to comment.