Skip to content

Commit

Permalink
feat: add PartitionedUpdate support to executor (#2228)
Browse files Browse the repository at this point in the history
This PR adds support for PartitionedUpdate to Cloud Client Executor Framework.
  • Loading branch information
rajatbhatta committed Mar 15, 2023
1 parent ff17244 commit 2c8ecf6
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Mutation.WriteBuilder;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.Partition;
import com.google.cloud.spanner.PartitionOptions;
import com.google.cloud.spanner.ReadContext;
Expand Down Expand Up @@ -128,6 +129,8 @@
import com.google.spanner.executor.v1.MutationAction.Mod;
import com.google.spanner.executor.v1.MutationAction.UpdateArgs;
import com.google.spanner.executor.v1.OperationResponse;
import com.google.spanner.executor.v1.PartitionedUpdateAction;
import com.google.spanner.executor.v1.PartitionedUpdateAction.ExecutePartitionedUpdateOptions;
import com.google.spanner.executor.v1.QueryAction;
import com.google.spanner.executor.v1.ReadAction;
import com.google.spanner.executor.v1.RestoreCloudDatabaseAction;
Expand Down Expand Up @@ -886,6 +889,13 @@ private Status executeAction(
} else if (action.hasExecutePartition()) {
return executeExecutePartition(
action.getExecutePartition(), outcomeSender, executionContext);
} else if (action.hasPartitionedUpdate()) {
if (dbPath == null) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT, "Database path must be set for this action");
}
DatabaseClient dbClient = getClient().getDatabaseClient(DatabaseId.of(dbPath));
return executePartitionedUpdate(action.getPartitionedUpdate(), dbClient, outcomeSender);
} else if (action.hasCloseBatchTxn()) {
return executeCloseBatchTxn(action.getCloseBatchTxn(), outcomeSender, executionContext);
} else if (action.hasExecuteChangeStreamQuery()) {
Expand Down Expand Up @@ -1974,6 +1984,33 @@ private Status executeExecutePartition(
}
}

/** Execute a partitioned update which runs different partitions in parallel. */
private Status executePartitionedUpdate(
PartitionedUpdateAction action, DatabaseClient dbClient, OutcomeSender sender) {
try {
ExecutePartitionedUpdateOptions options = action.getOptions();
Long count =
dbClient.executePartitionedUpdate(
Statement.of(action.getUpdate().getSql()),
Options.tag(options.getTag()),
Options.priority(RpcPriority.fromProto(options.getRpcPriority())));
SpannerActionOutcome outcome =
SpannerActionOutcome.newBuilder()
.setStatus(toProto(Status.OK))
.addDmlRowsModified(count)
.build();
sender.sendOutcome(outcome);
return sender.finishWithOK();
} catch (SpannerException e) {
return sender.finishWithError(toStatus(e));
} catch (Exception e) {
return sender.finishWithError(
toStatus(
SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT, "Unexpected error: " + e.getMessage())));
}
}

/** Build a child partition record proto out of childPartitionRecord returned by client. */
private ChildPartitionsRecord buildChildPartitionRecord(Struct childPartitionRecord)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,21 @@ public final class Options implements Serializable {
public enum RpcPriority {
LOW(Priority.PRIORITY_LOW),
MEDIUM(Priority.PRIORITY_MEDIUM),
HIGH(Priority.PRIORITY_HIGH);
HIGH(Priority.PRIORITY_HIGH),
UNSPECIFIED(Priority.PRIORITY_UNSPECIFIED);

private final Priority proto;

RpcPriority(Priority proto) {
this.proto = Preconditions.checkNotNull(proto);
}

public static RpcPriority fromProto(Priority proto) {
for (RpcPriority e : RpcPriority.values()) {
if (e.proto.equals(proto)) return e;
}
return RpcPriority.UNSPECIFIED;
}
}

/** Marker interface to mark options applicable to both Read and Query operations */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,15 @@ public void testUpdateOptionsPriority() {
assertEquals("priority: " + priority + " ", options.toString());
}

@Test
public void testRpcPriorityEnumFromProto() {
assertEquals(RpcPriority.fromProto(Priority.PRIORITY_LOW), RpcPriority.LOW);
assertEquals(RpcPriority.fromProto(Priority.PRIORITY_MEDIUM), RpcPriority.MEDIUM);
assertEquals(RpcPriority.fromProto(Priority.PRIORITY_HIGH), RpcPriority.HIGH);
assertEquals(RpcPriority.fromProto(Priority.PRIORITY_UNSPECIFIED), RpcPriority.UNSPECIFIED);
assertEquals(RpcPriority.fromProto(null), RpcPriority.UNSPECIFIED);
}

@Test
public void testTransactionOptionsHashCode() {
Options option1 = Options.fromTransactionOptions();
Expand Down

0 comments on commit 2c8ecf6

Please sign in to comment.