Skip to content

Commit 74a8504

Browse files
feat: Add the PartitionCountWatchingPublisher (#387)
* feat: Add the PartitionCountWatchingPublisher * changes to address comments * more changes
1 parent 6890f2a commit 74a8504

File tree

5 files changed

+546
-5
lines changed

5 files changed

+546
-5
lines changed

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/DefaultRoutingPolicy.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,27 @@
2626
import com.google.protobuf.ByteString;
2727
import java.math.BigInteger;
2828
import java.util.Random;
29+
import java.util.concurrent.ThreadLocalRandom;
2930

3031
public class DefaultRoutingPolicy implements RoutingPolicy {
31-
private final int numPartitions;
32+
private final long numPartitions;
3233
private final CloseableMonitor monitor = new CloseableMonitor();
3334

3435
@GuardedBy("monitor.monitor")
35-
private int nextWithoutKeyPartition;
36+
private long nextWithoutKeyPartition;
3637

37-
public DefaultRoutingPolicy(int numPartitions) throws ApiException {
38+
public DefaultRoutingPolicy(long numPartitions) throws ApiException {
3839
checkArgument(numPartitions > 0, "Must have a positive number of partitions.");
3940
this.numPartitions = numPartitions;
40-
this.nextWithoutKeyPartition = new Random().nextInt(this.numPartitions);
41+
this.nextWithoutKeyPartition = ThreadLocalRandom.current().nextLong(numPartitions);
42+
this.nextWithoutKeyPartition = new Random().longs(1, 0, numPartitions).findFirst().getAsLong();
4143
}
4244

4345
@Override
4446
public Partition routeWithoutKey() throws ApiException {
4547
try (CloseableMonitor.Hold h = monitor.enter()) {
4648
Partition toReturn = Partition.of(nextWithoutKeyPartition);
47-
int next = nextWithoutKeyPartition + 1;
49+
long next = nextWithoutKeyPartition + 1;
4850
next = next % numPartitions;
4951
nextWithoutKeyPartition = next;
5052
return toReturn;

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/RoutingPolicy.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121

2222
// Route the user message key to a given partition.
2323
public interface RoutingPolicy {
24+
interface Factory {
25+
RoutingPolicy newPolicy(long numPartitions);
26+
}
2427
// Route a message without a key to a partition.
2528
Partition routeWithoutKey() throws CheckedApiException;
2629
// Route a message with a key to a partition.
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.internal.wire;
18+
19+
import static com.google.cloud.pubsublite.internal.CheckedApiPreconditions.checkState;
20+
import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
21+
22+
import com.google.api.core.ApiFuture;
23+
import com.google.api.core.ApiFutures;
24+
import com.google.api.core.ApiService;
25+
import com.google.cloud.pubsublite.Message;
26+
import com.google.cloud.pubsublite.Partition;
27+
import com.google.cloud.pubsublite.PublishMetadata;
28+
import com.google.cloud.pubsublite.internal.*;
29+
import com.google.common.collect.ImmutableMap;
30+
import com.google.common.flogger.GoogleLogger;
31+
import com.google.common.util.concurrent.MoreExecutors;
32+
import com.google.errorprone.annotations.concurrent.GuardedBy;
33+
import java.io.IOException;
34+
import java.util.Optional;
35+
import java.util.stream.LongStream;
36+
37+
public class PartitionCountWatchingPublisher extends ProxyService
38+
implements Publisher<PublishMetadata> {
39+
private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
40+
private final PartitionPublisherFactory publisherFactory;
41+
private final RoutingPolicy.Factory policyFactory;
42+
43+
private static class PartitionsWithRouting {
44+
public final ImmutableMap<Partition, Publisher<PublishMetadata>> publishers;
45+
private final RoutingPolicy routingPolicy;
46+
47+
private PartitionsWithRouting(
48+
ImmutableMap<Partition, Publisher<PublishMetadata>> publishers,
49+
RoutingPolicy routingPolicy) {
50+
this.publishers = publishers;
51+
this.routingPolicy = routingPolicy;
52+
}
53+
54+
public ApiFuture<PublishMetadata> publish(Message message) throws CheckedApiException {
55+
try {
56+
Partition routedPartition =
57+
message.key().isEmpty()
58+
? routingPolicy.routeWithoutKey()
59+
: routingPolicy.route(message.key());
60+
checkState(
61+
publishers.containsKey(routedPartition),
62+
String.format(
63+
"Routed to partition %s for which there is no publisher available.",
64+
routedPartition));
65+
return publishers.get(routedPartition).publish(message);
66+
} catch (Throwable t) {
67+
throw toCanonical(t);
68+
}
69+
}
70+
71+
public void flush() throws IOException {
72+
for (Publisher<PublishMetadata> publisher : publishers.values()) {
73+
publisher.flush();
74+
}
75+
}
76+
77+
public void stop() {
78+
publishers.values().forEach(ApiService::stopAsync);
79+
publishers.values().forEach(ApiService::awaitTerminated);
80+
}
81+
}
82+
83+
private final CloseableMonitor monitor = new CloseableMonitor();
84+
85+
@GuardedBy("monitor.monitor")
86+
private boolean shutdown = false;
87+
88+
@GuardedBy("monitor.monitor")
89+
private Optional<PartitionsWithRouting> partitionsWithRouting = Optional.empty();
90+
91+
public PartitionCountWatchingPublisher(PartitionCountWatchingPublisherSettings settings) {
92+
this.publisherFactory = settings.publisherFactory();
93+
this.policyFactory = settings.routingPolicyFactory();
94+
PartitionCountWatcher configWatcher =
95+
settings.configWatcherFactory().newWatcher(this::handleConfig);
96+
addServices(configWatcher);
97+
}
98+
99+
@Override
100+
public ApiFuture<PublishMetadata> publish(Message message) {
101+
Optional<PartitionsWithRouting> partitions;
102+
try (CloseableMonitor.Hold h = monitor.enter()) {
103+
partitions = partitionsWithRouting;
104+
}
105+
if (!partitions.isPresent()) {
106+
throw new IllegalStateException("Publish called before start or after shutdown");
107+
}
108+
try {
109+
return partitions.get().publish(message);
110+
} catch (CheckedApiException e) {
111+
onPermanentError(e);
112+
return ApiFutures.immediateFailedFuture(e);
113+
}
114+
}
115+
116+
@Override
117+
public void flush() throws IOException {
118+
Optional<PartitionsWithRouting> partitions;
119+
try (CloseableMonitor.Hold h = monitor.enter()) {
120+
partitions = partitionsWithRouting;
121+
}
122+
if (!partitions.isPresent()) {
123+
throw new IllegalStateException("Publish called before start or after shutdown");
124+
}
125+
partitions.get().flush();
126+
}
127+
128+
private ImmutableMap<Partition, Publisher<PublishMetadata>> getNewPartitionPublishers(
129+
LongStream newPartitions) {
130+
ImmutableMap.Builder<Partition, Publisher<PublishMetadata>> mapBuilder = ImmutableMap.builder();
131+
newPartitions.forEach(
132+
i -> {
133+
Publisher<PublishMetadata> p = publisherFactory.newPublisher(Partition.of(i));
134+
p.addListener(
135+
new Listener() {
136+
@Override
137+
public void failed(State from, Throwable failure) {
138+
onPermanentError(toCanonical(failure));
139+
}
140+
},
141+
MoreExecutors.directExecutor());
142+
mapBuilder.put(Partition.of(i), p);
143+
p.startAsync();
144+
});
145+
ImmutableMap<Partition, Publisher<PublishMetadata>> partitions = mapBuilder.build();
146+
partitions.values().forEach(ApiService::awaitRunning);
147+
return partitions;
148+
}
149+
150+
private void handleConfig(long partitionCount) {
151+
try (CloseableMonitor.Hold h = monitor.enter()) {
152+
if (shutdown) {
153+
return;
154+
}
155+
Optional<PartitionsWithRouting> current = partitionsWithRouting;
156+
long currentSize = current.map(withRouting -> withRouting.publishers.size()).orElse(0);
157+
if (partitionCount == currentSize) {
158+
return;
159+
}
160+
if (partitionCount < currentSize) {
161+
log.atWarning().log(
162+
"Received an unexpected decrease in partition count. Previous partition count {}, new count {}",
163+
currentSize,
164+
partitionCount);
165+
return;
166+
}
167+
ImmutableMap.Builder<Partition, Publisher<PublishMetadata>> mapBuilder =
168+
ImmutableMap.builder();
169+
current.ifPresent(p -> p.publishers.forEach(mapBuilder::put));
170+
getNewPartitionPublishers(LongStream.range(currentSize, partitionCount))
171+
.forEach(mapBuilder::put);
172+
ImmutableMap<Partition, Publisher<PublishMetadata>> newMap = mapBuilder.build();
173+
174+
partitionsWithRouting =
175+
Optional.of(
176+
new PartitionsWithRouting(
177+
mapBuilder.build(), policyFactory.newPolicy(partitionCount)));
178+
}
179+
}
180+
181+
@Override
182+
protected void start() {}
183+
184+
@Override
185+
protected void stop() {
186+
Optional<PartitionsWithRouting> current;
187+
try (CloseableMonitor.Hold h = monitor.enter()) {
188+
shutdown = true;
189+
current = partitionsWithRouting;
190+
partitionsWithRouting = Optional.empty();
191+
}
192+
current.ifPresent(PartitionsWithRouting::stop);
193+
}
194+
195+
@Override
196+
protected void handlePermanentError(CheckedApiException error) {
197+
try {
198+
stop();
199+
} catch (Exception e) {
200+
log.atWarning().withCause(e).log("Encountered exception while trying to handle failure");
201+
}
202+
}
203+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.pubsublite.internal.wire;
17+
18+
import com.google.auto.value.AutoValue;
19+
import com.google.cloud.pubsublite.*;
20+
import com.google.cloud.pubsublite.internal.DefaultRoutingPolicy;
21+
import com.google.cloud.pubsublite.internal.RoutingPolicy;
22+
import java.time.Duration;
23+
import java.util.Optional;
24+
25+
@AutoValue
26+
public abstract class PartitionCountWatchingPublisherSettings {
27+
// Required parameters.
28+
abstract TopicPath topic();
29+
30+
abstract PartitionPublisherFactory publisherFactory();
31+
32+
// Optional parameters
33+
abstract PartitionCountWatcher.Factory configWatcherFactory();
34+
35+
abstract RoutingPolicy.Factory routingPolicyFactory();
36+
37+
abstract Duration configPollPeriod();
38+
39+
public static Builder newBuilder() {
40+
return new AutoValue_PartitionCountWatchingPublisherSettings.Builder()
41+
.setConfigPollPeriod(Duration.ofMinutes(10));
42+
}
43+
44+
@AutoValue.Builder
45+
public abstract static class Builder {
46+
// Required parameters.
47+
public abstract Builder setTopic(TopicPath path);
48+
49+
public abstract Builder setPublisherFactory(PartitionPublisherFactory factory);
50+
51+
// Optional parameters.
52+
public abstract Builder setConfigWatcherFactory(PartitionCountWatcher.Factory factory);
53+
54+
public abstract Builder setRoutingPolicyFactory(RoutingPolicy.Factory factory);
55+
56+
public abstract Builder setConfigPollPeriod(Duration period);
57+
58+
abstract Optional<PartitionCountWatcher.Factory> configWatcherFactory();
59+
60+
abstract Optional<RoutingPolicy.Factory> routingPolicyFactory();
61+
62+
abstract Duration configPollPeriod();
63+
64+
abstract TopicPath topic();
65+
66+
abstract PartitionCountWatchingPublisherSettings autoBuild();
67+
68+
public PartitionCountWatchingPublisherSettings build() {
69+
if (!configWatcherFactory().isPresent()) {
70+
setConfigWatcherFactory(
71+
new PartitionCountWatcherImpl.Factory(
72+
topic(),
73+
AdminClient.create(
74+
AdminClientSettings.newBuilder()
75+
.setRegion(topic().location().region())
76+
.build()),
77+
configPollPeriod()));
78+
}
79+
if (!routingPolicyFactory().isPresent()) {
80+
setRoutingPolicyFactory(DefaultRoutingPolicy::new);
81+
}
82+
return autoBuild();
83+
}
84+
}
85+
}

0 commit comments

Comments
 (0)