Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import java.util.regex.Pattern;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
Expand All @@ -51,7 +51,7 @@
* <p>New Concepts:
* <pre>
* 1. Logging using SLF4J, even in a distributed environment
* 2. Creating a custom aggregator (runners have varying levels of support)
* 2. Creating a custom metric (runners have varying levels of support)
* 3. Testing your Pipeline via PAssert
* </pre>
*
Expand Down Expand Up @@ -90,29 +90,27 @@ public FilterTextFn(String pattern) {
}

/**
* Concept #2: A custom aggregator can track values in your pipeline as it runs. Each
* runner provides varying levels of support for aggregators, and may expose them
* Concept #2: A custom metric can track values in your pipeline as it runs. Each
* runner provides varying levels of support for metrics, and may expose them
* in a dashboard, etc.
*/
private final Aggregator<Long, Long> matchedWords =
createAggregator("matchedWords", Sum.ofLongs());
private final Aggregator<Long, Long> unmatchedWords =
createAggregator("unmatchedWords", Sum.ofLongs());
private final Counter matchedWords = Metrics.counter(FilterTextFn.class, "matchedWords");
private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unMatchedWords");

@ProcessElement
public void processElement(ProcessContext c) {
if (filter.matcher(c.element().getKey()).matches()) {
// Log at the "DEBUG" level each element that we match. When executing this pipeline
// these log lines will appear only if the log level is set to "DEBUG" or lower.
LOG.debug("Matched: " + c.element().getKey());
matchedWords.addValue(1L);
matchedWords.inc();
c.output(c.element());
} else {
// Log at the "TRACE" level each element that is not matched. Different log levels
// can be used to control the verbosity of logging providing an effective mechanism
// to filter less important information.
LOG.trace("Did not match: " + c.element().getKey());
unmatchedWords.addValue(1L);
unmatchedWords.inc();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

Expand Down Expand Up @@ -86,13 +86,12 @@ public class WordCount {
* to a ParDo in the pipeline.
*/
static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", Sum.ofLongs());
private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");

@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
emptyLines.addValue(1L);
emptyLines.inc();
}

// Split the line into words.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

Expand Down Expand Up @@ -79,8 +79,7 @@ public class CombinePerKeyExamples {
* outputs word, play_name.
*/
static class ExtractLargeWordsFn extends DoFn<TableRow, KV<String, String>> {
private final Aggregator<Long, Long> smallerWords =
createAggregator("smallerWords", Sum.ofLongs());
private final Counter smallerWords = Metrics.counter(ExtractLargeWordsFn.class, "smallerWords");

@ProcessElement
public void processElement(ProcessContext c){
Expand All @@ -92,7 +91,7 @@ public void processElement(ProcessContext c){
} else {
// Track how many smaller words we're not including. This information will be
// visible in the Monitoring UI.
smallerWords.addValue(1L);
smallerWords.inc();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.PubsubIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
Expand Down Expand Up @@ -127,16 +128,15 @@ public PCollection<KV<String, Integer>> expand(PCollection<KV<String, Integer>>
// use the derived mean total score as a side input
.withSideInputs(globalMeanScore)
.of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
private final Aggregator<Long, Long> numSpammerUsers =
createAggregator("SpammerUsers", Sum.ofLongs());
private final Counter numSpammerUsers = Metrics.counter("main", "SpammerUsers");
@ProcessElement
public void processElement(ProcessContext c) {
Integer score = c.element().getValue();
Double gmc = c.sideInput(globalMeanScore);
if (score > (gmc * SCORE_WEIGHT)) {
LOG.info("user " + c.element().getKey() + " spammer score " + score
+ " with mean " + gmc);
numSpammerUsers.addValue(1L);
numSpammerUsers.inc();
c.output(c.element());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down Expand Up @@ -125,8 +126,7 @@ static class ParseEventFn extends DoFn<String, GameActionInfo> {

// Log and count parse errors.
private static final Logger LOG = LoggerFactory.getLogger(ParseEventFn.class);
private final Aggregator<Long, Long> numParseErrors =
createAggregator("ParseErrors", Sum.ofLongs());
private final Counter numParseErrors = Metrics.counter("main", "ParseErrors");

@ProcessElement
public void processElement(ProcessContext c) {
Expand All @@ -139,7 +139,7 @@ public void processElement(ProcessContext c) {
GameActionInfo gInfo = new GameActionInfo(user, team, score, timestamp);
c.output(gInfo);
} catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
numParseErrors.addValue(1L);
numParseErrors.inc();
LOG.info("Parse error on " + c.element() + ", " + e.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,9 @@

import org.apache.apex.api.Launcher.AppHandle;
import org.apache.apex.api.Launcher.ShutdownMode;
import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.transforms.Aggregator;
import org.joda.time.Duration;

/**
Expand All @@ -49,12 +46,6 @@ public State getState() {
return state;
}

@Override
public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
throws AggregatorRetrievalException {
return null;
}

@Override
public State cancel() throws IOException {
apexApp.shutdown(ShutdownMode.KILL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,17 @@
import org.apache.beam.runners.apex.translation.utils.NoOpStepContext;
import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable;
import org.apache.beam.runners.core.AggregatorFactory;
import org.apache.beam.runners.core.DoFnAdapters;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.NullSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
Expand Down Expand Up @@ -313,7 +309,7 @@ public void setup(OperatorContext context) {
mainOutputTag,
sideOutputTags,
new NoOpStepContext(),
new NoOpAggregatorFactory(),
null,
windowingStrategy
);

Expand All @@ -337,45 +333,6 @@ public void beginWindow(long windowId) {
public void endWindow() {
}

/**
* TODO: Placeholder for aggregation, to be implemented for embedded and cluster mode.
* It is called from {@link org.apache.beam.runners.core.SimpleDoFnRunner}.
*/
public static class NoOpAggregatorFactory implements AggregatorFactory {

private NoOpAggregatorFactory() {
}

@Override
public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
Class<?> fnClass, ExecutionContext.StepContext step,
String name, CombineFn<InputT, AccumT, OutputT> combine) {
return new NoOpAggregator<>();
}

private static class NoOpAggregator<InputT, OutputT> implements Aggregator<InputT, OutputT>,
java.io.Serializable {
private static final long serialVersionUID = 1L;

@Override
public void addValue(InputT value) {
}

@Override
public String getName() {
// TODO Auto-generated method stub
return null;
}

@Override
public CombineFn<InputT, ?, OutputT> getCombineFn() {
// TODO Auto-generated method stub
return null;
}

};
}

private static class LongMin {
long state = Long.MAX_VALUE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
Expand Down Expand Up @@ -67,13 +67,12 @@ public void processElement(ProcessContext c) {

static class ExtractWordsFn extends DoFn<String, String> {
private static final long serialVersionUID = 1L;
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", Sum.ofLongs());
private final Counter emptyLines = Metrics.counter("main", "emptyLines");

@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
emptyLines.addValue(1L);
emptyLines.inc(1);
}

// Split the line into words.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ public void testAssertionFailure() throws Exception {
PCollection<Integer> pcollection = pipeline
.apply(Create.of(1, 2, 3, 4));
PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3, 7);

Throwable exc = runExpectingAssertionFailure(pipeline);
Pattern expectedPattern = Pattern.compile(
"Expected: iterable over \\[((<4>|<7>|<3>|<2>|<1>)(, )?){5}\\] in any order");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import org.apache.beam.runners.core.StatefulDoFnRunner.StateInternalsStateCleaner;
import org.apache.beam.runners.core.StatefulDoFnRunner.TimeInternalsCleanupTimer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
Expand Down Expand Up @@ -115,7 +115,7 @@ DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(
DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> wrappedRunner,
StepContext stepContext,
WindowingStrategy<?, W> windowingStrategy,
Aggregator<Long, Long> droppedDueToLatenessAggregator) {
Counter droppedDueToLatenessAggregator) {
return new LateDataDroppingDoFnRunner<>(
wrappedRunner,
windowingStrategy,
Expand All @@ -136,9 +136,8 @@ DoFnRunner<InputT, OutputT> defaultStatefulDoFnRunner(
StepContext stepContext,
AggregatorFactory aggregatorFactory,
WindowingStrategy<?, ?> windowingStrategy) {
Aggregator<Long, Long> droppedDueToLateness = aggregatorFactory.createAggregatorForDoFn(
fn.getClass(), stepContext, StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER,
Sum.ofLongs());
Counter droppedDueToLateness = Metrics.counter(
"main", StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER);

CleanupTimer cleanupTimer =
new TimeInternalsCleanupTimer(stepContext.timerInternals(), windowingStrategy);
Expand Down
Loading