Skip to content

Commit

Permalink
feat: Adds examples and fixes bugs in spark connector (#456)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangmichaellll committed Jan 13, 2021
1 parent 48a19d7 commit d1c2a24
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 34 deletions.
33 changes: 33 additions & 0 deletions pubsublite-spark-sql-streaming/examples/simple_read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/usr/bin/env python
# Copyright 2020 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from pyspark.sql import SparkSession
import sys

full_subscription_path = sys.argv[1]

spark = SparkSession.builder.appName('Simple PubSub Lite Read').master('yarn').getOrCreate()

spark \
.readStream \
.format('pubsublite') \
.option('pubsublite.subscription', full_subscription_path) \
.load() \
.writeStream \
.format('console') \
.outputMode('append') \
.trigger(processingTime='1 second') \
.start() \
.awaitTermination()
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
Expand Down Expand Up @@ -107,18 +108,21 @@ public StructType readSchema() {

@Override
public List<InputPartition<InternalRow>> planInputPartitions() {

return startOffset.getPartitionOffsetMap().values().stream()
.map(
v ->
new PslContinuousInputPartition(
(consumer) -> partitionSubscriberFactory.newSubscriber(v.partition(), consumer),
SparkPartitionOffset.builder()
.partition(v.partition())
.offset(v.offset())
.build(),
subscriptionPath,
flowControlSettings))
.collect(Collectors.toList());
List<InputPartition<InternalRow>> list = new ArrayList<>();
for (SparkPartitionOffset offset : startOffset.getPartitionOffsetMap().values()) {
PartitionSubscriberFactory partitionSubscriberFactory = this.partitionSubscriberFactory;
SubscriberFactory subscriberFactory =
(consumer) -> partitionSubscriberFactory.newSubscriber(offset.partition(), consumer);
list.add(
new PslContinuousInputPartition(
subscriberFactory,
SparkPartitionOffset.builder()
.partition(offset.partition())
.offset(offset.offset())
.build(),
subscriptionPath,
flowControlSettings));
}
return list;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,18 @@
public class PslMicroBatchInputPartition implements InputPartition<InternalRow> {

private final SubscriberFactory subscriberFactory;
private final SparkPartitionOffset startOffset;
private final SparkPartitionOffset endOffset;
private final SubscriptionPath subscriptionPath;
private final FlowControlSettings flowControlSettings;

public PslMicroBatchInputPartition(
SubscriptionPath subscriptionPath,
FlowControlSettings flowControlSettings,
SparkPartitionOffset startOffset,
SparkPartitionOffset endOffset,
SubscriberFactory subscriberFactory) {
this.startOffset = startOffset;
this.endOffset = endOffset;
this.subscriptionPath = subscriptionPath;
this.flowControlSettings = flowControlSettings;
Expand All @@ -55,7 +58,11 @@ public InputPartitionReader<InternalRow> createPartitionReader() {
subscriberFactory,
flowControlSettings,
SeekRequest.newBuilder()
.setCursor(Cursor.newBuilder().setOffset(endOffset.offset()).build())
.setCursor(
Cursor.newBuilder()
.setOffset(
PslSparkUtils.toPslPartitionOffset(startOffset).offset().value())
.build())
.build());
} catch (CheckedApiException e) {
throw new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
Expand Down Expand Up @@ -112,23 +113,27 @@ public StructType readSchema() {

@Override
public List<InputPartition<InternalRow>> planInputPartitions() {
assert startOffset != null;
return startOffset.getPartitionOffsetMap().values().stream()
.map(
v -> {
SparkPartitionOffset endPartitionOffset =
endOffset.getPartitionOffsetMap().get(v.partition());
if (v.equals(endPartitionOffset)) {
// There is no message to pull for this partition.
return null;
}
return new PslMicroBatchInputPartition(
subscriptionPath,
flowControlSettings,
endPartitionOffset,
(consumer) -> partitionSubscriberFactory.newSubscriber(v.partition(), consumer));
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
List<InputPartition<InternalRow>> list = new ArrayList<>();

for (SparkPartitionOffset offset :
Objects.requireNonNull(startOffset).getPartitionOffsetMap().values()) {
SparkPartitionOffset endPartitionOffset =
endOffset.getPartitionOffsetMap().get(offset.partition());
if (offset.equals(endPartitionOffset)) {
// There is no message to pull for this partition.
continue;
}
PartitionSubscriberFactory partitionSubscriberFactory = this.partitionSubscriberFactory;
SubscriberFactory subscriberFactory =
(consumer) -> partitionSubscriberFactory.newSubscriber(offset.partition(), consumer);
list.add(
new PslMicroBatchInputPartition(
subscriptionPath,
flowControlSettings,
offset,
endPartitionOffset,
subscriberFactory));
}
return list;
}
}

0 comments on commit d1c2a24

Please sign in to comment.