Skip to content

Commit

Permalink
IGNITE-8116 Fixed historical rebalance from WAL
Browse files Browse the repository at this point in the history
  • Loading branch information
Jokser authored and agoncharuk committed Apr 18, 2018
1 parent b058bf6 commit 5c71854
Show file tree
Hide file tree
Showing 13 changed files with 467 additions and 110 deletions.
Expand Up @@ -861,7 +861,8 @@ private long allocateForTree() throws IgniteCheckedException {
}

@Override protected void onClose() throws IgniteCheckedException {
assert loc != null && loc.state() == OWNING && loc.reservations() > 0;
assert loc != null && loc.state() == OWNING && loc.reservations() > 0
: "Partition should be in OWNING state and has at least 1 reservation: " + loc;

loc.release();
}
Expand All @@ -874,36 +875,37 @@ private long allocateForTree() throws IgniteCheckedException {
throws IgniteCheckedException {

final TreeMap<Integer, GridCloseableIterator<CacheDataRow>> iterators = new TreeMap<>();
Set<Integer> missing = null;

Set<Integer> missing = new HashSet<>();

for (Integer p : parts.fullSet()) {
GridCloseableIterator<CacheDataRow> partIter = reservedIterator(p, topVer);

if (partIter == null) {
if (missing == null)
missing = new HashSet<>();

missing.add(p);

continue;
}
else
iterators.put(p, partIter);

iterators.put(p, partIter);
}

IgniteRebalanceIterator iter = new IgniteRebalanceIteratorImpl(iterators, historicalIterator(parts.historicalMap()));
IgniteHistoricalIterator historicalIterator = historicalIterator(parts.historicalMap(), missing);

if (missing != null) {
for (Integer p : missing)
iter.setPartitionMissing(p);
}
IgniteRebalanceIterator iter = new IgniteRebalanceIteratorImpl(iterators, historicalIterator);

for (Integer p : missing)
iter.setPartitionMissing(p);

return iter;
}

/**
* @param partCntrs Partition counters map.
* @param missing Set of partitions need to populate if partition is missing or failed to reserve.
* @return Historical iterator.
*/
@Nullable protected IgniteHistoricalIterator historicalIterator(CachePartitionPartialCountersMap partCntrs)
@Nullable protected IgniteHistoricalIterator historicalIterator(CachePartitionPartialCountersMap partCntrs, Set<Integer> missing)
throws IgniteCheckedException {
return null;
}
Expand Down
Expand Up @@ -459,15 +459,17 @@ private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssign
+ ", topology=" + fut.topologyVersion() + ", rebalanceId=" + fut.rebalanceId + "]");
}

int stripes = ctx.gridConfig().getRebalanceThreadPoolSize();
int totalStripes = ctx.gridConfig().getRebalanceThreadPoolSize();

int stripes = totalStripes;

final List<IgniteDhtDemandedPartitionsMap> stripePartitions = new ArrayList<>(stripes);
for (int i = 0; i < stripes; i++)
stripePartitions.add(new IgniteDhtDemandedPartitionsMap());

// Reserve one stripe for historical partitions.
if (parts.hasHistorical()) {
stripePartitions.add(stripes - 1, new IgniteDhtDemandedPartitionsMap(parts.historicalMap(), null));
stripePartitions.set(stripes - 1, new IgniteDhtDemandedPartitionsMap(parts.historicalMap(), null));

if (stripes > 1)
stripes--;
Expand All @@ -478,7 +480,7 @@ private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssign
for (int i = 0; it.hasNext(); i++)
stripePartitions.get(i % stripes).addFull(it.next());

for (int stripe = 0; stripe < stripes; stripe++) {
for (int stripe = 0; stripe < totalStripes; stripe++) {
if (!stripePartitions.get(stripe).isEmpty()) {
// Create copy of demand message with new striped partitions map.
final GridDhtPartitionDemandMessage demandMsg = d.withNewPartitionsMap(stripePartitions.get(stripe));
Expand All @@ -489,23 +491,27 @@ private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssign

final int topicId = stripe;

Runnable initDemandRequestTask = () -> {
IgniteInternalFuture<?> clearAllFuture = clearFullPartitions(fut, demandMsg.partitions().fullSet());

// Start rebalancing after clearing full partitions is finished.
clearAllFuture.listen(f -> ctx.kernalContext().closure().runLocalSafe(() -> {
if (fut.isDone())
return;

try {
if (!fut.isDone()) {
ctx.io().sendOrderedMessage(node, rebalanceTopics.get(topicId),
demandMsg.convertIfNeeded(node.version()), grp.ioPolicy(), demandMsg.timeout());

// Cleanup required in case partitions demanded in parallel with cancellation.
synchronized (fut) {
if (fut.isDone())
fut.cleanupRemoteContexts(node.id());
}
ctx.io().sendOrderedMessage(node, rebalanceTopics.get(topicId),
demandMsg.convertIfNeeded(node.version()), grp.ioPolicy(), demandMsg.timeout());

if (log.isDebugEnabled())
log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" +
topicId + ", partitions count=" + stripePartitions.get(topicId).size() +
" (" + stripePartitions.get(topicId).partitionsList() + ")]");
// Cleanup required in case partitions demanded in parallel with cancellation.
synchronized (fut) {
if (fut.isDone())
fut.cleanupRemoteContexts(node.id());
}

if (log.isDebugEnabled())
log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" +
topicId + " " + demandMsg.rebalanceId() + ", partitions count=" + stripePartitions.get(topicId).size() +
" (" + stripePartitions.get(topicId).partitionsList() + ")]");
}
catch (IgniteCheckedException e1) {
ClusterTopologyCheckedException cause = e1.getCause(ClusterTopologyCheckedException.class);
Expand All @@ -522,31 +528,26 @@ private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssign

fut.cancel();
}
};

awaitClearingAndStartRebalance(fut, demandMsg, initDemandRequestTask);
}, true));
}
}
}
}

