From 415da5e19482ed441c459550542df1dfd0992447 Mon Sep 17 00:00:00 2001 From: vincent Date: Tue, 29 Mar 2022 11:17:19 +0200 Subject: [PATCH 1/2] Cancel blocking pull request on shutdown --- .../google/pubsub/GooglePubsubConsumer.java | 45 ++++++++++++++++--- 1 file changed, 38 insertions(+), 7 deletions(-) diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java index b4aa82bb6de51..379b3eb07cb5a 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java @@ -17,11 +17,17 @@ package org.apache.camel.component.google.pubsub; import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import com.google.api.core.AbstractApiService; +import com.google.api.core.ApiFuture; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsub.v1.Subscriber; @@ -49,13 +55,14 @@ public class GooglePubsubConsumer extends DefaultConsumer { private final Processor processor; private ExecutorService executor; private List subscribers; + private final Set> pendingSynchronousPullResponses; GooglePubsubConsumer(GooglePubsubEndpoint endpoint, Processor processor) { super(endpoint, processor); this.endpoint = endpoint; this.processor = processor; this.subscribers = new LinkedList<>(); - + this.pendingSynchronousPullResponses = Collections.synchronizedSet(new HashSet<>()); String loggerId = endpoint.getLoggerId(); if (Strings.isNullOrEmpty(loggerId)) { @@ -85,6 +92,8 @@ protected void doStop() throws Exception { subscribers.forEach(AbstractApiService::stopAsync); } + safeCancelSynchronousPullResponses(); + if (executor != null) { if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor); @@ -95,6 +104,19 @@ protected void doStop() throws Exception { executor = null; } + private void safeCancelSynchronousPullResponses() { + synchronized (pendingSynchronousPullResponses) { + for (ApiFuture pullResponseApiFuture : pendingSynchronousPullResponses) { + try { + pullResponseApiFuture.cancel(true); + } catch (Exception e) { + localLog.warn("Exception while cancelling pending synchronous pull response", e); + } + } + pendingSynchronousPullResponses.clear(); + } + } + private class SubscriberWrapper implements Runnable { private final String subscriptionName; @@ -140,8 +162,9 @@ private void asynchronousPull(String subscriptionName) throws IOException { } } - private void synchronousPull(String subscriptionName) { + private void synchronousPull(String subscriptionName) throws ExecutionException, InterruptedException { while (isRunAllowed() && !isSuspendingOrSuspended()) { + ApiFuture synchronousPullResponseFuture = null; try (SubscriberStub subscriber = endpoint.getComponent().getSubscriberStub(endpoint)) { PullRequest pullRequest = PullRequest.newBuilder() @@ -150,7 +173,9 @@ private void synchronousPull(String subscriptionName) { .setSubscription(subscriptionName) .build(); - PullResponse pullResponse = subscriber.pullCallable().call(pullRequest); + synchronousPullResponseFuture = subscriber.pullCallable().futureCall(pullRequest); + pendingSynchronousPullResponses.add(synchronousPullResponseFuture); + PullResponse pullResponse = synchronousPullResponseFuture.get(); for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) { PubsubMessage pubsubMessage = message.getMessage(); Exchange exchange = createExchange(true); @@ -175,14 +200,20 @@ private void synchronousPull(String subscriptionName) { getExceptionHandler().handleException(e); } } + } catch (CancellationException e) { + localLog.info("PubSub synchronous pull request cancelled", e); } catch (IOException e) { - localLog.error("Failure getting messages from PubSub", e); - } catch (ApiException e) { - if (e.isRetryable()) { - localLog.error("Retryable API exception in getting messages from PubSub", e); + localLog.error("I/O exception while getting messages from PubSub. Reconnecting.", e); + } catch (ExecutionException e) { + if (e.getCause() instanceof ApiException && ((ApiException) (e.getCause())).isRetryable()) { + localLog.error("Retryable API exception in getting messages from PubSub", e.getCause()); } else { throw e; } + } finally { + if (synchronousPullResponseFuture != null) { + pendingSynchronousPullResponses.remove(synchronousPullResponseFuture); + } } } } From 5aa7f9b53726130d697f18372661d8d4fc0f6e1f Mon Sep 17 00:00:00 2001 From: vincent Date: Tue, 29 Mar 2022 12:40:15 +0200 Subject: [PATCH 2/2] Lower log severity for cancellation exception --- .../camel/component/google/pubsub/GooglePubsubConsumer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java index 379b3eb07cb5a..06a9942de4a49 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java @@ -201,7 +201,7 @@ private void synchronousPull(String subscriptionName) throws ExecutionException, } } } catch (CancellationException e) { - localLog.info("PubSub synchronous pull request cancelled", e); + localLog.debug("PubSub synchronous pull request cancelled", e); } catch (IOException e) { localLog.error("I/O exception while getting messages from PubSub. Reconnecting.", e); } catch (ExecutionException e) {