Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -59,6 +61,7 @@
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.testing.TestPipeline;
Expand Down Expand Up @@ -176,6 +179,131 @@ public void testSQLSelectsPayloadContent() throws Exception {
resultSignal.waitForSuccess(Duration.standardMinutes(5));
}

@Test
public void testSQLSelectsArrayAttributes() throws Exception {
String createTableString =
String.format(
"CREATE EXTERNAL TABLE message (\n"
+ "event_timestamp TIMESTAMP, \n"
+ "attributes ARRAY<ROW<key VARCHAR, `value` VARCHAR>>, \n"
+ "payload ROW< \n"
+ " id INTEGER, \n"
+ " name VARCHAR \n"
+ " > \n"
+ ") \n"
+ "TYPE '%s' \n"
+ "LOCATION '%s' \n"
+ "TBLPROPERTIES '{ "
+ "%s"
+ "\"timestampAttributeKey\" : \"ts\" }'",
tableProvider.getTableType(), eventsTopic.topicPath(), payloadFormatParam());

String queryString =
"SELECT message.payload.id, attributes[1].key AS a1, attributes[2].key AS a2 FROM message";

// Prepare messages to send later
List<PubsubMessage> messages =
ImmutableList.of(
objectsProvider.messageIdName(ts(1), 3, "foo"),
objectsProvider.messageIdName(ts(2), 5, "bar"),
objectsProvider.messageIdName(ts(3), 7, "baz"));

// Initialize SQL environment and create the pubsub table
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
sqlEnv.executeDdl(createTableString);

// Apply the PTransform to query the pubsub topic
PCollection<Row> queryOutput = query(sqlEnv, pipeline, queryString);

// Observe the query results and send success signal after seeing the expected messages
queryOutput.apply(
"waitForSuccess",
resultSignal.signalSuccessWhen(
SchemaCoder.of(PAYLOAD_SCHEMA),
observedRows -> {
Map<Integer, String> entries = new HashMap<>();
for (Row row : observedRows) {
if ("ts".equals(row.getString("a1"))) {
entries.put(row.getInt32("id"), row.getString("a2"));
} else {
entries.put(row.getInt32("id"), row.getString("a1"));
}
}

return entries.equals(ImmutableMap.of(3, "foo", 5, "bar", 7, "baz"));
}));

// Start the pipeline
pipeline.run();

// Block until a subscription for this topic exists
eventsTopic.assertSubscriptionEventuallyCreated(
pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));

// Start publishing the messages when main pipeline is started and signaling topic is ready
eventsTopic.publish(messages);

// Poll the signaling topic for success message
resultSignal.waitForSuccess(Duration.standardMinutes(1));
}

@Test
public void testSQLWithBytePayload() throws Exception {
String createTableString =
String.format(
"CREATE EXTERNAL TABLE message (\n"
+ "event_timestamp TIMESTAMP, \n"
+ "attributes MAP<VARCHAR, VARCHAR>, \n"
+ "payload VARBINARY \n"
+ ") \n"
+ "TYPE '%s' \n"
+ "LOCATION '%s' \n"
+ "TBLPROPERTIES '{ "
+ "\"timestampAttributeKey\" : \"ts\" }'",
tableProvider.getTableType(), eventsTopic.topicPath());

String queryString = "SELECT message.payload AS some_bytes FROM message";

// Prepare messages to send later
List<PubsubMessage> messages =
ImmutableList.of(
objectsProvider.messageIdName(ts(1), 3, "foo"),
objectsProvider.messageIdName(ts(2), 5, "bar"),
objectsProvider.messageIdName(ts(3), 7, "baz"));

// Initialize SQL environment and create the pubsub table
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
sqlEnv.executeDdl(createTableString);

// Apply the PTransform to query the pubsub topic
PCollection<Row> queryOutput = query(sqlEnv, pipeline, queryString);

// Observe the query results and send success signal after seeing the expected messages
Schema justBytesSchema =
Schema.builder().addField("some_bytes", FieldType.BYTES.withNullable(true)).build();
Row expectedRow0 = row(justBytesSchema, (Object) messages.get(0).getPayload());
Row expectedRow1 = row(justBytesSchema, (Object) messages.get(1).getPayload());
Row expectedRow2 = row(justBytesSchema, (Object) messages.get(2).getPayload());
Set<Row> expected = ImmutableSet.of(expectedRow0, expectedRow1, expectedRow2);
queryOutput.apply(
"waitForSuccess",
resultSignal.signalSuccessWhen(
SchemaCoder.of(justBytesSchema), observedRows -> observedRows.equals(expected)));

// Start the pipeline
pipeline.run();

// Block until a subscription for this topic exists
eventsTopic.assertSubscriptionEventuallyCreated(
pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));

