Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@
package org.apache.camel.component.google.pubsub;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;

import com.google.api.core.AbstractApiService;
import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
Expand Down Expand Up @@ -49,13 +55,14 @@ public class GooglePubsubConsumer extends DefaultConsumer {
private final Processor processor;
private ExecutorService executor;
private List<Subscriber> subscribers;
private final Set<ApiFuture<PullResponse>> pendingSynchronousPullResponses;

GooglePubsubConsumer(GooglePubsubEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.endpoint = endpoint;
this.processor = processor;
this.subscribers = new LinkedList<>();

this.pendingSynchronousPullResponses = Collections.synchronizedSet(new HashSet<>());
String loggerId = endpoint.getLoggerId();

if (Strings.isNullOrEmpty(loggerId)) {
Expand Down Expand Up @@ -85,6 +92,8 @@ protected void doStop() throws Exception {
subscribers.forEach(AbstractApiService::stopAsync);
}

safeCancelSynchronousPullResponses();

if (executor != null) {
if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor);
Expand All @@ -95,6 +104,19 @@ protected void doStop() throws Exception {
executor = null;
}

private void safeCancelSynchronousPullResponses() {
synchronized (pendingSynchronousPullResponses) {
for (ApiFuture<PullResponse> pullResponseApiFuture : pendingSynchronousPullResponses) {
try {
pullResponseApiFuture.cancel(true);
} catch (Exception e) {
localLog.warn("Exception while cancelling pending synchronous pull response", e);
}
}
pendingSynchronousPullResponses.clear();
}
}

private class SubscriberWrapper implements Runnable {

private final String subscriptionName;
Expand Down Expand Up @@ -140,8 +162,9 @@ private void asynchronousPull(String subscriptionName) throws IOException {
}
}

private void synchronousPull(String subscriptionName) {
private void synchronousPull(String subscriptionName) throws ExecutionException, InterruptedException {
while (isRunAllowed() && !isSuspendingOrSuspended()) {
ApiFuture<PullResponse> synchronousPullResponseFuture = null;
try (SubscriberStub subscriber = endpoint.getComponent().getSubscriberStub(endpoint)) {

PullRequest pullRequest = PullRequest.newBuilder()
Expand All @@ -150,7 +173,9 @@ private void synchronousPull(String subscriptionName) {
.setSubscription(subscriptionName)
.build();

PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
synchronousPullResponseFuture = subscriber.pullCallable().futureCall(pullRequest);
pendingSynchronousPullResponses.add(synchronousPullResponseFuture);
PullResponse pullResponse = synchronousPullResponseFuture.get();
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
PubsubMessage pubsubMessage = message.getMessage();
Exchange exchange = createExchange(true);
Expand All @@ -175,14 +200,20 @@ private void synchronousPull(String subscriptionName) {
getExceptionHandler().handleException(e);
}
}
} catch (CancellationException e) {
localLog.debug("PubSub synchronous pull request cancelled", e);
} catch (IOException e) {
localLog.error("Failure getting messages from PubSub", e);
} catch (ApiException e) {
if (e.isRetryable()) {
localLog.error("Retryable API exception in getting messages from PubSub", e);
localLog.error("I/O exception while getting messages from PubSub. Reconnecting.", e);
} catch (ExecutionException e) {
if (e.getCause() instanceof ApiException && ((ApiException) (e.getCause())).isRetryable()) {
localLog.error("Retryable API exception in getting messages from PubSub", e.getCause());
} else {
throw e;
}
} finally {
if (synchronousPullResponseFuture != null) {
pendingSynchronousPullResponses.remove(synchronousPullResponseFuture);
}
}
}
}
Expand Down