From 5bcb8c579656bc2d6e4d2a8dd5dcb2a46875812f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Wed, 12 Apr 2017 14:49:58 +0200 Subject: [PATCH] [BEAM-1722] Move PubsubIO into the google-cloud-platform module --- .../examples/complete/game/GameStats.java | 2 +- .../examples/complete/game/LeaderBoard.java | 2 +- .../triggers/AfterWatermarkStateMachine.java | 14 +++--- runners/google-cloud-dataflow-java/pom.xml | 5 +-- .../beam/runners/dataflow/DataflowRunner.java | 4 +- sdks/java/core/pom.xml | 44 ------------------- .../beam/sdk/transforms/GroupByKey.java | 4 +- .../transforms/windowing/AfterWatermark.java | 14 +++--- .../org/apache/beam/sdk/util/Transport.java | 3 +- sdks/java/io/google-cloud-platform/pom.xml | 44 +++++++++++++++++++ .../beam/sdk/io/gcp/pubsub}/PubsubClient.java | 10 ++--- .../sdk/io/gcp/pubsub}/PubsubGrpcClient.java | 4 +- .../beam/sdk/io/gcp/pubsub}/PubsubIO.java | 12 +++-- .../sdk/io/gcp/pubsub}/PubsubJsonClient.java | 6 ++- .../sdk/io/gcp/pubsub}/PubsubTestClient.java | 6 +-- .../io/gcp/pubsub}/PubsubUnboundedSink.java | 14 +++--- .../io/gcp/pubsub}/PubsubUnboundedSource.java | 16 +++---- .../beam/sdk/io/gcp/pubsub/package-info.java | 24 ++++++++++ .../beam/sdk/io/gcp/GcpApiSurfaceTest.java | 5 ++- .../sdk/io/gcp/pubsub}/PubsubClientTest.java | 8 ++-- .../io/gcp/pubsub}/PubsubGrpcClientTest.java | 11 ++--- .../beam/sdk/io/gcp/pubsub}/PubsubIOTest.java | 2 +- .../io/gcp/pubsub}/PubsubJsonClientTest.java | 11 +++-- .../io/gcp/pubsub}/PubsubTestClientTest.java | 12 ++--- .../gcp/pubsub}/PubsubUnboundedSinkTest.java | 12 +++-- .../pubsub}/PubsubUnboundedSourceTest.java | 18 ++++---- 26 files changed, 159 insertions(+), 148 deletions(-) rename sdks/java/{core/src/main/java/org/apache/beam/sdk/util => io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub}/PubsubClient.java (98%) rename sdks/java/{core/src/main/java/org/apache/beam/sdk/util => io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub}/PubsubGrpcClient.java (99%) rename sdks/java/{core/src/main/java/org/apache/beam/sdk/io => io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub}/PubsubIO.java (98%) rename sdks/java/{core/src/main/java/org/apache/beam/sdk/util => io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub}/PubsubJsonClient.java (98%) rename sdks/java/{core/src/main/java/org/apache/beam/sdk/util => io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub}/PubsubTestClient.java (98%) rename sdks/java/{core/src/main/java/org/apache/beam/sdk/io => io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub}/PubsubUnboundedSink.java (97%) rename sdks/java/{core/src/main/java/org/apache/beam/sdk/io => io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub}/PubsubUnboundedSource.java (99%) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/package-info.java rename sdks/java/{core/src/test/java/org/apache/beam/sdk/util => io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub}/PubsubClientTest.java (95%) rename sdks/java/{core/src/test/java/org/apache/beam/sdk/util => io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub}/PubsubGrpcClientTest.java (95%) rename sdks/java/{core/src/test/java/org/apache/beam/sdk/io => io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub}/PubsubIOTest.java (99%) rename sdks/java/{core/src/test/java/org/apache/beam/sdk/util => io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub}/PubsubJsonClientTest.java (94%) rename sdks/java/{core/src/test/java/org/apache/beam/sdk/util => io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub}/PubsubTestClientTest.java (91%) rename sdks/java/{core/src/test/java/org/apache/beam/sdk/io => io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub}/PubsubUnboundedSinkTest.java (95%) rename sdks/java/{core/src/test/java/org/apache/beam/sdk/io => io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub}/PubsubUnboundedSourceTest.java (96%) diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java index 93e8254787a1..6874953ade41 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java @@ -25,7 +25,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java index 35b586bd8932..96f42919e702 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java @@ -28,7 +28,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java index 0b1200555abe..1b117d2e6923 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java @@ -31,18 +31,16 @@ * lower-bound, sometimes heuristically established, on event times that have been fully processed * by the pipeline. * - *

