Skip to content

Commit faf1ece

Browse files
fix: Move Spark constants into its own class. (#127)
1 parent 92cfdfd commit faf1ece

9 files changed

Lines changed: 81 additions & 55 deletions

File tree

clirr-ignored-differences.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,9 @@
3636
<differenceType>8001</differenceType>
3737
<className>com/google/cloud/pubsublite/spark/PslDataSourceOptions*</className>
3838
</difference>
39-
39+
<difference>
40+
<differenceType>6001</differenceType>
41+
<className>com/google/cloud/pubsublite/spark/Constants</className>
42+
<field>DEFAULT_SCHEMA</field>
43+
</difference>
4044
</differences>

src/main/java/com/google/cloud/pubsublite/spark/Constants.java

Lines changed: 2 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,51 +17,14 @@
1717
package com.google.cloud.pubsublite.spark;
1818

1919
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
20-
import com.google.common.collect.ImmutableMap;
21-
import java.util.Map;
22-
import org.apache.spark.sql.types.ArrayType;
23-
import org.apache.spark.sql.types.DataType;
24-
import org.apache.spark.sql.types.DataTypes;
25-
import org.apache.spark.sql.types.MapType;
26-
import org.apache.spark.sql.types.Metadata;
27-
import org.apache.spark.sql.types.StructField;
28-
import org.apache.spark.sql.types.StructType;
2920

3021
public class Constants {
22+
public static final PubsubContext.Framework FRAMEWORK = PubsubContext.Framework.of("SPARK");
23+
3124
public static long DEFAULT_BYTES_OUTSTANDING = 50_000_000;
3225
public static long DEFAULT_MESSAGES_OUTSTANDING = Long.MAX_VALUE;
3326
public static long DEFAULT_MAX_MESSAGES_PER_BATCH = Long.MAX_VALUE;
3427

35-
public static ArrayType ATTRIBUTES_PER_KEY_DATATYPE =
36-
DataTypes.createArrayType(DataTypes.BinaryType);
37-
public static MapType ATTRIBUTES_DATATYPE =
38-
DataTypes.createMapType(DataTypes.StringType, ATTRIBUTES_PER_KEY_DATATYPE);
39-
public static Map<String, DataType> PUBLISH_FIELD_TYPES =
40-
ImmutableMap.of(
41-
"key", DataTypes.BinaryType,
42-
"data", DataTypes.BinaryType,
43-
"attributes", ATTRIBUTES_DATATYPE,
44-
"event_timestamp", DataTypes.TimestampType);
45-
public static StructType DEFAULT_SCHEMA =
46-
new StructType(
47-
new StructField[] {
48-
new StructField("subscription", DataTypes.StringType, false, Metadata.empty()),
49-
new StructField("partition", DataTypes.LongType, false, Metadata.empty()),
50-
new StructField("offset", DataTypes.LongType, false, Metadata.empty()),
51-
new StructField("key", PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()),
52-
new StructField("data", PUBLISH_FIELD_TYPES.get("data"), false, Metadata.empty()),
53-
new StructField("publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()),
54-
new StructField(
55-
"event_timestamp",
56-
PUBLISH_FIELD_TYPES.get("event_timestamp"),
57-
true,
58-
Metadata.empty()),
59-
new StructField(
60-
"attributes", PUBLISH_FIELD_TYPES.get("attributes"), true, Metadata.empty())
61-
});
62-
63-
public static final PubsubContext.Framework FRAMEWORK = PubsubContext.Framework.of("SPARK");
64-
6528
public static String MAX_MESSAGE_PER_BATCH_CONFIG_KEY =
6629
"pubsublite.flowcontrol.maxmessagesperbatch";
6730
public static String BYTES_OUTSTANDING_CONFIG_KEY =

src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public void stop() {
109109

110110
@Override
111111
public StructType readSchema() {
112-
return Constants.DEFAULT_SCHEMA;
112+
return SparkStructs.DEFAULT_SCHEMA;
113113
}
114114

115115
@Override

src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public void stop() {
127127

128128
@Override
129129
public StructType readSchema() {
130-
return Constants.DEFAULT_SCHEMA;
130+
return SparkStructs.DEFAULT_SCHEMA;
131131
}
132132

133133
@Override

src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,31 +123,31 @@ public static Message toPubSubMessage(StructType inputSchema, InternalRow row) {
123123
inputSchema,
124124
row,
125125
"key",
126-
Constants.PUBLISH_FIELD_TYPES.get("key"),
126+
SparkStructs.PUBLISH_FIELD_TYPES.get("key"),
127127
(byte[] o) -> builder.setKey(ByteString.copyFrom(o)));
128128
extractVal(
129129
inputSchema,
130130
row,
131131
"data",
132-
Constants.PUBLISH_FIELD_TYPES.get("data"),
132+
SparkStructs.PUBLISH_FIELD_TYPES.get("data"),
133133
(byte[] o) -> builder.setData(ByteString.copyFrom(o)));
134134
extractVal(
135135
inputSchema,
136136
row,
137137
"event_timestamp",
138-
Constants.PUBLISH_FIELD_TYPES.get("event_timestamp"),
138+
SparkStructs.PUBLISH_FIELD_TYPES.get("event_timestamp"),
139139
(Long o) -> builder.setEventTime(Timestamps.fromMicros(o)));
140140
extractVal(
141141
inputSchema,
142142
row,
143143
"attributes",
144-
Constants.PUBLISH_FIELD_TYPES.get("attributes"),
144+
SparkStructs.PUBLISH_FIELD_TYPES.get("attributes"),
145145
(MapData o) -> {
146146
ImmutableListMultimap.Builder<String, ByteString> attributeMapBuilder =
147147
ImmutableListMultimap.builder();
148148
o.foreach(
149149
DataTypes.StringType,
150-
Constants.ATTRIBUTES_PER_KEY_DATATYPE,
150+
SparkStructs.ATTRIBUTES_PER_KEY_DATATYPE,
151151
new FromJavaBiConsumer<>(
152152
(k, v) -> {
153153
String key = ((UTF8String) k).toString();
@@ -170,7 +170,7 @@ public static Message toPubSubMessage(StructType inputSchema, InternalRow row) {
170170
* @throws IllegalArgumentException if any DataType mismatch detected.
171171
*/
172172
public static void verifyWriteInputSchema(StructType inputSchema) {
173-
Constants.PUBLISH_FIELD_TYPES.forEach(
173+
SparkStructs.PUBLISH_FIELD_TYPES.forEach(
174174
(k, v) -> {
175175
Option<Object> idxOr = inputSchema.getFieldIndex(k);
176176
if (!idxOr.isEmpty()) {
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.spark;
18+
19+
import com.google.common.collect.ImmutableMap;
20+
import java.util.Map;
21+
import org.apache.spark.sql.types.ArrayType;
22+
import org.apache.spark.sql.types.DataType;
23+
import org.apache.spark.sql.types.DataTypes;
24+
import org.apache.spark.sql.types.MapType;
25+
import org.apache.spark.sql.types.Metadata;
26+
import org.apache.spark.sql.types.StructField;
27+
import org.apache.spark.sql.types.StructType;
28+
29+
public class SparkStructs {
30+
31+
public static ArrayType ATTRIBUTES_PER_KEY_DATATYPE =
32+
DataTypes.createArrayType(DataTypes.BinaryType);
33+
public static MapType ATTRIBUTES_DATATYPE =
34+
DataTypes.createMapType(DataTypes.StringType, ATTRIBUTES_PER_KEY_DATATYPE);
35+
public static Map<String, DataType> PUBLISH_FIELD_TYPES =
36+
ImmutableMap.of(
37+
"key", DataTypes.BinaryType,
38+
"data", DataTypes.BinaryType,
39+
"attributes", ATTRIBUTES_DATATYPE,
40+
"event_timestamp", DataTypes.TimestampType);
41+
public static StructType DEFAULT_SCHEMA =
42+
new StructType(
43+
new StructField[] {
44+
new StructField("subscription", DataTypes.StringType, false, Metadata.empty()),
45+
new StructField("partition", DataTypes.LongType, false, Metadata.empty()),
46+
new StructField("offset", DataTypes.LongType, false, Metadata.empty()),
47+
new StructField("key", PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()),
48+
new StructField("data", PUBLISH_FIELD_TYPES.get("data"), false, Metadata.empty()),
49+
new StructField("publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()),
50+
new StructField(
51+
"event_timestamp",
52+
PUBLISH_FIELD_TYPES.get("event_timestamp"),
53+
true,
54+
Metadata.empty()),
55+
new StructField(
56+
"attributes", PUBLISH_FIELD_TYPES.get("attributes"), true, Metadata.empty())
57+
});
58+
}

src/test/java/com/google/cloud/pubsublite/spark/PslDataWriterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class PslDataWriterTest {
5151
new StructType(
5252
new StructField[] {
5353
new StructField(
54-
"key", Constants.PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()),
54+
"key", SparkStructs.PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()),
5555
});
5656

5757
private final PslDataWriter writer = new PslDataWriter(1L, 2L, 3L, keyOnly, publisherFactory);

src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,8 @@ public void testToPubSubMessage() {
145145
new StructField[] {
146146
new StructField("key", DataTypes.BinaryType, false, Metadata.empty()),
147147
new StructField("data", DataTypes.BinaryType, false, Metadata.empty()),
148-
new StructField("attributes", Constants.ATTRIBUTES_DATATYPE, true, Metadata.empty()),
148+
new StructField(
149+
"attributes", SparkStructs.ATTRIBUTES_DATATYPE, true, Metadata.empty()),
149150
new StructField("event_timestamp", DataTypes.TimestampType, true, Metadata.empty()),
150151
new StructField("random_extra_field", DataTypes.BinaryType, false, Metadata.empty())
151152
});
@@ -171,19 +172,19 @@ public void testToPubSubMessageLongForEventTimestamp() {
171172

172173
@Test
173174
public void testVerifyWriteInputSchema() {
174-
PslSparkUtils.verifyWriteInputSchema(Constants.DEFAULT_SCHEMA);
175+
PslSparkUtils.verifyWriteInputSchema(SparkStructs.DEFAULT_SCHEMA);
175176

176177
StructType goodThoughMissing =
177178
new StructType(
178179
new StructField[] {
179180
new StructField("offset", DataTypes.LongType, false, Metadata.empty()),
180181
new StructField(
181-
"key", Constants.PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()),
182+
"key", SparkStructs.PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()),
182183
new StructField(
183184
"publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()),
184185
new StructField(
185186
"attributes",
186-
Constants.PUBLISH_FIELD_TYPES.get("attributes"),
187+
SparkStructs.PUBLISH_FIELD_TYPES.get("attributes"),
187188
true,
188189
Metadata.empty())
189190
});
@@ -199,7 +200,7 @@ public void testVerifyWriteInputSchema() {
199200
"publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()),
200201
new StructField(
201202
"attributes",
202-
Constants.PUBLISH_FIELD_TYPES.get("attributes"),
203+
SparkStructs.PUBLISH_FIELD_TYPES.get("attributes"),
203204
true,
204205
Metadata.empty())
205206
});

src/test/java/com/google/cloud/pubsublite/spark/PslStreamWriterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public class PslStreamWriterTest {
2424

2525
private final PslStreamWriter writer =
2626
new PslStreamWriter(
27-
Constants.DEFAULT_SCHEMA,
27+
SparkStructs.DEFAULT_SCHEMA,
2828
PslWriteDataSourceOptions.builder()
2929
.setTopicPath(UnitTestExamples.exampleTopicPath())
3030
.build());

0 commit comments

Comments
 (0)