Skip to content

Commit

Permalink
GG-20702 Fix invalid partition clearing.
Browse files Browse the repository at this point in the history
  • Loading branch information
ascherbakoff committed Jul 3, 2019
1 parent afdc96e commit cafab0d
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,8 @@ private boolean casState(long state, GridDhtPartitionState toState) {
*/
public boolean own() {
while (true) {
assert !clear : "Could not own clearing partition " + this;

long state = this.state.get();

GridDhtPartitionState partState = getPartState(state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@

import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.ExchangeType.ALL;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
Expand Down Expand Up @@ -763,7 +764,8 @@ private boolean partitionLocalNode(int p, AffinityTopologyVersion topVer) {

long updateSeq = this.updateSeq.incrementAndGet();

if (!ctx.localNode().isClient()) {
// Skip partition updates in case of not real exchange.
if (!ctx.localNode().isClient() && exchFut.exchangeType() == ALL) {
for (int p = 0; p < partitions; p++) {
GridDhtLocalPartition locPart = localPartition0(p, topVer, false, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ private void showProgress() {
", grpId=" + grp.groupId() +
", remainingPartsToEvict=" + (totalTasks.get() - taskInProgress) +
", partsEvictInProgress=" + taskInProgress +
", totalParts= " + grp.topology().localPartitions().size() + "]");
", totalParts=" + grp.topology().localPartitions().size() + "]");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,43 @@ public void testRebalancedPartitionsOwningWithConcurrentAffinityChange() throws
verifyCache(ig1.cache(CACHE3_NAME), GENERATING_FUNC);
}

/**
* Scenario: when rebalanced MOVING partitions are owning by checkpointer,
* concurrent no-op exchange should not trigger partition clearing.
*
* @throws Exception If failed.
*/
@Test
public void testRebalancedPartitionsOwningWithAffinitySwitch() throws Exception {
Ignite ig0 = startGridsMultiThreaded(4);
fillCache(ig0.dataStreamer(CACHE3_NAME), CACHE_SIZE, GENERATING_FUNC);

// Stop idx=2 to prepare for baseline topology change later.
stopGrid(2);

// Stop idx=1 and cleanup LFS to trigger full rebalancing after it restart.
String ig1Name = "node01-" + grid(1).localNode().consistentId();
stopGrid(1);
cleanPersistenceFiles(ig1Name);

// Blocking fileIO and blockMessagePredicate to block checkpointer and rebalancing for node idx=1.
useBlockingFileIO = true;

// Enable blocking checkpointer on node idx=1 (see BlockingCheckpointFileIOFactory).
fileIoBlockingSemaphore.drainPermits();

// Wait for rebalance (all partitions will be in MOVING state until cp is finished).
startGrid(1).cachex(CACHE3_NAME).context().group().preloader().rebalanceFuture().get();

startGrid("client");

fileIoBlockingSemaphore.release(Integer.MAX_VALUE);

awaitPartitionMapExchange();

assertPartitionsSame(idleVerify(grid(0), CACHE3_NAME));
}

/** FileIOFactory implementation that enables blocking of writes to disk so checkpoint can be blocked. */
private static class BlockingCheckpointFileIOFactory implements FileIOFactory {
/** Serial version uid. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,8 @@ protected void printPartitionState(String cacheName, int firstParts) {

sb.append("nodeId=")
.append(k.context().localNodeId())
.append(" consistentId=")
.append(k.localNode().consistentId())
.append(" isDone=")
.append(syncFut.isDone())
.append("\n");
Expand Down Expand Up @@ -952,7 +954,8 @@ protected void printPartitionState(String cacheName, int firstParts) {
.append(part == null ? "NA" : part.dataStore().partUpdateCounter())
.append(" fullSize=")
.append(part == null ? "NA" : part.fullSize())
.append(" state=").append(part.state());
.append(" state=").append(part.state())
.append(" reservations=").append(part.reservations());
}
else
sb.append(p).append(" is null");
Expand Down

0 comments on commit cafab0d

Please sign in to comment.