Skip to content

Commit 92cfdfd

Browse files
feat: PSL Connector Writer support (#121)
1 parent 2188988 commit 92cfdfd

32 files changed

Lines changed: 986 additions & 59 deletions

clirr-ignored-differences.xml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,29 @@
1212
<method>*</method>
1313
<to>*</to>
1414
</difference>
15+
<difference>
16+
<differenceType>8001</differenceType>
17+
<className>com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader</className>
18+
</difference>
19+
<difference>
20+
<differenceType>8001</differenceType>
21+
<className>com/google/cloud/pubsublite/spark/MultiPartitionCommitter*</className>
22+
</difference>
23+
<difference>
24+
<differenceType>8001</differenceType>
25+
<className>com/google/cloud/pubsublite/spark/PartitionSubscriberFactory</className>
26+
</difference>
27+
<difference>
28+
<differenceType>8001</differenceType>
29+
<className>com/google/cloud/pubsublite/spark/PerTopicHeadOffsetReader</className>
30+
</difference>
31+
<difference>
32+
<differenceType>8001</differenceType>
33+
<className>com/google/cloud/pubsublite/spark/PslCredentialsProvider</className>
34+
</difference>
35+
<difference>
36+
<differenceType>8001</differenceType>
37+
<className>com/google/cloud/pubsublite/spark/PslDataSourceOptions*</className>
38+
</difference>
39+
1540
</differences>

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,11 @@
113113
<version>${scala.version}</version>
114114
<scope>provided</scope>
115115
</dependency>
116+
<dependency>
117+
<groupId>org.scala-lang.modules</groupId>
118+
<artifactId>scala-java8-compat_2.11</artifactId>
119+
<version>0.9.1</version>
120+
</dependency>
116121

117122
<!--test dependencies-->
118123
<dependency>

src/main/java/com/google/cloud/pubsublite/spark/Constants.java

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,12 @@
1717
package com.google.cloud.pubsublite.spark;
1818

1919
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
20+
import com.google.common.collect.ImmutableMap;
21+
import java.util.Map;
22+
import org.apache.spark.sql.types.ArrayType;
23+
import org.apache.spark.sql.types.DataType;
2024
import org.apache.spark.sql.types.DataTypes;
25+
import org.apache.spark.sql.types.MapType;
2126
import org.apache.spark.sql.types.Metadata;
2227
import org.apache.spark.sql.types.StructField;
2328
import org.apache.spark.sql.types.StructType;
@@ -26,22 +31,33 @@ public class Constants {
2631
public static long DEFAULT_BYTES_OUTSTANDING = 50_000_000;
2732
public static long DEFAULT_MESSAGES_OUTSTANDING = Long.MAX_VALUE;
2833
public static long DEFAULT_MAX_MESSAGES_PER_BATCH = Long.MAX_VALUE;
34+
35+
public static ArrayType ATTRIBUTES_PER_KEY_DATATYPE =
36+
DataTypes.createArrayType(DataTypes.BinaryType);
37+
public static MapType ATTRIBUTES_DATATYPE =
38+
DataTypes.createMapType(DataTypes.StringType, ATTRIBUTES_PER_KEY_DATATYPE);
39+
public static Map<String, DataType> PUBLISH_FIELD_TYPES =
40+
ImmutableMap.of(
41+
"key", DataTypes.BinaryType,
42+
"data", DataTypes.BinaryType,
43+
"attributes", ATTRIBUTES_DATATYPE,
44+
"event_timestamp", DataTypes.TimestampType);
2945
public static StructType DEFAULT_SCHEMA =
3046
new StructType(
3147
new StructField[] {
3248
new StructField("subscription", DataTypes.StringType, false, Metadata.empty()),
3349
new StructField("partition", DataTypes.LongType, false, Metadata.empty()),
3450
new StructField("offset", DataTypes.LongType, false, Metadata.empty()),
35-
new StructField("key", DataTypes.BinaryType, false, Metadata.empty()),
36-
new StructField("data", DataTypes.BinaryType, false, Metadata.empty()),
51+
new StructField("key", PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()),
52+
new StructField("data", PUBLISH_FIELD_TYPES.get("data"), false, Metadata.empty()),
3753
new StructField("publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()),
38-
new StructField("event_timestamp", DataTypes.TimestampType, true, Metadata.empty()),
3954
new StructField(
40-
"attributes",
41-
DataTypes.createMapType(
42-
DataTypes.StringType, DataTypes.createArrayType(DataTypes.BinaryType)),
55+
"event_timestamp",
56+
PUBLISH_FIELD_TYPES.get("event_timestamp"),
4357
true,
44-
Metadata.empty())
58+
Metadata.empty()),
59+
new StructField(
60+
"attributes", PUBLISH_FIELD_TYPES.get("attributes"), true, Metadata.empty())
4561
});
4662

4763
public static final PubsubContext.Framework FRAMEWORK = PubsubContext.Framework.of("SPARK");
@@ -52,6 +68,7 @@ public class Constants {
5268
"pubsublite.flowcontrol.byteoutstandingperpartition";
5369
public static String MESSAGES_OUTSTANDING_CONFIG_KEY =
5470
"pubsublite.flowcontrol.messageoutstandingperparition";
71+
public static String TOPIC_CONFIG_KEY = "pubsublite.topic";
5572
public static String SUBSCRIPTION_CONFIG_KEY = "pubsublite.subscription";
5673
public static String CREDENTIALS_KEY_CONFIG_KEY = "gcp.credentials.key";
5774
}

src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
2323
import com.google.cloud.pubsublite.internal.CursorClient;
2424
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
25+
import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter;
26+
import com.google.cloud.pubsublite.spark.internal.PartitionCountReader;
27+
import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory;
2528
import com.google.common.annotations.VisibleForTesting;
2629
import java.util.ArrayList;
2730
import java.util.Arrays;

src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,30 @@
2323
import com.google.cloud.pubsublite.AdminClient;
2424
import com.google.cloud.pubsublite.SubscriptionPath;
2525
import com.google.cloud.pubsublite.TopicPath;
26+
import com.google.cloud.pubsublite.spark.internal.CachedPartitionCountReader;
27+
import com.google.cloud.pubsublite.spark.internal.LimitingHeadOffsetReader;
28+
import com.google.cloud.pubsublite.spark.internal.PartitionCountReader;
2629
import java.util.Objects;
2730
import java.util.Optional;
2831
import org.apache.spark.sql.sources.DataSourceRegister;
2932
import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
3033
import org.apache.spark.sql.sources.v2.DataSourceOptions;
3134
import org.apache.spark.sql.sources.v2.DataSourceV2;
3235
import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
36+
import org.apache.spark.sql.sources.v2.StreamWriteSupport;
3337
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
3438
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
39+
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
40+
import org.apache.spark.sql.streaming.OutputMode;
3541
import org.apache.spark.sql.types.StructType;
3642

3743
@AutoService(DataSourceRegister.class)
3844
public final class PslDataSource
39-
implements DataSourceV2, ContinuousReadSupport, MicroBatchReadSupport, DataSourceRegister {
45+
implements DataSourceV2,
46+
ContinuousReadSupport,
47+
MicroBatchReadSupport,
48+
StreamWriteSupport,
49+
DataSourceRegister {
4050

4151
@Override
4252
public String shortName() {
@@ -51,23 +61,24 @@ public ContinuousReader createContinuousReader(
5161
"PubSub Lite uses fixed schema and custom schema is not allowed");
5262
}
5363

54-
PslDataSourceOptions pslDataSourceOptions =
55-
PslDataSourceOptions.fromSparkDataSourceOptions(options);
56-
SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
64+
PslReadDataSourceOptions pslReadDataSourceOptions =
65+
PslReadDataSourceOptions.fromSparkDataSourceOptions(options);
66+
SubscriptionPath subscriptionPath = pslReadDataSourceOptions.subscriptionPath();
5767
TopicPath topicPath;
58-
try (AdminClient adminClient = pslDataSourceOptions.newAdminClient()) {
68+
try (AdminClient adminClient = pslReadDataSourceOptions.newAdminClient()) {
5969
topicPath = TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic());
6070
} catch (Throwable t) {
6171
throw toCanonical(t).underlying;
6272
}
6373
PartitionCountReader partitionCountReader =
64-
new CachedPartitionCountReader(pslDataSourceOptions.newAdminClient(), topicPath);
74+
new CachedPartitionCountReader(pslReadDataSourceOptions.newAdminClient(), topicPath);
6575
return new PslContinuousReader(
66-
pslDataSourceOptions.newCursorClient(),
67-
pslDataSourceOptions.newMultiPartitionCommitter(partitionCountReader.getPartitionCount()),
68-
pslDataSourceOptions.getSubscriberFactory(),
76+
pslReadDataSourceOptions.newCursorClient(),
77+
pslReadDataSourceOptions.newMultiPartitionCommitter(
78+
partitionCountReader.getPartitionCount()),
79+
pslReadDataSourceOptions.getSubscriberFactory(),
6980
subscriptionPath,
70-
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
81+
Objects.requireNonNull(pslReadDataSourceOptions.flowControlSettings()),
7182
partitionCountReader);
7283
}
7384

@@ -79,28 +90,38 @@ public MicroBatchReader createMicroBatchReader(
7990
"PubSub Lite uses fixed schema and custom schema is not allowed");
8091
}
8192

82-
PslDataSourceOptions pslDataSourceOptions =
83-
PslDataSourceOptions.fromSparkDataSourceOptions(options);
84-
SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
93+
PslReadDataSourceOptions pslReadDataSourceOptions =
94+
PslReadDataSourceOptions.fromSparkDataSourceOptions(options);
95+
SubscriptionPath subscriptionPath = pslReadDataSourceOptions.subscriptionPath();
8596
TopicPath topicPath;
86-
try (AdminClient adminClient = pslDataSourceOptions.newAdminClient()) {
97+
try (AdminClient adminClient = pslReadDataSourceOptions.newAdminClient()) {
8798
topicPath = TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic());
8899
} catch (Throwable t) {
89100
throw toCanonical(t).underlying;
90101
}
91102
PartitionCountReader partitionCountReader =
92-
new CachedPartitionCountReader(pslDataSourceOptions.newAdminClient(), topicPath);
103+
new CachedPartitionCountReader(pslReadDataSourceOptions.newAdminClient(), topicPath);
93104
return new PslMicroBatchReader(
94-
pslDataSourceOptions.newCursorClient(),
95-
pslDataSourceOptions.newMultiPartitionCommitter(partitionCountReader.getPartitionCount()),
96-
pslDataSourceOptions.getSubscriberFactory(),
105+
pslReadDataSourceOptions.newCursorClient(),
106+
pslReadDataSourceOptions.newMultiPartitionCommitter(
107+
partitionCountReader.getPartitionCount()),
108+
pslReadDataSourceOptions.getSubscriberFactory(),
97109
new LimitingHeadOffsetReader(
98-
pslDataSourceOptions.newTopicStatsClient(),
110+
pslReadDataSourceOptions.newTopicStatsClient(),
99111
topicPath,
100112
partitionCountReader,
101113
Ticker.systemTicker()),
102114
subscriptionPath,
103-
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
104-
pslDataSourceOptions.maxMessagesPerBatch());
115+
Objects.requireNonNull(pslReadDataSourceOptions.flowControlSettings()),
116+
pslReadDataSourceOptions.maxMessagesPerBatch());
117+
}
118+
119+
@Override
120+
public StreamWriter createStreamWriter(
121+
String queryId, StructType schema, OutputMode mode, DataSourceOptions options) {
122+
PslSparkUtils.verifyWriteInputSchema(schema);
123+
PslWriteDataSourceOptions pslWriteDataSourceOptions =
124+
PslWriteDataSourceOptions.fromSparkDataSourceOptions(options);
125+
return new PslStreamWriter(schema, pslWriteDataSourceOptions);
105126
}
106127
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.spark;
18+
19+
import com.google.api.core.ApiFuture;
20+
import com.google.api.core.ApiService;
21+
import com.google.cloud.pubsublite.MessageMetadata;
22+
import com.google.cloud.pubsublite.internal.Publisher;
23+
import com.google.cloud.pubsublite.spark.internal.PublisherFactory;
24+
import com.google.common.flogger.GoogleLogger;
25+
import java.io.IOException;
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
import java.util.Objects;
29+
import java.util.Optional;
30+
import java.util.concurrent.ExecutionException;
31+
import javax.annotation.concurrent.GuardedBy;
32+
import org.apache.spark.sql.catalyst.InternalRow;
33+
import org.apache.spark.sql.sources.v2.writer.DataWriter;
34+
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
35+
import org.apache.spark.sql.types.StructType;
36+
37+
public class PslDataWriter implements DataWriter<InternalRow> {
38+
39+
private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
40+
41+
private final long partitionId, taskId, epochId;
42+
private final StructType inputSchema;
43+
private final PublisherFactory publisherFactory;
44+
45+
@GuardedBy("this")
46+
private Optional<Publisher<MessageMetadata>> publisher = Optional.empty();
47+
48+
@GuardedBy("this")
49+
private final List<ApiFuture<MessageMetadata>> futures = new ArrayList<>();
50+
51+
public PslDataWriter(
52+
long partitionId,
53+
long taskId,
54+
long epochId,
55+
StructType schema,
56+
PublisherFactory publisherFactory) {
57+
this.partitionId = partitionId;
58+
this.taskId = taskId;
59+
this.epochId = epochId;
60+
this.inputSchema = schema;
61+
this.publisherFactory = publisherFactory;
62+
}
63+
64+
@Override
65+
public synchronized void write(InternalRow record) {
66+
if (!publisher.isPresent() || publisher.get().state() != ApiService.State.RUNNING) {
67+
publisher = Optional.of(publisherFactory.newPublisher());
68+
}
69+
futures.add(
70+
publisher
71+
.get()
72+
.publish(Objects.requireNonNull(PslSparkUtils.toPubSubMessage(inputSchema, record))));
73+
}
74+
75+
@Override
76+
public synchronized WriterCommitMessage commit() throws IOException {
77+
for (ApiFuture<MessageMetadata> f : futures) {
78+
try {
79+
f.get();
80+
} catch (InterruptedException | ExecutionException e) {
81+
publisher = Optional.empty();
82+
throw new IOException(e);
83+
}
84+
}
85+
log.atInfo().log(
86+
"All writes for partitionId:%d, taskId:%d, epochId:%d succeeded, committing...",
87+
partitionId, taskId, epochId);
88+
return PslWriterCommitMessage.create(futures.size());
89+
}
90+
91+
@Override
92+
public synchronized void abort() {
93+
log.atWarning().log(
94+
"One or more writes for partitionId:%d, taskId:%d, epochId:%d failed, aborted.",
95+
partitionId, taskId, epochId);
96+
}
97+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.spark;
18+
19+
import com.google.cloud.pubsublite.spark.internal.CachedPublishers;
20+
import com.google.cloud.pubsublite.spark.internal.PublisherFactory;
21+
import java.io.Serializable;
22+
import org.apache.spark.sql.catalyst.InternalRow;
23+
import org.apache.spark.sql.sources.v2.writer.DataWriter;
24+
import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
25+
import org.apache.spark.sql.types.StructType;
26+
27+
public class PslDataWriterFactory implements Serializable, DataWriterFactory<InternalRow> {
28+
private static final long serialVersionUID = -6904546364310978844L;
29+
30+
private static final CachedPublishers CACHED_PUBLISHERS = new CachedPublishers();
31+
32+
private final StructType inputSchema;
33+
private final PslWriteDataSourceOptions writeOptions;
34+
35+
public PslDataWriterFactory(StructType inputSchema, PslWriteDataSourceOptions writeOptions) {
36+
this.inputSchema = inputSchema;
37+
this.writeOptions = writeOptions;
38+
}
39+
40+
@Override
41+
public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
42+
PublisherFactory pf = () -> CACHED_PUBLISHERS.getOrCreate(writeOptions);
43+
return new PslDataWriter(partitionId, taskId, epochId, inputSchema, pf);
44+
}
45+
}

src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
2525
import com.google.cloud.pubsublite.internal.CursorClient;
2626
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
27+
import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter;
28+
import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory;
29+
import com.google.cloud.pubsublite.spark.internal.PerTopicHeadOffsetReader;
2730
import java.util.ArrayList;
2831
import java.util.List;
2932
import java.util.Optional;

0 commit comments

Comments
 (0)