Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increment cross regional duplicate tokens to replicate the policy we have been applying manually. #1048

Merged
merged 2 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,30 +155,53 @@ public PriamInstance retriableCall() throws Exception {
set(100, 100);
logger.info("Trying to generate a new token");
sleeper.sleep(new Random().nextInt(15000));
String myRegion = myInstanceInfo.getRegion();
// this offset ensures the nodes are spread far away from the other regions.
int regionOffset = tokenManager.regionOffset(myRegion);
String myRac = myInstanceInfo.getRac();
List<String> racs = config.getRacs();
int mySlot =
factory.getAllIds(config.getAppName())
.stream()
.filter(i -> i.getRac().equals(myRac))
.map(PriamInstance::getId)
.max(Integer::compareTo)
.map(id -> racs.size() + Math.max(id, regionOffset) - regionOffset)
.orElseGet(
() -> {
Preconditions.checkState(racs.contains(myRac));
return racs.indexOf(myRac);
});
int instanceCount = membership.getRacCount() * membership.getRacMembershipSize();
String newToken = tokenManager.createToken(mySlot, instanceCount, myRegion);
return createToken(mySlot + regionOffset, newToken);
return generateNewToken();
}
}.call();
}

@VisibleForTesting
PriamInstance generateNewToken() {
String myRegion = myInstanceInfo.getRegion();
// this offset ensures the nodes are spread far away from the other regions.
int regionOffset = tokenManager.regionOffset(myRegion);
String myRac = myInstanceInfo.getRac();
List<String> racs = config.getRacs();
ImmutableSet<PriamInstance> allIds = factory.getAllIds(config.getAppName());
int mySlot =
allIds.stream()
.filter(i -> i.getRac().equals(myRac))
.map(PriamInstance::getId)
.max(Integer::compareTo)
.map(id -> racs.size() + Math.max(id, regionOffset) - regionOffset)
.orElseGet(
() -> {
Preconditions.checkState(racs.contains(myRac));
return racs.indexOf(myRac);
});
int instanceCount = membership.getRacCount() * membership.getRacMembershipSize();
String newToken = tokenManager.createToken(mySlot, instanceCount, myRegion);
while (newTokenIsADuplicate(newToken, allIds)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strictly related to your PR but: do we need to worry about atomicity between the check here and the create below? E.g. if this is backed by Cassandra, do we need to use LWTs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internally we don't use LWTs, but Contention is accounted for by "acquiring a lock" in a separate table before writing. Note that contention is managed based on the token's relative position within the region rather than the token value itself. That prevents this change introducing the possibility of multiple tokens that are one apart in the same region.

newToken = new BigInteger(newToken).add(BigInteger.ONE).toString();
}
return createToken(mySlot + regionOffset, newToken);
}

private boolean newTokenIsADuplicate(String newToken, ImmutableSet<PriamInstance> instances) {
for (PriamInstance priamInstance : instances) {
if (newToken.equals(priamInstance.getToken())) {
if (myInstanceInfo.getRegion().equals(priamInstance.getDC())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little unclear as to why we throw an IllegalStateException in this case vs the case where the DCs are different - IIUC, duplicates across DCs never happen because of the region hash offset? So it feels that that's the case we should throw vs return false

Also, I'm having trouble tracing the entire code path given the multiple impls, but is it possible that InstanceInfo#getRegion() will return us-east-1 for us-east-1 while PriamInstance#getDC will return us-east?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, duplicates across DCs never happen because of the region hash offset?

Duplicates across DCs can happen if the region hash is off by one and the tokens between regions are off by one in the opposite direction. That scenario can result if the cluster has been doubled using Priam's doubling facilities. The purpose of this PR is to adapt to this scenario. That said, there should never be in-region duplicates and if there are we need to fail immediately. In that case we should not increment until there are no duplicates because that will lead to a situation where tokens are one apart within the same region. That would cause a data imbalance.

is it possible that InstanceInfo#getRegion() will return us-east-1 for us-east-1 while PriamInstance#getDC will return us-east?

This is an interesting concern, but I don't believe it is a threat. The only place we ever convert away from AWS region names is in an internal tuning use case. I can go into greater depth about that this morning if you're interested.

throw new IllegalStateException(
String.format(
"Trying to add token %s to %s but it already exists in %s",
newToken, myInstanceInfo.getRegion(), priamInstance.getDC()));
}
return true;
}
}
return false;
}