/**
* Awaits partitions clearing for full partitions and sends initial demand request
* after all partitions are cleared and safe to consume data.
* Creates future which will be completed when all {@code fullPartitions} are cleared.
*
* @param fut Rebalance future.
* @param demandMessage Initial demand message which contains set of full partitions to await.
* @param initDemandRequestTask Task which sends initial demand request.
* @param fullPartitions Set of full partitions need to be cleared.
* @return Future which will be completed when given partitions are cleared.
*/
private void awaitClearingAndStartRebalance(RebalanceFuture fut,
GridDhtPartitionDemandMessage demandMessage,
Runnable initDemandRequestTask) {
Set<Integer> fullPartitions = demandMessage.partitions().fullSet();
private IgniteInternalFuture<?> clearFullPartitions(RebalanceFuture fut, Set<Integer> fullPartitions) {
final GridFutureAdapter clearAllFuture = new GridFutureAdapter();

if (fullPartitions.isEmpty()) {
ctx.kernalContext().closure().runLocalSafe(initDemandRequestTask, true);
clearAllFuture.onDone();

return;
return clearAllFuture;
}

for (GridCacheContext cctx : grp.caches()) {
Expand All @@ -560,16 +561,19 @@ private void awaitClearingAndStartRebalance(RebalanceFuture fut,
final AtomicInteger clearingPartitions = new AtomicInteger(fullPartitions.size());

for (int partId : fullPartitions) {
if (fut.isDone())
return;
if (fut.isDone()) {
clearAllFuture.onDone();

return clearAllFuture;
}

GridDhtLocalPartition part = grp.topology().localPartition(partId);

if (part != null && part.state() == MOVING) {
part.onClearFinished(f -> {
// Cancel rebalance if partition clearing was failed.
if (f.error() != null) {
if (!fut.isDone()) {
if (!fut.isDone()) {
// Cancel rebalance if partition clearing was failed.
if (f.error() != null) {
for (GridCacheContext cctx : grp.caches()) {
if (cctx.statisticsEnabled()) {
final CacheMetricsImpl metrics = cctx.cache().metrics0();
Expand All @@ -581,30 +585,54 @@ private void awaitClearingAndStartRebalance(RebalanceFuture fut,
log.error("Unable to await partition clearing " + part, f.error());

fut.cancel();

clearAllFuture.onDone(f.error());
}
}
else {
if (!fut.isDone()) {
int existed = clearingPartitions.decrementAndGet();
else {
int remaining = clearingPartitions.decrementAndGet();

for (GridCacheContext cctx : grp.caches()) {
if (cctx.statisticsEnabled()) {
final CacheMetricsImpl metrics = cctx.cache().metrics0();

metrics.rebalanceClearingPartitions(existed);
metrics.rebalanceClearingPartitions(remaining);
}
}

// If all partitions are cleared send initial demand message.
if (existed == 0)
ctx.kernalContext().closure().runLocalSafe(initDemandRequestTask, true);
if (log.isDebugEnabled())
log.debug("Remaining clearing partitions [grp=" + grp.cacheOrGroupName()
+ ", remaining=" + remaining + "]");

if (remaining == 0)
clearAllFuture.onDone();
}
}
else {
clearAllFuture.onDone();
}
});
}
else
clearingPartitions.decrementAndGet();
else {
int remaining = clearingPartitions.decrementAndGet();

for (GridCacheContext cctx : grp.caches()) {
if (cctx.statisticsEnabled()) {
final CacheMetricsImpl metrics = cctx.cache().metrics0();

metrics.rebalanceClearingPartitions(remaining);
}
}

if (log.isDebugEnabled())
log.debug("Remaining clearing partitions [grp=" + grp.cacheOrGroupName()
+ ", remaining=" + remaining + "]");

if (remaining == 0)
clearAllFuture.onDone();
}
}

return clearAllFuture;
}

/**
Expand Down
Expand Up @@ -173,7 +173,8 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand

if (curTop.compareTo(demTop) > 0) {
if (log.isDebugEnabled())
log.debug("Demand request outdated [currentTopVer=" + curTop
log.debug("Demand request outdated [grp=" + grp.cacheOrGroupName()
+ ", currentTopVer=" + curTop
+ ", demandTopVer=" + demTop
+ ", from=" + nodeId
+ ", topicId=" + topicId + "]");
Expand All @@ -189,24 +190,36 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand

if (sctx != null && sctx.rebalanceId == -d.rebalanceId()) {
clearContext(scMap.remove(contextId), log);

if (log.isDebugEnabled())
log.debug("Supply context cleaned [grp=" + grp.cacheOrGroupName()
+ ", from=" + nodeId
+ ", demandMsg=" + d
+ ", supplyContext=" + sctx + "]");
}
else {
if (log.isDebugEnabled())
log.debug("Stale context cleanup message " + d + ", supplyContext=" + sctx);
log.debug("Stale supply context cleanup message [grp=" + grp.cacheOrGroupName()
+ ", from=" + nodeId
+ ", demandMsg=" + d
+ ", supplyContext=" + sctx + "]");
}

return;
}
}

if (log.isDebugEnabled())
log.debug("Demand request accepted [current=" + curTop + ", demanded=" + demTop +
", from=" + nodeId + ", topicId=" + topicId + "]");
log.debug("Demand request accepted [grp=" + grp.cacheOrGroupName()
+ ", from=" + nodeId
+ ", currentVer=" + curTop
+ ", demandedVer=" + demTop
+ ", topicId=" + topicId + "]");

ClusterNode node = grp.shared().discovery().node(nodeId);

if (node == null)
return; // Context will be cleaned at topology change.
return;

try {
SupplyContext sctx;
Expand All @@ -217,13 +230,27 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand
if (sctx != null && d.rebalanceId() < sctx.rebalanceId) {
// Stale message, return context back and return.
scMap.put(contextId, sctx);

if (log.isDebugEnabled())
log.debug("Stale demand message [grp=" + grp.cacheOrGroupName()
+ ", actualContext=" + sctx
+ ", from=" + nodeId
+ ", demandMsg=" + d + "]");

return;
}
}

// Demand request should not contain empty partitions if no supply context is associated with it.
if (sctx == null && (d.partitions() == null || d.partitions().isEmpty()))
if (sctx == null && (d.partitions() == null || d.partitions().isEmpty())) {
if (log.isDebugEnabled())
log.debug("Empty demand message [grp=" + grp.cacheOrGroupName()
+ ", from=" + nodeId
+ ", topicId=" + topicId
+ ", demandMsg=" + d + "]");

return;
}

assert !(sctx != null && !d.partitions().isEmpty());

Expand Down Expand Up @@ -271,7 +298,8 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand

GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);

assert loc != null && loc.state() == GridDhtPartitionState.OWNING;
assert loc != null && loc.state() == GridDhtPartitionState.OWNING
: "Partition should be in OWNING state: " + loc;

s.addEstimatedKeysCount(grp.offheap().totalPartitionEntriesCount(part));
}
Expand Down Expand Up @@ -323,7 +351,8 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand

GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);

assert (loc != null && loc.state() == OWNING && loc.reservations() > 0) || iter.isPartitionMissing(part) : loc;
assert (loc != null && loc.state() == OWNING && loc.reservations() > 0) || iter.isPartitionMissing(part)
: "Partition should be in OWNING state and has at least 1 reservation " + loc;

if (iter.isPartitionMissing(part) && remainingParts.contains(part)) {
s.missed(part);
Expand Down Expand Up @@ -361,9 +390,6 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand

remainingParts.remove(part);
}

// Need to manually prepare cache message.
// TODO GG-11141.
}

Iterator<Integer> remainingIter = remainingParts.iterator();
Expand All @@ -374,7 +400,8 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand
if (iter.isPartitionDone(p)) {
GridDhtLocalPartition loc = top.localPartition(p, d.topologyVersion(), false);

assert loc != null;
assert loc != null
: "Supply partition is gone: grp=" + grp.cacheOrGroupName() + ", p=" + p;

s.last(p, loc.updateCounter());

Expand All @@ -387,7 +414,8 @@ else if (iter.isPartitionMissing(p)) {
}
}

assert remainingParts.isEmpty();
assert remainingParts.isEmpty()
: "Partitions after rebalance should be either done or missing: " + remainingParts;

if (sctx != null)
clearContext(sctx, log);
Expand Down

0 comments on commit 5c71854

Please sign in to comment.