Skip to content

Commit

Permalink
IGNITE-8610 Fixed checkpoint record search for WAL delta rebalancing -
Browse files Browse the repository at this point in the history
…Fixes #4090.

Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>
  • Loading branch information
Jokser authored and agoncharuk committed Jun 15, 2018
1 parent 8d43ee8 commit 10aa02a
Show file tree
Hide file tree
Showing 22 changed files with 1,696 additions and 1,272 deletions.
Expand Up @@ -32,6 +32,7 @@
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
Expand Down Expand Up @@ -812,11 +813,23 @@ public void destroy() {
* Awaits completion of partition destroy process in case of {@code EVICTED} partition state.
*/
public void awaitDestroy() {
try {
if (state() == EVICTED)
rent.get();
} catch (IgniteCheckedException e) {
log.error("Unable to await partition destroy " + this, e);
if (state() != EVICTED)
return;

final long timeout = 10_000;

for (;;) {
try {
rent.get(timeout);

break;
}
catch (IgniteFutureTimeoutCheckedException ignored) {
U.warn(log, "Failed to await partition destroy within timeout " + this);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to await partition destroy " + this, e);
}
}
}

Expand Down
Expand Up @@ -1309,7 +1309,8 @@ void onResult(GridDhtLockResponse res) {
info.version(),
info.ttl(),
info.expireTime(),
true, topVer,
true,
topVer,
replicate ? DR_PRELOAD : DR_NONE,
false)) {
if (rec && !entry.isInternal())
Expand Down
Expand Up @@ -136,7 +136,7 @@ public GridDhtPartitionDemandMessage withNewPartitionsMap(@NotNull IgniteDhtDema
/**
* @return Partition.
*/
IgniteDhtDemandedPartitionsMap partitions() {
public IgniteDhtDemandedPartitionsMap partitions() {
return parts;
}

Expand Down
Expand Up @@ -431,12 +431,15 @@ public void affinityChangeMessage(CacheAffinityChangeMessage affChangeMsg) {
}

/**
* Retreives the node which has WAL history since {@code cntrSince}.
*
* @param grpId Cache group ID.
* @param partId Partition ID.
* @param cntrSince Partition update counter since history supplying is requested.
* @return ID of history supplier node or null if it doesn't exist.
*/
@Nullable public UUID partitionHistorySupplier(int grpId, int partId) {
return partHistSuppliers.getSupplier(grpId, partId);
@Nullable public UUID partitionHistorySupplier(int grpId, int partId, long cntrSince) {
return partHistSuppliers.getSupplier(grpId, partId, cntrSince);
}

/**
Expand Down Expand Up @@ -1191,8 +1194,9 @@ private void distributedExchange() throws IgniteCheckedException {
In case of persistent store is enabled we first restore partitions presented on disk.
We need to guarantee that there are no partition state changes logged to WAL before this callback
to make sure that we correctly restored last actual states. */
cctx.database().beforeExchange(this);
boolean restored = cctx.database().beforeExchange(this);

// Pre-create missing partitions using current affinity.
if (!exchCtx.mergeExchanges()) {
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
Expand All @@ -1204,6 +1208,10 @@ private void distributedExchange() throws IgniteCheckedException {
}
}

// After all partitions have been restored and pre-created it's safe to make first checkpoint.
if (restored)
cctx.database().onStateRestored();

changeWalModeIfNeeded();

if (crd.isLocal()) {
Expand Down Expand Up @@ -1769,6 +1777,7 @@ public void finishMerged() {
}

cctx.database().releaseHistoryForExchange();

cctx.database().rebuildIndexesIfNeeded(this);

if (err == null) {
Expand Down Expand Up @@ -2434,8 +2443,6 @@ else if (cntr == maxCntr.cnt)
maxCntr.nodes.add(cctx.localNodeId());
}

int entryLeft = maxCntrs.size();

Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;

Map<Integer, Long> localReserved = partHistReserved0 != null ? partHistReserved0.get(top.groupId()) : null;
Expand All @@ -2458,7 +2465,7 @@ else if (cntr == maxCntr.cnt)
Long localCntr = localReserved.get(p);

if (localCntr != null && localCntr <= minCntr && maxCntrObj.nodes.contains(cctx.localNodeId())) {
partHistSuppliers.put(cctx.localNodeId(), top.groupId(), p, minCntr);
partHistSuppliers.put(cctx.localNodeId(), top.groupId(), p, localCntr);

haveHistory.add(p);

Expand All @@ -2470,7 +2477,7 @@ else if (cntr == maxCntr.cnt)
Long histCntr = e0.getValue().partitionHistoryCounters(top.groupId()).get(p);

if (histCntr != null && histCntr <= minCntr && maxCntrObj.nodes.contains(e0.getKey())) {
partHistSuppliers.put(e0.getKey(), top.groupId(), p, minCntr);
partHistSuppliers.put(e0.getKey(), top.groupId(), p, histCntr);

haveHistory.add(p);

Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
Expand Down Expand Up @@ -223,6 +224,8 @@ private IgniteCheckedException stopError() {

// If partition was destroyed recreate it.
if (part.state() == EVICTED) {
part.awaitDestroy();

part = top.localPartition(p, topVer, true);
}

Expand All @@ -231,7 +234,7 @@ private IgniteCheckedException stopError() {
ClusterNode histSupplier = null;

if (grp.persistenceEnabled() && exchFut != null) {
UUID nodeId = exchFut.partitionHistorySupplier(grp.groupId(), p);
UUID nodeId = exchFut.partitionHistorySupplier(grp.groupId(), p, part.initialUpdateCounter());

if (nodeId != null)
histSupplier = ctx.discovery().node(nodeId);
Expand Down Expand Up @@ -288,6 +291,9 @@ private IgniteCheckedException stopError() {
}
}

if (!assignments.isEmpty())
ctx.database().lastCheckpointInapplicableForWalRebalance(grp.groupId());

return assignments;
}

Expand Down
Expand Up @@ -47,17 +47,22 @@ public static IgniteDhtPartitionHistorySuppliersMap empty() {
}

/**
* @param grpId Cache group ID.
* @param grpId Group ID.
* @param partId Partition ID.
* @param cntrSince Partition update counter since history supplying is requested.
* @return Supplier UUID.
*/
@Nullable public synchronized UUID getSupplier(int grpId, int partId) {
@Nullable public synchronized UUID getSupplier(int grpId, int partId, long cntrSince) {
if (map == null)
return null;

for (Map.Entry<UUID, Map<T2<Integer, Integer>, Long>> e : map.entrySet()) {
if (e.getValue().containsKey(new T2<>(grpId, partId)))
return e.getKey();
UUID supplierNode = e.getKey();

Long historyCounter = e.getValue().get(new T2<>(grpId, partId));

if (historyCounter != null && historyCounter <= cntrSince)
return supplierNode;
}

return null;
Expand Down

0 comments on commit 10aa02a

Please sign in to comment.