private String getReplacedIpForAssignedToken(
ImmutableSet<PriamInstance> aliveInstances, PriamInstance instance)
throws TokenRetrieverUtils.GossipParseException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class FakeConfiguration implements IConfiguration {
private boolean skipIngressUnlessIPIsPublic;
private long compressionTransitionEpochMillis;
private boolean autoSnapshot;
private String partitioner;

public Map<String, String> fakeProperties = new HashMap<>();

Expand Down Expand Up @@ -313,4 +314,13 @@ public FakeConfiguration setAutoSnapshot(boolean autoSnapshot) {
public boolean getAutoSnapshot() {
return autoSnapshot;
}

public void setPartitioner(String partitioner) {
this.partitioner = partitioner;
}

@Override
public String getPartitioner() {
return partitioner;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,12 @@ public String getAutoScalingGroup() {
public InstanceEnvironment getInstanceEnvironment() {
return InstanceEnvironment.VPC;
}

public void setRac(String rac) {
this.availabilityZone = rac;
}

public void setRegion(String region) {
this.region = region;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package com.netflix.priam.identity.token;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.*;
import com.google.common.truth.Truth;
import com.google.inject.Guice;
import com.google.inject.Injector;
Expand All @@ -29,6 +27,7 @@
import com.netflix.priam.identity.IMembership;
import com.netflix.priam.identity.IPriamInstanceFactory;
import com.netflix.priam.identity.PriamInstance;
import com.netflix.priam.identity.config.FakeInstanceInfo;
import com.netflix.priam.identity.config.InstanceInfo;
import com.netflix.priam.utils.FakeSleeper;
import com.netflix.priam.utils.SystemUtils;
Expand All @@ -42,6 +41,7 @@
import mockit.Mocked;
import org.apache.commons.lang3.math.Fraction;
import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

Expand Down Expand Up @@ -425,6 +425,43 @@ public void testRingPositionLast(@Mocked SystemUtils systemUtils) throws Excepti
Truth.assertThat(tokenRetriever.getRingPosition()).isEqualTo(Fraction.getFraction(6, 7));
}

@Test
public void testThrowOnDuplicateTokenInSameRegion() {
prepareTokenGenerationTest();
create(1, instanceInfo.getInstanceId(), "host_0", "1.2.3.4", "us-east-1d", 1808575600 + "");
Assert.assertThrows(
IllegalStateException.class, () -> getTokenRetriever().generateNewToken());
}

@Test
public void testIncrementDuplicateTokenInDifferentRegion() {
((FakeInstanceInfo) instanceInfo).setRegion("us-west-2");
create(1, instanceInfo.getInstanceId(), "host_0", "1.2.3.4", "us-west-2a", 1808575600 + "");
prepareTokenGenerationTest();
Truth.assertThat(getTokenRetriever().generateNewToken().getToken()).isEqualTo("1808575601");
}

private void prepareTokenGenerationTest() {
((FakeConfiguration) configuration).setCreateNewToken(true);
((FakeConfiguration) configuration)
.setPartitioner("org.apache.cassandra.dht.RandomPartitioner");
((FakeConfiguration) configuration).setRacs("us-east-1c", "us-east-1d", "us-east-1e");
((FakeInstanceInfo) instanceInfo).setRegion("us-east-1");
((FakeInstanceInfo) instanceInfo).setRac("us-east-1c");
new Expectations() {
{
membership.getRacMembershipSize();
result = 2;
}
};
new Expectations() {
{
membership.getRacCount();
result = 3;
}
};
}

private String getStatus(List<String> liveInstances, Map<String, String> tokenToEndpointMap) {
JSONObject jsonObject = new JSONObject();
try {
Expand Down