Skip to content

Commit

Permalink
fix: Numerous publish path performance issues (#998)
Browse files Browse the repository at this point in the history
* fix: Numerous publish path performance issues

All of the uses of DirectExecutor are performing simple data-only transformations and are in the publisher hotpath

* 🦉 Updates from OwlBot

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
dpcollins-google and gcf-owl-bot[bot] committed Dec 16, 2021
1 parent a9e0870 commit 2cd8b85
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 32 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ If you are using Maven, add this to your pom.xml file:
If you are using Gradle without BOM, add this to your dependencies

```Groovy
implementation 'com.google.cloud:google-cloud-pubsublite:1.4.5'
implementation 'com.google.cloud:google-cloud-pubsublite:1.4.6'
```

If you are using SBT, add this to your dependencies

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.4.5"
libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.4.6"
```

## Authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;

// A WrappingPublisher wraps the wire protocol client with a Cloud Pub/Sub api compliant
Expand Down Expand Up @@ -74,6 +75,6 @@ public ApiFuture<String> publish(PubsubMessage message) {
return ApiFutures.transform(
wirePublisher.publish(wireMessage),
MessageMetadata::encode,
SystemExecutors.getFuturesExecutor());
MoreExecutors.directExecutor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,24 @@ public static void checkArgument(boolean test, String description) throws Checke
if (!test) throw new CheckedApiException(description, Code.INVALID_ARGUMENT);
}

public static void checkArgument(boolean test, String descriptionFormat, Object... args)
throws CheckedApiException {
if (!test)
throw new CheckedApiException(String.format(descriptionFormat, args), Code.INVALID_ARGUMENT);
}

public static void checkState(boolean test) throws CheckedApiException {
checkState(test, "");
}

public static void checkState(boolean test, String description) throws CheckedApiException {
if (!test) throw new CheckedApiException(description, Code.FAILED_PRECONDITION);
}

public static void checkState(boolean test, String descriptionFormat, Object... args)
throws CheckedApiException {
if (!test)
throw new CheckedApiException(
String.format(descriptionFormat, args), Code.FAILED_PRECONDITION);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,27 @@
import com.google.cloud.pubsublite.Partition;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.protobuf.ByteString;
import java.math.BigInteger;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;

public class DefaultRoutingPolicy implements RoutingPolicy {
private final long numPartitions;
private final CloseableMonitor monitor = new CloseableMonitor();

@GuardedBy("monitor.monitor")
private long nextWithoutKeyPartition;
// An incrementing counter, when taken mod(numPartitions), gives the partition choice.
private final AtomicLong withoutKeyCounter;

public DefaultRoutingPolicy(long numPartitions) throws ApiException {
checkArgument(numPartitions > 0, "Must have a positive number of partitions.");
this.numPartitions = numPartitions;
this.nextWithoutKeyPartition = ThreadLocalRandom.current().nextLong(numPartitions);
this.nextWithoutKeyPartition = new Random().longs(1, 0, numPartitions).findFirst().getAsLong();
this.withoutKeyCounter = new AtomicLong(ThreadLocalRandom.current().nextLong(numPartitions));
}

@Override
public Partition routeWithoutKey() throws ApiException {
try (CloseableMonitor.Hold h = monitor.enter()) {
Partition toReturn = Partition.of(nextWithoutKeyPartition);
long next = nextWithoutKeyPartition + 1;
next = next % numPartitions;
nextWithoutKeyPartition = next;
return toReturn;
}
long index = withoutKeyCounter.incrementAndGet();
return Partition.of(index % numPartitions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -53,7 +54,7 @@ public static <T> ApiFuture<T> toClientFuture(ApiFuture<T> source) {
source,
Throwable.class,
t -> ApiFutures.immediateFailedFuture(toCanonical(t).underlying),
SystemExecutors.getFuturesExecutor());
MoreExecutors.directExecutor());
}

public static void addFailureHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,16 @@ protected void handleStreamResponse(SubscribeResponse response) throws CheckedAp
private void onMessages(MessageResponse response) throws CheckedApiException {
checkState(
response.getMessagesCount() > 0,
String.format(
"Received an empty MessageResponse on stream with initial request %s.",
initialRequest));
"Received an empty MessageResponse on stream with initial request %s.",
initialRequest);
List<SequencedMessage> messages =
response.getMessagesList().stream()
.map(SequencedMessage::fromProto)
.collect(Collectors.toList());
checkState(
Predicates.isOrdered(messages),
String.format(
"Received out of order messages on the stream with initial request %s.",
initialRequest));
"Received out of order messages on the stream with initial request %s.",
initialRequest);
sendToClient(messages);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ public ApiFuture<MessageMetadata> publish(Message message) throws CheckedApiExce
: routingPolicy.route(message.key());
checkState(
publishers.containsKey(routedPartition),
String.format(
"Routed to partition %s for which there is no publisher available.",
routedPartition));
"Routed to partition %s for which there is no publisher available.",
routedPartition);
return publishers.get(routedPartition).publish(message);
} catch (Throwable t) {
throw toCanonical(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ public ApiFuture<Offset> publish(Message message) {
ApiService.State currentState = state();
checkState(
currentState == ApiService.State.RUNNING,
String.format("Cannot publish when Publisher state is %s.", currentState.name()));
"Cannot publish when Publisher state is %s.",
currentState.name());
return batcher.add(proto);
} catch (CheckedApiException e) {
onPermanentError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ public ApiFuture<MessageMetadata> publish(Message message) {
message.key().isEmpty() ? policy.routeWithoutKey() : policy.route(message.key());
checkState(
partitionPublishers.containsKey(routedPartition),
String.format(
"Routed to partition %s for which there is no publisher available.",
routedPartition));
"Routed to partition %s for which there is no publisher available.",
routedPartition);
return partitionPublishers.get(routedPartition).publish(message);
} catch (Throwable t) {
CheckedApiException e = toCanonical(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;

public class SinglePartitionPublisher extends ProxyService implements Publisher<MessageMetadata> {
Expand All @@ -43,7 +44,7 @@ public ApiFuture<MessageMetadata> publish(Message message) {
return ApiFutures.transform(
publisher.publish(message),
offset -> MessageMetadata.of(partition, offset),
SystemExecutors.getFuturesExecutor());
MoreExecutors.directExecutor());
}

@Override
Expand Down

0 comments on commit 2cd8b85

Please sign in to comment.