From 24e3c3151f1460a4d87d5809513e14a9505dc06e Mon Sep 17 00:00:00 2001 From: Rafal Wojdyla Date: Mon, 27 Mar 2017 14:40:10 -0400 Subject: [PATCH 1/2] Fix typo --- .../main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java index 118496835dea..8c396a2ae75c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java @@ -1253,7 +1253,7 @@ public void populateDisplayData(Builder builder) { * Subscription to read from. If {@literal null} then {@link #topic} must be given. * Otherwise {@link #topic} must be null. * - *

If no subscription is given a random one will be created when the transorm is + *

If no subscription is given a random one will be created when the transform is * applied. This field will be update with that subscription's path. The created * subscription is never deleted. */ From 6a577da0c993f3c770094c5c616bf712db78138b Mon Sep 17 00:00:00 2001 From: Rafal Wojdyla Date: Wed, 29 Mar 2017 19:11:19 -0400 Subject: [PATCH 2/2] Always reuse reader for direct pipeline runner --- .../apache/beam/runners/direct/TransformEvaluatorRegistry.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index 5ad8709f6b9c..9eda07d8794b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -50,7 +50,8 @@ public static TransformEvaluatorRegistry defaultRegistry(EvaluationContext ctxt) ImmutableMap, TransformEvaluatorFactory> primitives = ImmutableMap., TransformEvaluatorFactory>builder() .put(Read.Bounded.class, new BoundedReadEvaluatorFactory(ctxt)) - .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt)) + .put(Read.Unbounded.class, + new UnboundedReadEvaluatorFactory(ctxt, 1.0 /* always reuse */)) .put(ParDo.MultiOutput.class, new ParDoEvaluatorFactory<>(ctxt)) .put(StatefulParDo.class, new StatefulParDoEvaluatorFactory<>(ctxt)) .put(PCollections.class, new FlattenEvaluatorFactory(ctxt))