From a1c723ac692cac0e5b7a5417802f1ca60956f64e Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Wed, 3 May 2017 18:16:47 -0700 Subject: [PATCH 1/2] Provides a default coder for PubsubMessage --- .../io/gcp/pubsub/PubsubCoderRegistrar.java | 36 +++++++++++++++++++ .../PubsubMessageWithAttributesCoder.java | 5 +++ .../gcp/pubsub/PubsubUnboundedSinkTest.java | 3 -- 3 files changed, 41 insertions(+), 3 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java new file mode 100644 index 000000000000..59443059b0d9 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.beam.sdk.coders.CoderFactories; +import org.apache.beam.sdk.coders.CoderFactory; +import org.apache.beam.sdk.coders.CoderRegistrar; + +/** A {@link CoderRegistrar} for standard types used with {@link PubsubIO}. */ +@AutoService(CoderRegistrar.class) +public class PubsubCoderRegistrar implements CoderRegistrar { + @Override + public Map, CoderFactory> getCoderFactoriesToUseForClasses() { + return ImmutableMap., CoderFactory>of( + PubsubIO.PubsubMessage.class, + CoderFactories.forCoder(PubsubMessageWithAttributesCoder.of())); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java index 27f0f0276b12..be9493cd5d82 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.coders.MapCoder; import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.values.TypeDescriptor; /** A coder for PubsubMessage including attributes. */ public class PubsubMessageWithAttributesCoder extends CustomCoder { @@ -35,6 +36,10 @@ public class PubsubMessageWithAttributesCoder extends CustomCoder> ATTRIBUTES_CODER = MapCoder.of( StringUtf8Coder.of(), StringUtf8Coder.of()); + public static Coder of(TypeDescriptor ignored) { + return of(); + } + public static PubsubMessageWithAttributesCoder of() { return new PubsubMessageWithAttributesCoder(); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java index 11e7d831bbdc..f2f40bb37b39 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java @@ -115,7 +115,6 @@ public void sendOneMessage() throws IOException { RecordIdMethod.DETERMINISTIC); p.apply(Create.of(ImmutableList.of(DATA))) .apply(ParDo.of(new Stamp(ATTRIBUTES))) - .setCoder(PubsubMessageWithAttributesCoder.of()) .apply(sink); p.run(); } @@ -145,7 +144,6 @@ public void sendMoreThanOneBatchByNumMessages() throws IOException { Duration.standardSeconds(2), RecordIdMethod.DETERMINISTIC); p.apply(Create.of(data)) .apply(ParDo.of(new Stamp())) - .setCoder(PubsubMessagePayloadOnlyCoder.of()) .apply(sink); p.run(); } @@ -182,7 +180,6 @@ public void sendMoreThanOneBatchByByteSize() throws IOException { RecordIdMethod.DETERMINISTIC); p.apply(Create.of(data)) .apply(ParDo.of(new Stamp())) - .setCoder(PubsubMessagePayloadOnlyCoder.of()) .apply(sink); p.run(); } From cc69c3a35edbf19aff3f4d7ee2390f421d996b26 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Wed, 3 May 2017 18:17:34 -0700 Subject: [PATCH 2/2] Moves PubsubMessage to upper level and renames payload --- .../beam/runners/dataflow/DataflowRunner.java | 22 +++---- .../io/gcp/pubsub/PubsubCoderRegistrar.java | 3 +- .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 49 ++------------- .../beam/sdk/io/gcp/pubsub/PubsubMessage.java | 61 +++++++++++++++++++ .../pubsub/PubsubMessagePayloadOnlyCoder.java | 10 +-- .../PubsubMessageWithAttributesCoder.java | 12 ++-- .../io/gcp/pubsub/PubsubUnboundedSink.java | 12 ++-- .../io/gcp/pubsub/PubsubUnboundedSource.java | 16 ++--- .../gcp/pubsub/PubsubUnboundedSinkTest.java | 4 +- .../gcp/pubsub/PubsubUnboundedSourceTest.java | 4 +- 10 files changed, 107 insertions(+), 86 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index f7455b3da34b..9e5a2fb114d6 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -89,7 +89,7 @@ import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource; @@ -867,7 +867,7 @@ public PDone expand(PCollection input) { * instead defer to Windmill's implementation. */ private static class StreamingPubsubIORead - extends PTransform> { + extends PTransform> { private final PubsubUnboundedSource transform; /** @@ -883,8 +883,8 @@ PubsubUnboundedSource getOverriddenTransform() { } @Override - public PCollection expand(PBegin input) { - return PCollection.createPrimitiveOutputInternal( + public PCollection expand(PBegin input) { + return PCollection.createPrimitiveOutputInternal( input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED) .setCoder(new PubsubMessageWithAttributesCoder()); } @@ -956,9 +956,9 @@ public void translate(StreamingPubsubIORead transform, TranslationContext contex } private static class IdentityMessageFn - extends SimpleFunction { + extends SimpleFunction { @Override - public PubsubIO.PubsubMessage apply(PubsubIO.PubsubMessage input) { + public PubsubMessage apply(PubsubMessage input) { return input; } } @@ -968,7 +968,7 @@ public PubsubIO.PubsubMessage apply(PubsubIO.PubsubMessage input) { * instead defer to Windmill's implementation. */ private static class StreamingPubsubIOWrite - extends PTransform, PDone> { + extends PTransform, PDone> { private final PubsubUnboundedSink transform; /** @@ -984,7 +984,7 @@ PubsubUnboundedSink getOverriddenTransform() { } @Override - public PDone expand(PCollection input) { + public PDone expand(PCollection input) { return PDone.in(input.getPipeline()); } @@ -1332,7 +1332,7 @@ public Map mapOutputs( private class StreamingPubsubIOWriteOverrideFactory implements PTransformOverrideFactory< - PCollection, PDone, PubsubUnboundedSink> { + PCollection, PDone, PubsubUnboundedSink> { private final DataflowRunner runner; private StreamingPubsubIOWriteOverrideFactory(DataflowRunner runner) { @@ -1340,9 +1340,9 @@ private StreamingPubsubIOWriteOverrideFactory(DataflowRunner runner) { } @Override - public PTransformReplacement, PDone> + public PTransformReplacement, PDone> getReplacementTransform( - AppliedPTransform, PDone, PubsubUnboundedSink> + AppliedPTransform, PDone, PubsubUnboundedSink> transform) { return PTransformReplacement.of( PTransformReplacements.getSingletonMainInput(transform), diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java index 59443059b0d9..062f3500e582 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java @@ -30,7 +30,6 @@ public class PubsubCoderRegistrar implements CoderRegistrar { @Override public Map, CoderFactory> getCoderFactoriesToUseForClasses() { return ImmutableMap., CoderFactory>of( - PubsubIO.PubsubMessage.class, - CoderFactories.forCoder(PubsubMessageWithAttributesCoder.of())); + PubsubMessage.class, CoderFactories.forCoder(PubsubMessageWithAttributesCoder.of())); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 133839c3313a..e023ad0dca07 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.gcp.pubsub; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; @@ -155,44 +154,6 @@ private static void populateCommonDisplayData(DisplayData.Builder builder, } } - /** - * Class representing a Pub/Sub message. Each message contains a single message payload and - * a map of attached attributes. - */ - public static class PubsubMessage { - - private byte[] message; - private Map attributes; - - public PubsubMessage(byte[] message, Map attributes) { - this.message = message; - this.attributes = attributes; - } - - /** - * Returns the main PubSub message. - */ - public byte[] getMessage() { - return message; - } - - /** - * Returns the given attribute value. If not such attribute exists, returns null. - */ - @Nullable - public String getAttribute(String attribute) { - checkNotNull(attribute, "attribute"); - return attributes.get(attribute); - } - - /** - * Returns the full map of attributes. This is an unmodifiable map. - */ - public Map getAttributeMap() { - return attributes; - } - } - /** * Class representing a Cloud Pub/Sub Subscription. */ @@ -471,7 +432,7 @@ private static Read read() { /** * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. The - * messages will only contain a {@link PubsubMessage#getMessage() payload}, but no {@link + * messages will only contain a {@link PubsubMessage#getPayload() payload}, but no {@link * PubsubMessage#getAttributeMap() attributes}. */ public static Read readPubsubMessagesWithoutAttributes() { @@ -484,7 +445,7 @@ public static Read readPubsubMessagesWithoutAttributes() { /** * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. The - * messages will contain both a {@link PubsubMessage#getMessage() payload} and {@link + * messages will contain both a {@link PubsubMessage#getPayload() payload} and {@link * PubsubMessage#getAttributeMap() attributes}. */ public static Read readPubsubMessagesWithAttributes() { @@ -939,7 +900,7 @@ public void startBundle(Context c) throws IOException { public void processElement(ProcessContext c) throws IOException { byte[] payload; PubsubMessage message = getFormatFn().apply(c.element()); - payload = message.getMessage(); + payload = message.getPayload(); Map attributes = message.getAttributeMap(); // NOTE: The record id is always null. output.add(new OutgoingMessage(payload, attributes, c.timestamp().getMillis(), null)); @@ -981,7 +942,7 @@ public void populateDisplayData(DisplayData.Builder builder) { private static class ParsePayloadAsUtf8 extends SimpleFunction { @Override public String apply(PubsubMessage input) { - return new String(input.getMessage(), StandardCharsets.UTF_8); + return new String(input.getPayload(), StandardCharsets.UTF_8); } } @@ -995,7 +956,7 @@ public ParsePayloadUsingCoder(Coder coder) { @Override public T apply(PubsubMessage input) { try { - return CoderUtils.decodeFromByteArray(coder, input.getMessage()); + return CoderUtils.decodeFromByteArray(coder, input.getPayload()); } catch (CoderException e) { throw new RuntimeException("Could not decode Pubsub message", e); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java new file mode 100644 index 000000000000..69f850a5b1a9 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Map; +import javax.annotation.Nullable; + +/** + * Class representing a Pub/Sub message. Each message contains a single message payload and + * a map of attached attributes. + */ +public class PubsubMessage { + + private byte[] message; + private Map attributes; + + public PubsubMessage(byte[] payload, Map attributes) { + this.message = payload; + this.attributes = attributes; + } + + /** + * Returns the main PubSub message. + */ + public byte[] getPayload() { + return message; + } + + /** + * Returns the given attribute value. If not such attribute exists, returns null. + */ + @Nullable + public String getAttribute(String attribute) { + checkNotNull(attribute, "attribute"); + return attributes.get(attribute); + } + + /** + * Returns the full map of attributes. This is an unmodifiable map. + */ + public Map getAttributeMap() { + return attributes; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java index f0dae4652616..81c1a45b4d5b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java @@ -27,22 +27,22 @@ import org.apache.beam.sdk.util.StreamUtils; /** A coder for PubsubMessage treating the raw bytes being decoded as the message's payload. */ -public class PubsubMessagePayloadOnlyCoder extends CustomCoder { +public class PubsubMessagePayloadOnlyCoder extends CustomCoder { public static PubsubMessagePayloadOnlyCoder of() { return new PubsubMessagePayloadOnlyCoder(); } @Override - public void encode(PubsubIO.PubsubMessage value, OutputStream outStream, Context context) + public void encode(PubsubMessage value, OutputStream outStream, Context context) throws IOException { checkState(context.isWholeStream, "Expected to only be used in a whole-stream context"); - outStream.write(value.getMessage()); + outStream.write(value.getPayload()); } @Override - public PubsubIO.PubsubMessage decode(InputStream inStream, Context context) throws IOException { + public PubsubMessage decode(InputStream inStream, Context context) throws IOException { checkState(context.isWholeStream, "Expected to only be used in a whole-stream context"); - return new PubsubIO.PubsubMessage( + return new PubsubMessage( StreamUtils.getBytes(inStream), ImmutableMap.of()); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java index be9493cd5d82..f70955da92e0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java @@ -30,13 +30,13 @@ import org.apache.beam.sdk.values.TypeDescriptor; /** A coder for PubsubMessage including attributes. */ -public class PubsubMessageWithAttributesCoder extends CustomCoder { +public class PubsubMessageWithAttributesCoder extends CustomCoder { private static final Coder PAYLOAD_CODER = NullableCoder.of(ByteArrayCoder.of()); private static final Coder> ATTRIBUTES_CODER = MapCoder.of( StringUtf8Coder.of(), StringUtf8Coder.of()); - public static Coder of(TypeDescriptor ignored) { + public static Coder of(TypeDescriptor ignored) { return of(); } @@ -44,19 +44,19 @@ public static PubsubMessageWithAttributesCoder of() { return new PubsubMessageWithAttributesCoder(); } - public void encode(PubsubIO.PubsubMessage value, OutputStream outStream, Context context) + public void encode(PubsubMessage value, OutputStream outStream, Context context) throws IOException { PAYLOAD_CODER.encode( - value.getMessage(), + value.getPayload(), outStream, context.nested()); ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream, context); } @Override - public PubsubIO.PubsubMessage decode(InputStream inStream, Context context) throws IOException { + public PubsubMessage decode(InputStream inStream, Context context) throws IOException { byte[] payload = PAYLOAD_CODER.decode(inStream, context.nested()); Map attributes = ATTRIBUTES_CODER.decode(inStream, context); - return new PubsubIO.PubsubMessage(payload, attributes); + return new PubsubMessage(payload, attributes); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index 67530ec64b0b..9d97e91f50bc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -81,7 +81,7 @@ * to dedup messages. * */ -public class PubsubUnboundedSink extends PTransform, PDone> { +public class PubsubUnboundedSink extends PTransform, PDone> { /** * Default maximum number of messages per publish. */ @@ -154,7 +154,7 @@ enum RecordIdMethod { /** * Convert elements to messages and shard them. */ - private static class ShardFn extends DoFn> { + private static class ShardFn extends DoFn> { private final Counter elementCounter = Metrics.counter(ShardFn.class, "elements"); private final int numShards; private final RecordIdMethod recordIdMethod; @@ -167,8 +167,8 @@ private static class ShardFn extends DoFn attributes = message.getAttributeMap(); long timestampMsSinceEpoch = c.timestamp().getMillis(); @@ -427,11 +427,11 @@ public String getIdAttribute() { } @Override - public PDone expand(PCollection input) { + public PDone expand(PCollection input) { input .apply( "PubsubUnboundedSink.Window", - Window.into(new GlobalWindows()) + Window.into(new GlobalWindows()) .triggering( Repeatedly.forever( AfterFirst.of( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index d366949134db..e5be71ba9e29 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -107,7 +107,7 @@ * {@link UnboundedSource.UnboundedReader} instances to execute concurrently and thus hide latency. * */ -public class PubsubUnboundedSource extends PTransform> { +public class PubsubUnboundedSource extends PTransform> { private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSource.class); /** @@ -389,7 +389,7 @@ public PubsubCheckpoint decode(InputStream inStream, Context context) throws IOE * but not yet consumed downstream and/or ACKed back to Pubsub. */ @VisibleForTesting - static class PubsubReader extends UnboundedSource.UnboundedReader { + static class PubsubReader extends UnboundedSource.UnboundedReader { /** * For access to topic and checkpointCoder. */ @@ -963,11 +963,11 @@ public boolean advance() throws IOException { } @Override - public PubsubIO.PubsubMessage getCurrent() throws NoSuchElementException { + public PubsubMessage getCurrent() throws NoSuchElementException { if (current == null) { throw new NoSuchElementException(); } - return new PubsubIO.PubsubMessage(current.elementBytes, current.attributes); + return new PubsubMessage(current.elementBytes, current.attributes); } @Override @@ -1088,7 +1088,7 @@ public long getSplitBacklogBytes() { // ================================================================================ @VisibleForTesting - static class PubsubSource extends UnboundedSource { + static class PubsubSource extends UnboundedSource { public final PubsubUnboundedSource outer; // The subscription to read from. @VisibleForTesting @@ -1161,7 +1161,7 @@ public Coder getCheckpointMarkCoder() { } @Override - public Coder getDefaultOutputCoder() { + public Coder getDefaultOutputCoder() { return new PubsubMessageWithAttributesCoder(); } @@ -1181,7 +1181,7 @@ public boolean requiresDeduping() { // StatsFn // ================================================================================ - private static class StatsFn extends DoFn { + private static class StatsFn extends DoFn { private final Counter elementCounter = SourceMetrics.elementsRead(); private final PubsubClientFactory pubsubFactory; @@ -1398,7 +1398,7 @@ public boolean getNeedsAttributes() { } @Override - public PCollection expand(PBegin input) { + public PCollection expand(PBegin input) { return input.getPipeline().begin() .apply(Read.from(new PubsubSource(this))) .apply("PubsubUnboundedSource.Stats", diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java index f2f40bb37b39..cc3c85e17ff2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java @@ -60,7 +60,7 @@ public class PubsubUnboundedSinkTest implements Serializable { private static final String ID_ATTRIBUTE = "id"; private static final int NUM_SHARDS = 10; - private static class Stamp extends DoFn { + private static class Stamp extends DoFn { private final Map attributes; private Stamp() { @@ -74,7 +74,7 @@ private Stamp(Map attributes) { @ProcessElement public void processElement(ProcessContext c) { c.outputWithTimestamp( - new PubsubIO.PubsubMessage( + new PubsubMessage( c.element().getBytes(StandardCharsets.UTF_8), attributes), new Instant(TIMESTAMP)); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java index 592dfa3d8ae8..ee467daeff17 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java @@ -114,8 +114,8 @@ public void after() throws IOException { factory = null; } - private static String data(PubsubIO.PubsubMessage message) { - return new String(message.getMessage(), StandardCharsets.UTF_8); + private static String data(PubsubMessage message) { + return new String(message.getPayload(), StandardCharsets.UTF_8); } @Test