// Start publishing the messages when main pipeline is started and signaling topic is ready
eventsTopic.publish(messages);

// Poll the signaling topic for success message
resultSignal.waitForSuccess(Duration.standardMinutes(5));
}

@Test
@SuppressWarnings("unchecked")
public void testUsesDlq() throws Exception {
Expand Down Expand Up @@ -210,8 +338,8 @@ public void testUsesDlq() throws Exception {
objectsProvider.messageIdName(ts(1), 3, "foo"),
objectsProvider.messageIdName(ts(2), 5, "bar"),
objectsProvider.messageIdName(ts(3), 7, "baz"),
messagePayload(ts(4), "{ - }"), // invalid message, will go to DLQ
messagePayload(ts(5), "{ + }")); // invalid message, will go to DLQ
messagePayload(ts(4), "{ - }", ImmutableMap.of()), // invalid message, will go to DLQ
messagePayload(ts(5), "{ + }", ImmutableMap.of())); // invalid message, will go to DLQ

// Initialize SQL environment and create the pubsub table
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
Expand Down Expand Up @@ -606,8 +734,14 @@ private Row row(Schema schema, Object... values) {
return Row.withSchema(schema).addValues(values).build();
}

private static PubsubMessage message(Instant timestamp, byte[] payload) {
return new PubsubMessage(payload, ImmutableMap.of("ts", String.valueOf(timestamp.getMillis())));
private static PubsubMessage message(
Instant timestamp, byte[] payload, Map<String, String> attributes) {
return new PubsubMessage(
payload,
ImmutableMap.<String, String>builder()
.putAll(attributes)
.put("ts", String.valueOf(timestamp.getMillis()))
.build());
}

private Matcher<PubsubMessage> matcherTsNameHeightKnowsJS(
Expand All @@ -627,8 +761,9 @@ private Instant ts(long millis) {
return Instant.ofEpochMilli(millis);
}

private PubsubMessage messagePayload(Instant timestamp, String payload) {
return message(timestamp, payload.getBytes(StandardCharsets.US_ASCII));
private PubsubMessage messagePayload(
Instant timestamp, String payload, Map<String, String> attributes) {
return message(timestamp, payload.getBytes(StandardCharsets.US_ASCII), attributes);
}

private abstract static class PubsubObjectProvider implements Serializable {
Expand Down Expand Up @@ -657,7 +792,7 @@ protected String getPayloadFormat() {
@Override
protected PubsubMessage messageIdName(Instant timestamp, int id, String name) {
String jsonString = "{ \"id\" : " + id + ", \"name\" : \"" + name + "\" }";
return message(timestamp, jsonString);
return message(timestamp, jsonString, ImmutableMap.of(name, Integer.toString(id)));
}

@Override
Expand All @@ -681,8 +816,9 @@ protected Matcher<PubsubMessage> matcherNameHeight(String name, int height) thro
return hasProperty("payload", toJsonByteLike(jsonString));
}

private PubsubMessage message(Instant timestamp, String jsonPayload) {
return PubsubTableProviderIT.message(timestamp, jsonPayload.getBytes(UTF_8));
private PubsubMessage message(
Instant timestamp, String jsonPayload, Map<String, String> attributes) {
return PubsubTableProviderIT.message(timestamp, jsonPayload.getBytes(UTF_8), attributes);
}

private Matcher<byte[]> toJsonByteLike(String jsonString) throws IOException {
Expand Down Expand Up @@ -717,7 +853,7 @@ protected PubsubMessage messageIdName(Instant timestamp, int id, String name)
PAYLOAD_SCHEMA,
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.of(
id, name));
return message(timestamp, encodedRecord);
return message(timestamp, encodedRecord, ImmutableMap.of(name, Integer.toString(id)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,31 @@

import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.TIMESTAMP_FIELD;

import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.schemas.transforms.DropFields;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.WithTimestamps;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A {@link PTransform} to convert {@link Row} to {@link PubsubMessage} with JSON/AVRO payload.
*
* <p>Currently only supports writing a flat schema into a JSON/AVRO payload. This means that all
* Row field values are written to the {@link PubsubMessage} payload, except for {@code
* event_timestamp}, which is either ignored or written to the message attributes, depending on
* whether config.getValue("timestampAttributeKey") is set.
* Adds a timestamp attribute if desired and filters it out of the underlying row if no timestamp
* attribute exists.
*/
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
class RowToPubsubMessage extends PTransform<PCollection<Row>, PCollection<PubsubMessage>> {
private static final Logger LOG = LoggerFactory.getLogger(RowToPubsubMessage.class);
class AddTimestampAttribute extends PTransform<PCollection<Row>, PCollection<Row>> {
private static final Logger LOG = LoggerFactory.getLogger(AddTimestampAttribute.class);
private final boolean useTimestampAttribute;
private final PayloadSerializer serializer;

private RowToPubsubMessage(boolean useTimestampAttribute, PayloadSerializer serializer) {
AddTimestampAttribute(boolean useTimestampAttribute) {
this.useTimestampAttribute = useTimestampAttribute;
this.serializer = serializer;
}

public static RowToPubsubMessage of(boolean useTimestampAttribute, PayloadSerializer serializer) {
return new RowToPubsubMessage(useTimestampAttribute, serializer);
}

@Override
public PCollection<PubsubMessage> expand(PCollection<Row> input) {
public PCollection<Row> expand(PCollection<Row> input) {
// If a timestamp attribute is used, make sure the TIMESTAMP_FIELD is propagated to the
// element's event time. PubSubIO will populate the attribute from there.
PCollection<Row> withTimestamp =
Expand All @@ -82,16 +67,6 @@ public PCollection<PubsubMessage> expand(PCollection<Row> input) {
rows = withTimestamp;
}

return rows.apply(
"MapRowToBytes",
MapElements.into(new TypeDescriptor<byte[]>() {}).via(serializer::serialize))
.apply("MapToPubsubMessage", MapElements.via(new ToPubsubMessage()));
}

private static class ToPubsubMessage extends SimpleFunction<byte[], PubsubMessage> {
@Override
public PubsubMessage apply(byte[] bytes) {
return new PubsubMessage(bytes, ImmutableMap.of());
}
return rows;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.pubsub;

import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.ATTRIBUTES_FIELD;
import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.PAYLOAD_FIELD;
import static org.apache.beam.sdk.io.gcp.pubsub.PubsubSchemaIOProvider.ATTRIBUTE_ARRAY_FIELD_TYPE;
import static org.apache.beam.sdk.io.gcp.pubsub.PubsubSchemaIOProvider.ATTRIBUTE_MAP_FIELD_TYPE;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

import java.util.Collection;
import java.util.Map;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;

class NestedRowToMessage extends SimpleFunction<Row, PubsubMessage> {
private static final long serialVersionUID = 65176815766314684L;

private final PayloadSerializer serializer;
private final SerializableFunction<Row, Map<String, String>> attributesExtractor;
private final SerializableFunction<Row, byte[]> payloadExtractor;

@SuppressWarnings("methodref.receiver.bound.invalid")
NestedRowToMessage(PayloadSerializer serializer, Schema schema) {
this.serializer = serializer;
if (schema.getField(ATTRIBUTES_FIELD).getType().equals(ATTRIBUTE_MAP_FIELD_TYPE)) {
attributesExtractor = NestedRowToMessage::getAttributesFromMap;
} else {
checkArgument(schema.getField(ATTRIBUTES_FIELD).getType().equals(ATTRIBUTE_ARRAY_FIELD_TYPE));
attributesExtractor = NestedRowToMessage::getAttributesFromArray;
}
if (schema.getField(PAYLOAD_FIELD).getType().equals(FieldType.BYTES)) {
payloadExtractor = NestedRowToMessage::getPayloadFromBytes;
} else {
checkArgument(schema.getField(PAYLOAD_FIELD).getType().getTypeName().equals(TypeName.ROW));
payloadExtractor = this::getPayloadFromNested;
}
}

private static Map<String, String> getAttributesFromMap(Row row) {
return ImmutableMap.<String, String>builder()
.putAll(checkArgumentNotNull(row.getMap(ATTRIBUTES_FIELD)))
.build();
}

private static Map<String, String> getAttributesFromArray(Row row) {
ImmutableMap.Builder<String, String> attributes = ImmutableMap.builder();
Collection<Row> attributeEntries = checkArgumentNotNull(row.getArray(ATTRIBUTES_FIELD));
for (Row entry : attributeEntries) {
attributes.put(
checkArgumentNotNull(entry.getString("key")),
checkArgumentNotNull(entry.getString("value")));
}
return attributes.build();
}

private static byte[] getPayloadFromBytes(Row row) {
return checkArgumentNotNull(row.getBytes(PAYLOAD_FIELD));
}

private byte[] getPayloadFromNested(Row row) {
return serializer.serialize(checkArgumentNotNull(row.getRow(PAYLOAD_FIELD)));
}

@Override
public PubsubMessage apply(Row row) {
return new PubsubMessage(payloadExtractor.apply(row), attributesExtractor.apply(row));
}
}
Loading