Skip to content

Commit 4f20858

Browse files
feat: add shutdown() and shutdownNow() (#673)
1 parent 9915e40 commit 4f20858

File tree

8 files changed

+320
-53
lines changed

8 files changed

+320
-53
lines changed

google-cloud-firestore/clirr-ignored-differences.xml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,27 @@
1717

1818
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
1919
<differences>
20+
<!-- Shutdown/Shutdown Now -->
21+
<difference>
22+
<differenceType>7012</differenceType>
23+
<className>com/google/cloud/firestore/Firestore</className>
24+
<method>void shutdown()</method>
25+
</difference>
26+
<difference>
27+
<differenceType>7012</differenceType>
28+
<className>com/google/cloud/firestore/Firestore</className>
29+
<method>void shutdownNow()</method>
30+
</difference>
31+
<difference>
32+
<differenceType>7012</differenceType>
33+
<className>com/google/cloud/firestore/spi/v1/FirestoreRpc</className>
34+
<method>void shutdown()</method>
35+
</difference>
36+
<difference>
37+
<differenceType>7012</differenceType>
38+
<className>com/google/cloud/firestore/spi/v1/FirestoreRpc</className>
39+
<method>void shutdownNow()</method>
40+
</difference>
2041

2142
<!-- v2.1.1 -->
2243
<difference>

google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,4 +296,13 @@ void getAll(
296296
*/
297297
@Override
298298
void close() throws Exception;
299+
300+
/**
301+
* Initiates an orderly shutdown in which previously submitted work is finished, but no new work
302+
* will be accepted.
303+
*/
304+
void shutdown();
305+
306+
/** Attempts to stop all actively executing work and halts the processing of waiting work. */
307+
void shutdownNow();
299308
}

google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,18 @@ public void close() throws Exception {
451451
closed = true;
452452
}
453453

454+
@Override
455+
public void shutdown() {
456+
firestoreClient.shutdown();
457+
closed = true;
458+
}
459+
460+
@Override
461+
public void shutdownNow() {
462+
firestoreClient.shutdownNow();
463+
closed = true;
464+
}
465+
454466
private static class TransactionAsyncAdapter<T> implements Transaction.AsyncFunction<T> {
455467
private final Transaction.Function<T> syncFunction;
456468

google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -337,15 +337,18 @@ private void closeStream(final Throwable throwable) {
337337
new Runnable() {
338338
@Override
339339
public void run() {
340-
listener.onEvent(
341-
null,
342-
throwable instanceof FirestoreException
343-
? (FirestoreException) throwable
344-
: FirestoreException.forApiException(
345-
new ApiException(
346-
throwable,
347-
GrpcStatusCode.of(getStatus(throwable).getCode()),
348-
false)));
340+
if (throwable instanceof FirestoreException) {
341+
listener.onEvent(null, (FirestoreException) throwable);
342+
} else {
343+
Status status = getStatus(throwable);
344+
FirestoreException firestoreException =
345+
FirestoreException.forApiException(
346+
new ApiException(
347+
throwable,
348+
GrpcStatusCode.of(status != null ? status.getCode() : Code.UNKNOWN),
349+
false));
350+
listener.onEvent(null, firestoreException);
351+
}
349352
}
350353
});
351354
}
@@ -383,31 +386,36 @@ private void initStream() {
383386
new Runnable() {
384387
@Override
385388
public void run() {
386-
if (!isActive.get()) {
387-
return;
388-
}
389-
390-
synchronized (Watch.this) {
389+
try {
391390
if (!isActive.get()) {
392391
return;
393392
}
394393

395-
Preconditions.checkState(stream == null);
394+
synchronized (Watch.this) {
395+
if (!isActive.get()) {
396+
return;
397+
}
396398

397-
current = false;
398-
nextAttempt = backoff.createNextAttempt(nextAttempt);
399+
Preconditions.checkState(stream == null);
399400

400-
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_LISTEN);
401-
stream = firestore.streamRequest(Watch.this, firestore.getClient().listenCallable());
401+
current = false;
402+
nextAttempt = backoff.createNextAttempt(nextAttempt);
402403

403-
ListenRequest.Builder request = ListenRequest.newBuilder();
404-
request.setDatabase(firestore.getDatabaseName());
405-
request.setAddTarget(target);
406-
if (resumeToken != null) {
407-
request.getAddTargetBuilder().setResumeToken(resumeToken);
408-
}
404+
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_LISTEN);
405+
stream =
406+
firestore.streamRequest(Watch.this, firestore.getClient().listenCallable());
407+
408+
ListenRequest.Builder request = ListenRequest.newBuilder();
409+
request.setDatabase(firestore.getDatabaseName());
410+
request.setAddTarget(target);
411+
if (resumeToken != null) {
412+
request.getAddTargetBuilder().setResumeToken(resumeToken);
413+
}
409414

410-
stream.onNext(request.build());
415+
stream.onNext(request.build());
416+
}
417+
} catch (Throwable throwable) {
418+
onError(throwable);
411419
}
412420
}
413421
},
@@ -549,6 +557,10 @@ private List<DocumentChange> computeSnapshot(Timestamp readTime) {
549557
private static boolean isPermanentError(Throwable throwable) {
550558
Status status = getStatus(throwable);
551559

560+
if (status == null) {
561+
return true;
562+
}
563+
552564
switch (status.getCode()) {
553565
case CANCELLED:
554566
case UNKNOWN:
@@ -563,20 +575,20 @@ private static boolean isPermanentError(Throwable throwable) {
563575
}
564576
}
565577

566-
/** Extracts the GRPC status code if available. Returns UNKNOWN for non-GRPC exceptions. */
578+
/** Extracts the GRPC status code if available. Returns `null` for non-GRPC exceptions. */
579+
@Nullable
567580
private static Status getStatus(Throwable throwable) {
568-
Status status = Status.UNKNOWN;
569-
570581
if (throwable instanceof StatusRuntimeException) {
571-
status = ((StatusRuntimeException) throwable).getStatus();
582+
return ((StatusRuntimeException) throwable).getStatus();
572583
} else if (throwable instanceof StatusException) {
573-
status = ((StatusException) throwable).getStatus();
584+
return ((StatusException) throwable).getStatus();
574585
}
575-
return status;
586+
return null;
576587
}
577588

578589
/** Determines whether we need to initiate a longer backoff due to system overload. */
579590
private static boolean isResourceExhaustedError(Throwable throwable) {
580-
return getStatus(throwable).getCode().equals(Code.RESOURCE_EXHAUSTED);
591+
Status status = getStatus(throwable);
592+
return status != null && status.getCode().equals(Code.RESOURCE_EXHAUSTED);
581593
};
582594
}

google-cloud-firestore/src/main/java/com/google/cloud/firestore/spi/v1/FirestoreRpc.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,8 @@ public interface FirestoreRpc extends AutoCloseable, ServiceRpc {
7878

7979
/** Returns a bi-directional watch stream. */
8080
BidiStreamingCallable<ListenRequest, ListenResponse> listenCallable();
81+
82+
void shutdownNow();
83+
84+
void shutdown();
8185
}

google-cloud-firestore/src/main/java/com/google/cloud/firestore/spi/v1/GrpcFirestoreRpc.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import com.google.api.gax.rpc.ServerStreamingCallable;
2929
import com.google.api.gax.rpc.TransportChannel;
3030
import com.google.api.gax.rpc.UnaryCallSettings;
31-
import com.google.api.gax.rpc.UnaryCallSettings.Builder;
3231
import com.google.api.gax.rpc.UnaryCallable;
3332
import com.google.cloud.NoCredentials;
3433
import com.google.cloud.ServiceOptions;
@@ -127,7 +126,7 @@ public GrpcFirestoreRpc(final FirestoreOptions options) throws IOException {
127126
clientContext = ClientContext.create(settingsBuilder.build());
128127
}
129128
ApiFunction<UnaryCallSettings.Builder<?, ?>, Void> retrySettingsSetter =
130-
new ApiFunction<Builder<?, ?>, Void>() {
129+
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
131130
@Override
132131
public Void apply(UnaryCallSettings.Builder<?, ?> builder) {
133132
builder.setRetrySettings(options.getRetrySettings());
@@ -145,18 +144,43 @@ public Void apply(UnaryCallSettings.Builder<?, ?> builder) {
145144

146145
@Override
147146
public void close() throws Exception {
147+
if (!closed) {
148+
firestoreStub.close();
149+
for (BackgroundResource resource : clientContext.getBackgroundResources()) {
150+
resource.close();
151+
}
152+
executorFactory.release(executor);
153+
closed = true;
154+
}
155+
for (BackgroundResource resource : clientContext.getBackgroundResources()) {
156+
resource.awaitTermination(1, TimeUnit.SECONDS);
157+
}
158+
}
159+
160+
@Override
161+
public void shutdown() {
148162
if (closed) {
149163
return;
150164
}
151-
closed = true;
152-
firestoreStub.close();
165+
firestoreStub.shutdown();
153166
for (BackgroundResource resource : clientContext.getBackgroundResources()) {
154-
resource.close();
167+
resource.shutdown();
155168
}
169+
executorFactory.release(executor);
170+
closed = true;
171+
}
172+
173+
@Override
174+
public void shutdownNow() {
175+
if (closed) {
176+
return;
177+
}
178+
firestoreStub.shutdownNow();
156179
for (BackgroundResource resource : clientContext.getBackgroundResources()) {
157-
resource.awaitTermination(1, TimeUnit.SECONDS);
180+
resource.shutdownNow();
158181
}
159182
executorFactory.release(executor);
183+
closed = true;
160184
}
161185

162186
@Override

0 commit comments

Comments
 (0)