Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursi
public PCollection<KV<String, List<CompletionCandidate>>> expand(PCollection<String> input) {
PCollection<CompletionCandidate> candidates = input
// First count how often each token appears.
.apply(new Count.PerElement<String>())
.apply(Count.<String>perElement())

// Map the KV outputs of Count into our own CompletionCandiate class.
.apply("CreateCompletionCandidates", ParDo.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
registerTransformTranslator(Flatten.FlattenPCollectionList.class,
registerTransformTranslator(Flatten.PCollections.class,
new FlattenPCollectionTranslator());
registerTransformTranslator(PrimitiveCreate.class, new CreateValuesTranslator());
registerTransformTranslator(CreateApexPCollectionView.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@
import org.apache.beam.sdk.values.TaggedPValue;

/**
* {@link Flatten.FlattenPCollectionList} translation to Apex operator.
* {@link Flatten.PCollections} translation to Apex operator.
*/
class FlattenPCollectionTranslator<T> implements
TransformTranslator<Flatten.FlattenPCollectionList<T>> {
TransformTranslator<Flatten.PCollections<T>> {
private static final long serialVersionUID = 1L;

@Override
public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
public void translate(Flatten.PCollections<T> transform, TranslationContext context) {
List<PCollection<T>> inputCollections = extractPCollections(context.getInputs());

if (inputCollections.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;

import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple.WatermarkTuple;
import org.apache.beam.sdk.transforms.Flatten.PCollections;
import org.apache.beam.sdk.util.WindowedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Apex operator for Beam {@link org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList}.
* Apex operator for Beam {@link PCollections}.
*/
public class ApexFlattenOperator<InputT> extends BaseOperator {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
Expand All @@ -36,11 +35,11 @@

/**
* A {@link PTransformOverrideFactory} that provides an empty {@link Create} to replace a {@link
* Flatten.FlattenPCollectionList} that takes no input {@link PCollection PCollections}.
* Flatten.PCollections} that takes no input {@link PCollection PCollections}.
*/
public class EmptyFlattenAsCreateFactory<T>
implements PTransformOverrideFactory<
PCollectionList<T>, PCollection<T>, Flatten.FlattenPCollectionList<T>> {
PCollectionList<T>, PCollection<T>, Flatten.PCollections<T>> {
private static final EmptyFlattenAsCreateFactory<Object> INSTANCE =
new EmptyFlattenAsCreateFactory<>();

Expand All @@ -52,7 +51,7 @@ private EmptyFlattenAsCreateFactory() {}

@Override
public PTransform<PCollectionList<T>, PCollection<T>> getReplacementTransform(
FlattenPCollectionList<T> transform) {
Flatten.PCollections<T> transform) {
return (PTransform) Create.empty(VoidCoder.of());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,14 @@ public boolean matches(AppliedPTransform<?, ?, ?> application) {
}

/**
* A {@link PTransformMatcher} which matches a {@link Flatten.FlattenPCollectionList} which
* A {@link PTransformMatcher} which matches a {@link Flatten.PCollections} which
* consumes no input {@link PCollection PCollections}.
*/
public static PTransformMatcher emptyFlatten() {
return new PTransformMatcher() {
@Override
public boolean matches(AppliedPTransform<?, ?, ?> application) {
return (application.getTransform() instanceof Flatten.FlattenPCollectionList)
return (application.getTransform() instanceof Flatten.PCollections)
&& application.getInputs().isEmpty();
}
};
Expand All @@ -182,8 +182,8 @@ public static PTransformMatcher writeWithRunnerDeterminedSharding() {
return new PTransformMatcher() {
@Override
public boolean matches(AppliedPTransform<?, ?, ?> application) {
if (application.getTransform() instanceof Write.Bound) {
return ((Write.Bound) application.getTransform()).getSharding() == null;
if (application.getTransform() instanceof Write) {
return ((Write) application.getTransform()).getSharding() == null;
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public void parDoWithFnTypeNotParDo() {
public void emptyFlattenWithEmptyFlatten() {
AppliedPTransform application =
AppliedPTransform
.<PCollectionList<Object>, PCollection<Object>, Flatten.FlattenPCollectionList<Object>>
.<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>
of(
"EmptyFlatten",
Collections.<TaggedPValue>emptyList(),
Expand All @@ -346,7 +346,7 @@ public void emptyFlattenWithEmptyFlatten() {
public void emptyFlattenWithNonEmptyFlatten() {
AppliedPTransform application =
AppliedPTransform
.<PCollectionList<Object>, PCollection<Object>, Flatten.FlattenPCollectionList<Object>>
.<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>
of(
"Flatten",
Collections.singletonList(
Expand All @@ -369,7 +369,7 @@ public void emptyFlattenWithNonEmptyFlatten() {
public void emptyFlattenWithNonFlatten() {
AppliedPTransform application =
AppliedPTransform
.<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.FlattenIterables<Object>>
.<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>
of(
"EmptyFlatten",
Collections.<TaggedPValue>emptyList(),
Expand All @@ -387,7 +387,7 @@ public void emptyFlattenWithNonFlatten() {

@Test
public void writeWithRunnerDeterminedSharding() {
Write.Bound<Integer> write =
Write<Integer> write =
Write.to(
new FileBasedSink<Integer>("foo", "bar") {
@Override
Expand All @@ -400,22 +400,22 @@ public FileBasedWriteOperation<Integer> createWriteOperation(
PTransformMatchers.writeWithRunnerDeterminedSharding().matches(appliedWrite(write)),
is(true));

Write.Bound<Integer> withStaticSharding = write.withNumShards(3);
Write<Integer> withStaticSharding = write.withNumShards(3);
assertThat(
PTransformMatchers.writeWithRunnerDeterminedSharding()
.matches(appliedWrite(withStaticSharding)),
is(false));

Write.Bound<Integer> withCustomSharding =
Write<Integer> withCustomSharding =
write.withSharding(Sum.integersGlobally().asSingletonView());
assertThat(
PTransformMatchers.writeWithRunnerDeterminedSharding()
.matches(appliedWrite(withCustomSharding)),
is(false));
}

private AppliedPTransform<?, ?, ?> appliedWrite(Write.Bound<Integer> write) {
return AppliedPTransform.<PCollection<Integer>, PDone, Write.Bound<Integer>>of(
private AppliedPTransform<?, ?, ?> appliedWrite(Write<Integer> write) {
return AppliedPTransform.<PCollection<Integer>, PDone, Write<Integer>>of(
"Write",
Collections.<TaggedPValue>emptyList(),
Collections.<TaggedPValue>emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

/** A {@link RootInputProvider} that provides a singleton empty bundle. */
class EmptyInputProvider<T>
implements RootInputProvider<T, Void, PCollectionList<T>, Flatten.FlattenPCollectionList<T>> {
implements RootInputProvider<T, Void, PCollectionList<T>, Flatten.PCollections<T>> {
EmptyInputProvider() {}

/**
Expand All @@ -37,7 +37,7 @@ class EmptyInputProvider<T>
*/
@Override
public Collection<CommittedBundle<Void>> getInitialInputs(
AppliedPTransform<PCollectionList<T>, PCollection<T>, Flatten.FlattenPCollectionList<T>>
AppliedPTransform<PCollectionList<T>, PCollection<T>, Flatten.PCollections<T>>
transform,
int targetParallelism) {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
import org.apache.beam.sdk.transforms.Flatten.PCollections;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -53,7 +53,7 @@ public void cleanup() throws Exception {}

private <InputT> TransformEvaluator<InputT> createInMemoryEvaluator(
final AppliedPTransform<
PCollectionList<InputT>, PCollection<InputT>, FlattenPCollectionList<InputT>>
PCollectionList<InputT>, PCollection<InputT>, PCollections<InputT>>
application) {
final UncommittedBundle<InputT> outputBundle =
evaluationContext.createBundle(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
import org.apache.beam.sdk.transforms.Flatten.PCollections;
import org.apache.beam.sdk.transforms.PTransform;

/**
Expand All @@ -42,7 +42,7 @@ public static RootProviderRegistry defaultRegistry(EvaluationContext context) {
.put(
TestStreamEvaluatorFactory.DirectTestStreamFactory.DirectTestStream.class,
new TestStreamEvaluatorFactory.InputProvider(context))
.put(FlattenPCollectionList.class, new EmptyInputProvider());
.put(PCollections.class, new EmptyInputProvider());
return new RootProviderRegistry(defaultProviders.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
import org.apache.beam.sdk.transforms.Flatten.PCollections;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.Window;
Expand All @@ -53,7 +53,7 @@ public static TransformEvaluatorRegistry defaultRegistry(EvaluationContext ctxt)
.put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt))
.put(ParDo.BoundMulti.class, new ParDoEvaluatorFactory<>(ctxt))
.put(StatefulParDo.class, new StatefulParDoEvaluatorFactory<>(ctxt))
.put(FlattenPCollectionList.class, new FlattenEvaluatorFactory(ctxt))
.put(PCollections.class, new FlattenEvaluatorFactory(ctxt))
.put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory(ctxt))
.put(Window.Bound.class, new WindowEvaluatorFactory(ctxt))
// Runner-specific primitives used in expansion of GroupByKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.io.Write.Bound;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
Expand All @@ -48,13 +47,15 @@
* of shards is the log base 10 of the number of input records, with up to 2 additional shards.
*/
class WriteWithShardingFactory<InputT>
implements PTransformOverrideFactory<PCollection<InputT>, PDone, Bound<InputT>> {
implements PTransformOverrideFactory<PCollection<InputT>, PDone, Write<InputT>> {
static final int MAX_RANDOM_EXTRA_SHARDS = 3;
@VisibleForTesting static final int MIN_SHARDS_FOR_LOG = 3;

@Override
public PTransform<PCollection<InputT>, PDone> getReplacementTransform(Bound<InputT> transform) {
return transform.withSharding(new LogElementShardsWithDrift<InputT>());
public PTransform<PCollection<InputT>, PDone> getReplacementTransform(
Write<InputT> transform) {

return transform.withSharding(new LogElementShardsWithDrift<InputT>());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
import org.apache.beam.sdk.transforms.Flatten.PCollections;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
Expand Down Expand Up @@ -110,7 +110,7 @@ public void getRootTransformsContainsRootTransforms() {

@Test
public void getRootTransformsContainsEmptyFlatten() {
FlattenPCollectionList<String> flatten = Flatten.pCollections();
PCollections<String> flatten = Flatten.pCollections();
PCollectionList<String> emptyList = PCollectionList.empty(p);
PCollection<String> empty = emptyList.apply(flatten);
empty.setCoder(StringUtf8Coder.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void dynamicallyReshardedWrite() throws Exception {

@Test
public void withNoShardingSpecifiedReturnsNewTransform() {
Write.Bound<Object> original = Write.to(new TestSink());
Write<Object> original = Write.to(new TestSink());
assertThat(factory.getReplacementTransform(original), not(equalTo((Object) original)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursi
public PCollection<KV<String, List<CompletionCandidate>>> expand(PCollection<String> input) {
PCollection<CompletionCandidate> candidates = input
// First count how often each token appears.
.apply(new Count.PerElement<String>())
.apply(Count.<String>perElement())

// Map the KV outputs of Count into our own CompletionCandiate class.
.apply("CreateCompletionCandidates", ParDo.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class FlinkBatchTransformTranslators {
TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorBatch());

TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslatorBatch());
TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslatorBatch());

TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslatorBatch());

Expand Down Expand Up @@ -706,12 +706,12 @@ private <T> void pruneOutput(

private static class FlattenPCollectionTranslatorBatch<T>
implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
Flatten.FlattenPCollectionList<T>> {
Flatten.PCollections<T>> {

@Override
@SuppressWarnings("unchecked")
public void translateNode(
Flatten.FlattenPCollectionList<T> transform,
Flatten.PCollections<T> transform,
FlinkBatchTranslationContext context) {

List<TaggedPValue> allInputs = context.getInputs(transform);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,14 @@ class FlinkStreamingTransformTranslators {
static {
TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator());
TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator());
TRANSLATORS.put(Write.class, new WriteSinkStreamingTranslator());
TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());

TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator());

TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator());
TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator());
TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslator());
TRANSLATORS.put(
FlinkStreamingViewOverrides.CreateFlinkPCollectionView.class,
new CreateViewStreamingTranslator());
Expand Down Expand Up @@ -194,10 +194,10 @@ public void flatMap(
}

private static class WriteSinkStreamingTranslator<T>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> {
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write<T>> {

@Override
public void translateNode(Write.Bound<T> transform, FlinkStreamingTranslationContext context) {
public void translateNode(Write<T> transform, FlinkStreamingTranslationContext context) {
String name = transform.getName();
PValue input = context.getInput(transform);

Expand Down Expand Up @@ -999,11 +999,11 @@ public void flatMap(

private static class FlattenPCollectionTranslator<T>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
Flatten.FlattenPCollectionList<T>> {
Flatten.PCollections<T>> {

@Override
public void translateNode(
Flatten.FlattenPCollectionList<T> transform,
Flatten.PCollections<T> transform,
FlinkStreamingTranslationContext context) {
List<TaggedPValue> allInputs = context.getInputs(transform);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.flink.test.util.JavaProgramTestBase;

/**
* Tests the translation of custom Write.Bound sinks.
* Tests the translation of custom Write sinks.
*/
public class WriteSinkITCase extends JavaProgramTestBase {

Expand Down
Loading