-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat!: add support for CommitStats #544
Changes from 24 commits
3872199
def8768
5372dae
34fbda6
8f61c2a
041b34d
8299054
9b710a7
8c15641
919cf02
97ec917
afeb0fd
8524526
06b3e22
1c17d16
664b87d
6573a0f
0b448c3
83039fb
57ea714
66c5f88
1d17973
48ef21b
1f33c42
7d45ace
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,23 +16,31 @@ | |
|
||
package com.google.cloud.spanner; | ||
|
||
import static com.google.common.base.Preconditions.checkState; | ||
|
||
import com.google.api.core.ApiFunction; | ||
import com.google.api.core.ApiFuture; | ||
import com.google.api.core.ApiFutures; | ||
import com.google.api.core.SettableApiFuture; | ||
import com.google.cloud.Timestamp; | ||
import com.google.cloud.spanner.TransactionRunner.TransactionCallable; | ||
import com.google.common.base.Preconditions; | ||
import com.google.common.util.concurrent.MoreExecutors; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.Executor; | ||
|
||
class AsyncRunnerImpl implements AsyncRunner { | ||
private final TransactionRunnerImpl delegate; | ||
private final SettableApiFuture<Timestamp> commitTimestamp = SettableApiFuture.create(); | ||
private SettableApiFuture<CommitResponse> commitResponse; | ||
|
||
AsyncRunnerImpl(TransactionRunnerImpl delegate) { | ||
this.delegate = delegate; | ||
this.delegate = Preconditions.checkNotNull(delegate); | ||
} | ||
|
||
@Override | ||
public <R> ApiFuture<R> runAsync(final AsyncWork<R> work, Executor executor) { | ||
Preconditions.checkState(commitResponse == null, "runAsync() can only be called once"); | ||
commitResponse = SettableApiFuture.create(); | ||
final SettableApiFuture<R> res = SettableApiFuture.create(); | ||
executor.execute( | ||
new Runnable() { | ||
|
@@ -43,7 +51,7 @@ public void run() { | |
} catch (Throwable t) { | ||
res.setException(t); | ||
} finally { | ||
setCommitTimestamp(); | ||
setCommitResponse(); | ||
} | ||
} | ||
}); | ||
|
@@ -66,16 +74,30 @@ public R run(TransactionContext transaction) throws Exception { | |
}); | ||
} | ||
|
||
private void setCommitTimestamp() { | ||
private void setCommitResponse() { | ||
try { | ||
commitTimestamp.set(delegate.getCommitTimestamp()); | ||
commitResponse.set(delegate.getCommitResponse()); | ||
} catch (Throwable t) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. letting it slide because it isn't changed in this PR, but catching Throwable is only rarely what you want. This is probably worth filing a bug on. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added issue: #875 |
||
commitTimestamp.setException(t); | ||
commitResponse.setException(t); | ||
} | ||
} | ||
|
||
@Override | ||
public ApiFuture<Timestamp> getCommitTimestamp() { | ||
return commitTimestamp; | ||
checkState(commitResponse != null, "runAsync() has not yet been called"); | ||
return ApiFutures.transform( | ||
commitResponse, | ||
new ApiFunction<CommitResponse, Timestamp>() { | ||
@Override | ||
public Timestamp apply(CommitResponse input) { | ||
return input.getCommitTimestamp(); | ||
} | ||
}, | ||
MoreExecutors.directExecutor()); | ||
} | ||
|
||
public ApiFuture<CommitResponse> getCommitResponse() { | ||
checkState(commitResponse != null, "runAsync() has not yet been called"); | ||
return commitResponse; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
package com.google.cloud.spanner; | ||
|
||
import com.google.api.core.ApiAsyncFunction; | ||
import com.google.api.core.ApiFunction; | ||
import com.google.api.core.ApiFuture; | ||
import com.google.api.core.ApiFutureCallback; | ||
import com.google.api.core.ApiFutures; | ||
|
@@ -45,7 +46,7 @@ final class AsyncTransactionManagerImpl | |
|
||
private TransactionRunnerImpl.TransactionContextImpl txn; | ||
private TransactionState txnState; | ||
private final SettableApiFuture<Timestamp> commitTimestamp = SettableApiFuture.create(); | ||
private final SettableApiFuture<CommitResponse> commitResponse = SettableApiFuture.create(); | ||
|
||
AsyncTransactionManagerImpl(SessionImpl session, Span span, TransactionOption... options) { | ||
this.session = session; | ||
|
@@ -132,29 +133,37 @@ public ApiFuture<Timestamp> commitAsync() { | |
SpannerExceptionFactory.newSpannerException( | ||
ErrorCode.ABORTED, "Transaction already aborted")); | ||
} | ||
ApiFuture<Timestamp> res = txn.commitAsync(); | ||
ApiFuture<CommitResponse> res = txn.commitAsync(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no abbreviated variable names per google style. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed to |
||
txnState = TransactionState.COMMITTED; | ||
|
||
ApiFutures.addCallback( | ||
res, | ||
new ApiFutureCallback<Timestamp>() { | ||
new ApiFutureCallback<CommitResponse>() { | ||
@Override | ||
public void onFailure(Throwable t) { | ||
if (t instanceof AbortedException) { | ||
txnState = TransactionState.ABORTED; | ||
} else { | ||
txnState = TransactionState.COMMIT_FAILED; | ||
commitTimestamp.setException(t); | ||
commitResponse.setException(t); | ||
} | ||
} | ||
|
||
@Override | ||
public void onSuccess(Timestamp result) { | ||
commitTimestamp.set(result); | ||
public void onSuccess(CommitResponse result) { | ||
commitResponse.set(result); | ||
} | ||
}, | ||
MoreExecutors.directExecutor()); | ||
return ApiFutures.transform( | ||
res, | ||
new ApiFunction<CommitResponse, Timestamp>() { | ||
@Override | ||
public Timestamp apply(CommitResponse input) { | ||
return input.getCommitTimestamp(); | ||
} | ||
}, | ||
MoreExecutors.directExecutor()); | ||
return res; | ||
} | ||
|
||
@Override | ||
|
@@ -187,6 +196,11 @@ public TransactionState getState() { | |
return txnState; | ||
} | ||
|
||
@Override | ||
public ApiFuture<CommitResponse> getCommitResponse() { | ||
return commitResponse; | ||
} | ||
|
||
@Override | ||
public void invalidate() { | ||
if (txnState == TransactionState.STARTED || txnState == null) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
/* | ||
* Copyright 2021 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.cloud.spanner; | ||
|
||
import com.google.common.base.Preconditions; | ||
|
||
/** | ||
* Commit statistics are returned by a read/write transaction if specifically requested by passing | ||
* in {@link Options#commitStats()} to the transaction. | ||
*/ | ||
public class CommitStats { | ||
private final long mutationCount; | ||
|
||
private CommitStats(long mutationCount) { | ||
this.mutationCount = mutationCount; | ||
} | ||
|
||
static CommitStats fromProto(com.google.spanner.v1.CommitResponse.CommitStats proto) { | ||
Preconditions.checkNotNull(proto); | ||
return new CommitStats(proto.getMutationCount()); | ||
} | ||
|
||
/** | ||
* The number of mutations that were executed by the transaction. Insert and update operations | ||
* count with the multiplicity of the number of columns they affect. For example, inserting a new | ||
* record may count as five mutations, if values are inserted into five columns. Delete and delete | ||
* range operations count as one mutation regardless of the number of columns affected. Deleting a | ||
* row from a parent table that has the ON DELETE CASCADE annotation is also counted as one | ||
* mutation regardless of the number of interleaved child rows present. The exception to this is | ||
* if there are secondary indexes defined on rows being deleted, then the changes to the secondary | ||
* indexes will be counted individually. For example, if a table has 2 secondary indexes, deleting | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will be --> are There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
* a range of rows in the table will count as 1 mutation for the table, plus 2 mutations for each | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will count --> counts There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
* row that is deleted because the rows in the secondary index might be scattered over the | ||
* key-space, making it impossible for Cloud Spanner to call a single delete range operation on | ||
* the secondary indexes. Secondary indexes include the foreign keys backing indexes. | ||
*/ | ||
public long getMutationCount() { | ||
return mutationCount; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will throw --> throws
per Google style
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done