Skip to content

Commit

Permalink
feat: support RPC priority (#676)
Browse files Browse the repository at this point in the history
* feat: add support for RPC priority

* cleanup: cleanup after merge + add additional tests

* cleanup: fix formatting + tests

* chore: merge and cleanup

* fix: remove merge conflict markers and cleanup

Co-authored-by: Thiago Nunes <thiagotnunes@google.com>
  • Loading branch information
olavloite and thiagotnunes committed Mar 31, 2021
1 parent a2e5803 commit 0bc9972
Show file tree
Hide file tree
Showing 12 changed files with 760 additions and 27 deletions.
Expand Up @@ -47,6 +47,7 @@
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ReadRequest;
import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
Expand Down Expand Up @@ -557,6 +558,14 @@ QueryOptions buildQueryOptions(QueryOptions requestOptions) {
return builder.build();
}

RequestOptions buildRequestOptions(Options options) {
RequestOptions.Builder builder = RequestOptions.newBuilder();
if (options.hasPriority()) {
builder.setPriority(options.priority());
}
return builder.build();
}

ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
Statement statement, QueryMode queryMode, Options options, boolean withTransactionSelector) {
ExecuteSqlRequest.Builder builder =
Expand All @@ -580,6 +589,7 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
}
builder.setSeqno(getSeqNo());
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
builder.setRequestOptions(buildRequestOptions(options));
return builder;
}

Expand Down Expand Up @@ -610,6 +620,7 @@ ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(
builder.setTransaction(selector);
}
builder.setSeqno(getSeqNo());
builder.setRequestOptions(buildRequestOptions(options));
return builder;
}

Expand Down Expand Up @@ -760,6 +771,7 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
if (selector != null) {
builder.setTransaction(selector);
}
builder.setRequestOptions(buildRequestOptions(readOptions));
SpannerRpc.StreamingCall call =
rpc.read(builder.build(), stream.consumer(), session.getOptions());
call.request(prefetchChunks);
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;

Expand Down Expand Up @@ -75,9 +76,21 @@ public interface DatabaseClient {
* .set("LastName")
* .to("Joel")
* .build();
* dbClient.writeWithOptions(Collections.singletonList(mutation));
* dbClient.writeWithOptions(
* Collections.singletonList(mutation),
* Options.priority(RpcPriority.HIGH));
* }</pre>
*
* Options for a transaction can include:
*
* <ul>
* <li>{@link Options#priority(com.google.cloud.spanner.Options.RpcPriority)}: The {@link
* RpcPriority} to use for the commit request of the transaction. The priority will not be
* applied to any other requests on the transaction.
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
* {@link CommitResponse}.
* </ul>
*
* @return a response with the timestamp at which the write was committed
*/
CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption... options)
Expand Down Expand Up @@ -138,9 +151,21 @@ CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption.
* .set("LastName")
* .to("Joel")
* .build();
* dbClient.writeAtLeastOnce(Collections.singletonList(mutation));
* dbClient.writeAtLeastOnceWithOptions(
* Collections.singletonList(mutation),
* Options.priority(RpcPriority.LOW));
* }</pre>
*
* Options for a transaction can include:
*
* <ul>
* <li>{@link Options#priority(com.google.cloud.spanner.Options.RpcPriority)}: The {@link
* RpcPriority} to use for the commit request of the transaction. The priority will not be
* applied to any other requests on the transaction.
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
* {@link CommitResponse}.
* </ul>
*
* @return a response with the timestamp at which the write was committed
*/
CommitResponse writeAtLeastOnceWithOptions(
Expand Down Expand Up @@ -308,6 +333,16 @@ CommitResponse writeAtLeastOnceWithOptions(
* }
* });
* </code></pre>
*
* Options for a transaction can include:
*
* <ul>
* <li>{@link Options#priority(com.google.cloud.spanner.Options.RpcPriority)}: The {@link
* RpcPriority} to use for the commit request of the transaction. The priority will not be
* applied to any other requests on the transaction.
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
* {@link CommitResponse}.
* </ul>
*/
TransactionRunner readWriteTransaction(TransactionOption... options);

