Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -74,9 +76,9 @@ private static Row flattenAnalyticsRow(Row row) {

static class ExtractBrowserTransactionsFn extends DoFn<Row, KV<String, Long>> {
@ProcessElement
public void processElement(ProcessContext c) {
Row row = c.element();
c.output(
public void processElement(@Element Row element, OutputReceiver<KV<String, Long>> receiver) {
Row row = element;
receiver.output(
KV.of(
Preconditions.checkStateNotNull(row.getString("browser")),
Preconditions.checkStateNotNull(row.getInt64("transactions"))));
Expand All @@ -85,13 +87,13 @@ public void processElement(ProcessContext c) {

static class FormatCountsFn extends DoFn<KV<String, Long>, Row> {
@ProcessElement
public void processElement(ProcessContext c) {
public void processElement(@Element KV<String, Long> element, OutputReceiver<Row> receiver) {
Row row =
Row.withSchema(AGGREGATED_SCHEMA)
.withFieldValue("browser", c.element().getKey())
.withFieldValue("transaction_count", c.element().getValue())
.withFieldValue("browser", element.getKey())
.withFieldValue("transaction_count", element.getValue())
.build();
c.output(row);
receiver.output(row);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.beam.sdk.schemas.transforms.Select;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -101,9 +103,9 @@ public LogOutput(String prefix) {
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("{}{}", prefix, c.element());
c.output(c.element());
public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
LOG.info("{}{}", prefix, element);
receiver.output(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
Expand Down Expand Up @@ -95,9 +97,9 @@ public LogOutput(String prefix) {
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("{}{}", prefix, c.element());
c.output(c.element());
public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
LOG.info("{}{}", prefix, element);
receiver.output(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.beam.sdk.transforms.ApproximateQuantiles;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
Expand Down Expand Up @@ -70,9 +72,9 @@ public LogOutput(String prefix) {
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("{}{}", prefix, c.element());
c.output(c.element());
public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
LOG.info("{}{}", prefix, element);
receiver.output(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.beam.sdk.transforms.CombineFns;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -185,13 +187,16 @@ public Long apply(Long input) {
new DoFn<
KV<Long, CombineFns.CoCombineResult>, KV<Long, Iterable<KV<String, Long>>>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
CombineFns.CoCombineResult e = c.element().getValue();
public void processElement(
@Element KV<Long, CombineFns.CoCombineResult> element,
OutputReceiver<KV<Long, Iterable<KV<String, Long>>>> receiver)
throws Exception {
CombineFns.CoCombineResult e = element.getValue();
ArrayList<KV<String, Long>> o = new ArrayList<KV<String, Long>>();
o.add(KV.of(minTag.getId(), e.get(minTag)));
o.add(KV.of(maxTag.getId(), e.get(maxTag)));
o.add(KV.of(sumTag.getId(), e.get(sumTag)));
c.output(KV.of(c.element().getKey(), o));
receiver.output(KV.of(element.getKey(), o));
}
}));

Expand All @@ -210,9 +215,9 @@ public LogOutput(String prefix) {
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("{}{}", prefix, c.element());
c.output(c.element());
public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
LOG.info("{}{}", prefix, element);
receiver.output(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
Expand Down Expand Up @@ -84,9 +86,9 @@ public LogOutput(String prefix) {
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("{}{}", prefix, c.element());
c.output(c.element());
public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
LOG.info("{}{}", prefix, element);
receiver.output(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -68,9 +70,9 @@ public LogOutput(String prefix) {
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("{}{}", prefix, c.element());
c.output(c.element());
public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
LOG.info("{}{}", prefix, element);
receiver.output(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
Expand Down Expand Up @@ -63,9 +65,9 @@ public LogOutput(String prefix) {
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("{}{}", prefix, c.element());
c.output(c.element());
public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
LOG.info("{}{}", prefix, element);
receiver.output(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -67,9 +69,9 @@ public LogOutput(String prefix) {
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("{}{}", prefix, c.element());
c.output(c.element());
public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
LOG.info("{}{}", prefix, element);
receiver.output(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -79,9 +81,9 @@ public LogOutput(String prefix) {
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("{}{}", prefix, c.element());
c.output(c.element());
public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
LOG.info("{}{}", prefix, element);
receiver.output(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -115,18 +117,19 @@ public FilterTextFn(String pattern) {
private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unmatchedWords");

@ProcessElement
public void processElement(ProcessContext c) {
if (filter.matcher(c.element().getKey()).matches()) {
public void processElement(
@Element KV<String, Long> element, OutputReceiver<KV<String, Long>> receiver) {
if (filter.matcher(element.getKey()).matches()) {
// Log at the "DEBUG" level each element that we match. When executing this pipeline
// these log lines will appear only if the log level is set to "DEBUG" or lower.
LOG.debug("Matched: {}", c.element().getKey());
LOG.debug("Matched: {}", element.getKey());
matchedWords.inc();
c.output(c.element());
receiver.output(element);
} else {
// Log at the "TRACE" level each element that is not matched. Different log levels
// can be used to control the verbosity of logging providing an effective mechanism
// to filter less important information.
LOG.trace("Did not match: {}", c.element().getKey());
LOG.trace("Did not match: {}", element.getKey());
unmatchedWords.inc();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
Expand Down Expand Up @@ -68,9 +70,9 @@ public LogOutput(String prefix) {
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("{}{}", prefix, c.element());
c.output(c.element());
public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
LOG.info("{}{}", prefix, element);
receiver.output(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.InferableFunction;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -78,9 +80,9 @@ public LogOutput(String prefix) {
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("{}{}", prefix, c.element());
c.output(c.element());
public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
LOG.info("{}{}", prefix, element);
receiver.output(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
Expand Down Expand Up @@ -74,9 +76,9 @@ public LogOutput(String prefix) {
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("{}{}", prefix, c.element());
c.output(c.element());
public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
LOG.info("{}{}", prefix, element);
receiver.output(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Values;
Expand Down Expand Up @@ -109,10 +111,13 @@ public static void main(String[] args) {
ParDo.of(
new DoFn<String, KV<Integer, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) throws JsonProcessingException {
public void processElement(
@Element String element, OutputReceiver<KV<Integer, Integer>> receiver)
throws JsonProcessingException {
final VendorToPassengerDTO result =
om.readValue(c.element(), new TypeReference<VendorToPassengerDTO>() {});
c.output(KV.of(result.getVendorIdField(), result.getPassengerCountField()));
om.readValue(element, new TypeReference<VendorToPassengerDTO>() {});
receiver.output(
KV.of(result.getVendorIdField(), result.getPassengerCountField()));
}
}))
.apply(
Expand All @@ -124,11 +129,11 @@ public void processElement(ProcessContext c) throws JsonProcessingException {
new DoFn<KV<Integer, Integer>, KV<Integer, Integer>>() {
@ProcessElement
public void processElement(
ProcessContext c, OutputReceiver<KV<Integer, Integer>> out) {
OutputReceiver<KV<Integer, Integer>> out,
@Element KV<Integer, Integer> element) {
System.out.printf(
"Vendor: %s, Passengers: %s%n",
c.element().getKey(), c.element().getValue());
out.output(c.element());
"Vendor: %s, Passengers: %s%n", element.getKey(), element.getValue());
out.output(element);
}
}));
p.run().waitUntilFinish();
Expand Down
Loading
Loading