Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Leader Aware Routing #2214

Merged
merged 13 commits into from Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
99 changes: 99 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -222,4 +222,103 @@
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.ResultSet analyzeUpdateStatement(com.google.cloud.spanner.Statement, com.google.cloud.spanner.ReadContext$QueryAnalyzeMode, com.google.cloud.spanner.Options$UpdateOption[])</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.spanner.v1.Transaction beginTransaction(com.google.spanner.v1.BeginTransactionRequest, java.util.Map)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.api.core.ApiFuture beginTransactionAsync(com.google.spanner.v1.BeginTransactionRequest, java.util.Map)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.api.core.ApiFuture beginTransactionAsync(com.google.spanner.v1.BeginTransactionRequest, java.util.Map)</method>
</difference>
<difference>
yifanzyifanz marked this conversation as resolved.
Show resolved Hide resolved
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map)</method>
<to>com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, boolean)</to>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map)</method>
<to>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map, boolean)</to>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.api.core.ApiFuture executeQueryAsync(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall read(com.google.spanner.v1.ReadRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map)</method>
</difference>
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map)</method>
<to>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, boolean)</to>
</difference>
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map)</method>
<to>com.google.spanner.v1.ResultSet</to>
</difference>
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map)</method>
<to>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall</to>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.spanner.v1.Transaction beginTransaction(com.google.spanner.v1.BeginTransactionRequest, java.util.Map)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.api.core.ApiFuture beginTransactionAsync(com.google.spanner.v1.BeginTransactionRequest, java.util.Map)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.api.core.ApiFuture executeQueryAsync(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall read(com.google.spanner.v1.ReadRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map)</method>
</difference>
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map)</method>
<to>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, boolean)</to>
</difference>
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map)</method>
<to>com.google.spanner.v1.ResultSet</to>
</difference>
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map)</method>
<to>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall</to>
</difference>

</differences>
Expand Up @@ -161,6 +161,7 @@ static Builder newBuilder() {
private SingleReadContext(Builder builder) {
super(builder);
this.bound = builder.bound;
this.routeToLeader = false;
}

@GuardedBy("lock")
Expand Down Expand Up @@ -291,6 +292,7 @@ static Builder newBuilder() {
this.timestamp = builder.timestamp;
this.transactionId = builder.transactionId;
}
this.routeToLeader = false;
}

@Override
Expand Down Expand Up @@ -347,7 +349,8 @@ void initTransaction() {
.setSession(session.getName())
.setOptions(options)
.build();
Transaction transaction = rpc.beginTransaction(request, session.getOptions());
Transaction transaction =
rpc.beginTransaction(request, session.getOptions(), routeToLeader);
if (!transaction.hasReadTimestamp()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
Expand Down Expand Up @@ -380,6 +383,7 @@ void initTransaction() {
Span span;
private final int defaultPrefetchChunks;
private final QueryOptions defaultQueryOptions;
protected boolean routeToLeader = false;
yifanzyifanz marked this conversation as resolved.
Show resolved Hide resolved

@GuardedBy("lock")
private boolean isValid = true;
Expand Down Expand Up @@ -664,7 +668,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
request.setTransaction(selector);
}
SpannerRpc.StreamingCall call =
rpc.executeQuery(request.build(), stream.consumer(), session.getOptions());
rpc.executeQuery(
request.build(), stream.consumer(), session.getOptions(), routeToLeader);
call.request(prefetchChunks);
stream.setCall(call, request.getTransaction().hasBegin());
return stream;
Expand Down Expand Up @@ -792,7 +797,7 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
}
builder.setRequestOptions(buildRequestOptions(readOptions));
SpannerRpc.StreamingCall call =
rpc.read(builder.build(), stream.consumer(), session.getOptions());
rpc.read(builder.build(), stream.consumer(), session.getOptions(), routeToLeader);
call.request(prefetchChunks);
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
return stream;
Expand Down
Expand Up @@ -202,7 +202,7 @@ private ByteString initTransaction() {
TransactionOptions.newBuilder()
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance()))
.build();
Transaction tx = rpc.beginTransaction(request, session.getOptions());
Transaction tx = rpc.beginTransaction(request, session.getOptions(), true);
if (tx.getId().isEmpty()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL,
Expand Down
Expand Up @@ -275,7 +275,7 @@ public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption...
@Override
public void prepareReadWriteTransaction() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not relevant for this PR, but note to self/other readers: This seems like a method that can be cleaned up. I don't think it is in use anymore. @rajatbhatta Would you mind taking a look at that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll take a look. @yifanzyifanz: Please keep this comment unresolved for the time being.

setActive(null);
readyTransactionId = beginTransaction();
readyTransactionId = beginTransaction(true);
}

@Override
Expand All @@ -296,21 +296,21 @@ public void close() {
}
}

