Skip to content

Commit 20f3366

Browse files
feat: Supports topic partition increase. (#115)
1 parent b7237f8 commit 20f3366

13 files changed

Lines changed: 488 additions & 120 deletions

clirr-ignored-differences.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
3+
<differences>
4+
<difference>
5+
<differenceType>7004</differenceType>
6+
<className>com/google/cloud/pubsublite/spark/*Reader</className>
7+
<method>*</method>
8+
</difference>
9+
<difference>
10+
<differenceType>7005</differenceType>
11+
<className>com/google/cloud/pubsublite/spark/*Reader</className>
12+
<method>*</method>
13+
<to>*</to>
14+
</difference>
15+
</differences>
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.spark;
18+
19+
import com.google.cloud.pubsublite.AdminClient;
20+
import com.google.cloud.pubsublite.PartitionLookupUtils;
21+
import com.google.cloud.pubsublite.TopicPath;
22+
import com.google.common.base.Supplier;
23+
import com.google.common.base.Suppliers;
24+
import java.util.concurrent.TimeUnit;
25+
import javax.annotation.concurrent.ThreadSafe;
26+
27+
@ThreadSafe
28+
public class CachedPartitionCountReader implements PartitionCountReader {
29+
private final AdminClient adminClient;
30+
private final Supplier<Integer> supplier;
31+
32+
public CachedPartitionCountReader(AdminClient adminClient, TopicPath topicPath) {
33+
this.adminClient = adminClient;
34+
this.supplier =
35+
Suppliers.memoizeWithExpiration(
36+
() -> PartitionLookupUtils.numPartitions(topicPath, adminClient), 1, TimeUnit.MINUTES);
37+
}
38+
39+
@Override
40+
public void close() {
41+
adminClient.close();
42+
}
43+
44+
public int getPartitionCount() {
45+
return supplier.get();
46+
}
47+
}

src/main/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
import com.google.cloud.pubsublite.internal.TopicStatsClient;
2828
import com.google.cloud.pubsublite.proto.Cursor;
2929
import com.google.common.annotations.VisibleForTesting;
30+
import com.google.common.flogger.GoogleLogger;
3031
import com.google.common.util.concurrent.MoreExecutors;
32+
import java.io.Closeable;
3133
import java.util.HashSet;
3234
import java.util.Map;
3335
import java.util.Set;
@@ -40,18 +42,22 @@
4042
* offsets for the topic at most once per minute.
4143
*/
4244
public class LimitingHeadOffsetReader implements PerTopicHeadOffsetReader {
45+
private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
4346

4447
private final TopicStatsClient topicStatsClient;
4548
private final TopicPath topic;
46-
private final long topicPartitionCount;
49+
private final PartitionCountReader partitionCountReader;
4750
private final AsyncLoadingCache<Partition, Offset> cachedHeadOffsets;
4851

4952
@VisibleForTesting
5053
public LimitingHeadOffsetReader(
51-
TopicStatsClient topicStatsClient, TopicPath topic, long topicPartitionCount, Ticker ticker) {
54+
TopicStatsClient topicStatsClient,
55+
TopicPath topic,
56+
PartitionCountReader partitionCountReader,
57+
Ticker ticker) {
5258
this.topicStatsClient = topicStatsClient;
5359
this.topic = topic;
54-
this.topicPartitionCount = topicPartitionCount;
60+
this.partitionCountReader = partitionCountReader;
5561
this.cachedHeadOffsets =
5662
Caffeine.newBuilder()
5763
.ticker(ticker)
@@ -82,7 +88,7 @@ public void onSuccess(Cursor c) {
8288
@Override
8389
public PslSourceOffset getHeadOffset() {
8490
Set<Partition> keySet = new HashSet<>();
85-
for (int i = 0; i < topicPartitionCount; i++) {
91+
for (int i = 0; i < partitionCountReader.getPartitionCount(); i++) {
8692
keySet.add(Partition.of(i));
8793
}
8894
CompletableFuture<Map<Partition, Offset>> future = cachedHeadOffsets.getAll(keySet);
@@ -95,6 +101,10 @@ public PslSourceOffset getHeadOffset() {
95101

96102
@Override
97103
public void close() {
98-
topicStatsClient.close();
104+
try (AutoCloseable a = topicStatsClient;
105+
Closeable b = partitionCountReader) {
106+
} catch (Exception e) {
107+
log.atWarning().withCause(e).log("Unable to close LimitingHeadOffsetReader.");
108+
}
99109
}
100110
}

src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImpl.java

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,30 +25,95 @@
2525
import com.google.common.flogger.GoogleLogger;
2626
import com.google.common.util.concurrent.MoreExecutors;
2727
import java.util.HashMap;
28+
import java.util.HashSet;
2829
import java.util.Map;
30+
import java.util.Set;
31+
import java.util.concurrent.ScheduledExecutorService;
32+
import java.util.concurrent.ScheduledThreadPoolExecutor;
33+
import java.util.concurrent.TimeUnit;
34+
import javax.annotation.concurrent.GuardedBy;
2935

36+
/**
37+
* A {@link MultiPartitionCommitter} that lazily adjusts for partition changes when {@link
38+
* MultiPartitionCommitter#commit(PslSourceOffset)} is called.
39+
*/
3040
public class MultiPartitionCommitterImpl implements MultiPartitionCommitter {
3141
private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
3242

43+
private final CommitterFactory committerFactory;
44+
45+
@GuardedBy("this")
3346
private final Map<Partition, Committer> committerMap = new HashMap<>();
3447

48+
@GuardedBy("this")
49+
private final Set<Partition> partitionsCleanUp = new HashSet<>();
50+
51+
public MultiPartitionCommitterImpl(long topicPartitionCount, CommitterFactory committerFactory) {
52+
this(
53+
topicPartitionCount,
54+
committerFactory,
55+
MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1)));
56+
}
57+
3558
@VisibleForTesting
36-
MultiPartitionCommitterImpl(long topicPartitionCount, CommitterFactory committerFactory) {
59+
MultiPartitionCommitterImpl(
60+
long topicPartitionCount,
61+
CommitterFactory committerFactory,
62+
ScheduledExecutorService executorService) {
63+
this.committerFactory = committerFactory;
3764
for (int i = 0; i < topicPartitionCount; i++) {
3865
Partition p = Partition.of(i);
39-
Committer committer = committerFactory.newCommitter(p);
40-
committer.startAsync().awaitRunning();
41-
committerMap.put(p, committer);
66+
committerMap.put(p, createCommitter(p));
4267
}
68+
executorService.scheduleWithFixedDelay(this::cleanUpCommitterMap, 10, 10, TimeUnit.MINUTES);
4369
}
4470

4571
@Override
4672
public synchronized void close() {
4773
committerMap.values().forEach(c -> c.stopAsync().awaitTerminated());
4874
}
4975

76+
/** Adjust committerMap based on the partitions that needs to be committed. */
77+
private synchronized void updateCommitterMap(PslSourceOffset offset) {
78+
int currentPartitions = committerMap.size();
79+
int newPartitions = offset.partitionOffsetMap().size();
80+
81+
if (currentPartitions == newPartitions) {
82+
return;
83+
}
84+
if (currentPartitions < newPartitions) {
85+
for (int i = currentPartitions; i < newPartitions; i++) {
86+
Partition p = Partition.of(i);
87+
if (!committerMap.containsKey(p)) {
88+
committerMap.put(p, createCommitter(p));
89+
}
90+
partitionsCleanUp.remove(p);
91+
}
92+
return;
93+
}
94+
partitionsCleanUp.clear();
95+
for (int i = newPartitions; i < currentPartitions; i++) {
96+
partitionsCleanUp.add(Partition.of(i));
97+
}
98+
}
99+
100+
private synchronized Committer createCommitter(Partition p) {
101+
Committer committer = committerFactory.newCommitter(p);
102+
committer.startAsync().awaitRunning();
103+
return committer;
104+
}
105+
106+
private synchronized void cleanUpCommitterMap() {
107+
for (Partition p : partitionsCleanUp) {
108+
committerMap.get(p).stopAsync();
109+
committerMap.remove(p);
110+
}
111+
partitionsCleanUp.clear();
112+
}
113+
50114
@Override
51115
public synchronized void commit(PslSourceOffset offset) {
116+
updateCommitterMap(offset);
52117
offset
53118
.partitionOffsetMap()
54119
.forEach(
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.spark;
18+
19+
import java.io.Closeable;
20+
21+
public interface PartitionCountReader extends Closeable {
22+
int getPartitionCount();
23+
24+
@Override
25+
void close();
26+
}

src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@ public class PslContinuousReader implements ContinuousReader {
4141
private final PartitionSubscriberFactory partitionSubscriberFactory;
4242
private final SubscriptionPath subscriptionPath;
4343
private final FlowControlSettings flowControlSettings;
44-
private final long topicPartitionCount;
4544
private SparkSourceOffset startOffset;
45+
private final PartitionCountReader partitionCountReader;
46+
private final long topicPartitionCount;
4647

4748
@VisibleForTesting
4849
public PslContinuousReader(
@@ -51,13 +52,14 @@ public PslContinuousReader(
5152
PartitionSubscriberFactory partitionSubscriberFactory,
5253
SubscriptionPath subscriptionPath,
5354
FlowControlSettings flowControlSettings,
54-
long topicPartitionCount) {
55+
PartitionCountReader partitionCountReader) {
5556
this.cursorClient = cursorClient;
5657
this.committer = committer;
5758
this.partitionSubscriberFactory = partitionSubscriberFactory;
5859
this.subscriptionPath = subscriptionPath;
5960
this.flowControlSettings = flowControlSettings;
60-
this.topicPartitionCount = topicPartitionCount;
61+
this.partitionCountReader = partitionCountReader;
62+
this.topicPartitionCount = partitionCountReader.getPartitionCount();
6163
}
6264

6365
@Override
@@ -126,4 +128,9 @@ public List<InputPartition<InternalRow>> planInputPartitions() {
126128
}
127129
return list;
128130
}
131+
132+
@Override
133+
public boolean needsReconfiguration() {
134+
return partitionCountReader.getPartitionCount() != topicPartitionCount;
135+
}
129136
}

src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.github.benmanes.caffeine.cache.Ticker;
2222
import com.google.auto.service.AutoService;
2323
import com.google.cloud.pubsublite.AdminClient;
24-
import com.google.cloud.pubsublite.PartitionLookupUtils;
2524
import com.google.cloud.pubsublite.SubscriptionPath;
2625
import com.google.cloud.pubsublite.TopicPath;
2726
import java.util.Objects;
@@ -55,17 +54,21 @@ public ContinuousReader createContinuousReader(
5554
PslDataSourceOptions pslDataSourceOptions =
5655
PslDataSourceOptions.fromSparkDataSourceOptions(options);
5756
SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
58-
long topicPartitionCount;
57+
TopicPath topicPath;
5958
try (AdminClient adminClient = pslDataSourceOptions.newAdminClient()) {
60-
topicPartitionCount = PartitionLookupUtils.numPartitions(subscriptionPath, adminClient);
59+
topicPath = TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic());
60+
} catch (Throwable t) {
61+
throw toCanonical(t).underlying;
6162
}
63+
PartitionCountReader partitionCountReader =
64+
new CachedPartitionCountReader(pslDataSourceOptions.newAdminClient(), topicPath);
6265
return new PslContinuousReader(
6366
pslDataSourceOptions.newCursorClient(),
64-
pslDataSourceOptions.newMultiPartitionCommitter(topicPartitionCount),
67+
pslDataSourceOptions.newMultiPartitionCommitter(partitionCountReader.getPartitionCount()),
6568
pslDataSourceOptions.getSubscriberFactory(),
6669
subscriptionPath,
6770
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
68-
topicPartitionCount);
71+
partitionCountReader);
6972
}
7073

7174
@Override
@@ -80,25 +83,24 @@ public MicroBatchReader createMicroBatchReader(
8083
PslDataSourceOptions.fromSparkDataSourceOptions(options);
8184
SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
8285
TopicPath topicPath;
83-
long topicPartitionCount;
8486
try (AdminClient adminClient = pslDataSourceOptions.newAdminClient()) {
8587
topicPath = TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic());
86-
topicPartitionCount = PartitionLookupUtils.numPartitions(topicPath, adminClient);
8788
} catch (Throwable t) {
8889
throw toCanonical(t).underlying;
8990
}
91+
PartitionCountReader partitionCountReader =
92+
new CachedPartitionCountReader(pslDataSourceOptions.newAdminClient(), topicPath);
9093
return new PslMicroBatchReader(
9194
pslDataSourceOptions.newCursorClient(),
92-
pslDataSourceOptions.newMultiPartitionCommitter(topicPartitionCount),
95+
pslDataSourceOptions.newMultiPartitionCommitter(partitionCountReader.getPartitionCount()),
9396
pslDataSourceOptions.getSubscriberFactory(),
9497
new LimitingHeadOffsetReader(
9598
pslDataSourceOptions.newTopicStatsClient(),
9699
topicPath,
97-
topicPartitionCount,
100+
partitionCountReader,
98101
Ticker.systemTicker()),
99102
subscriptionPath,
100103
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
101-
pslDataSourceOptions.maxMessagesPerBatch(),
102-
topicPartitionCount);
104+
pslDataSourceOptions.maxMessagesPerBatch());
103105
}
104106
}

0 commit comments

Comments
 (0)