/
AsyncRunnerExample.java
134 lines (124 loc) · 5.63 KB
/
AsyncRunnerExample.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/*
* Copyright 2020 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example.spanner;
//[START spanner_async_read_write_transaction]
import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.spanner.AsyncRunner;
import com.google.cloud.spanner.AsyncRunner.AsyncWork;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TransactionContext;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
class AsyncRunnerExample {
static void asyncRunner() throws InterruptedException, ExecutionException, TimeoutException {
// TODO(developer): Replace these variables before running the sample.
String projectId = "my-project";
String instanceId = "my-instance";
String databaseId = "my-database";
try (Spanner spanner =
SpannerOptions.newBuilder().setProjectId(projectId).build().getService()) {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(projectId, instanceId, databaseId));
asyncRunner(client);
}
}
// Execute a read/write transaction asynchronously.
static void asyncRunner(DatabaseClient client)
throws InterruptedException, ExecutionException, TimeoutException {
ExecutorService executor = Executors.newSingleThreadExecutor();
// Create an async transaction runner.
AsyncRunner runner = client.runAsync();
// The transaction returns the total number of rows that were updated as a future array of
// longs.
ApiFuture<long[]> rowCounts =
runner.runAsync(
txn -> {
// Transfer marketing budget from one album to another. We do it in a
// transaction to ensure that the transfer is atomic.
ApiFuture<Struct> album1BudgetFut =
txn.readRowAsync("Albums", Key.of(1, 1), ImmutableList.of("MarketingBudget"));
ApiFuture<Struct> album2BudgetFut =
txn.readRowAsync("Albums", Key.of(2, 2), ImmutableList.of("MarketingBudget"));
try {
// Transaction will only be committed if this condition still holds at the
// time of commit. Otherwise it will be aborted and the AsyncWork will be
// rerun by the client library.
long transfer = 200_000;
if (album2BudgetFut.get().getLong(0) >= transfer) {
long album1Budget = album1BudgetFut.get().getLong(0);
long album2Budget = album2BudgetFut.get().getLong(0);
album1Budget += transfer;
album2Budget -= transfer;
Statement updateStatement1 =
Statement.newBuilder(
"UPDATE Albums "
+ "SET MarketingBudget = @AlbumBudget "
+ "WHERE SingerId = 1 and AlbumId = 1")
.bind("AlbumBudget")
.to(album1Budget)
.build();
Statement updateStatement2 =
Statement.newBuilder(
"UPDATE Albums "
+ "SET MarketingBudget = @AlbumBudget "
+ "WHERE SingerId = 2 and AlbumId = 2")
.bind("AlbumBudget")
.to(album2Budget)
.build();
return txn.batchUpdateAsync(
ImmutableList.of(updateStatement1, updateStatement2));
} else {
return ApiFutures.immediateFuture(new long[] {0L, 0L});
}
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause());
} catch (InterruptedException e) {
throw SpannerExceptionFactory.propagateInterrupt(e);
}
},
executor);
ApiFuture<Long> totalUpdateCount =
ApiFutures.transform(
rowCounts,
new ApiFunction<long[], Long>() {
@SuppressFBWarnings("UVA_USE_VAR_ARGS")
@Override
public Long apply(long[] input) {
return Arrays.stream(input).sum();
}
},
MoreExecutors.directExecutor());
System.out.printf("%d records updated.%n", totalUpdateCount.get(30L, TimeUnit.SECONDS));
executor.shutdown();
}
}
//[END spanner_async_read_write_transaction]