Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down Expand Up @@ -173,15 +172,6 @@ public DirectPipelineResult run(Pipeline pipeline) {
"PipelineOptions specified failed to serialize to JSON.", e);
}

// TODO(BEAM-10670): Remove additional experiments when we address performance issue.
if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_sdf_read")) {
// Populate experiments directly to avoid direct-runner <-> kafka circular dependency.
ExperimentalOptions.addExperiment(
pipeline.getOptions().as(ExperimentalOptions.class), "beam_fn_api_use_deprecated_read");
ExperimentalOptions.addExperiment(
pipeline.getOptions().as(ExperimentalOptions.class), "use_deprecated_read");
}

performRewrites(pipeline);
MetricsEnvironment.setMetricsSupported(true);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,6 @@ public PipelineResult run(Pipeline pipeline) {
// Portable flink only support SDF as read.
// TODO(BEAM-10670): Use SDF read as default when we address performance issue.
if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "beam_fn_api")) {
if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_sdf_read")) {
// Populate experiments directly to have Kafka use legacy read.
ExperimentalOptions.addExperiment(
pipeline.getOptions().as(ExperimentalOptions.class), "beam_fn_api_use_deprecated_read");
ExperimentalOptions.addExperiment(
pipeline.getOptions().as(ExperimentalOptions.class), "use_deprecated_read");
}
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,11 @@ public PortablePipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline, J

@Override
public SamzaPipelineResult run(Pipeline pipeline) {
// TODO(BEAM-10670): Use SDF read as default when we address performance issue.
if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_sdf_read")) {
// Populate experiments directly to have Kafka use legacy read.
ExperimentalOptions.addExperiment(
pipeline.getOptions().as(ExperimentalOptions.class), "beam_fn_api_use_deprecated_read");
ExperimentalOptions.addExperiment(
pipeline.getOptions().as(ExperimentalOptions.class), "use_deprecated_read");
// TODO(BEAM-10670): Use SDF read as default for non-portable execution when we address
// performance issue.
if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "beam_fn_api")) {
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
}
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);

MetricsEnvironment.setMetricsSupported(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,11 @@ public SparkStructuredStreamingPipelineResult run(final Pipeline pipeline) {
private TranslationContext translatePipeline(Pipeline pipeline) {
PipelineTranslator.detectTranslationMode(pipeline, options);

// Default to using the primitive versions of Read.Bounded and Read.Unbounded.
// Default to using the primitive versions of Read.Bounded and Read.Unbounded for non-portable
// execution.
// TODO(BEAM-10670): Use SDF read as default when we address performance issue.
if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_sdf_read")) {
// Populate experiments directly to have Kafka use legacy read.
ExperimentalOptions.addExperiment(
pipeline.getOptions().as(ExperimentalOptions.class), "beam_fn_api_use_deprecated_read");
ExperimentalOptions.addExperiment(
pipeline.getOptions().as(ExperimentalOptions.class), "use_deprecated_read");

SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "beam_fn_api")) {
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
}

PipelineTranslator.replaceTransforms(pipeline, options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,8 @@ public SparkPipelineResult run(final Pipeline pipeline) {

// Default to using the primitive versions of Read.Bounded and Read.Unbounded.
// TODO(BEAM-10670): Use SDF read as default when we address performance issue.
if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_sdf_read")) {
// Populate experiments directly to have Kafka use legacy read.
ExperimentalOptions.addExperiment(
pipeline.getOptions().as(ExperimentalOptions.class), "beam_fn_api_use_deprecated_read");
ExperimentalOptions.addExperiment(
pipeline.getOptions().as(ExperimentalOptions.class), "use_deprecated_read");
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "beam_fn_api")) {
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
}

pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(pipelineOptions.isStreaming()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,8 @@ public SparkPipelineResult run(Pipeline pipeline) {

// Default to using the primitive versions of Read.Bounded and Read.Unbounded.
// TODO(BEAM-10670): Use SDF read as default when we address performance issue.
if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_sdf_read")) {
// Populate experiments directly to have Kafka use legacy read.
ExperimentalOptions.addExperiment(
pipeline.getOptions().as(ExperimentalOptions.class), "beam_fn_api_use_deprecated_read");
ExperimentalOptions.addExperiment(
pipeline.getOptions().as(ExperimentalOptions.class), "use_deprecated_read");
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "beam_fn_api")) {
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
}

JavaSparkContext jsc = new JavaSparkContext("local[1]", "Debug_Pipeline");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,9 @@ public PipelineResult run(Pipeline pipeline) {
pipeline.replaceAll(getDefaultOverrides());

// TODO(BEAM-10670): Use SDF read as default when we address performance issue.
if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_sdf_read")) {
// Populate experiments directly to have Kafka use legacy read.
ExperimentalOptions.addExperiment(
pipeline.getOptions().as(ExperimentalOptions.class), "beam_fn_api_use_deprecated_read");
ExperimentalOptions.addExperiment(
pipeline.getOptions().as(ExperimentalOptions.class), "use_deprecated_read");
if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "beam_fn_api")) {
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
}
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);

env.translate(pipeline);
setupSystem(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1228,12 +1228,28 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
|| ExperimentalOptions.hasExperiment(
input.getPipeline().getOptions(), "use_deprecated_read")
|| getMaxNumRecords() < Long.MAX_VALUE
|| getMaxReadTime() != null) {
|| getMaxReadTime() != null
|| runnerRequiresLegacyRead(input.getPipeline().getOptions())) {
return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder));
}
return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
}

private boolean runnerRequiresLegacyRead(PipelineOptions options) {
// Only Dataflow runner requires sdf read at this moment. For other non-portable runners, if
// it doesn't specify use_sdf_read, it will use legacy read regarding to performance concern.
// TODO(BEAM-10670): Remove this special check when we address performance issue.
if (ExperimentalOptions.hasExperiment(options, "use_sdf_read")) {
return false;
}
if (options.getRunner().getName().startsWith("org.apache.beam.runners.dataflow.")) {
return false;
} else if (ExperimentalOptions.hasExperiment(options, "beam_fn_api")) {
return false;
}
return true;
}

/**
* A {@link PTransformOverride} for runners to swap {@link ReadFromKafkaViaSDF} to legacy Kafka
* read if runners doesn't have a good support on executing unbounded Splittable DoFn.
Expand Down