Skip to content

Commit

Permalink
[BEAM-9500] Refactor load tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Piotr Szuberski committed Mar 20, 2020
1 parent e91659f commit cb70da4
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 45 deletions.
Expand Up @@ -27,7 +27,6 @@
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.testutils.metrics.ByteMonitor;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
Expand Down Expand Up @@ -92,30 +91,27 @@ private CoGroupByKeyLoadTest(String[] args) throws IOException {
}

@Override
void loadTest() throws IOException {
void loadTest(PCollection<KV<byte[], byte[]>> input) throws IOException {
SyntheticSourceOptions coSourceOptions =
fromJsonString(options.getCoSourceOptions(), SyntheticSourceOptions.class);

Optional<SyntheticStep> syntheticStep = createStep(options.getStepOptions());

PCollection<KV<byte[], byte[]>> input =
pipeline.apply("Read input", readFromSource(sourceOptions));
input = input.apply("Collect start time metrics (input)", ParDo.of(runtimeMonitor));
input = applyWindowing(input);
input = applyStepIfPresent(input, "Synthetic step for input", syntheticStep);

PCollection<KV<byte[], byte[]>> coInput =
pipeline.apply("Read co-input", readFromSource(coSourceOptions));
coInput = coInput.apply("Collect start time metrics (co-input)", ParDo.of(runtimeMonitor));
pipeline
.apply("Read co-input", readFromSource(coSourceOptions))
.apply("Collect start time metrics (co-input)", ParDo.of(runtimeMonitor));
coInput = applyWindowing(coInput, options.getCoInputWindowDurationSec());
coInput = applyStepIfPresent(coInput, "Synthetic step for co-input", syntheticStep);

KeyedPCollectionTuple.of(INPUT_TAG, input)
.and(CO_INPUT_TAG, coInput)
.apply("CoGroupByKey", CoGroupByKey.create())
.apply("Ungroup and reiterate", ParDo.of(new UngroupAndReiterate(options.getIterations())))
.apply(
"Collect total bytes", ParDo.of(new ByteMonitor(METRICS_NAMESPACE, "totalBytes.count")))
.apply("Collect total bytes", ParDo.of(byteMonitor))
.apply("Collect end time metrics", ParDo.of(runtimeMonitor));
}

Expand Down
Expand Up @@ -25,8 +25,6 @@
import org.apache.beam.sdk.io.synthetic.SyntheticStep;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.testutils.metrics.ByteMonitor;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.MapElements;
Expand Down Expand Up @@ -113,27 +111,17 @@ private CombineLoadTest(String[] args) throws IOException {
}

@Override
protected void loadTest() throws IOException {
protected void loadTest(PCollection<KV<byte[], byte[]>> input) throws IOException {
Optional<SyntheticStep> syntheticStep = createStep(options.getStepOptions());

PCollection<KV<byte[], byte[]>> input =
pipeline
.apply("Read input", readFromSource(sourceOptions))
.apply(
"Collect start time metric",
ParDo.of(new TimeMonitor<>(METRICS_NAMESPACE, "runtime")))
.apply(
"Collect metrics",
ParDo.of(new ByteMonitor(METRICS_NAMESPACE, "totalBytes.count")));

input = input.apply("Collect metrics", ParDo.of(byteMonitor));
input = applyWindowing(input);

for (int i = 0; i < options.getFanout(); i++) {
applyStepIfPresent(input, format("Step: %d", i), syntheticStep)
.apply(format("Convert to Long: %d", i), MapElements.via(new ByteValueToLong()))
.apply(format("Combine: %d", i), getPerKeyCombiner(options.getPerKeyCombiner()))
.apply(
"Collect end time metric", ParDo.of(new TimeMonitor<>(METRICS_NAMESPACE, "runtime")));
.apply("Collect end time metric", ParDo.of(runtimeMonitor));
}
}

Expand Down
Expand Up @@ -24,7 +24,6 @@
import org.apache.beam.sdk.io.synthetic.SyntheticStep;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.testutils.metrics.ByteMonitor;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -79,17 +78,10 @@ private GroupByKeyLoadTest(String[] args) throws IOException {
}

@Override
void loadTest() throws IOException {
void loadTest(PCollection<KV<byte[], byte[]>> input) throws IOException {
Optional<SyntheticStep> syntheticStep = createStep(options.getStepOptions());

PCollection<KV<byte[], byte[]>> input =
pipeline
.apply("Read input", readFromSource(sourceOptions))
.apply("Collect start time metrics", ParDo.of(runtimeMonitor))
.apply(
"Total bytes monitor",
ParDo.of(new ByteMonitor(METRICS_NAMESPACE, "totalBytes.count")));

input = input.apply("Total bytes monitor", ParDo.of(byteMonitor));
input = applyWindowing(input);

for (int branch = 0; branch < options.getFanout(); branch++) {
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.apache.beam.sdk.io.synthetic.SyntheticStep;
import org.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.ByteMonitor;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
import org.apache.beam.sdk.testutils.publishing.BigQueryResultsPublisher;
Expand Down Expand Up @@ -62,13 +63,16 @@ abstract class LoadTest<OptionsT extends LoadTestOptions> {

protected OptionsT options;

protected SyntheticSourceOptions sourceOptions;
private SyntheticSourceOptions sourceOptions;

protected Pipeline pipeline;

protected ByteMonitor byteMonitor;

LoadTest(String[] args, Class<OptionsT> testOptions, String metricsNamespace) throws IOException {
this.metricsNamespace = metricsNamespace;
this.runtimeMonitor = new TimeMonitor<>(metricsNamespace, "runtime");
this.byteMonitor = new ByteMonitor(metricsNamespace, "totalBytes.count");
this.options = LoadTestOptions.readFromArgs(args, testOptions);
this.sourceOptions = fromJsonString(options.getSourceOptions(), SyntheticSourceOptions.class);

Expand All @@ -85,21 +89,22 @@ PTransform<PBegin, PCollection<KV<byte[], byte[]>>> readFromSource(
}

/** The load test pipeline implementation. */
abstract void loadTest() throws IOException;
abstract void loadTest(PCollection<KV<byte[], byte[]>> input) throws IOException;

/**
* Runs the load test, collects and publishes test results to various data store and/or console.
*/
public PipelineResult run() throws IOException {
Timestamp timestamp = Timestamp.now();

loadTest();
PCollection<KV<byte[], byte[]>> input = readSourceFromOptions();
loadTest(input);

PipelineResult pipelineResult = pipeline.run();
pipelineResult.waitUntilFinish(Duration.standardMinutes(options.getLoadTestTimeout()));

String testId = UUID.randomUUID().toString();
List metrics = readMetrics(timestamp, pipelineResult, testId);
List<NamedTestResult> metrics = readMetrics(timestamp, pipelineResult, testId);

ConsoleResultPublisher.publish(metrics, testId, timestamp.toString());

Expand All @@ -112,6 +117,12 @@ public PipelineResult run() throws IOException {
return pipelineResult;
}

private PCollection<KV<byte[], byte[]>> readSourceFromOptions() {
return pipeline
.apply("Read input", readFromSource(sourceOptions))
.apply("Collect start time metrics", ParDo.of(runtimeMonitor));
}

private List<NamedTestResult> readMetrics(
Timestamp timestamp, PipelineResult result, String testId) {
MetricsReader reader = new MetricsReader(result, metricsNamespace);
Expand Down
Expand Up @@ -25,7 +25,6 @@
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.testutils.metrics.ByteMonitor;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
Expand Down Expand Up @@ -80,12 +79,8 @@ private ParDoLoadTest(String[] args) throws IOException {
}

@Override
protected void loadTest() {
PCollection<KV<byte[], byte[]>> input =
pipeline
.apply("Read input", readFromSource(sourceOptions))
.apply(ParDo.of(runtimeMonitor))
.apply(ParDo.of(new ByteMonitor(METRICS_NAMESPACE, "totalBytes.count")));
protected void loadTest(PCollection<KV<byte[], byte[]>> input) {
input = input.apply("Total bytes monitor", ParDo.of(byteMonitor));

for (int i = 0; i < options.getIterations(); i++) {
input =
Expand Down

0 comments on commit cb70da4

Please sign in to comment.