For sources that provide non-heuristic watermarks (e.g. - * {@link org.apache.beam.sdk.io.PubsubIO} when using arrival times as event times), the - * watermark is a strict guarantee that no data with an event time earlier than + *

For sources that provide non-heuristic watermarks (e.g. PubsubIO when using arrival times as + * event times), the watermark is a strict guarantee that no data with an event time earlier than * that watermark will ever be observed in the pipeline. In this case, it's safe to assume that any * pane triggered by an {@code AfterWatermark} trigger with a reference point at or beyond the end * of the window will be the last pane ever for that window. * - *

For sources that provide heuristic watermarks (e.g. - * {@link org.apache.beam.sdk.io.PubsubIO} when using user-supplied event times), the - * watermark itself becomes an estimate that no data with an event time earlier than that - * watermark (i.e. "late data") will ever be observed in the pipeline. These heuristics can - * often be quite accurate, but the chance of seeing late data for any given window is non-zero. + *

For sources that provide heuristic watermarks (e.g. PubsubIO when using user-supplied event + * times), the watermark itself becomes an estimate that no data with an event time earlier + * than that watermark (i.e. "late data") will ever be observed in the pipeline. These heuristics + * can often be quite accurate, but the chance of seeing late data for any given window is non-zero. * Thus, if absolute correctness over time is important to your use case, you may want to consider * using a trigger that accounts for late data. The default trigger, * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}, which fires diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index a57744cd969b..96eced80b765 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -188,13 +188,12 @@ org.apache.beam - beam-runners-core-construction-java + beam-sdks-java-io-google-cloud-platform org.apache.beam - beam-sdks-java-io-google-cloud-platform - test + beam-runners-core-construction-java diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 8726635d1b2c..684dc1468fa5 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -82,11 +82,11 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.FileBasedSink; -import org.apache.beam.sdk.io.PubsubUnboundedSink; -import org.apache.beam.sdk.io.PubsubUnboundedSource; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.Write; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index d117d5a63a5c..0ac40f4008fb 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -139,40 +139,6 @@ beam-sdks-common-runner-api - - io.grpc - grpc-auth - - - - io.grpc - grpc-core - - - - io.grpc - grpc-netty - - - - io.grpc - grpc-stub - - - - - io.grpc - grpc-all - runtime - - - - io.grpc - grpc-protobuf - runtime - - com.google.auth google-auth-library-credentials @@ -183,16 +149,6 @@ google-auth-library-oauth2-http - - io.netty - netty-handler - - - - com.google.api.grpc - grpc-google-pubsub-v1 - - com.google.api-client google-api-client diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java index d228dbb78b3a..cc9210238d71 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java @@ -101,9 +101,7 @@ * The output {@code PCollection} will have the same {@link WindowFn} * as the input. * - *