Expand Down Expand Up @@ -338,6 +373,16 @@ CommitResponse writeAtLeastOnceWithOptions(
* }
* }
* }</pre>
*
* Options for a transaction can include:
*
* <ul>
* <li>{@link Options#priority(com.google.cloud.spanner.Options.RpcPriority)}: The {@link
* RpcPriority} to use for the commit request of the transaction. The priority will not be
* applied to any other requests on the transaction.
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
* {@link CommitResponse}.
* </ul>
*/
TransactionManager transactionManager(TransactionOption... options);

Expand Down Expand Up @@ -371,6 +416,16 @@ CommitResponse writeAtLeastOnceWithOptions(
* },
* executor);
* </code></pre>
*
* Options for a transaction can include:
*
* <ul>
* <li>{@link Options#priority(com.google.cloud.spanner.Options.RpcPriority)}: The {@link
* RpcPriority} to use for the commit request of the transaction. The priority will not be
* applied to any other requests on the transaction.
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
* {@link CommitResponse}.
* </ul>
*/
AsyncRunner runAsync(TransactionOption... options);

Expand Down Expand Up @@ -459,6 +514,18 @@ CommitResponse writeAtLeastOnceWithOptions(
* }
* }
* }</pre>
*
* Options for a transaction can include:
*
* <p>Options for a transaction can include:
*
* <ul>
* <li>{@link Options#priority(com.google.cloud.spanner.Options.RpcPriority)}: The {@link
* RpcPriority} to use for the commit request of the transaction. The priority will not be
* applied to any other requests on the transaction.
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
* {@link CommitResponse}.
* </ul>
*/
AsyncTransactionManager transactionManagerAsync(TransactionOption... options);

Expand Down
Expand Up @@ -17,13 +17,30 @@
package com.google.cloud.spanner;

import com.google.common.base.Preconditions;
import com.google.spanner.v1.RequestOptions.Priority;
import java.io.Serializable;
import java.util.Objects;

/** Specifies options for various spanner operations */
public final class Options implements Serializable {
private static final long serialVersionUID = 8067099123096783941L;

/**
* Priority for an RPC invocation. The default priority is {@link #HIGH}. This enum can be used to
* set a lower priority for a specific RPC invocation.
*/
public enum RpcPriority {
LOW(Priority.PRIORITY_LOW),
MEDIUM(Priority.PRIORITY_MEDIUM),
HIGH(Priority.PRIORITY_HIGH);

private final Priority proto;

private RpcPriority(Priority proto) {
this.proto = Preconditions.checkNotNull(proto);
}
}

/** Marker interface to mark options applicable to both Read and Query operations */
public interface ReadAndQueryOption extends ReadOption, QueryOption {}

Expand Down Expand Up @@ -79,6 +96,11 @@ public static ReadAndQueryOption bufferRows(int bufferRows) {
return new BufferRowsOption(bufferRows);
}

/** Specifies the priority to use for the RPC. */
public static ReadQueryUpdateTransactionOption priority(RpcPriority priority) {
return new PriorityOption(priority);
}

/**
* Specifying this will cause the list operations to fetch at most this many records in a page.
*/
Expand Down Expand Up @@ -158,13 +180,28 @@ void appendToOptions(Options options) {
}
}

static final class PriorityOption extends InternalOption
implements ReadQueryUpdateTransactionOption {
private final RpcPriority priority;

PriorityOption(RpcPriority priority) {
this.priority = priority;
}

@Override
void appendToOptions(Options options) {
options.priority = priority;
}
}

private boolean withCommitStats;
private Long limit;
private Integer prefetchChunks;
private Integer bufferRows;
private Integer pageSize;
private String pageToken;
private String filter;
private RpcPriority priority;

// Construction is via factory methods below.
private Options() {}
Expand Down Expand Up @@ -221,6 +258,14 @@ String filter() {
return filter;
}

boolean hasPriority() {
return priority != null;
}

Priority priority() {
return priority == null ? null : priority.proto;
}

