Skip to content

Commit

Permalink
feat: Spark micro batch processing (#426)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangmichaellll committed Dec 16, 2020
1 parent ff05b1a commit 86aecc9
Show file tree
Hide file tree
Showing 11 changed files with 669 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.util.function.Consumer;

public interface SubscriberFactory {
public interface SubscriberFactory extends Serializable {
Subscriber newSubscriber(Consumer<ImmutableList<SequencedMessage>> message_consumer)
throws ApiException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import java.io.Closeable;

public interface HeadOffsetReader extends Closeable {

// Gets the head offsets for all partitions in a topic. Blocks.
PslSourceOffset getHeadOffset(TopicPath topic) throws CheckedApiException;

@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,13 @@

package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.common.annotations.VisibleForTesting;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
Expand Down Expand Up @@ -84,23 +80,8 @@ public void setStartOffset(Optional<Offset> start) {
startOffset = (SparkSourceOffset) start.get();
return;
}
try {
Map<Partition, com.google.cloud.pubsublite.Offset> pslSourceOffsetMap = new HashMap<>();
for (int i = 0; i < topicPartitionCount; i++) {
pslSourceOffsetMap.put(Partition.of(i), com.google.cloud.pubsublite.Offset.of(0));
}
cursorClient
.listPartitionCursors(subscriptionPath)
.get()
.entrySet()
.forEach((e) -> pslSourceOffsetMap.replace(e.getKey(), e.getValue()));
startOffset =
PslSparkUtils.toSparkSourceOffset(
PslSourceOffset.builder().partitionOffsetMap(pslSourceOffsetMap).build());
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(
"Failed to get information from PSL and construct startOffset", e);
}
startOffset =
PslSparkUtils.getSparkStartOffset(cursorClient, subscriptionPath, topicPartitionCount);
}

@Override
Expand All @@ -123,13 +104,13 @@ public StructType readSchema() {

@Override
public List<InputPartition<InternalRow>> planInputPartitions() {
return startOffset.getPartitionOffsetMap().entrySet().stream()
return startOffset.getPartitionOffsetMap().values().stream()
.map(
e ->
v ->
new PslContinuousInputPartition(
SparkPartitionOffset.builder()
.partition(e.getKey())
.offset(e.getValue().offset())
.partition(v.partition())
.offset(v.offset())
.build(),
subscriptionPath,
flowControlSettings))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,27 @@
package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.PartitionLookupUtils;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.wire.CommitterBuilder;
import com.google.cloud.pubsublite.proto.Subscription;
import com.google.common.collect.ImmutableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.types.StructType;

public class PslDataSource implements DataSourceV2, ContinuousReadSupport, DataSourceRegister {
public class PslDataSource
implements DataSourceV2, ContinuousReadSupport, MicroBatchReadSupport, DataSourceRegister {

@Override
public String shortName() {
Expand All @@ -52,16 +57,45 @@ public ContinuousReader createContinuousReader(
CursorClient cursorClient = pslDataSourceOptions.newCursorClient();
AdminClient adminClient = pslDataSourceOptions.newAdminClient();
SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
long topicPartitionCount;
long topicPartitionCount = PartitionLookupUtils.numPartitions(subscriptionPath, adminClient);
MultiPartitionCommitter committer =
new MultiPartitionCommitterImpl(
topicPartitionCount,
(partition) ->
CommitterBuilder.newBuilder()
.setSubscriptionPath(subscriptionPath)
.setPartition(partition)
.setServiceClient(pslDataSourceOptions.newCursorServiceClient())
.build());
return new PslContinuousReader(
cursorClient,
committer,
subscriptionPath,
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
topicPartitionCount);
}

@Override
public MicroBatchReader createMicroBatchReader(
Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
if (schema.isPresent()) {
throw new IllegalArgumentException(
"PubSub Lite uses fixed schema and custom schema is not allowed");
}

PslDataSourceOptions pslDataSourceOptions =
PslDataSourceOptions.fromSparkDataSourceOptions(options);
CursorClient cursorClient = pslDataSourceOptions.newCursorClient();
AdminClient adminClient = pslDataSourceOptions.newAdminClient();
SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
TopicPath topicPath;
try {
Subscription sub = adminClient.getSubscription(subscriptionPath).get();
topicPartitionCount =
adminClient.getTopicPartitionCount(TopicPath.parse(sub.getTopic())).get();
} catch (InterruptedException | ExecutionException e) {
topicPath = TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic());
} catch (Throwable t) {
throw new IllegalStateException(
"Failed to get information of subscription " + pslDataSourceOptions.subscriptionPath(),
e);
"Unable to get topic for subscription " + subscriptionPath, t);
}
long topicPartitionCount = PartitionLookupUtils.numPartitions(topicPath, adminClient);
MultiPartitionCommitter committer =
new MultiPartitionCommitterImpl(
topicPartitionCount,
Expand All @@ -71,11 +105,37 @@ public ContinuousReader createContinuousReader(
.setPartition(partition)
.setServiceClient(pslDataSourceOptions.newCursorServiceClient())
.build());
return new PslContinuousReader(

return new PslMicroBatchReader(
cursorClient,
committer,
subscriptionPath,
PslSparkUtils.toSparkSourceOffset(getHeadOffset(topicPath)),
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
topicPartitionCount);
}

private static PslSourceOffset getHeadOffset(TopicPath topicPath) {
// TODO(jiangmichael): Replace it with real implementation.
HeadOffsetReader headOffsetReader =
new HeadOffsetReader() {
@Override
public PslSourceOffset getHeadOffset(TopicPath topic) {
return PslSourceOffset.builder()
.partitionOffsetMap(
ImmutableMap.of(
Partition.of(0), com.google.cloud.pubsublite.Offset.of(50),
Partition.of(1), com.google.cloud.pubsublite.Offset.of(50)))
.build();
}

@Override
public void close() {}
};
try {
return headOffsetReader.getHeadOffset(topicPath);
} catch (CheckedApiException e) {
throw new IllegalStateException("Unable to get head offset for topic " + topicPath, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SeekRequest;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;

public class PslMicroBatchInputPartition implements InputPartition<InternalRow> {

private final SubscriberFactory subscriberFactory;
private final SparkPartitionOffset endOffset;
private final SubscriptionPath subscriptionPath;
private final FlowControlSettings flowControlSettings;

public PslMicroBatchInputPartition(
SubscriptionPath subscriptionPath,
FlowControlSettings flowControlSettings,
SparkPartitionOffset endOffset,
SubscriberFactory subscriberFactory) {
this.endOffset = endOffset;
this.subscriptionPath = subscriptionPath;
this.flowControlSettings = flowControlSettings;
this.subscriberFactory = subscriberFactory;
}

@Override
public InputPartitionReader<InternalRow> createPartitionReader() {
BlockingPullSubscriber subscriber;
try {
subscriber =
new BlockingPullSubscriberImpl(
subscriberFactory,
flowControlSettings,
SeekRequest.newBuilder()
.setCursor(Cursor.newBuilder().setOffset(endOffset.offset()).build())
.build());
} catch (CheckedApiException e) {
throw new IllegalStateException(
"Unable to create PSL subscriber for " + endOffset.partition(), e);
}
return new PslMicroBatchInputPartitionReader(subscriptionPath, endOffset, subscriber);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.flogger.GoogleLogger;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;

public class PslMicroBatchInputPartitionReader implements InputPartitionReader<InternalRow> {
private static final GoogleLogger log = GoogleLogger.forEnclosingClass();

private static final Duration SUBSCRIBER_PULL_TIMEOUT = Duration.ofSeconds(10);

private final SubscriptionPath subscriptionPath;
private final SparkPartitionOffset endOffset;
private final BlockingPullSubscriber subscriber;
@Nullable private SequencedMessage currentMsg = null;
private boolean batchFulfilled = false;

@VisibleForTesting
PslMicroBatchInputPartitionReader(
SubscriptionPath subscriptionPath,
SparkPartitionOffset endOffset,
BlockingPullSubscriber subscriber) {
this.subscriptionPath = subscriptionPath;
this.subscriber = subscriber;
this.endOffset = endOffset;
}

@Override
public boolean next() {
if (batchFulfilled) {
return false;
}
Optional<SequencedMessage> msg;
while (true) {
try {
subscriber.onData().get(SUBSCRIBER_PULL_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
msg = subscriber.messageIfAvailable();
break;
} catch (TimeoutException e) {
log.atWarning().log("Unable to get any messages in last " + SUBSCRIBER_PULL_TIMEOUT);
} catch (Throwable t) {
throw new IllegalStateException("Failed to retrieve messages.", t);
}
}
// since next() is only called on one thread at a time, we are sure that the message is
// available to this thread.
assert msg.isPresent();
currentMsg = msg.get();
if (currentMsg.offset().value() == endOffset.offset()) {
// this is the last msg for the batch.
batchFulfilled = true;
} else if (currentMsg.offset().value() > endOffset.offset()) {
batchFulfilled = true;
return false;
}
return true;
}

@Override
public InternalRow get() {
assert currentMsg != null;
return PslSparkUtils.toInternalRow(currentMsg, subscriptionPath, endOffset.partition());
}

@Override
public void close() {
try {
subscriber.close();
} catch (Exception e) {
log.atWarning().log("Subscriber failed to close.");
}
}
}
Loading

0 comments on commit 86aecc9

Please sign in to comment.