If the input {@code PCollection} contains late data (see - * {@link org.apache.beam.sdk.io.PubsubIO.Read#timestampLabel} - * for an example of how this can occur) or the + *

If the input {@code PCollection} contains late data or the * {@link Window#triggering requested TriggerFn} can fire before * the watermark, then there may be multiple elements * output by a {@code GroupByKey} that correspond to the same key and window. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java index 7cd2601b5cc3..fb6053878987 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java @@ -33,18 +33,16 @@ * lower-bound, sometimes heuristically established, on event times that have been fully processed * by the pipeline. * - *

For sources that provide non-heuristic watermarks (e.g. - * {@link org.apache.beam.sdk.io.PubsubIO} when using arrival times as event times), the - * watermark is a strict guarantee that no data with an event time earlier than + *

For sources that provide non-heuristic watermarks (e.g. PubsubIO when using arrival times as + * event times), the watermark is a strict guarantee that no data with an event time earlier than * that watermark will ever be observed in the pipeline. In this case, it's safe to assume that any * pane triggered by an {@code AfterWatermark} trigger with a reference point at or beyond the end * of the window will be the last pane ever for that window. * - *

For sources that provide heuristic watermarks (e.g. - * {@link org.apache.beam.sdk.io.PubsubIO} when using user-supplied event times), the - * watermark itself becomes an estimate that no data with an event time earlier than that - * watermark (i.e. "late data") will ever be observed in the pipeline. These heuristics can - * often be quite accurate, but the chance of seeing late data for any given window is non-zero. + *

For sources that provide heuristic watermarks (e.g. PubsubIO when using user-supplied event + * times), the watermark itself becomes an estimate that no data with an event time earlier + * than that watermark (i.e. "late data") will ever be observed in the pipeline. These heuristics + * can often be quite accurate, but the chance of seeing late data for any given window is non-zero. * Thus, if absolute correctness over time is important to your use case, you may want to consider * using a trigger that accounts for late data. The default trigger, * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}, which fires diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java index 1edfa1d4ba2f..80c093bb73f4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java @@ -107,8 +107,7 @@ private static ApiComponents apiComponentsFromUrl(String urlString) { /** * Returns a Pubsub client builder using the specified {@link PubsubOptions}. * - * @deprecated Use an appropriate - * {@link org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory} + * @deprecated Use an appropriate org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory */ @Deprecated public static Pubsub.Builder diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml index 1a2e100b8277..d22c6c51fc85 100644 --- a/sdks/java/io/google-cloud-platform/pom.xml +++ b/sdks/java/io/google-cloud-platform/pom.xml @@ -83,6 +83,16 @@ google-api-services-storage + + com.google.apis + google-api-services-pubsub + + + + com.google.api.grpc + grpc-google-pubsub-v1 + + com.google.auto.service auto-service @@ -104,11 +114,45 @@ datastore-v1-protos + + io.grpc + grpc-auth + + io.grpc grpc-core + + io.grpc + grpc-netty + + + + io.netty + netty-handler + + + + io.grpc + grpc-stub + + + + + io.grpc + grpc-all + runtime + + + + io.grpc + grpc-protobuf + runtime + + joda-time joda-time diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java similarity index 98% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java rename to sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java index fc840577a157..750178cfe9a3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.beam.sdk.util; +package org.apache.beam.sdk.io.gcp.pubsub; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -37,7 +37,7 @@ /** * An (abstract) helper class for talking to Pubsub via an underlying transport. */ -public abstract class PubsubClient implements Closeable { +abstract class PubsubClient implements Closeable { /** * Factory for creating clients. */ @@ -121,7 +121,7 @@ protected static long extractTimestamp( /** * Path representing a cloud project id. */ - public static class ProjectPath implements Serializable { + static class ProjectPath implements Serializable { private final String projectId; /** @@ -304,7 +304,7 @@ public static TopicPath topicPathFromName(String projectId, String topicName) { *

NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. * Java serialization is never used for non-test clients. */ - public static class OutgoingMessage implements Serializable { + static class OutgoingMessage implements Serializable { /** * Underlying (encoded) element. */ @@ -368,7 +368,7 @@ public int hashCode() { *

NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. * Java serialization is never used for non-test clients. */ - public static class IncomingMessage implements Serializable { + static class IncomingMessage implements Serializable { /** * Underlying (encoded) element. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java similarity index 99% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java rename to sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java index 4a6ddaceab12..912d59cde5fc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.beam.sdk.util; +package org.apache.beam.sdk.io.gcp.pubsub; import static com.google.common.base.Preconditions.checkState; @@ -70,7 +70,7 @@ *

CAUTION: Currently uses the application default credentials and does not respect any * credentials-related arguments in {@link GcpOptions}. */ -public class PubsubGrpcClient extends PubsubClient { +class PubsubGrpcClient extends PubsubClient { private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com"; private static final int PUBSUB_PORT = 443; private static final int LIST_BATCH_SIZE = 1000; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java similarity index 98% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java rename to sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 67ab2ec35f29..8fc1c19ce74f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io; +package org.apache.beam.sdk.io.gcp.pubsub; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; @@ -30,6 +30,10 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.options.PubsubOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; @@ -43,12 +47,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.PubsubClient; -import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; -import org.apache.beam.sdk.util.PubsubClient.ProjectPath; -import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; -import org.apache.beam.sdk.util.PubsubClient.TopicPath; -import org.apache.beam.sdk.util.PubsubJsonClient; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java similarity index 98% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java rename to sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java index ef8abfdff594..e290a6ba481f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.beam.sdk.util; +package org.apache.beam.sdk.io.gcp.pubsub; import static com.google.common.base.Preconditions.checkState; @@ -48,11 +48,13 @@ import java.util.TreeMap; import javax.annotation.Nullable; import org.apache.beam.sdk.options.PubsubOptions; +import org.apache.beam.sdk.util.RetryHttpRequestInitializer; +import org.apache.beam.sdk.util.Transport; /** * A Pubsub client using JSON transport. */ -public class PubsubJsonClient extends PubsubClient { +class PubsubJsonClient extends PubsubClient { private static class PubsubJsonClientFactory implements PubsubClientFactory { private static HttpRequestInitializer chainHttpRequestInitializer( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java similarity index 98% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java rename to sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java index 61479f907476..c88576ed11bb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.beam.sdk.util; +package org.apache.beam.sdk.io.gcp.pubsub; import static com.google.common.base.Preconditions.checkState; @@ -40,7 +40,7 @@ * testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link #modifyAckDeadline} * methods. Relies on statics to mimic the Pubsub service, though we try to hide that. */ -public class PubsubTestClient extends PubsubClient implements Serializable { +class PubsubTestClient extends PubsubClient implements Serializable { /** * Mimic the state of the simulated Pubsub 'service'. * @@ -123,7 +123,7 @@ public interface PubsubTestClientFactory * The factory must be closed when the test is complete, at which point final validation will * occur. */ - public static PubsubTestClientFactory createFactoryForPublish( + static PubsubTestClientFactory createFactoryForPublish( final TopicPath expectedTopic, final Iterable expectedOutgoingMessages, final Iterable failingOutgoingMessages) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java similarity index 97% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java rename to sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index 55605b32255c..3ce9224d8db7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -16,14 +16,13 @@ * limitations under the License. */ -package org.apache.beam.sdk.io; +package org.apache.beam.sdk.io.gcp.pubsub; import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.hash.Hashing; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -33,7 +32,6 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; - import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.ByteArrayCoder; @@ -44,7 +42,9 @@ import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.io.PubsubIO.PubsubMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PubsubOptions; @@ -63,10 +63,6 @@ import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.PubsubClient; -import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; -import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory; -import org.apache.beam.sdk.util.PubsubClient.TopicPath; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -166,7 +162,7 @@ private static class ShardFn extends DoFn> { private final Coder elementCoder; private final int numShards; private final RecordIdMethod recordIdMethod; - private final SimpleFunction formatFn; + private final SimpleFunction formatFn; ShardFn(Coder elementCoder, int numShards, SimpleFunction formatFn, RecordIdMethod recordIdMethod) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java similarity index 99% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java rename to sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index 0e6bec8e6931..9d8763b80997 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.beam.sdk.io; +package org.apache.beam.sdk.io.gcp.pubsub; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -52,7 +52,12 @@ import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.PubsubIO.PubsubMessage; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptions; @@ -70,11 +75,6 @@ import org.apache.beam.sdk.util.BucketingFunction; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.MovingFunction; -import org.apache.beam.sdk.util.PubsubClient; -import org.apache.beam.sdk.util.PubsubClient.ProjectPath; -import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory; -import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; -import org.apache.beam.sdk.util.PubsubClient.TopicPath; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; @@ -1316,7 +1316,7 @@ public void populateDisplayData(Builder builder) { * used to parse {@link PubsubIO.PubsubMessage}s containing a payload and attributes. */ @Nullable - SimpleFunction parseFn; + SimpleFunction parseFn; @VisibleForTesting PubsubUnboundedSource( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/package-info.java new file mode 100644 index 000000000000..55befbaa5969 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Defines transforms for reading and writing from + * Google Cloud Pub/Sub. + * @see org.apache.beam.sdk.io.gcp.pubsub.PubsubIO + */ +package org.apache.beam.sdk.io.gcp.pubsub; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java index 0987140e7d8a..f468ec06e052 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java @@ -44,11 +44,14 @@ public void testGcpApiSurface() throws Exception { ApiSurface.ofPackage(thisPackage, thisClassLoader) .pruningPattern("org[.]apache[.]beam[.].*Test.*") .pruningPattern("org[.]apache[.]beam[.].*IT") - .pruningPattern("java[.]lang.*"); + .pruningPattern("java[.]lang.*") + .pruningPattern("java[.]util.*"); @SuppressWarnings("unchecked") final Set>> allowedClasses = ImmutableSet.of( + classesInPackage("com.google.api.client.googleapis"), + classesInPackage("com.google.api.client.http"), classesInPackage("com.google.api.client.json"), classesInPackage("com.google.api.client.util"), classesInPackage("com.google.api.services.bigquery.model"), diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java similarity index 95% rename from sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java index 1a99d38c714f..14c36f93ed9f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java @@ -16,15 +16,15 @@ * limitations under the License. */ -package org.apache.beam.sdk.util; +package org.apache.beam.sdk.io.gcp.pubsub; import static org.junit.Assert.assertEquals; import com.google.common.collect.ImmutableMap; import java.util.Map; -import org.apache.beam.sdk.util.PubsubClient.ProjectPath; -import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; -import org.apache.beam.sdk.util.PubsubClient.TopicPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java similarity index 95% rename from sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java index 6d4cf4efc4bc..28e07e229b6a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.beam.sdk.util; +package org.apache.beam.sdk.io.gcp.pubsub; import static org.junit.Assert.assertEquals; @@ -44,10 +44,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; -import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; -import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; -import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; -import org.apache.beam.sdk.util.PubsubClient.TopicPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; +import org.apache.beam.sdk.util.TestCredential; import org.junit.After; import org.junit.Before; import org.junit.Test; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java similarity index 99% rename from sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java index c996409a7564..6e9922c4fcd1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io; +package org.apache.beam.sdk.io.gcp.pubsub; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.hasItem; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java similarity index 94% rename from sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java index 019190bdafee..d2909947ec70 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.beam.sdk.util; +package org.apache.beam.sdk.io.gcp.pubsub; import static org.junit.Assert.assertEquals; @@ -33,11 +33,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; - -import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; -import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; -import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; -import org.apache.beam.sdk.util.PubsubClient.TopicPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.junit.After; import org.junit.Before; import org.junit.Test; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java similarity index 91% rename from sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java index a1b7daf191c5..18180afedb77 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.beam.sdk.util; +package org.apache.beam.sdk.io.gcp.pubsub; import static org.junit.Assert.assertEquals; @@ -27,11 +27,11 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicLong; -import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; -import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; -import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; -import org.apache.beam.sdk.util.PubsubClient.TopicPath; -import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java similarity index 95% rename from sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java index 7a4be62b599f..be425d4cea4a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.beam.sdk.io; +package org.apache.beam.sdk.io.gcp.pubsub; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -28,7 +28,10 @@ import java.util.Map; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.PubsubUnboundedSink.RecordIdMethod; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink.RecordIdMethod; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.testing.NeedsRunner; @@ -37,11 +40,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.util.PubsubClient; -import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; -import org.apache.beam.sdk.util.PubsubClient.TopicPath; -import org.apache.beam.sdk.util.PubsubTestClient; -import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Rule; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java similarity index 96% rename from sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java index d9df2ca1c57c..d2e88c3f0dcf 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.beam.sdk.io; +package org.apache.beam.sdk.io.gcp.pubsub; import static junit.framework.TestCase.assertFalse; import static org.hamcrest.Matchers.equalTo; @@ -40,21 +40,19 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubCheckpoint; -import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubReader; -import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubSource; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.PubsubCheckpoint; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.PubsubReader; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.PubsubSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.PubsubClient; -import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; -import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; -import org.apache.beam.sdk.util.PubsubClient.TopicPath; -import org.apache.beam.sdk.util.PubsubTestClient; -import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory; import org.joda.time.Instant; import org.junit.After; import org.junit.Rule;