Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.io.Write.Bound;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
Expand Down Expand Up @@ -102,7 +102,7 @@ public PDone apply(PCollection<T> input) {
}

@VisibleForTesting
static class KeyBasedOnCountFn<T> extends OldDoFn<T, KV<Integer, T>> {
static class KeyBasedOnCountFn<T> extends DoFn<T, KV<Integer, T>> {
@VisibleForTesting
static final int MIN_SHARDS_FOR_LOG = 3;

Expand All @@ -116,7 +116,7 @@ static class KeyBasedOnCountFn<T> extends OldDoFn<T, KV<Integer, T>> {
this.randomExtraShards = extraShards;
}

@Override
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
if (maxShards == 0) {
maxShards = calculateShards(c.sideInput(numRecords));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
Expand Down Expand Up @@ -60,9 +60,9 @@ public void getViewsReturnsViews() {
p.apply("listCreate", Create.of("foo", "bar"))
.apply(
ParDo.of(
new OldDoFn<String, String>() {
@Override
public void processElement(OldDoFn<String, String>.ProcessContext c)
new DoFn<String, String>() {
@ProcessElement
public void processElement(DoFn<String, String>.ProcessContext c)
throws Exception {
c.output(Integer.toString(c.element().length()));
}
Expand Down Expand Up @@ -107,9 +107,9 @@ public void getValueToConsumersSucceeds() {
PCollection<String> transformed =
created.apply(
ParDo.of(
new OldDoFn<String, String>() {
@Override
public void processElement(OldDoFn<String, String>.ProcessContext c)
new DoFn<String, String>() {
@ProcessElement
public void processElement(DoFn<String, String>.ProcessContext c)
throws Exception {
c.output(Integer.toString(c.element().length()));
}
Expand Down Expand Up @@ -138,9 +138,9 @@ public void getUnfinalizedPValuesContainsDanglingOutputs() {
PCollection<String> transformed =
created.apply(
ParDo.of(
new OldDoFn<String, String>() {
@Override
public void processElement(OldDoFn<String, String>.ProcessContext c)
new DoFn<String, String>() {
@ProcessElement
public void processElement(DoFn<String, String>.ProcessContext c)
throws Exception {
c.output(Integer.toString(c.element().length()));
}
Expand All @@ -155,9 +155,9 @@ public void getUnfinalizedPValuesEmpty() {
p.apply(Create.of("1", "2", "3"))
.apply(
ParDo.of(
new OldDoFn<String, String>() {
@Override
public void processElement(OldDoFn<String, String>.ProcessContext c)
new DoFn<String, String>() {
@ProcessElement
public void processElement(DoFn<String, String>.ProcessContext c)
throws Exception {
c.output(Integer.toString(c.element().length()));
}
Expand All @@ -180,9 +180,9 @@ public void getStepNamesContainsAllTransforms() {
PCollection<String> transformed =
created.apply(
ParDo.of(
new OldDoFn<String, String>() {
@Override
public void processElement(OldDoFn<String, String>.ProcessContext c)
new DoFn<String, String>() {
@ProcessElement
public void processElement(DoFn<String, String>.ProcessContext c)
throws Exception {
c.output(Integer.toString(c.element().length()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
Expand Down Expand Up @@ -221,8 +220,8 @@ public KV<Integer, Long> apply(KV<byte[], Long> input) {

@Test
public void transformDisplayDataExceptionShouldFail() {
OldDoFn<Integer, Integer> brokenDoFn = new OldDoFn<Integer, Integer>() {
@Override
DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {}

@Override
Expand All @@ -242,7 +241,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
}

/**
* Tests that a {@link OldDoFn} that mutates an output with a good equals() fails in the
* Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
* {@link DirectRunner}.
*/
@Test
Expand All @@ -251,8 +250,9 @@ public void testMutatingOutputThenOutputDoFnError() throws Exception {

pipeline
.apply(Create.of(42))
.apply(ParDo.of(new OldDoFn<Integer, List<Integer>>() {
@Override public void processElement(ProcessContext c) {
.apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
c.output(outputList);
outputList.set(0, 37);
Expand All @@ -267,7 +267,7 @@ public void testMutatingOutputThenOutputDoFnError() throws Exception {
}

/**
* Tests that a {@link OldDoFn} that mutates an output with a good equals() fails in the
* Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
* {@link DirectRunner}.
*/
@Test
Expand All @@ -276,8 +276,9 @@ public void testMutatingOutputThenTerminateDoFnError() throws Exception {

pipeline
.apply(Create.of(42))
.apply(ParDo.of(new OldDoFn<Integer, List<Integer>>() {
@Override public void processElement(ProcessContext c) {
.apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
c.output(outputList);
outputList.set(0, 37);
Expand All @@ -291,7 +292,7 @@ public void testMutatingOutputThenTerminateDoFnError() throws Exception {
}

/**
* Tests that a {@link OldDoFn} that mutates an output with a bad equals() still fails
* Tests that a {@link DoFn} that mutates an output with a bad equals() still fails
* in the {@link DirectRunner}.
*/
@Test
Expand All @@ -300,8 +301,9 @@ public void testMutatingOutputCoderDoFnError() throws Exception {

pipeline
.apply(Create.of(42))
.apply(ParDo.of(new OldDoFn<Integer, byte[]>() {
@Override public void processElement(ProcessContext c) {
.apply(ParDo.of(new DoFn<Integer, byte[]>() {
@ProcessElement
public void processElement(ProcessContext c) {
byte[] outputArray = new byte[]{0x1, 0x2, 0x3};
c.output(outputArray);
outputArray[0] = 0xa;
Expand All @@ -316,7 +318,7 @@ public void testMutatingOutputCoderDoFnError() throws Exception {
}

/**
* Tests that a {@link OldDoFn} that mutates its input with a good equals() fails in the
* Tests that a {@link DoFn} that mutates its input with a good equals() fails in the
* {@link DirectRunner}.
*/
@Test
Expand All @@ -326,8 +328,9 @@ public void testMutatingInputDoFnError() throws Exception {
pipeline
.apply(Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6))
.withCoder(ListCoder.of(VarIntCoder.of())))
.apply(ParDo.of(new OldDoFn<List<Integer>, Integer>() {
@Override public void processElement(ProcessContext c) {
.apply(ParDo.of(new DoFn<List<Integer>, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
List<Integer> inputList = c.element();
inputList.set(0, 37);
c.output(12);
Expand All @@ -341,7 +344,7 @@ public void testMutatingInputDoFnError() throws Exception {
}

/**
* Tests that a {@link OldDoFn} that mutates an input with a bad equals() still fails
* Tests that a {@link DoFn} that mutates an input with a bad equals() still fails
* in the {@link DirectRunner}.
*/
@Test
Expand All @@ -350,8 +353,9 @@ public void testMutatingInputCoderDoFnError() throws Exception {

pipeline
.apply(Create.of(new byte[]{0x1, 0x2, 0x3}, new byte[]{0x4, 0x5, 0x6}))
.apply(ParDo.of(new OldDoFn<byte[], Integer>() {
@Override public void processElement(ProcessContext c) {
.apply(ParDo.of(new DoFn<byte[], Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
byte[] inputArray = c.element();
inputArray[0] = 0xa;
c.output(13);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
Expand Down Expand Up @@ -179,9 +179,9 @@ public void mutationAfterAddCreateBundleThrows() {
intermediate.commit(Instant.now());
}

private static class IdentityDoFn<T> extends OldDoFn<T, T> {
@Override
public void processElement(OldDoFn<T, T>.ProcessContext c) throws Exception {
private static class IdentityDoFn<T> extends DoFn<T, T> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(c.element());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.IllegalMutationException;
import org.apache.beam.sdk.util.WindowedValue;
Expand Down Expand Up @@ -57,9 +57,9 @@ public void setup() {
p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes()))
.apply(
ParDo.of(
new OldDoFn<byte[], byte[]>() {
@Override
public void processElement(OldDoFn<byte[], byte[]>.ProcessContext c)
new DoFn<byte[], byte[]>() {
@ProcessElement
public void processElement(ProcessContext c)
throws Exception {
c.element()[0] = 'b';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
Expand Down Expand Up @@ -177,9 +177,9 @@ public PCollection<K> apply(PCollection<K> input) {
}
}

private static class IdentityFn<K> extends OldDoFn<K, K> {
@Override
public void processElement(OldDoFn<K, K>.ProcessContext c) throws Exception {
private static class IdentityFn<K> extends DoFn<K, K> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(c.element());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
Expand Down Expand Up @@ -168,7 +168,7 @@ private ParDoEvaluator<Integer> createEvaluator(
ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, output));
}

private static class RecorderFn extends OldDoFn<Integer, Integer> {
private static class RecorderFn extends DoFn<Integer, Integer> {
private Collection<Integer> processed;
private final PCollectionView<Integer> view;

Expand All @@ -177,8 +177,8 @@ public RecorderFn(PCollectionView<Integer> view) {
this.view = view;
}

@Override
public void processElement(OldDoFn<Integer, Integer>.ProcessContext c) throws Exception {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
processed.add(c.element());
c.output(c.element() + c.sideInput(view));
}
Expand Down
Loading