Skip to content
Permalink
Browse files
GEODE-10290: GII requester should remove departed members (#7670)
  • Loading branch information
gesterzhou committed May 18, 2022
1 parent cbd8795 commit 3d6354cb6b182d54531a8103a357f03754cf5165
Showing 5 changed files with 332 additions and 13 deletions.
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;

import static org.apache.geode.test.dunit.VM.getVM;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.Serializable;
import java.util.Map;
import java.util.Set;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.control.RebalanceOperation;
import org.apache.geode.cache.control.RebalanceResults;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.CacheRule;
import org.apache.geode.test.dunit.rules.DistributedRule;

public class PartitionedRegionRestartRebalanceDUnitTest implements Serializable {
private static final int REDUNDANT_COPIES = 2;
private static final int TOTAL_NUM_BUCKETS = 12;
private static final Logger logger = LogManager.getLogger();

private String REGION_NAME = getClass().getSimpleName();;
private VM[] datastores;

@Rule
public DistributedRule distributedRule = new DistributedRule();

@Rule
public CacheRule cacheRule = new CacheRule();

@Before
public void setUp() throws Exception {
datastores = new VM[4];
for (int i = 0; i < datastores.length; i++) {
datastores[i] = getVM(i);
datastores[i].invoke(() -> cacheRule.createCache());
datastores[i].invoke(() -> createRegion());
}
datastores[0].invoke(() -> feedData());
}

private void createRegion() {
PartitionAttributesFactory<String, Integer> paf = new PartitionAttributesFactory();
paf.setRedundantCopies(REDUNDANT_COPIES);
paf.setTotalNumBuckets(TOTAL_NUM_BUCKETS);

RegionFactory<String, Integer> rf = cacheRule.getCache().createRegionFactory();
rf.setDataPolicy(DataPolicy.PARTITION);
rf.setPartitionAttributes(paf.create());
LocalRegion region = (LocalRegion) rf.create(REGION_NAME);
}

private void feedData() throws InterruptedException {
PartitionedRegion pr = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME);
for (int i = 0; i < TOTAL_NUM_BUCKETS * 2; i++) {
pr.put(i, "VALUE-" + i);
if (i < TOTAL_NUM_BUCKETS) {
pr.destroy(i);
}
}
cacheRule.getCache().getTombstoneService().forceBatchExpirationForTests(TOTAL_NUM_BUCKETS);
}

private void rebalance() throws InterruptedException {
RebalanceOperation op =
cacheRule.getCache().getResourceManager().createRebalanceFactory().start();
RebalanceResults results = op.getResults();
logger.info("Rebalance total time is " + results.getTotalTime());
}

private void verify() {
PartitionedRegion pr = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME);
for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) {
Set<VersionSource> departedMemberSet = br.getVersionVector().getDepartedMembersSet();
for (Object key : br.getRegionKeysForIteration()) {
RegionEntry entry = br.getRegionEntry(key);
departedMemberSet.remove(entry.getVersionStamp().getMemberID());
if (departedMemberSet.isEmpty()) {
break;
}
}
Map map = br.getVersionVector().getMemberToVersion();
for (Object key : br.getVersionVector().getMemberToVersion().keySet()) {
logger.info(br.getFullPath() + ":" + key + ":"
+ br.getVersionVector().getMemberToVersion().get(key));
}
// The test proved that departedMemberSet is not growing
assertThat(departedMemberSet.size()).isLessThanOrEqualTo(datastores.length);
}
}

