From 0b4461f58a8cd9a36d326590cd8280b72fedf591 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Thu, 16 Dec 2021 16:57:17 -0500 Subject: [PATCH 1/2] fix: Numerous publish path performance issues All of the uses of DirectExecutor are performing simple data-only transformations and are in the publisher hotpath --- .../internal/WrappingPublisher.java | 3 ++- .../internal/CheckedApiPreconditions.java | 13 ++++++++++++ .../internal/DefaultRoutingPolicy.java | 20 ++++++------------- .../pubsublite/internal/ExtractStatus.java | 3 ++- .../wire/ConnectedSubscriberImpl.java | 10 ++++------ .../wire/PartitionCountWatchingPublisher.java | 5 ++--- .../internal/wire/PublisherImpl.java | 3 ++- .../internal/wire/RoutingPublisher.java | 5 ++--- .../wire/SinglePartitionPublisher.java | 3 ++- 9 files changed, 35 insertions(+), 30 deletions(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java index 05eb9183a..81223e9d1 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java @@ -29,6 +29,7 @@ import com.google.cloud.pubsublite.internal.ProxyService; import com.google.cloud.pubsublite.internal.wire.SystemExecutors; import com.google.common.flogger.GoogleLogger; +import com.google.common.util.concurrent.MoreExecutors; import com.google.pubsub.v1.PubsubMessage; // A WrappingPublisher wraps the wire protocol client with a Cloud Pub/Sub api compliant @@ -74,6 +75,6 @@ public ApiFuture publish(PubsubMessage message) { return ApiFutures.transform( wirePublisher.publish(wireMessage), MessageMetadata::encode, - SystemExecutors.getFuturesExecutor()); + MoreExecutors.directExecutor()); } } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/CheckedApiPreconditions.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/CheckedApiPreconditions.java index 83a4d788c..caa48b959 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/CheckedApiPreconditions.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/CheckedApiPreconditions.java @@ -30,6 +30,12 @@ public static void checkArgument(boolean test, String description) throws Checke if (!test) throw new CheckedApiException(description, Code.INVALID_ARGUMENT); } + public static void checkArgument(boolean test, String descriptionFormat, Object... args) + throws CheckedApiException { + if (!test) + throw new CheckedApiException(String.format(descriptionFormat, args), Code.INVALID_ARGUMENT); + } + public static void checkState(boolean test) throws CheckedApiException { checkState(test, ""); } @@ -37,4 +43,11 @@ public static void checkState(boolean test) throws CheckedApiException { public static void checkState(boolean test, String description) throws CheckedApiException { if (!test) throw new CheckedApiException(description, Code.FAILED_PRECONDITION); } + + public static void checkState(boolean test, String descriptionFormat, Object... args) + throws CheckedApiException { + if (!test) + throw new CheckedApiException( + String.format(descriptionFormat, args), Code.FAILED_PRECONDITION); + } } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/DefaultRoutingPolicy.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/DefaultRoutingPolicy.java index d104c1dcc..0a1ca5d93 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/DefaultRoutingPolicy.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/DefaultRoutingPolicy.java @@ -22,35 +22,27 @@ import com.google.cloud.pubsublite.Partition; import com.google.common.hash.HashCode; import com.google.common.hash.Hashing; -import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.protobuf.ByteString; import java.math.BigInteger; -import java.util.Random; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; public class DefaultRoutingPolicy implements RoutingPolicy { private final long numPartitions; - private final CloseableMonitor monitor = new CloseableMonitor(); - @GuardedBy("monitor.monitor") - private long nextWithoutKeyPartition; + // An incrementing counter, when taken mod(numPartitions), gives the partition choice. + private final AtomicLong withoutKeyCounter; public DefaultRoutingPolicy(long numPartitions) throws ApiException { checkArgument(numPartitions > 0, "Must have a positive number of partitions."); this.numPartitions = numPartitions; - this.nextWithoutKeyPartition = ThreadLocalRandom.current().nextLong(numPartitions); - this.nextWithoutKeyPartition = new Random().longs(1, 0, numPartitions).findFirst().getAsLong(); + this.withoutKeyCounter = new AtomicLong(ThreadLocalRandom.current().nextLong(numPartitions)); } @Override public Partition routeWithoutKey() throws ApiException { - try (CloseableMonitor.Hold h = monitor.enter()) { - Partition toReturn = Partition.of(nextWithoutKeyPartition); - long next = nextWithoutKeyPartition + 1; - next = next % numPartitions; - nextWithoutKeyPartition = next; - return toReturn; - } + long index = withoutKeyCounter.incrementAndGet(); + return Partition.of(index % numPartitions); } @Override diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/ExtractStatus.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/ExtractStatus.java index 0149b273b..c8df19d5c 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/ExtractStatus.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/ExtractStatus.java @@ -21,6 +21,7 @@ import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.pubsublite.internal.wire.SystemExecutors; +import com.google.common.util.concurrent.MoreExecutors; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.function.BiConsumer; @@ -53,7 +54,7 @@ public static ApiFuture toClientFuture(ApiFuture source) { source, Throwable.class, t -> ApiFutures.immediateFailedFuture(toCanonical(t).underlying), - SystemExecutors.getFuturesExecutor()); + MoreExecutors.directExecutor()); } public static void addFailureHandler( diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ConnectedSubscriberImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ConnectedSubscriberImpl.java index ddfdf2576..c9fd5000a 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ConnectedSubscriberImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ConnectedSubscriberImpl.java @@ -100,18 +100,16 @@ protected void handleStreamResponse(SubscribeResponse response) throws CheckedAp private void onMessages(MessageResponse response) throws CheckedApiException { checkState( response.getMessagesCount() > 0, - String.format( - "Received an empty MessageResponse on stream with initial request %s.", - initialRequest)); + "Received an empty MessageResponse on stream with initial request %s.", + initialRequest); List messages = response.getMessagesList().stream() .map(SequencedMessage::fromProto) .collect(Collectors.toList()); checkState( Predicates.isOrdered(messages), - String.format( - "Received out of order messages on the stream with initial request %s.", - initialRequest)); + "Received out of order messages on the stream with initial request %s.", + initialRequest); sendToClient(messages); } } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java index 8481d089a..f08b4f71c 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java @@ -63,9 +63,8 @@ public ApiFuture publish(Message message) throws CheckedApiExce : routingPolicy.route(message.key()); checkState( publishers.containsKey(routedPartition), - String.format( - "Routed to partition %s for which there is no publisher available.", - routedPartition)); + "Routed to partition %s for which there is no publisher available.", + routedPartition); return publishers.get(routedPartition).publish(message); } catch (Throwable t) { throw toCanonical(t); diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java index c73f7eb58..bc6dba2b6 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java @@ -234,7 +234,8 @@ public ApiFuture publish(Message message) { ApiService.State currentState = state(); checkState( currentState == ApiService.State.RUNNING, - String.format("Cannot publish when Publisher state is %s.", currentState.name())); + "Cannot publish when Publisher state is %s.", + currentState.name()); return batcher.add(proto); } catch (CheckedApiException e) { onPermanentError(e); diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisher.java index 966b64b42..97908a8bf 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisher.java @@ -52,9 +52,8 @@ public ApiFuture publish(Message message) { message.key().isEmpty() ? policy.routeWithoutKey() : policy.route(message.key()); checkState( partitionPublishers.containsKey(routedPartition), - String.format( - "Routed to partition %s for which there is no publisher available.", - routedPartition)); + "Routed to partition %s for which there is no publisher available.", + routedPartition); return partitionPublishers.get(routedPartition).publish(message); } catch (Throwable t) { CheckedApiException e = toCanonical(t); diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisher.java index 9b9880c57..145e672f7 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisher.java @@ -25,6 +25,7 @@ import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.internal.ProxyService; import com.google.cloud.pubsublite.internal.Publisher; +import com.google.common.util.concurrent.MoreExecutors; import java.io.IOException; public class SinglePartitionPublisher extends ProxyService implements Publisher { @@ -43,7 +44,7 @@ public ApiFuture publish(Message message) { return ApiFutures.transform( publisher.publish(message), offset -> MessageMetadata.of(partition, offset), - SystemExecutors.getFuturesExecutor()); + MoreExecutors.directExecutor()); } @Override From 8ed3b1d1a3b884abb148530bedf2951dde498048 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Thu, 16 Dec 2021 22:00:38 +0000 Subject: [PATCH 2/2] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 9afe04061..1a18c0369 100644 --- a/README.md +++ b/README.md @@ -32,13 +32,13 @@ If you are using Maven, add this to your pom.xml file: If you are using Gradle without BOM, add this to your dependencies ```Groovy -implementation 'com.google.cloud:google-cloud-pubsublite:1.4.5' +implementation 'com.google.cloud:google-cloud-pubsublite:1.4.6' ``` If you are using SBT, add this to your dependencies ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.4.5" +libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.4.6" ``` ## Authentication