Skip to content

Commit

Permalink
IGNITE-4242 ExchangeManager should wait for cache rebalancing in asyn…
Browse files Browse the repository at this point in the history
…c way
  • Loading branch information
anton-vinogradov committed Dec 6, 2016
1 parent acf20b3 commit 6ba1711
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 202 deletions.
Expand Up @@ -21,18 +21,18 @@
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Queue;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingDeque;
Expand Down Expand Up @@ -87,7 +87,6 @@
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.GPC;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
Expand All @@ -97,13 +96,11 @@
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import org.jsr166.ConcurrentLinkedDeque8;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.getLong;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
Expand Down Expand Up @@ -156,9 +153,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** */
private GridFutureAdapter<?> reconnectExchangeFut;

/** */
private final Queue<Callable<Boolean>> rebalanceQ = new ConcurrentLinkedDeque8<>();

/**
* Partition map futures.
* This set also contains already completed exchange futures to address race conditions when coordinator
Expand Down Expand Up @@ -1596,12 +1590,8 @@ void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
long timeout = cctx.gridConfig().getNetworkTimeout();

boolean startEvtFired = false;

int cnt = 0;

IgniteInternalFuture asyncStartFut = null;

while (!isCancelled()) {
GridDhtPartitionsExchangeFuture exchFut = null;

Expand Down Expand Up @@ -1703,20 +1693,8 @@ void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
continue;

changed |= cacheCtx.topology().afterExchange(exchFut);

// Preload event notification.
if (!exchFut.skipPreload() && cacheCtx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED)) {
if (!cacheCtx.isReplicated() || !startEvtFired) {
DiscoveryEvent discoEvt = exchFut.discoveryEvent();

cacheCtx.events().addPreloadEvent(-1, EVT_CACHE_REBALANCE_STARTED,
discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
}
}
}

startEvtFired = true;

if (!cctx.kernalContext().clientNode() && changed && futQ.isEmpty())
refreshPartitions();
}
Expand Down Expand Up @@ -1755,8 +1733,6 @@ void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
if (assignsMap != null) {
int size = assignsMap.size();

rebalanceQ.clear();

NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>();

for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) {
Expand All @@ -1772,101 +1748,65 @@ void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
orderMap.get(order).add(cacheId);
}

Callable<Boolean> marshR = null;
List<Callable<Boolean>> orderedRs = new ArrayList<>(size);
Runnable r = null;

List<String> rebList = new LinkedList<>();

boolean assignsCancelled = false;

//Ordered rebalance scheduling.
for (Integer order : orderMap.keySet()) {
for (Integer order : orderMap.descendingKeySet()) {
for (Integer cacheId : orderMap.get(order)) {
GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);

List<String> waitList = new ArrayList<>(size - 1);
GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId);

for (List<Integer> cIds : orderMap.headMap(order).values()) {
for (Integer cId : cIds)
waitList.add(cctx.cacheContext(cId).name());
}
if (assigns != null)
assignsCancelled |= assigns.cancelled();

Callable<Boolean> r = cacheCtx.preloader().addAssignments(assignsMap.get(cacheId),
// Cancels previous rebalance future (in case it's not done yet).
// Sends previous rebalance stopped event (if necessary).
// Creates new rebalance future.
// Sends current rebalance started event (if necessary).
// Finishes cache sync future (on empty assignments).
Runnable cur = cacheCtx.preloader().addAssignments(assigns,
forcePreload,
waitList,
cnt);
cnt,
r);

if (r != null) {
U.log(log, "Cache rebalancing scheduled: [cache=" + cacheCtx.name() +
", waitList=" + waitList.toString() + "]");
if (cur != null) {
rebList.add(U.maskName(cacheCtx.name()));

if (cacheId == CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME))
marshR = r;
else
orderedRs.add(r);
r = cur;
}
}
}

if (asyncStartFut != null)
asyncStartFut.get(); // Wait for thread stop.
if (assignsCancelled) { // Pending exchange.
U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
"[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
}
else if (r != null) {
Collections.reverse(rebList);

rebalanceQ.addAll(orderedRs);
U.log(log, "Rebalancing scheduled [order=" + rebList + "]");

if (marshR != null || !rebalanceQ.isEmpty()) {
if (futQ.isEmpty()) {
U.log(log, "Rebalancing required " +
U.log(log, "Rebalancing started " +
"[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
", node=" + exchFut.discoveryEvent().eventNode().id() + ']');

if (marshR != null) {
try {
marshR.call(); //Marshaller cache rebalancing launches in sync way.
}
catch (Exception ex) {
if (log.isDebugEnabled())
log.debug("Failed to send initial demand request to node");

continue;
}
}

final GridFutureAdapter fut = new GridFutureAdapter();

asyncStartFut = fut;

cctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() {
@Override public Boolean call() {
try {
while (true) {
Callable<Boolean> r = rebalanceQ.poll();

if (r == null)
return false;

if (!r.call())
return false;
}
}
catch (Exception ex) {
if (log.isDebugEnabled())
log.debug("Failed to send initial demand request to node");

return false;
}
finally {
fut.onDone();
}
}
}, /*system pool*/true);
r.run(); // Starts rebalancing routine.
}
else {
else
U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
"[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
}
}
else {
else
U.log(log, "Skipping rebalancing (nothing scheduled) " +
"[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
}
}
}
catch (IgniteInterruptedCheckedException e) {
Expand Down
Expand Up @@ -19,7 +19,6 @@

import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
Expand Down Expand Up @@ -84,14 +83,14 @@ public interface GridCachePreloader {
*
* @param assignments Assignments to add.
* @param forcePreload Force preload flag.
* @param caches Rebalancing of these caches will be finished before this started.
* @param cnt Counter.
* @return Rebalancing closure.
* @param next Runnable responsible for cache rebalancing start.
* @return Rebalancing runnable.
*/
public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
boolean forcePreload,
Collection<String> caches,
int cnt);
int cnt,
Runnable next);

/**
* @param p Preload predicate.
Expand Down
Expand Up @@ -19,7 +19,6 @@

import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.AffinityFunction;
Expand Down Expand Up @@ -166,8 +165,8 @@ public GridCachePreloaderAdapter(GridCacheContext<?, ?> cctx) {
}

/** {@inheritDoc} */
@Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload,
Collection<String> caches, int cnt) {
@Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload,
int cnt, Runnable next) {
return null;
}

Expand Down

0 comments on commit 6ba1711

Please sign in to comment.