@Test
public void restartAndRebalanceShouldNotIncreaseMemberToVersionMap() {
for (int i = 0; i < datastores.length * 10; i++) {
datastores[i % datastores.length].invoke(() -> {
cacheRule.getCache().close();
});
datastores[(i + 1) % datastores.length].invoke(() -> {
rebalance();
verify();
});
datastores[i % datastores.length].invoke(() -> {
cacheRule.createCache();
createRegion();
rebalance();
verify();
});
}
}
}
@@ -358,7 +358,7 @@ GIIStatus getFromOne(Set<InternalDistributedMember> recipientSet, boolean target
// remote_rvv will be filled with the versions of unfinished keys
// then if recoveredRVV is still newer than the filled remote_rvv, do fullGII
remote_rvv = received_rvv.getCloneForTransmission();
keysOfUnfinishedOps = processReceivedRVV(remote_rvv, recoveredRVV);
keysOfUnfinishedOps = processReceivedRVV(remote_rvv, recoveredRVV, received_rvv);
if (internalAfterCalculatedUnfinishedOps != null
&& internalAfterCalculatedUnfinishedOps.getRegionName().equals(region.getName())) {
internalAfterCalculatedUnfinishedOps.run();
@@ -1052,21 +1052,31 @@ protected RegionVersionVector getRVVFromProvider(final ClusterDistributionManage
/**
* Compare the received RVV with local RVV and return a set of keys for unfinished operations.
*
* @param remoteRVV RVV from provider
* @param remoteRVV RVV from provider to be filled with unfinished operations
* @param localRVV RVV recovered from disk
* @param receivedRVV original RVV from provider to remove departed members
* @return set for keys of unfinished operations.
*/
protected Set<Object> processReceivedRVV(RegionVersionVector remoteRVV,
RegionVersionVector localRVV) {
RegionVersionVector localRVV, RegionVersionVector receivedRVV) {
if (remoteRVV == null) {
return null;
}
// calculate keys for unfinished ops
HashSet<Object> keys = new HashSet<>();
if (region.getDataPolicy().withPersistence()
&& localRVV.isNewerThanOrCanFillExceptionsFor(remoteRVV)) {
// only search for unfinished keys when localRVV has something newer
// and the region is persistent region
Set<VersionSource> departedMemberSet = receivedRVV.getDepartedMembersSet();
boolean isPersistentRegion = region.getDataPolicy().withPersistence();
Set<VersionSource> foundIds;
if (!isPersistentRegion) {
foundIds = new HashSet<>();
} else {
foundIds = Collections.emptySet();
}
if ((isPersistentRegion && localRVV.isNewerThanOrCanFillExceptionsFor(remoteRVV))
|| !departedMemberSet.isEmpty()) {
// Only search for unfinished keys when localRVV has something newer
// and the region is persistent region.
// Search for departed members if region is not persistent region
Iterator<RegionEntry> it = region.getBestIterator(false);
int count = 0;
VersionSource<?> myId = region.getVersionMember();
@@ -1077,7 +1087,9 @@ protected Set<Object> processReceivedRVV(RegionVersionVector remoteRVV,
if (id == null) {
id = myId;
}
if (!remoteRVV.contains(id, stamp.getRegionVersion())) {
if (!isPersistentRegion) {
foundIds.add(id);
} else if (!remoteRVV.contains(id, stamp.getRegionVersion())) {
// found an unfinished operation
keys.add(mapEntry.getKey());
remoteRVV.recordVersion(id, stamp.getRegionVersion());
@@ -1100,6 +1112,13 @@ protected Set<Object> processReceivedRVV(RegionVersionVector remoteRVV,
}
}
}
if (!departedMemberSet.isEmpty()) {
if (localRVV != null) {
localRVV.removeOldMembers(foundIds);
}
receivedRVV.removeOldMembers(foundIds);
remoteRVV.removeOldMembers(foundIds);
}
return keys;
}

@@ -2042,11 +2061,9 @@ protected boolean chunkEntries(DistributedRegion rgn, int chunkSizeInBytes,
// if this region is destroyed while we are sending data, then abort.
} while (keepGoing && it.hasNext());

if (foundIds.size() > 0) {
RegionVersionVector vv = rgn.getVersionVector();
if (vv != null) {
vv.removeOldMembers(foundIds);
}
RegionVersionVector vv = rgn.getVersionVector();
if (vv != null) {
vv.removeOldMembers(foundIds);
}
// return false if we were told to abort
return sentLastChunk;
@@ -207,6 +207,8 @@ public synchronized String toString() {
sb.append(exceptions);
}
sb.append("}");
sb.append("id=" + id);
sb.append(",departed?" + isDepartedMember);
return sb.toString();
}

@@ -196,6 +196,7 @@ public RegionVersionVector(T ownerId, LocalRegion owner) {
isLiveVector = true;
region = owner;
localExceptions = new RegionVersionHolder<>(0);
localExceptions.id = myId;
memberToVersion =
new ConcurrentHashMap<>(INITIAL_CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
memberToGCVersion =
@@ -609,6 +610,9 @@ public void initializeVersionHolder(T mbr, RegionVersionHolder<T> otherHolder) {
if (!mbr.equals(myId)) {
h = otherHolder.clone();
h.makeReadyForRecording();
if (h.id == null) {
h.id = mbr;
}
memberToVersion.put(mbr, h);
} else {
RegionVersionHolder<T> vh = otherHolder;

0 comments on commit 3d6354c

Please sign in to comment.