Skip to content

Commit 002191f

Browse files
authored
deps: update to google-cloud-pubsublite to v0.15.0 (#192)
Manual update to handle the interface change of BlockingPullSubscriberImpl - the SubscriberBuilder now accepts an initial location when connecting a new Subscribe stream and an additional seek request is not necessary.
1 parent 7d1ac2f commit 002191f

8 files changed

Lines changed: 40 additions & 43 deletions

clirr-ignored-differences.xml

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
33
<differences>
4+
<difference>
5+
<differenceType>7004</differenceType>
6+
<className>com/google/cloud/pubsublite/spark/internal/*</className>
7+
<method>*</method>
8+
</difference>
49
<difference>
510
<differenceType>7004</differenceType>
611
<className>com/google/cloud/pubsublite/spark/*Reader</className>
@@ -13,16 +18,18 @@
1318
<to>*</to>
1419
</difference>
1520
<difference>
16-
<differenceType>8001</differenceType>
17-
<className>com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader</className>
21+
<differenceType>7005</differenceType>
22+
<className>com/google/cloud/pubsublite/spark/*InputPartition</className>
23+
<method>*</method>
24+
<to>*</to>
1825
</difference>
1926
<difference>
2027
<differenceType>8001</differenceType>
21-
<className>com/google/cloud/pubsublite/spark/MultiPartitionCommitter*</className>
28+
<className>com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader</className>
2229
</difference>
2330
<difference>
2431
<differenceType>8001</differenceType>
25-
<className>com/google/cloud/pubsublite/spark/PartitionSubscriberFactory</className>
32+
<className>com/google/cloud/pubsublite/spark/MultiPartitionCommitter*</className>
2633
</difference>
2734
<difference>
2835
<differenceType>8001</differenceType>

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
<dependency>
4444
<groupId>com.google.cloud</groupId>
4545
<artifactId>google-cloud-pubsublite</artifactId>
46-
<version>0.14.2</version>
46+
<version>0.15.0</version>
4747
</dependency>
4848
<dependency>
4949
<groupId>com.google.api.grpc</groupId>

src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartition.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@
2222
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
2323
import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl;
2424
import com.google.cloud.pubsublite.internal.CheckedApiException;
25-
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
26-
import com.google.cloud.pubsublite.proto.Cursor;
27-
import com.google.cloud.pubsublite.proto.SeekRequest;
25+
import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory;
2826
import java.io.Serializable;
2927
import org.apache.spark.sql.catalyst.InternalRow;
3028
import org.apache.spark.sql.sources.v2.reader.ContinuousInputPartition;
@@ -34,13 +32,13 @@
3432
public class PslContinuousInputPartition
3533
implements ContinuousInputPartition<InternalRow>, Serializable {
3634

37-
private final SubscriberFactory subscriberFactory;
35+
private final PartitionSubscriberFactory subscriberFactory;
3836
private final SparkPartitionOffset startOffset;
3937
private final SubscriptionPath subscriptionPath;
4038
private final FlowControlSettings flowControlSettings;
4139

4240
public PslContinuousInputPartition(
43-
SubscriberFactory subscriberFactory,
41+
PartitionSubscriberFactory subscriberFactory,
4442
SparkPartitionOffset startOffset,
4543
SubscriptionPath subscriptionPath,
4644
FlowControlSettings flowControlSettings) {
@@ -63,12 +61,10 @@ public InputPartitionReader<InternalRow> createContinuousReader(PartitionOffset
6361
try {
6462
subscriber =
6563
new BlockingPullSubscriberImpl(
66-
subscriberFactory,
67-
flowControlSettings,
68-
SeekRequest.newBuilder()
69-
.setCursor(
70-
Cursor.newBuilder().setOffset(pslPartitionOffset.offset().value()).build())
71-
.build());
64+
(consumer) ->
65+
subscriberFactory.newSubscriber(
66+
pslPartitionOffset.partition(), pslPartitionOffset.offset(), consumer),
67+
flowControlSettings);
7268
} catch (CheckedApiException e) {
7369
throw new IllegalStateException(
7470
"Unable to create PSL subscriber for " + startOffset.toString(), e);

src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.google.cloud.pubsublite.SubscriptionPath;
2222
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
2323
import com.google.cloud.pubsublite.internal.CursorClient;
24-
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
2524
import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter;
2625
import com.google.cloud.pubsublite.spark.internal.PartitionCountReader;
2726
import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory;
@@ -116,12 +115,9 @@ public StructType readSchema() {
116115
public List<InputPartition<InternalRow>> planInputPartitions() {
117116
List<InputPartition<InternalRow>> list = new ArrayList<>();
118117
for (SparkPartitionOffset offset : startOffset.getPartitionOffsetMap().values()) {
119-
PartitionSubscriberFactory partitionSubscriberFactory = this.partitionSubscriberFactory;
120-
SubscriberFactory subscriberFactory =
121-
(consumer) -> partitionSubscriberFactory.newSubscriber(offset.partition(), consumer);
122118
list.add(
123119
new PslContinuousInputPartition(
124-
subscriberFactory,
120+
partitionSubscriberFactory,
125121
SparkPartitionOffset.builder()
126122
.partition(offset.partition())
127123
.offset(offset.offset())

src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartition.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,14 @@
2121
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
2222
import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl;
2323
import com.google.cloud.pubsublite.internal.CheckedApiException;
24-
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
25-
import com.google.cloud.pubsublite.proto.Cursor;
26-
import com.google.cloud.pubsublite.proto.SeekRequest;
24+
import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory;
2725
import org.apache.spark.sql.catalyst.InternalRow;
2826
import org.apache.spark.sql.sources.v2.reader.InputPartition;
2927
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
3028

3129
public class PslMicroBatchInputPartition implements InputPartition<InternalRow> {
3230

33-
private final SubscriberFactory subscriberFactory;
31+
private final PartitionSubscriberFactory subscriberFactory;
3432
private final SparkPartitionOffset startOffset;
3533
private final SparkPartitionOffset endOffset;
3634
private final SubscriptionPath subscriptionPath;
@@ -41,7 +39,7 @@ public PslMicroBatchInputPartition(
4139
FlowControlSettings flowControlSettings,
4240
SparkPartitionOffset startOffset,
4341
SparkPartitionOffset endOffset,
44-
SubscriberFactory subscriberFactory) {
42+
PartitionSubscriberFactory subscriberFactory) {
4543
this.startOffset = startOffset;
4644
this.endOffset = endOffset;
4745
this.subscriptionPath = subscriptionPath;
@@ -53,17 +51,13 @@ public PslMicroBatchInputPartition(
5351
public InputPartitionReader<InternalRow> createPartitionReader() {
5452
BlockingPullSubscriber subscriber;
5553
try {
54+
PslPartitionOffset pslStartOffset = PslSparkUtils.toPslPartitionOffset(startOffset);
5655
subscriber =
5756
new BlockingPullSubscriberImpl(
58-
subscriberFactory,
59-
flowControlSettings,
60-
SeekRequest.newBuilder()
61-
.setCursor(
62-
Cursor.newBuilder()
63-
.setOffset(
64-
PslSparkUtils.toPslPartitionOffset(startOffset).offset().value())
65-
.build())
66-
.build());
57+
(consumer) ->
58+
subscriberFactory.newSubscriber(
59+
pslStartOffset.partition(), pslStartOffset.offset(), consumer),
60+
flowControlSettings);
6761
} catch (CheckedApiException e) {
6862
throw new IllegalStateException(
6963
"Unable to create PSL subscriber for " + endOffset.partition(), e);

src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.google.cloud.pubsublite.SubscriptionPath;
2424
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
2525
import com.google.cloud.pubsublite.internal.CursorClient;
26-
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
2726
import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter;
2827
import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory;
2928
import com.google.cloud.pubsublite.spark.internal.PerTopicHeadOffsetReader;
@@ -145,17 +144,13 @@ public List<InputPartition<InternalRow>> planInputPartitions() {
145144
// There is no message to pull for this partition.
146145
continue;
147146
}
148-
PartitionSubscriberFactory partitionSubscriberFactory = this.partitionSubscriberFactory;
149-
SubscriberFactory subscriberFactory =
150-
(consumer) ->
151-
partitionSubscriberFactory.newSubscriber(endPartitionOffset.partition(), consumer);
152147
list.add(
153148
new PslMicroBatchInputPartition(
154149
subscriptionPath,
155150
flowControlSettings,
156151
startPartitionOffset,
157152
endPartitionOffset,
158-
subscriberFactory));
153+
partitionSubscriberFactory));
159154
}
160155
return list;
161156
}

src/main/java/com/google/cloud/pubsublite/spark/PslReadDataSourceOptions.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
3434
import com.google.cloud.pubsublite.internal.wire.ServiceClients;
3535
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
36+
import com.google.cloud.pubsublite.proto.Cursor;
37+
import com.google.cloud.pubsublite.proto.SeekRequest;
3638
import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter;
3739
import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitterImpl;
3840
import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory;
@@ -135,7 +137,7 @@ MultiPartitionCommitter newMultiPartitionCommitter(long topicPartitionCount) {
135137
}
136138

137139
PartitionSubscriberFactory getSubscriberFactory() {
138-
return (partition, consumer) -> {
140+
return (partition, offset, consumer) -> {
139141
PubsubContext context = PubsubContext.of(Constants.FRAMEWORK);
140142
SubscriberServiceSettings.Builder settingsBuilder =
141143
SubscriberServiceSettings.newBuilder()
@@ -151,6 +153,10 @@ PartitionSubscriberFactory getSubscriberFactory() {
151153
.setPartition(partition)
152154
.setServiceClient(serviceClient)
153155
.setMessageConsumer(consumer)
156+
.setInitialLocation(
157+
SeekRequest.newBuilder()
158+
.setCursor(Cursor.newBuilder().setOffset(offset.value()))
159+
.build())
154160
.build();
155161
} catch (IOException e) {
156162
throw new IllegalStateException("Failed to create subscriber service.", e);

src/main/java/com/google/cloud/pubsublite/spark/internal/PartitionSubscriberFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.google.cloud.pubsublite.spark.internal;
1818

1919
import com.google.api.gax.rpc.ApiException;
20+
import com.google.cloud.pubsublite.Offset;
2021
import com.google.cloud.pubsublite.Partition;
2122
import com.google.cloud.pubsublite.SequencedMessage;
2223
import com.google.cloud.pubsublite.internal.wire.Subscriber;
@@ -26,6 +27,8 @@
2627

2728
public interface PartitionSubscriberFactory extends Serializable {
2829
Subscriber newSubscriber(
29-
Partition partition, Consumer<ImmutableList<SequencedMessage>> message_consumer)
30+
Partition partition,
31+
Offset offset,
32+
Consumer<ImmutableList<SequencedMessage>> message_consumer)
3033
throws ApiException;
3134
}

0 commit comments

Comments
 (0)