@Override
public String toString() {
StringBuilder b = new StringBuilder();
Expand All @@ -242,6 +287,9 @@ public String toString() {
if (filter != null) {
b.append("filter: ").append(filter).append(' ');
}
if (priority != null) {
b.append("priority: ").append(priority).append(' ');
}
return b.toString();
}

Expand Down Expand Up @@ -271,7 +319,8 @@ public boolean equals(Object o) {
&& (!hasPageSize() && !that.hasPageSize()
|| hasPageSize() && that.hasPageSize() && Objects.equals(pageSize(), that.pageSize()))
&& Objects.equals(pageToken(), that.pageToken())
&& Objects.equals(filter(), that.filter());
&& Objects.equals(filter(), that.filter())
&& Objects.equals(priority(), that.priority());
}

@Override
Expand All @@ -298,6 +347,9 @@ public int hashCode() {
if (filter != null) {
result = 31 * result + filter.hashCode();
}
if (priority != null) {
result = 31 * result + priority.hashCode();
}
return result;
}

Expand Down
Expand Up @@ -26,12 +26,14 @@
import com.google.api.gax.rpc.UnavailableException;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
Expand Down Expand Up @@ -162,8 +164,8 @@ private ExecuteSqlRequest resumeOrRestartRequest(
}
}

private ExecuteSqlRequest newTransactionRequestFrom(
final Statement statement, final Options options) {
@VisibleForTesting
ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Options options) {
ByteString transactionId = initTransaction();

final TransactionSelector transactionSelector =
Expand All @@ -179,6 +181,11 @@ private ExecuteSqlRequest newTransactionRequestFrom(

builder.setResumeToken(ByteString.EMPTY);

if (options.hasPriority()) {
builder.setRequestOptions(
RequestOptions.newBuilder().setPriority(options.priority()).build());
}

return builder.build();
}

Expand Down
Expand Up @@ -37,6 +37,7 @@
import com.google.protobuf.Empty;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TransactionOptions;
import io.opencensus.common.Scope;
Expand Down Expand Up @@ -160,22 +161,26 @@ public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... transactionOptions)
throws SpannerException {
setActive(null);
Options commitRequestOptions = Options.fromTransactionOptions(transactionOptions);
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
Mutation.toProto(mutations, mutationsProto);
final CommitRequest request =
final CommitRequest.Builder requestBuilder =
CommitRequest.newBuilder()
.setSession(name)
.setReturnCommitStats(
Options.fromTransactionOptions(transactionOptions).withCommitStats())
.addAllMutations(mutationsProto)
.setSingleUseTransaction(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
.build();
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
if (commitRequestOptions.hasPriority()) {
requestBuilder.setRequestOptions(
RequestOptions.newBuilder().setPriority(commitRequestOptions.priority()).build());
}
Span span = tracer.spanBuilder(SpannerImpl.COMMIT).startSpan();
try (Scope s = tracer.withSpan(span)) {
com.google.spanner.v1.CommitResponse response =
spanner.getRpc().commit(request, this.options);
spanner.getRpc().commit(requestBuilder.build(), this.options);
return new CommitResponse(response);
} catch (RuntimeException e) {
TraceUtil.setWithFailure(span, e);
Expand Down
Expand Up @@ -43,6 +43,7 @@
import com.google.spanner.v1.ExecuteBatchDmlResponse;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.ResultSet;
import com.google.spanner.v1.RollbackRequest;
import com.google.spanner.v1.Transaction;
Expand Down Expand Up @@ -298,6 +299,10 @@ ApiFuture<CommitResponse> commitAsync() {
CommitRequest.newBuilder()
.setSession(session.getName())
.setReturnCommitStats(options.withCommitStats());
if (options.hasPriority()) {
builder.setRequestOptions(
RequestOptions.newBuilder().setPriority(options.priority()).build());
}
synchronized (lock) {
if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) {
finishOps = SettableApiFuture.create();
Expand Down Expand Up @@ -344,6 +349,10 @@ public void run() {
requestBuilder.setTransactionId(
transactionId == null ? transactionIdFuture.get() : transactionId);
}
if (options.hasPriority()) {
requestBuilder.setRequestOptions(
RequestOptions.newBuilder().setPriority(options.priority()).build());
}
final CommitRequest commitRequest = requestBuilder.build();
span.addAnnotation("Starting Commit");
final Span opSpan =
Expand Down

0 comments on commit 0bc9972

Please sign in to comment.