Skip to content

Commit

Permalink
feat: Compute head offset for Spark connector micro batch mode. (#439)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangmichaellll committed Jan 12, 2021
1 parent 624b123 commit f484754
Show file tree
Hide file tree
Showing 10 changed files with 301 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.Cursor;

public interface TopicStatsClient extends ApiBackgroundResource {
static TopicStatsClient create(TopicStatsClientSettings settings) throws ApiException {
Expand All @@ -43,4 +44,17 @@ static TopicStatsClient create(TopicStatsClientSettings settings) throws ApiExce
*/
ApiFuture<ComputeMessageStatsResponse> computeMessageStats(
TopicPath path, Partition partition, Offset start, Offset end);

/**
* Compute the head cursor for the partition. The head cursor's offset is guaranteed to be before
* or equal to all messages which have not yet been acknowledged to be published, and greater than
* the offset of any message whose publish has already been acknowledged. It is 0 if there have
* never been messages on the partition.
*
* @param path The topic to compute head cursor on
* @param partition The partition to compute head cursor for
* @return A future that will have either an error {@link ApiException} or the head cursor on
* success.
*/
ApiFuture<Cursor> computeHeadCursor(TopicPath path, Partition partition);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@
package com.google.cloud.pubsublite.internal;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.v1.TopicStatsServiceClient;
import com.google.common.util.concurrent.MoreExecutors;

public class TopicStatsClientImpl extends ApiResourceAggregation implements TopicStatsClient {
private final CloudRegion region;
Expand Down Expand Up @@ -53,4 +57,18 @@ public ApiFuture<ComputeMessageStatsResponse> computeMessageStats(
.setEndCursor(Cursor.newBuilder().setOffset(end.value()).build())
.build());
}

@Override
public ApiFuture<Cursor> computeHeadCursor(TopicPath path, Partition partition) {
return ApiFutures.transform(
serviceClient
.computeHeadCursorCallable()
.futureCall(
ComputeHeadCursorRequest.newBuilder()
.setTopic(path.toString())
.setPartition(partition.value())
.build()),
ComputeHeadCursorResponse::getHeadCursor,
MoreExecutors.directExecutor());
}
}
10 changes: 10 additions & 0 deletions pubsublite-spark-sql-streaming/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@
<artifactId>jackson-core</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.8.8</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down Expand Up @@ -135,6 +140,11 @@
<scope>test</scope>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava-testlib</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Ticker;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.TopicStatsClient;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

/**
* Rate limited HeadOffsetReader, utilizing a LoadingCache that refreshes all partitions head
* offsets for the topic at most once per minute.
*/
public class LimitingHeadOffsetReader implements PerTopicHeadOffsetReader {

private final TopicStatsClient topicStatsClient;
private final TopicPath topic;
private final long topicPartitionCount;
private final AsyncLoadingCache<Partition, Offset> cachedHeadOffsets;

@VisibleForTesting
public LimitingHeadOffsetReader(
TopicStatsClient topicStatsClient, TopicPath topic, long topicPartitionCount, Ticker ticker) {
this.topicStatsClient = topicStatsClient;
this.topic = topic;
this.topicPartitionCount = topicPartitionCount;
this.cachedHeadOffsets =
Caffeine.newBuilder()
.ticker(ticker)
.expireAfterWrite(1, TimeUnit.MINUTES)
.buildAsync(this::loadHeadOffset);
}

private CompletableFuture<Offset> loadHeadOffset(Partition partition, Executor executor) {

CompletableFuture<Offset> result = new CompletableFuture<>();
ApiFutures.addCallback(
topicStatsClient.computeHeadCursor(topic, partition),
new ApiFutureCallback<Cursor>() {
@Override
public void onFailure(Throwable t) {
result.completeExceptionally(t);
}

@Override
public void onSuccess(Cursor c) {
result.complete(Offset.of(c.getOffset()));
}
},
MoreExecutors.directExecutor());
return result;
}

@Override
public PslSourceOffset getHeadOffset() {
Set<Partition> keySet = new HashSet<>();
for (int i = 0; i < topicPartitionCount; i++) {
keySet.add(Partition.of(i));
}
CompletableFuture<Map<Partition, Offset>> future = cachedHeadOffsets.getAll(keySet);
try {
return PslSourceOffset.builder().partitionOffsetMap(future.get()).build();
} catch (Throwable t) {
throw new IllegalStateException("Unable to compute head offset for topic: " + topic, t);
}
}

@Override
public void close() {
topicStatsClient.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@

package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import java.io.Closeable;

public interface HeadOffsetReader extends Closeable {
public interface PerTopicHeadOffsetReader extends Closeable {

// Gets the head offsets for all partitions in a topic. Blocks.
PslSourceOffset getHeadOffset(TopicPath topic) throws CheckedApiException;
// Gets the head offsets for all partitions in the topic. Blocks.
PslSourceOffset getHeadOffset();

@Override
void close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,13 @@

package com.google.cloud.pubsublite.spark;

import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.github.benmanes.caffeine.cache.Ticker;
import com.google.auto.service.AutoService;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.PartitionLookupUtils;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.wire.CommitterBuilder;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.ServiceClients;
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
import com.google.cloud.pubsublite.v1.SubscriberServiceSettings;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import org.apache.spark.sql.sources.DataSourceRegister;
Expand Down Expand Up @@ -69,20 +57,10 @@ public ContinuousReader createContinuousReader(
AdminClient adminClient = pslDataSourceOptions.newAdminClient();
SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
long topicPartitionCount = PartitionLookupUtils.numPartitions(subscriptionPath, adminClient);
MultiPartitionCommitter committer =
new MultiPartitionCommitterImpl(
topicPartitionCount,
(partition) ->
CommitterBuilder.newBuilder()
.setSubscriptionPath(subscriptionPath)
.setPartition(partition)
.setServiceClient(pslDataSourceOptions.newCursorServiceClient())
.build());

return new PslContinuousReader(
cursorClient,
committer,
getSubscriberFactory(new PslCredentialsProvider(pslDataSourceOptions), subscriptionPath),
pslDataSourceOptions.newMultiPartitionCommitter(topicPartitionCount),
pslDataSourceOptions.getSubscriberFactory(),
subscriptionPath,
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
topicPartitionCount);
Expand All @@ -98,7 +76,6 @@ public MicroBatchReader createMicroBatchReader(

PslDataSourceOptions pslDataSourceOptions =
PslDataSourceOptions.fromSparkDataSourceOptions(options);
PslCredentialsProvider credentialsProvider = new PslCredentialsProvider(pslDataSourceOptions);
CursorClient cursorClient = pslDataSourceOptions.newCursorClient();
AdminClient adminClient = pslDataSourceOptions.newAdminClient();
SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
Expand All @@ -110,72 +87,17 @@ public MicroBatchReader createMicroBatchReader(
"Unable to get topic for subscription " + subscriptionPath, t);
}
long topicPartitionCount = PartitionLookupUtils.numPartitions(topicPath, adminClient);
MultiPartitionCommitter committer =
new MultiPartitionCommitterImpl(
topicPartitionCount,
(partition) ->
CommitterBuilder.newBuilder()
.setSubscriptionPath(subscriptionPath)
.setPartition(partition)
.setServiceClient(pslDataSourceOptions.newCursorServiceClient())
.build());

return new PslMicroBatchReader(
cursorClient,
committer,
getSubscriberFactory(new PslCredentialsProvider(pslDataSourceOptions), subscriptionPath),
pslDataSourceOptions.newMultiPartitionCommitter(topicPartitionCount),
pslDataSourceOptions.getSubscriberFactory(),
new LimitingHeadOffsetReader(
pslDataSourceOptions.newTopicStatsClient(),
topicPath,
topicPartitionCount,
Ticker.systemTicker()),
subscriptionPath,
PslSparkUtils.toSparkSourceOffset(getHeadOffset(topicPath)),
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
topicPartitionCount);
}

private static PslSourceOffset getHeadOffset(TopicPath topicPath) {
// TODO(jiangmichael): Replace it with real implementation.
HeadOffsetReader headOffsetReader =
new HeadOffsetReader() {
@Override
public PslSourceOffset getHeadOffset(TopicPath topic) {
return PslSourceOffset.builder()
.partitionOffsetMap(
ImmutableMap.of(
Partition.of(0), com.google.cloud.pubsublite.Offset.of(50),
Partition.of(1), com.google.cloud.pubsublite.Offset.of(50)))
.build();
}

@Override
public void close() {}
};
try {
return headOffsetReader.getHeadOffset(topicPath);
} catch (CheckedApiException e) {
throw new IllegalStateException("Unable to get head offset for topic " + topicPath, e);
}
}

private static PartitionSubscriberFactory getSubscriberFactory(
PslCredentialsProvider credentialsProvider, SubscriptionPath subscriptionPath) {
return (partition, consumer) -> {
PubsubContext context = PubsubContext.of(Constants.FRAMEWORK);
SubscriberServiceSettings.Builder settingsBuilder =
SubscriberServiceSettings.newBuilder().setCredentialsProvider(credentialsProvider);
ServiceClients.addDefaultMetadata(
context, RoutingMetadata.of(subscriptionPath, partition), settingsBuilder);
try {
SubscriberServiceClient serviceClient =
SubscriberServiceClient.create(
addDefaultSettings(subscriptionPath.location().region(), settingsBuilder));
return SubscriberBuilder.newBuilder()
.setSubscriptionPath(subscriptionPath)
.setPartition(partition)
.setContext(context)
.setServiceClient(serviceClient)
.setMessageConsumer(consumer)
.build();
} catch (IOException e) {
throw new IllegalStateException("Failed to create subscriber service.", e);
}
};
}
}
Loading

0 comments on commit f484754

Please sign in to comment.