Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ If you are using Maven, add this to your pom.xml file:
If you are using Gradle without BOM, add this to your dependencies

```Groovy
implementation 'com.google.cloud:google-cloud-pubsublite:1.4.5'
implementation 'com.google.cloud:google-cloud-pubsublite:1.4.6'
```

If you are using SBT, add this to your dependencies

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.4.5"
libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.4.6"
```

## Authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;

// A WrappingPublisher wraps the wire protocol client with a Cloud Pub/Sub api compliant
Expand Down Expand Up @@ -74,6 +75,6 @@ public ApiFuture<String> publish(PubsubMessage message) {
return ApiFutures.transform(
wirePublisher.publish(wireMessage),
MessageMetadata::encode,
SystemExecutors.getFuturesExecutor());
MoreExecutors.directExecutor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,24 @@ public static void checkArgument(boolean test, String description) throws Checke
if (!test) throw new CheckedApiException(description, Code.INVALID_ARGUMENT);
}

public static void checkArgument(boolean test, String descriptionFormat, Object... args)
throws CheckedApiException {
if (!test)
throw new CheckedApiException(String.format(descriptionFormat, args), Code.INVALID_ARGUMENT);
}

public static void checkState(boolean test) throws CheckedApiException {
checkState(test, "");
}

public static void checkState(boolean test, String description) throws CheckedApiException {
if (!test) throw new CheckedApiException(description, Code.FAILED_PRECONDITION);
}

public static void checkState(boolean test, String descriptionFormat, Object... args)
throws CheckedApiException {
if (!test)
throw new CheckedApiException(
String.format(descriptionFormat, args), Code.FAILED_PRECONDITION);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,27 @@
import com.google.cloud.pubsublite.Partition;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.protobuf.ByteString;
import java.math.BigInteger;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;

public class DefaultRoutingPolicy implements RoutingPolicy {
private final long numPartitions;
private final CloseableMonitor monitor = new CloseableMonitor();

@GuardedBy("monitor.monitor")
private long nextWithoutKeyPartition;
// An incrementing counter, when taken mod(numPartitions), gives the partition choice.
private final AtomicLong withoutKeyCounter;

public DefaultRoutingPolicy(long numPartitions) throws ApiException {
checkArgument(numPartitions > 0, "Must have a positive number of partitions.");
this.numPartitions = numPartitions;
this.nextWithoutKeyPartition = ThreadLocalRandom.current().nextLong(numPartitions);
this.nextWithoutKeyPartition = new Random().longs(1, 0, numPartitions).findFirst().getAsLong();
this.withoutKeyCounter = new AtomicLong(ThreadLocalRandom.current().nextLong(numPartitions));
}

@Override
public Partition routeWithoutKey() throws ApiException {
try (CloseableMonitor.Hold h = monitor.enter()) {
Partition toReturn = Partition.of(nextWithoutKeyPartition);
long next = nextWithoutKeyPartition + 1;
next = next % numPartitions;
nextWithoutKeyPartition = next;
return toReturn;
}
long index = withoutKeyCounter.incrementAndGet();
return Partition.of(index % numPartitions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -53,7 +54,7 @@ public static <T> ApiFuture<T> toClientFuture(ApiFuture<T> source) {
source,
Throwable.class,
t -> ApiFutures.immediateFailedFuture(toCanonical(t).underlying),
SystemExecutors.getFuturesExecutor());
MoreExecutors.directExecutor());
}

public static void addFailureHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,16 @@ protected void handleStreamResponse(SubscribeResponse response) throws CheckedAp
private void onMessages(MessageResponse response) throws CheckedApiException {
checkState(
response.getMessagesCount() > 0,
String.format(
"Received an empty MessageResponse on stream with initial request %s.",
initialRequest));
"Received an empty MessageResponse on stream with initial request %s.",
initialRequest);
List<SequencedMessage> messages =
response.getMessagesList().stream()
.map(SequencedMessage::fromProto)
.collect(Collectors.toList());
checkState(
Predicates.isOrdered(messages),
String.format(
"Received out of order messages on the stream with initial request %s.",
initialRequest));
"Received out of order messages on the stream with initial request %s.",
initialRequest);
sendToClient(messages);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ public ApiFuture<MessageMetadata> publish(Message message) throws CheckedApiExce
: routingPolicy.route(message.key());
checkState(
publishers.containsKey(routedPartition),
String.format(
"Routed to partition %s for which there is no publisher available.",
routedPartition));
"Routed to partition %s for which there is no publisher available.",
routedPartition);
return publishers.get(routedPartition).publish(message);
} catch (Throwable t) {
throw toCanonical(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ public ApiFuture<Offset> publish(Message message) {
ApiService.State currentState = state();
checkState(
currentState == ApiService.State.RUNNING,
String.format("Cannot publish when Publisher state is %s.", currentState.name()));
"Cannot publish when Publisher state is %s.",
currentState.name());
return batcher.add(proto);
} catch (CheckedApiException e) {
onPermanentError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ public ApiFuture<MessageMetadata> publish(Message message) {
message.key().isEmpty() ? policy.routeWithoutKey() : policy.route(message.key());
checkState(
partitionPublishers.containsKey(routedPartition),
String.format(
"Routed to partition %s for which there is no publisher available.",
routedPartition));
"Routed to partition %s for which there is no publisher available.",
routedPartition);
return partitionPublishers.get(routedPartition).publish(message);
} catch (Throwable t) {
CheckedApiException e = toCanonical(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;

public class SinglePartitionPublisher extends ProxyService implements Publisher<MessageMetadata> {
Expand All @@ -43,7 +44,7 @@ public ApiFuture<MessageMetadata> publish(Message message) {
return ApiFutures.transform(
publisher.publish(message),
offset -> MessageMetadata.of(partition, offset),
SystemExecutors.getFuturesExecutor());
MoreExecutors.directExecutor());
}

@Override
Expand Down