Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PSL Connector Writer support #121

Merged
merged 16 commits into from
Apr 2, 2021
23 changes: 17 additions & 6 deletions src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.flogger.GoogleLogger;
import com.google.common.math.LongMath;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
Expand All @@ -38,6 +38,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.spark.sql.catalyst.InternalRow;
Expand All @@ -50,9 +51,13 @@
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.ByteArray;
import org.apache.spark.unsafe.types.UTF8String;
import scala.Option;
import scala.compat.java8.functionConverterImpls.FromJavaBiConsumer;

public class PslSparkUtils {

private static final GoogleLogger log = GoogleLogger.forEnclosingClass();

@VisibleForTesting
public static ArrayBasedMapData convertAttributesToSparkMap(
ListMultimap<String, ByteString> attributeMap) {
Expand Down Expand Up @@ -102,12 +107,16 @@ private static <T> void extractVal(
String fieldName,
DataType expectedDataType,
Consumer<T> consumer) {
if (!inputSchema.getFieldIndex(fieldName).isEmpty()) {
Integer idx = (Integer) inputSchema.getFieldIndex(fieldName).get();
Option<Object> idxOr;
dpcollins-google marked this conversation as resolved.
Show resolved Hide resolved
if (!(idxOr = inputSchema.getFieldIndex(fieldName)).isEmpty()) {
Integer idx = (Integer) idxOr.get();
try {
consumer.accept((T) row.get(idx, expectedDataType));
} catch (ClassCastException e) {
// This means the field has a wrong class type.
jiangmichaellll marked this conversation as resolved.
Show resolved Hide resolved
log.atInfo().atMostEvery(5, TimeUnit.MINUTES).log(
"Col %s was dropped since the type doesn't match. Actual type: %s, expected type: %s.",
fieldName, inputSchema.apply(idx).dataType(), expectedDataType);
}
}
}
Expand Down Expand Up @@ -138,7 +147,8 @@ public static Message toPubSubMessage(StructType inputSchema, InternalRow row) {
"attributes",
Constants.ATTRIBUTES_DATATYPE,
(MapData o) -> {
ListMultimap<String, ByteString> attributeMap = ArrayListMultimap.create();
ImmutableListMultimap.Builder<String, ByteString> attributeMapBuilder =
ImmutableListMultimap.builder();
o.foreach(
DataTypes.StringType,
Constants.ATTRIBUTES_PER_KEY_DATATYPE,
Expand All @@ -149,9 +159,10 @@ public static Message toPubSubMessage(StructType inputSchema, InternalRow row) {
values.foreach(
DataTypes.BinaryType,
new FromJavaBiConsumer<>(
(idx, a) -> attributeMap.put(key, ByteString.copyFrom((byte[]) a))));
(idx, a) ->
attributeMapBuilder.put(key, ByteString.copyFrom((byte[]) a))));
}));
builder.setAttributes(ImmutableListMultimap.copyOf(attributeMap));
builder.setAttributes(attributeMapBuilder.build());
});
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,51 +83,48 @@ public PslCredentialsProvider getCredentialProvider() {
return new PslCredentialsProvider(credentialsKey());
}

public static Publisher<MessageMetadata> createNewPublisher(
PslWriteDataSourceOptions writeOptions) {
public Publisher<MessageMetadata> createNewPublisher() {
return PartitionCountWatchingPublisherSettings.newBuilder()
.setTopic(writeOptions.topicPath())
.setTopic(topicPath())
.setPublisherFactory(
partition ->
SinglePartitionPublisherBuilder.newBuilder()
.setTopic(writeOptions.topicPath())
.setTopic(topicPath())
.setPartition(partition)
.setServiceClient(newServiceClient(writeOptions, partition))
.setServiceClient(newServiceClient(partition))
.build())
.setAdminClient(getAdminClient(writeOptions))
.setAdminClient(getAdminClient())
.build()
.instantiate();
}

private static PublisherServiceClient newServiceClient(
PslWriteDataSourceOptions writeOptions, Partition partition) throws ApiException {
private PublisherServiceClient newServiceClient(Partition partition) throws ApiException {
PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder();
settingsBuilder = settingsBuilder.setCredentialsProvider(writeOptions.getCredentialProvider());
settingsBuilder = settingsBuilder.setCredentialsProvider(getCredentialProvider());
settingsBuilder =
addDefaultMetadata(
PubsubContext.of(Constants.FRAMEWORK),
RoutingMetadata.of(writeOptions.topicPath(), partition),
RoutingMetadata.of(topicPath(), partition),
settingsBuilder);
try {
return PublisherServiceClient.create(
addDefaultSettings(writeOptions.topicPath().location().region(), settingsBuilder));
addDefaultSettings(topicPath().location().region(), settingsBuilder));
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
}

private static AdminClient getAdminClient(PslWriteDataSourceOptions writeOptions)
throws ApiException {
private AdminClient getAdminClient() throws ApiException {
try {
return AdminClient.create(
AdminClientSettings.newBuilder()
.setServiceClient(
AdminServiceClient.create(
addDefaultSettings(
writeOptions.topicPath().location().region(),
topicPath().location().region(),
AdminServiceSettings.newBuilder()
.setCredentialsProvider(writeOptions.getCredentialProvider()))))
.setRegion(writeOptions.topicPath().location().region())
.setCredentialsProvider(getCredentialProvider()))))
.setRegion(topicPath().location().region())
.build());
} catch (Throwable t) {
throw toCanonical(t).underlying;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
/** Cached {@link Publisher}s to reuse publisher of same settings in the same task. */
public class CachedPublishers {

// TODO(b/182322450): Use com.google.cloud.pubsublite.internal.wire.SystemExecutors
// TODO(jiangmichaellll): Use com.google.cloud.pubsublite.internal.wire.SystemExecutors
dpcollins-google marked this conversation as resolved.
Show resolved Hide resolved
// once new PSL client library is released.
private final Executor listenerExecutor = Executors.newSingleThreadExecutor();

Expand All @@ -44,7 +44,7 @@ public synchronized Publisher<MessageMetadata> getOrCreate(
return publisher;
}

publisher = PslWriteDataSourceOptions.createNewPublisher(writeOptions);
publisher = writeOptions.createNewPublisher();
publishers.put(writeOptions, publisher);
publisher.addListener(
new ApiService.Listener() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.internal.testing.UnitTestExamples;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.junit.Test;

public class PslStreamWriterTest {

private final PslStreamWriter writer =
new PslStreamWriter(
Constants.DEFAULT_SCHEMA,
PslWriteDataSourceOptions.builder()
.setTopicPath(UnitTestExamples.exampleTopicPath())
.build());
private final PslWriterCommitMessage message1 = PslWriterCommitMessage.create(10);
private final PslWriterCommitMessage message2 = PslWriterCommitMessage.create(5);

private static class AbortCommitMessage implements WriterCommitMessage {}

@Test
public void testCommit() {
writer.commit(100, new WriterCommitMessage[] {message1, message2});
}

@Test
public void testAbort() {
writer.abort(100, new WriterCommitMessage[] {message1, message2, new AbortCommitMessage()});
}

@Test
public void testCreateFactory() {
writer.createWriterFactory();
}
}