ByteString beginTransaction() {
ByteString beginTransaction(boolean routeToLeader) {
try {
return beginTransactionAsync().get();
return beginTransactionAsync(routeToLeader).get();
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause());
} catch (InterruptedException e) {
throw SpannerExceptionFactory.propagateInterrupt(e);
}
}

ApiFuture<ByteString> beginTransactionAsync() {
return beginTransactionAsync(Options.fromTransactionOptions());
ApiFuture<ByteString> beginTransactionAsync(boolean routeToLeader) {
return beginTransactionAsync(Options.fromTransactionOptions(), routeToLeader);
}

ApiFuture<ByteString> beginTransactionAsync(Options transactionOptions) {
ApiFuture<ByteString> beginTransactionAsync(Options transactionOptions, boolean routeToLeader) {
final SettableApiFuture<ByteString> res = SettableApiFuture.create();
final Span span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION).startSpan();
final BeginTransactionRequest request =
Expand All @@ -319,7 +319,7 @@ ApiFuture<ByteString> beginTransactionAsync(Options transactionOptions) {
.setOptions(createReadWriteTransactionOptions(transactionOptions))
.build();
final ApiFuture<Transaction> requestFuture =
spanner.getRpc().beginTransactionAsync(request, options);
spanner.getRpc().beginTransactionAsync(request, options, routeToLeader);
requestFuture.addListener(
tracer.withSpan(
span,
Expand Down
Expand Up @@ -132,6 +132,7 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
private final CallCredentialsProvider callCredentialsProvider;
private final CloseableExecutorProvider asyncExecutorProvider;
private final String compressorName;
private final boolean leaderAwareRoutingEnabled;

/**
* Interface that can be used to provide {@link CallCredentials} instead of {@link Credentials} to
Expand Down Expand Up @@ -600,6 +601,7 @@ private SpannerOptions(Builder builder) {
callCredentialsProvider = builder.callCredentialsProvider;
asyncExecutorProvider = builder.asyncExecutorProvider;
compressorName = builder.compressorName;
leaderAwareRoutingEnabled = builder.leaderAwareRoutingEnabled;
}

/**
Expand Down Expand Up @@ -700,6 +702,7 @@ public static class Builder
private CloseableExecutorProvider asyncExecutorProvider;
private String compressorName;
private String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST");
private boolean leaderAwareRoutingEnabled = true;

private Builder() {
// Manually set retry and polling settings that work.
Expand Down Expand Up @@ -1155,6 +1158,15 @@ public Builder setEmulatorHost(String emulatorHost) {
return this;
}

/**
* Enable or disable leader aware routing. Leader aware routing would route all requests in
* RW/PDML transactions to the leader region.
*/
public Builder setLeaderAwareRouting(boolean leaderAwareRoutingEnabled) {
yifanzyifanz marked this conversation as resolved.
Show resolved Hide resolved
this.leaderAwareRoutingEnabled = leaderAwareRoutingEnabled;
return this;
}

@SuppressWarnings("rawtypes")
@Override
public SpannerOptions build() {
Expand Down Expand Up @@ -1291,6 +1303,10 @@ public String getCompressorName() {
return compressorName;
}

public boolean isLeaderAwareRoutingEnabled() {
return leaderAwareRoutingEnabled;
}

/** Returns the default query options to use for the specific database. */
public QueryOptions getDefaultQueryOptions(DatabaseId databaseId) {
// Use the specific query options for the database if any have been specified. These have
Expand Down
Expand Up @@ -196,6 +196,7 @@ private TransactionContextImpl(Builder builder) {
this.trackTransactionStarter = builder.trackTransactionStarter;
this.options = builder.options;
this.finishedAsyncOperations.set(null);
this.routeToLeader = true;
}

private void increaseAsyncOperations() {
Expand Down Expand Up @@ -255,7 +256,7 @@ ApiFuture<Void> ensureTxnAsync() {

private void createTxnAsync(final SettableApiFuture<Void> res) {
span.addAnnotation("Creating Transaction");
final ApiFuture<ByteString> fut = session.beginTransactionAsync(options);
final ApiFuture<ByteString> fut = session.beginTransactionAsync(options, routeToLeader);
fut.addListener(
() -> {
try {
Expand Down Expand Up @@ -717,7 +718,7 @@ private ResultSet internalExecuteUpdate(
/* withTransactionSelector = */ true);
try {
com.google.spanner.v1.ResultSet resultSet =
rpc.executeQuery(builder.build(), session.getOptions());
rpc.executeQuery(builder.build(), session.getOptions(), routeToLeader);
if (resultSet.getMetadata().hasTransaction()) {
onTransactionMetadata(
resultSet.getMetadata().getTransaction(), builder.getTransaction().hasBegin());
Expand Down Expand Up @@ -747,7 +748,7 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... o
// Register the update as an async operation that must finish before the transaction may
// commit.
increaseAsyncOperations();
resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions());
resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions(), routeToLeader);
} catch (Throwable t) {
decreaseAsyncOperations();
throw t;
Expand Down