From c9445efaae0c62a8c648f4a8875c20cf22bf3b8a Mon Sep 17 00:00:00 2001 From: TongruiLi Date: Thu, 7 May 2026 22:24:17 +0000 Subject: [PATCH 01/12] Aded portable runner to python and go runners --- sdks/go/pkg/beam/runners/dataflow/dataflow.go | 10 ++++-- .../beam/runners/dataflow/dataflow_test.go | 34 +++++++++++++++++++ .../runners/dataflow/dataflow_runner.py | 4 +++ 3 files changed, 46 insertions(+), 2 deletions(-) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index e968911fcca1..1519cabf673e 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -335,7 +335,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions experiments := jobopts.GetExperiments() // Ensure that we enable the same set of experiments across all SDKs // for runner v2. - var fnApiSet, v2set, uwSet, portaSubmission, seSet, wsSet bool + var fnApiSet, v2set, uwSet, portableRunnerSet, portaSubmission, seSet, wsSet bool for _, e := range experiments { if strings.Contains(e, "beam_fn_api") { fnApiSet = true @@ -349,7 +349,10 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions if strings.Contains(e, "use_portable_job_submission") { portaSubmission = true } - if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") { + if strings.Contains(e, "enable_portable_runner") { + portableRunnerSet = true + } + if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") || strings.Contains(e, "disable_portable_runner") || strings.Contains(e, "enable_streaming_java_runner") { return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2. Disabling runner v2 is no longer supported as of Beam version 2.45.0+") } } @@ -366,6 +369,9 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions if !portaSubmission { experiments = append(experiments, "use_portable_job_submission") } + if !portableRunnerSet { + // As this option is not documented, we do not set it by default. This behavior will be fixed in later versions. + } // Ensure that streaming specific experiments are set for streaming pipelines // since runner v2 only supports using streaming engine. diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go index 23dcd034120a..529ae5c087cd 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go @@ -244,6 +244,40 @@ func TestGetJobOptions_DisableRunnerV2ExperimentsSet(t *testing.T) { } } +func TestGetJobOptions_DisablePortableRunnerExperimentsSet(t *testing.T) { + resetGlobals() + *stagingLocation = "gs://testStagingLocation" + *gcpopts.Project = "testProject" + *gcpopts.Region = "testRegion" + *jobopts.Experiments = "disable_portable_runner" + + opts, err := getJobOptions(context.Background(), false) + + if err == nil { + t.Error("getJobOptions() returned error nil, want an error") + } + if opts != nil { + t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts) + } +} + +func TestGetJobOptions_EnableStreamingJavaRunnerExperimentsSet(t *testing.T) { + resetGlobals() + *stagingLocation = "gs://testStagingLocation" + *gcpopts.Project = "testProject" + *gcpopts.Region = "testRegion" + *jobopts.Experiments = "enable_streaming_java_runner" + + opts, err := getJobOptions(context.Background(), false) + + if err == nil { + t.Error("getJobOptions() returned error nil, want an error") + } + if opts != nil { + t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts) + } +} + func TestGetJobOptions_NoStagingLocation(t *testing.T) { resetGlobals() *stagingLocation = "" diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 0c23e6024dc6..58da4461593c 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -591,6 +591,8 @@ def _add_runner_v2_missing_options(options): debug_options.add_experiment('use_unified_worker') debug_options.add_experiment('use_runner_v2') debug_options.add_experiment('use_portable_job_submission') + # enable_portable_runner is not added by default as it is not documented. + # This behavior will be fixed in later versions. def _check_and_add_missing_options(options): @@ -662,6 +664,8 @@ def _is_runner_v2_disabled(options): """Returns true if runner v2 is disabled.""" debug_options = options.view_as(DebugOptions) return ( + debug_options.lookup_experiment('disable_portable_runner') or + debug_options.lookup_experiment('enable_streaming_java_runner') or debug_options.lookup_experiment('disable_runner_v2') or debug_options.lookup_experiment('disable_runner_v2_until_2023') or debug_options.lookup_experiment('disable_runner_v2_until_v2.50') or From dbb982648f4cf82adc641db5d635422de989b53a Mon Sep 17 00:00:00 2001 From: TongruiLi <12992126+TongruiLi@users.noreply.github.com> Date: Fri, 8 May 2026 21:42:28 +0000 Subject: [PATCH 02/12] Removed empty if, added disable unit test for python. --- sdks/go/pkg/beam/runners/dataflow/dataflow.go | 4 +--- .../runners/dataflow/dataflow_runner_test.py | 23 +++++++++++++++++++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index 1519cabf673e..651a86b9ee42 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -369,9 +369,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions if !portaSubmission { experiments = append(experiments, "use_portable_job_submission") } - if !portableRunnerSet { - // As this option is not documented, we do not set it by default. This behavior will be fixed in later versions. - } + // As portable_runner is not documented, we do not set it by default. This behavior will be fixed in later versions. // Ensure that streaming specific experiments are set for streaming pipelines // since runner v2 only supports using streaming engine. diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index e1b8be6682f9..2d8f840a4d00 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -41,6 +41,7 @@ from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeException from apache_beam.runners.dataflow.dataflow_runner import _check_and_add_missing_options from apache_beam.runners.dataflow.dataflow_runner import _check_and_add_missing_streaming_options +from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2_disabled from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api from apache_beam.runners.internal import names from apache_beam.runners.runner import PipelineState @@ -734,5 +735,27 @@ def test_explicit_streaming_no_unbounded(self): apiclient.dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING) +class DataflowRunnerV2DisabledTest(unittest.TestCase): + + def test_runner_v2_disabled_experiments_raise(self): + disable_experiments = [ + 'disable_portable_runner', + 'enable_streaming_java_runner', + 'disable_runner_v2', + 'disable_runner_v2_until_2023', + 'disable_runner_v2_until_v2.50', + 'disable_prime_runner_v2', + ] + for experiment in disable_experiments: + options = PipelineOptions([f'--experiments={experiment}']) + self.assertTrue( + _is_runner_v2_disabled(options), + f'Expected {experiment} to disable runner v2') + with self.assertRaisesRegex( + ValueError, + 'Disabling Runner V2 no longer supported'): + DataflowRunner().run_pipeline(None, options) + + if __name__ == '__main__': unittest.main() From ac3716d950fd3420819adfdf359a0be188639012 Mon Sep 17 00:00:00 2001 From: TongruiLi <12992126+TongruiLi@users.noreply.github.com> Date: Fri, 8 May 2026 21:43:19 +0000 Subject: [PATCH 03/12] Removed unnessary class in unit test --- .../apache_beam/runners/dataflow/dataflow_runner_test.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index 2d8f840a4d00..b3035d38c7c0 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -734,9 +734,6 @@ def test_explicit_streaming_no_unbounded(self): p.result.job.proto.type, apiclient.dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING) - -class DataflowRunnerV2DisabledTest(unittest.TestCase): - def test_runner_v2_disabled_experiments_raise(self): disable_experiments = [ 'disable_portable_runner', From 1069559f292757e3b0cbfe94d7bf590caa003fb4 Mon Sep 17 00:00:00 2001 From: TongruiLi <12992126+TongruiLi@users.noreply.github.com> Date: Fri, 8 May 2026 21:50:35 +0000 Subject: [PATCH 04/12] Removed unused variable --- sdks/go/pkg/beam/runners/dataflow/dataflow.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index 651a86b9ee42..852613569801 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -335,7 +335,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions experiments := jobopts.GetExperiments() // Ensure that we enable the same set of experiments across all SDKs // for runner v2. - var fnApiSet, v2set, uwSet, portableRunnerSet, portaSubmission, seSet, wsSet bool + var fnApiSet, v2set, uwSet, portaSubmission, seSet, wsSet bool for _, e := range experiments { if strings.Contains(e, "beam_fn_api") { fnApiSet = true @@ -349,9 +349,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions if strings.Contains(e, "use_portable_job_submission") { portaSubmission = true } - if strings.Contains(e, "enable_portable_runner") { - portableRunnerSet = true - } + // enable_portable_runner is not documented and hence wont be set by default. This will be fixed in later versions. if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") || strings.Contains(e, "disable_portable_runner") || strings.Contains(e, "enable_streaming_java_runner") { return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2. Disabling runner v2 is no longer supported as of Beam version 2.45.0+") } @@ -369,7 +367,6 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions if !portaSubmission { experiments = append(experiments, "use_portable_job_submission") } - // As portable_runner is not documented, we do not set it by default. This behavior will be fixed in later versions. // Ensure that streaming specific experiments are set for streaming pipelines // since runner v2 only supports using streaming engine. From b846ceccaf75045d5fb68624f7c1abd14e6b43a5 Mon Sep 17 00:00:00 2001 From: TongruiLi <12992126+TongruiLi@users.noreply.github.com> Date: Sat, 16 May 2026 23:39:53 +0000 Subject: [PATCH 05/12] Ran formatter --- .../apache_beam/runners/dataflow/dataflow_runner_test.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index b3035d38c7c0..dd8b9d25712f 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -748,9 +748,8 @@ def test_runner_v2_disabled_experiments_raise(self): self.assertTrue( _is_runner_v2_disabled(options), f'Expected {experiment} to disable runner v2') - with self.assertRaisesRegex( - ValueError, - 'Disabling Runner V2 no longer supported'): + with self.assertRaisesRegex(ValueError, + 'Disabling Runner V2 no longer supported'): DataflowRunner().run_pipeline(None, options) From 4013e3d6691ee9535b345b68a8cf679eb067f986 Mon Sep 17 00:00:00 2001 From: TongruiLi Date: Mon, 18 May 2026 23:28:32 +0000 Subject: [PATCH 06/12] Updated All Reference of Runner V2/V1 to portable runner/streaming java runner --- .../arm/build.gradle | 2 +- .../google-cloud-dataflow-java/build.gradle | 10 +++++----- .../beam/runners/dataflow/DataflowRunner.java | 18 +++++++++--------- .../runners/dataflow/DataflowRunnerTest.java | 2 +- .../windmill/src/main/proto/windmill.proto | 2 +- sdks/go/pkg/beam/runners/dataflow/dataflow.go | 6 +++--- .../ordered_window_elements/streaming.py | 2 +- .../apache_beam/examples/kafkataxi/README.md | 2 +- .../io/external/xlang_bigqueryio_it_test.py | 2 +- .../runners/dataflow/dataflow_metrics.py | 2 +- .../runners/dataflow/dataflow_runner.py | 12 ++++++------ .../runners/dataflow/dataflow_runner_test.py | 4 ++-- .../runners/dataflow/internal/apiclient.py | 8 ++++---- .../clients/dataflow/dataflow_v1b3_messages.py | 4 ++-- sdks/python/apache_beam/transforms/core.py | 4 ++-- 15 files changed, 40 insertions(+), 40 deletions(-) diff --git a/runners/google-cloud-dataflow-java/arm/build.gradle b/runners/google-cloud-dataflow-java/arm/build.gradle index 2e74d7727f21..14d558cf82b8 100644 --- a/runners/google-cloud-dataflow-java/arm/build.gradle +++ b/runners/google-cloud-dataflow-java/arm/build.gradle @@ -118,7 +118,7 @@ task printrunnerV2PipelineOptionsARM { dependsOn buildAndPushDockerJavaMultiarchContainer doLast { - println "To run a Dataflow job with runner V2 on ARM, add the following pipeline options to your command-line:" + println "To run a Dataflow job with Portable Runner on ARM, add the following pipeline options to your command-line:" println runnerV2PipelineOptionsARM.join(' ') } } diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 93fe9cb227bb..73b78af09c71 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -431,7 +431,7 @@ task printRunnerV2PipelineOptions { dependsOn buildAndPushDockerJavaContainer doLast { - println "To run a Dataflow job with runner V2, add the following pipeline options to your command-line:" + println "To run a Dataflow job with Portable Runner, add the following pipeline options to your command-line:" println runnerV2PipelineOptions.join(' ') println "Please delete your image upon completion with the following command:" println "docker rmi ${dockerJavaImageName}; gcloud container images delete --force-delete-tags ${dockerJavaImageName}" @@ -471,7 +471,7 @@ def validatesRunnerStreamingConfig = [ excludedTests: [ // TODO(https://github.com/apache/beam/issues/21472) 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState', - // GroupIntoBatches.withShardedKey not supported on streaming runner v1 + // GroupIntoBatches.withShardedKey not supported on streaming Streaming Java Runner // https://github.com/apache/beam/issues/22592 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow', @@ -570,7 +570,7 @@ createCrossLanguageValidatesRunnerTask( task validatesRunnerV2 { group = "Verification" - description = "Runs the ValidatesRunner tests on Dataflow Runner V2" + description = "Runs the ValidatesRunner tests on Dataflow Portable Runner" dependsOn(createRunnerV2ValidatesRunnerTest( name: 'validatesRunnerV2Test', pipelineOptions: runnerV2PipelineOptions, @@ -610,7 +610,7 @@ task validatesRunnerV2 { task validatesRunnerV2Streaming { group = "Verification" - description = "Runs the ValidatesRunner tests on Dataflow Runner V2 forcing streaming mode" + description = "Runs the ValidatesRunner tests on Dataflow Portable Runner forcing streaming mode" dependsOn(createRunnerV2ValidatesRunnerTest( name: 'validatesRunnerV2TestStreaming', pipelineOptions: runnerV2PipelineOptions + ['--streaming', '--experiments=enable_streaming_engine'], @@ -880,7 +880,7 @@ task postCommit { task postCommitRunnerV2 { group = "Verification" - description = "Various integration tests using the Dataflow runner V2." + description = "Various integration tests using the Dataflow Portable Runner." dependsOn googleCloudPlatformRunnerV2IntegrationTest dependsOn coreSDKJavaRunnerV2IntegrationTest } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 299e7fa21ed1..ae17ceff8d56 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1242,25 +1242,25 @@ private static boolean includesTransformUpgrades(Pipeline pipeline) { @Override public DataflowPipelineJob run(Pipeline pipeline) { // Multi-language pipelines and pipelines that include upgrades should automatically be upgraded - // to Runner v2. + // to Portable Runner. if (DataflowRunner.isMultiLanguagePipeline(pipeline) || includesTransformUpgrades(pipeline)) { if (!useUnifiedWorker(options)) { List experiments = firstNonNull(options.getExperiments(), Collections.emptyList()); LOG.info( - "Automatically enabling Dataflow Runner v2 since the pipeline used cross-language" + "Automatically enabling Dataflow Portable Runner since the pipeline used cross-language" + " transforms or pipeline needed a transform upgrade."); options.setExperiments( ImmutableList.builder().addAll(experiments).add("use_runner_v2").build()); } } if (useUnifiedWorker(options)) { - if (hasExperiment(options, "disable_runner_v2") + if (hasExperiment(options, "disable_runner_v2") || hasExperiment(options, "disable_runner_v2_until_2023") || hasExperiment(options, "disable_prime_runner_v2") || hasExperiment(options, "disable_portable_runner") || hasExperiment(options, "enable_streaming_java_runner")) { throw new IllegalArgumentException( - "Runner V2 both disabled and enabled: at least one of ['beam_fn_api', 'use_unified_worker', 'use_runner_v2', 'use_portable_job_submission'] is set and also one of ['disable_runner_v2', 'disable_runner_v2_until_2023', 'disable_prime_runner_v2'] is set."); + "Portable Runner both disabled and enabled: at least one of ['enable_portable_runner', 'beam_fn_api', 'use_unified_worker', 'use_runner_v2', 'use_portable_job_submission'] is set and also one of ['enable_streaming_java_runner', 'disable_portable_runner', 'disable_runner_v2', 'disable_runner_v2_until_2023', 'disable_prime_runner_v2'] is set."); } List experiments = new ArrayList<>(options.getExperiments()); // non-null if useUnifiedWorker is true @@ -1374,10 +1374,10 @@ public DataflowPipelineJob run(Pipeline pipeline) { options.as(SdkHarnessOptions.class).setPipelineProtoHash(pipelineProtoHash); if (useUnifiedWorker(options)) { - LOG.info("Skipping v1 transform replacements since job will run on v2."); + LOG.info("Skipping Streaming Java Runner transform replacements since job will run on Portable Runner."); } else { - // Now rewrite things to be as needed for v1 (mutates the pipeline) - // This way the job submitted is valid for v1 and v2, simultaneously + // Now rewrite things to be as needed for Streaming Java Runner (mutates the pipeline) + // This way the job submitted is valid for Streaming Java Runner and Portable Runner, simultaneously replaceV1Transforms(pipeline); } // Capture the SdkComponents for look up during step translations @@ -1388,7 +1388,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { .addAllDependencies(getDefaultArtifacts()) .addAllCapabilities(Environments.getJavaCapabilities()) .build()); - // No need to perform transform upgrading for the Runner v1 proto. + // No need to perform transform upgrading for the Streaming Java Runner proto. RunnerApi.Pipeline dataflowV1PipelineProto = PipelineTranslation.toProto(pipeline, dataflowV1Components, true, false); @@ -1544,7 +1544,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { options.setExperiments(experiments); LOG.warn( "The upload_graph experiment was specified, but it does not apply " - + "to runner v2 jobs. Option has been automatically removed."); + + "to Portable Runner jobs. Option has been automatically removed."); } // Upload the job to GCS and remove the graph object from the API call. The graph diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index ab3b62a0aa1b..a6ca8ac1c0c7 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -1847,7 +1847,7 @@ public void testSettingConflictingEnableAndDisableExperimentsThrowsException() t ExperimentalOptions.addExperiment(options, disabledExperiment); Pipeline p = Pipeline.create(options); p.apply(Create.of("A")); - assertThrows("Runner V2 both disabled and enabled", IllegalArgumentException.class, p::run); + assertThrows("Portable Runner both disabled and enabled", IllegalArgumentException.class, p::run); } } } diff --git a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto index b7579cbacb8e..8a1f602105cc 100644 --- a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto +++ b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto @@ -979,7 +979,7 @@ enum ConnectivityType { CONNECTIVITY_TYPE_DIRECTPATH = 2; } -// Settings to control runtime behavior of the java runner v1 user worker. +// Settings to control runtime behavior of the Streaming Java Runner user worker. message UserWorkerRunnerV1Settings { optional UserWorkerGrpcFlowControlSettings flow_control_settings = 3; diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index 852613569801..169439aaafe6 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -334,7 +334,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions experiments := jobopts.GetExperiments() // Ensure that we enable the same set of experiments across all SDKs - // for runner v2. + // for Portable Runner. var fnApiSet, v2set, uwSet, portaSubmission, seSet, wsSet bool for _, e := range experiments { if strings.Contains(e, "beam_fn_api") { @@ -351,7 +351,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions } // enable_portable_runner is not documented and hence wont be set by default. This will be fixed in later versions. if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") || strings.Contains(e, "disable_portable_runner") || strings.Contains(e, "enable_streaming_java_runner") { - return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2. Disabling runner v2 is no longer supported as of Beam version 2.45.0+") + return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2 | disable_portable_runner | enable_streaming_java_runner. Disabling runner v2 is no longer supported as of Beam version 2.45.0+") } } // Enable default experiments. @@ -369,7 +369,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions } // Ensure that streaming specific experiments are set for streaming pipelines - // since runner v2 only supports using streaming engine. + // since Portable Runner only supports using streaming engine. if streaming { if !seSet { experiments = append(experiments, "enable_streaming_engine") diff --git a/sdks/python/apache_beam/examples/cookbook/ordered_window_elements/streaming.py b/sdks/python/apache_beam/examples/cookbook/ordered_window_elements/streaming.py index aed1400bc4d8..97e93fb0bc3b 100644 --- a/sdks/python/apache_beam/examples/cookbook/ordered_window_elements/streaming.py +++ b/sdks/python/apache_beam/examples/cookbook/ordered_window_elements/streaming.py @@ -135,7 +135,7 @@ def _state_read_range(self, buffer_state, range_lo, range_hi): def _state_clear_range(self, buffer_state, range_lo, range_hi): """Clears a specified range of elements from the buffer state.""" - # TODO: Dataflow runner v2 gets stuck when MIN_TIMESTAMP is used + # TODO: Dataflow Portable Runner gets stuck when MIN_TIMESTAMP is used # as the lower bound for clear_range. Investigate this further. buffer_state.clear_range(range_lo, range_hi) diff --git a/sdks/python/apache_beam/examples/kafkataxi/README.md b/sdks/python/apache_beam/examples/kafkataxi/README.md index 86924200b1f2..410eb41a62c5 100644 --- a/sdks/python/apache_beam/examples/kafkataxi/README.md +++ b/sdks/python/apache_beam/examples/kafkataxi/README.md @@ -67,7 +67,7 @@ Perform Beam runner specific setup. ℹ️ Note that cross-language transforms require portable implementations of Spark/Flink/Direct runners. Dataflow requires -[runner V2](https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2). +[Portable Runner](https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2). See [here](https://beam.apache.org/documentation/runners/dataflow/) for instructions for setting up Dataflow. diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py index bc012bd7be9d..49725d54e990 100644 --- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py @@ -563,7 +563,7 @@ def test_streaming_with_fixed_num_streams(self): @unittest.skip( "Streaming to the Storage Write API sink with autosharding is broken " - "with Dataflow Runner V2.") + "with Dataflow Portable Runner.") def test_streaming_with_auto_sharding(self): self.skip_if_not_dataflow_runner() table = 'streaming_with_auto_sharding' diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py index 87456124b816..8f72d45060ca 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py @@ -103,7 +103,7 @@ def _translate_step_name(self, internal_name): user_step_name = None if (self._job_graph and internal_name in self._job_graph.proto_pipeline.components.transforms.keys()): - # Dataflow Runner v2 with portable job submission uses proto transform map + # Dataflow Portable Runner with portable job submission uses proto transform map # IDs for step names. Also PTransform.unique_name maps to user step names. # Hence we lookup user step names based on the proto. user_step_name = self._job_graph.proto_pipeline.components.transforms[ diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 58da4461593c..2d0b0a4b4d3a 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -65,7 +65,7 @@ BQ_SOURCE_UW_ERROR = ( 'The Read(BigQuerySource(...)) transform is not supported with newer stack ' - 'features (Fn API, Dataflow Runner V2, etc). Please use the transform ' + 'features (Fn API, Dataflow Portable Runner, etc). Please use the transform ' 'apache_beam.io.gcp.bigquery.ReadFromBigQuery instead.') @@ -320,7 +320,7 @@ def visit_transform(self, applied_transform): raise ValueError( 'CombineFn.setup and CombineFn.teardown are ' 'not supported with non-portable Dataflow ' - 'runner. Please use Dataflow Runner V2 instead.') + 'runner. Please use Dataflow Portable Runner instead.') @staticmethod def _overrides_setup_or_teardown(combinefn): @@ -342,7 +342,7 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None): """Remotely executes entire pipeline or parts reachable from node.""" if _is_runner_v2_disabled(options): raise ValueError( - 'Disabling Runner V2 no longer supported ' + 'Disabling Portable Runner no longer supported ' 'using Beam Python %s.' % beam.version.__version__) # Label goog-dataflow-notebook if job is started from notebook. @@ -650,8 +650,8 @@ def _check_and_add_missing_streaming_options(options): :param options: PipelineOptions for this pipeline. """ - # Streaming only supports using runner v2 (aka unified worker). - # Runner v2 only supports using streaming engine (aka windmill service) + # Streaming only supports using Portable Runner (aka unified worker). + # Portable Runner only supports using streaming engine (aka windmill service) if options.view_as(StandardOptions).streaming: debug_options = options.view_as(DebugOptions) debug_options.add_experiment('enable_streaming_engine') @@ -661,7 +661,7 @@ def _check_and_add_missing_streaming_options(options): def _is_runner_v2_disabled(options): # Type: (PipelineOptions) -> bool - """Returns true if runner v2 is disabled.""" + """Returns true if Portable Runner is disabled.""" debug_options = options.view_as(DebugOptions) return ( debug_options.lookup_experiment('disable_portable_runner') or diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index dd8b9d25712f..a61eaf72e4b8 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -747,9 +747,9 @@ def test_runner_v2_disabled_experiments_raise(self): options = PipelineOptions([f'--experiments={experiment}']) self.assertTrue( _is_runner_v2_disabled(options), - f'Expected {experiment} to disable runner v2') + f'Expected {experiment} to disable Portable Runner') with self.assertRaisesRegex(ValueError, - 'Disabling Runner V2 no longer supported'): + 'Disabling Portable Runner .* no longer supported'): DataflowRunner().run_pipeline(None, options) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 4a7c61901de3..47087d0fb4cd 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -1153,7 +1153,7 @@ def to_split_int(n): # TODO: Used in legacy batch worker. Move under MetricUpdateTranslators -# after Runner V2 transition. +# after Portable Runner transition. def translate_distribution(distribution_update, metric_update_proto): """Translate metrics DistributionUpdate to dataflow distribution update. @@ -1174,7 +1174,7 @@ def translate_distribution(distribution_update, metric_update_proto): metric_update_proto.distribution = dist_update_proto -# TODO: Used in legacy batch worker. Delete after Runner V2 transition. +# TODO: Used in legacy batch worker. Delete after Portable Runner transition. def translate_value(value, metric_update_proto): metric_update_proto.integer = to_split_int(value) @@ -1203,8 +1203,8 @@ def get_container_image_from_options(pipeline_options): if worker_options.sdk_container_image: return worker_options.sdk_container_image - # Legacy and runner v2 exist in different repositories. - # Set to legacy format, override if runner v2 + # Legacy and Portable Runner exist in different repositories. + # Set to legacy format, override if Portable Runner container_repo = names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY image_name = '{repository}/beam_python{major}.{minor}_sdk'.format( repository=container_repo, diff --git a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py index 582fb30b57b1..f185dbb8d43b 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py @@ -6764,9 +6764,9 @@ class StreamingConfigTask(_messages.Message): format version for streaming engine jobs. userStepToStateFamilyNameMap: Map from user step names to state families. userWorkerRunnerV1Settings: Binary encoded proto to control runtime - behavior of the java runner v1 user worker. + behavior of the Streaming Java Runner user worker. userWorkerRunnerV2Settings: Binary encoded proto to control runtime - behavior of the runner v2 user worker. + behavior of the Portable Runner user worker. windmillServiceEndpoint: If present, the worker must use this endpoint to communicate with Windmill Service dispatchers, otherwise the worker must continue to use whatever endpoint it had been using. diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 55a2cf40b64a..cd01253cade7 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1096,7 +1096,7 @@ def setup(self, *args, **kwargs): before executing any of the other methods. The resources can then be disposed of in ``CombineFn.teardown``. - If you are using Dataflow, you need to enable Dataflow Runner V2 + If you are using Dataflow, you need to enable Dataflow Portable Runner before using this feature. Args: @@ -1194,7 +1194,7 @@ def extract_output(self, accumulator, *args, **kwargs): def teardown(self, *args, **kwargs): """Called to clean up an instance before it is discarded. - If you are using Dataflow, you need to enable Dataflow Runner V2 + If you are using Dataflow, you need to enable Dataflow Portable Runner before using this feature. Args: From 2884ee73c7e338e7885ce6b4662b133ced8a294a Mon Sep 17 00:00:00 2001 From: TongruiLi <12992126+TongruiLi@users.noreply.github.com> Date: Tue, 26 May 2026 20:01:38 +0000 Subject: [PATCH 07/12] Specified portable runner as dataflow portable runner --- .../google-cloud-dataflow-java/arm/build.gradle | 2 +- runners/google-cloud-dataflow-java/build.gradle | 4 ++-- .../beam/runners/dataflow/DataflowRunner.java | 14 +++++++------- .../beam/runners/dataflow/DataflowRunnerTest.java | 2 +- sdks/go/pkg/beam/runners/dataflow/dataflow.go | 6 +++--- .../apache_beam/examples/kafkataxi/README.md | 2 +- .../runners/dataflow/dataflow_runner.py | 8 ++++---- .../runners/dataflow/dataflow_runner_test.py | 2 +- .../runners/dataflow/internal/apiclient.py | 8 ++++---- .../clients/dataflow/dataflow_v1b3_messages.py | 4 ++-- 10 files changed, 26 insertions(+), 26 deletions(-) diff --git a/runners/google-cloud-dataflow-java/arm/build.gradle b/runners/google-cloud-dataflow-java/arm/build.gradle index 14d558cf82b8..531300bb63ea 100644 --- a/runners/google-cloud-dataflow-java/arm/build.gradle +++ b/runners/google-cloud-dataflow-java/arm/build.gradle @@ -118,7 +118,7 @@ task printrunnerV2PipelineOptionsARM { dependsOn buildAndPushDockerJavaMultiarchContainer doLast { - println "To run a Dataflow job with Portable Runner on ARM, add the following pipeline options to your command-line:" + println "To run a Dataflow job with Dataflow Portable Runner on ARM, add the following pipeline options to your command-line:" println runnerV2PipelineOptionsARM.join(' ') } } diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 73b78af09c71..943a01daa7d0 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -431,7 +431,7 @@ task printRunnerV2PipelineOptions { dependsOn buildAndPushDockerJavaContainer doLast { - println "To run a Dataflow job with Portable Runner, add the following pipeline options to your command-line:" + println "To run a Dataflow job with Dataflow Portable Runner, add the following pipeline options to your command-line:" println runnerV2PipelineOptions.join(' ') println "Please delete your image upon completion with the following command:" println "docker rmi ${dockerJavaImageName}; gcloud container images delete --force-delete-tags ${dockerJavaImageName}" @@ -471,7 +471,7 @@ def validatesRunnerStreamingConfig = [ excludedTests: [ // TODO(https://github.com/apache/beam/issues/21472) 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState', - // GroupIntoBatches.withShardedKey not supported on streaming Streaming Java Runner + // GroupIntoBatches.withShardedKey not supported on Streaming Java Runner // https://github.com/apache/beam/issues/22592 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow', diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index ae17ceff8d56..810d1060531e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1242,7 +1242,7 @@ private static boolean includesTransformUpgrades(Pipeline pipeline) { @Override public DataflowPipelineJob run(Pipeline pipeline) { // Multi-language pipelines and pipelines that include upgrades should automatically be upgraded - // to Portable Runner. + // to Dataflow Portable Runner. if (DataflowRunner.isMultiLanguagePipeline(pipeline) || includesTransformUpgrades(pipeline)) { if (!useUnifiedWorker(options)) { List experiments = firstNonNull(options.getExperiments(), Collections.emptyList()); @@ -1260,7 +1260,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { || hasExperiment(options, "disable_portable_runner") || hasExperiment(options, "enable_streaming_java_runner")) { throw new IllegalArgumentException( - "Portable Runner both disabled and enabled: at least one of ['enable_portable_runner', 'beam_fn_api', 'use_unified_worker', 'use_runner_v2', 'use_portable_job_submission'] is set and also one of ['enable_streaming_java_runner', 'disable_portable_runner', 'disable_runner_v2', 'disable_runner_v2_until_2023', 'disable_prime_runner_v2'] is set."); + "Dataflow Portable Runner both disabled and enabled: at least one of ['enable_portable_runner', 'beam_fn_api', 'use_unified_worker', 'use_runner_v2', 'use_portable_job_submission'] is set and also one of ['enable_streaming_java_runner', 'disable_portable_runner', 'disable_runner_v2', 'disable_runner_v2_until_2023', 'disable_prime_runner_v2'] is set."); } List experiments = new ArrayList<>(options.getExperiments()); // non-null if useUnifiedWorker is true @@ -1374,10 +1374,10 @@ public DataflowPipelineJob run(Pipeline pipeline) { options.as(SdkHarnessOptions.class).setPipelineProtoHash(pipelineProtoHash); if (useUnifiedWorker(options)) { - LOG.info("Skipping Streaming Java Runner transform replacements since job will run on Portable Runner."); + LOG.info("Skipping Dataflow Streaming Java Runner transform replacements since job will run on Dataflow Portable Runner."); } else { - // Now rewrite things to be as needed for Streaming Java Runner (mutates the pipeline) - // This way the job submitted is valid for Streaming Java Runner and Portable Runner, simultaneously + // Now rewrite things to be as needed for Dataflow Streaming Java Runner (mutates the pipeline) + // This way the job submitted is valid for Dataflow Streaming Java Runner and Dataflow Portable Runner, simultaneously replaceV1Transforms(pipeline); } // Capture the SdkComponents for look up during step translations @@ -1388,7 +1388,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { .addAllDependencies(getDefaultArtifacts()) .addAllCapabilities(Environments.getJavaCapabilities()) .build()); - // No need to perform transform upgrading for the Streaming Java Runner proto. + // No need to perform transform upgrading for the Dataflow Streaming Java Runner proto. RunnerApi.Pipeline dataflowV1PipelineProto = PipelineTranslation.toProto(pipeline, dataflowV1Components, true, false); @@ -1544,7 +1544,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { options.setExperiments(experiments); LOG.warn( "The upload_graph experiment was specified, but it does not apply " - + "to Portable Runner jobs. Option has been automatically removed."); + + "to Dataflow Portable Runner jobs. Option has been automatically removed."); } // Upload the job to GCS and remove the graph object from the API call. The graph diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index a6ca8ac1c0c7..12c0c1c44cd1 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -1847,7 +1847,7 @@ public void testSettingConflictingEnableAndDisableExperimentsThrowsException() t ExperimentalOptions.addExperiment(options, disabledExperiment); Pipeline p = Pipeline.create(options); p.apply(Create.of("A")); - assertThrows("Portable Runner both disabled and enabled", IllegalArgumentException.class, p::run); + assertThrows("Dataflow Portable Runner both disabled and enabled", IllegalArgumentException.class, p::run); } } } diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index 169439aaafe6..ce5e99b0a8de 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -334,7 +334,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions experiments := jobopts.GetExperiments() // Ensure that we enable the same set of experiments across all SDKs - // for Portable Runner. + // for Dataflow Portable Runner. var fnApiSet, v2set, uwSet, portaSubmission, seSet, wsSet bool for _, e := range experiments { if strings.Contains(e, "beam_fn_api") { @@ -351,7 +351,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions } // enable_portable_runner is not documented and hence wont be set by default. This will be fixed in later versions. if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") || strings.Contains(e, "disable_portable_runner") || strings.Contains(e, "enable_streaming_java_runner") { - return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2 | disable_portable_runner | enable_streaming_java_runner. Disabling runner v2 is no longer supported as of Beam version 2.45.0+") + return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2 | disable_portable_runner | enable_streaming_java_runner. Disabling Dataflow Portable Runner is no longer supported as of Beam version 2.45.0+") } } // Enable default experiments. @@ -369,7 +369,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions } // Ensure that streaming specific experiments are set for streaming pipelines - // since Portable Runner only supports using streaming engine. + // since Dataflow Portable Runner only supports using streaming engine. if streaming { if !seSet { experiments = append(experiments, "enable_streaming_engine") diff --git a/sdks/python/apache_beam/examples/kafkataxi/README.md b/sdks/python/apache_beam/examples/kafkataxi/README.md index 410eb41a62c5..ae1b6b9ba39e 100644 --- a/sdks/python/apache_beam/examples/kafkataxi/README.md +++ b/sdks/python/apache_beam/examples/kafkataxi/README.md @@ -67,7 +67,7 @@ Perform Beam runner specific setup. ℹ️ Note that cross-language transforms require portable implementations of Spark/Flink/Direct runners. Dataflow requires -[Portable Runner](https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2). +[Dataflow Portable Runner](https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2). See [here](https://beam.apache.org/documentation/runners/dataflow/) for instructions for setting up Dataflow. diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 2d0b0a4b4d3a..c6e2a7bec3d8 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -342,7 +342,7 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None): """Remotely executes entire pipeline or parts reachable from node.""" if _is_runner_v2_disabled(options): raise ValueError( - 'Disabling Portable Runner no longer supported ' + 'Disabling Dataflow Portable Runner no longer supported ' 'using Beam Python %s.' % beam.version.__version__) # Label goog-dataflow-notebook if job is started from notebook. @@ -650,8 +650,8 @@ def _check_and_add_missing_streaming_options(options): :param options: PipelineOptions for this pipeline. """ - # Streaming only supports using Portable Runner (aka unified worker). - # Portable Runner only supports using streaming engine (aka windmill service) + # Streaming only supports using Dataflow Portable Runner (aka unified worker, runner v2). + # Dataflow Portable Runner only supports using streaming engine (aka windmill service) if options.view_as(StandardOptions).streaming: debug_options = options.view_as(DebugOptions) debug_options.add_experiment('enable_streaming_engine') @@ -661,7 +661,7 @@ def _check_and_add_missing_streaming_options(options): def _is_runner_v2_disabled(options): # Type: (PipelineOptions) -> bool - """Returns true if Portable Runner is disabled.""" + """Returns true if Dataflow Portable Runner is disabled.""" debug_options = options.view_as(DebugOptions) return ( debug_options.lookup_experiment('disable_portable_runner') or diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index a61eaf72e4b8..f9f16829cf6d 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -749,7 +749,7 @@ def test_runner_v2_disabled_experiments_raise(self): _is_runner_v2_disabled(options), f'Expected {experiment} to disable Portable Runner') with self.assertRaisesRegex(ValueError, - 'Disabling Portable Runner .* no longer supported'): + 'Disabling Dataflow Portable Runner .* no longer supported'): DataflowRunner().run_pipeline(None, options) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 47087d0fb4cd..2cf0e79731ac 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -1153,7 +1153,7 @@ def to_split_int(n): # TODO: Used in legacy batch worker. Move under MetricUpdateTranslators -# after Portable Runner transition. +# after Dataflow Portable Runner transition. def translate_distribution(distribution_update, metric_update_proto): """Translate metrics DistributionUpdate to dataflow distribution update. @@ -1174,7 +1174,7 @@ def translate_distribution(distribution_update, metric_update_proto): metric_update_proto.distribution = dist_update_proto -# TODO: Used in legacy batch worker. Delete after Portable Runner transition. +# TODO: Used in legacy batch worker. Delete after Dataflow Portable Runner transition. def translate_value(value, metric_update_proto): metric_update_proto.integer = to_split_int(value) @@ -1203,8 +1203,8 @@ def get_container_image_from_options(pipeline_options): if worker_options.sdk_container_image: return worker_options.sdk_container_image - # Legacy and Portable Runner exist in different repositories. - # Set to legacy format, override if Portable Runner + # Dataflow Legacy and Portable Runner exist in different repositories. + # Set to legacy format, override if Dataflow Portable Runner container_repo = names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY image_name = '{repository}/beam_python{major}.{minor}_sdk'.format( repository=container_repo, diff --git a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py index f185dbb8d43b..582fb30b57b1 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py @@ -6764,9 +6764,9 @@ class StreamingConfigTask(_messages.Message): format version for streaming engine jobs. userStepToStateFamilyNameMap: Map from user step names to state families. userWorkerRunnerV1Settings: Binary encoded proto to control runtime - behavior of the Streaming Java Runner user worker. + behavior of the java runner v1 user worker. userWorkerRunnerV2Settings: Binary encoded proto to control runtime - behavior of the Portable Runner user worker. + behavior of the runner v2 user worker. windmillServiceEndpoint: If present, the worker must use this endpoint to communicate with Windmill Service dispatchers, otherwise the worker must continue to use whatever endpoint it had been using. From 923b4ca988825f4d0083642aa4e6f7ada87734fa Mon Sep 17 00:00:00 2001 From: TongruiLi <12992126+TongruiLi@users.noreply.github.com> Date: Tue, 26 May 2026 20:08:05 +0000 Subject: [PATCH 08/12] Updated testing logic to make things more clear --- sdks/go/pkg/beam/runners/dataflow/dataflow_test.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go index 529ae5c087cd..83d3c0108439 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go @@ -20,6 +20,7 @@ import ( "flag" "reflect" "sort" + "strings" "testing" "github.com/apache/beam/sdks/v2/go/pkg/beam/core" @@ -227,7 +228,7 @@ func TestGetJobOptions_NoExperimentsSetStreaming(t *testing.T) { } } -func TestGetJobOptions_DisableRunnerV2ExperimentsSet(t *testing.T) { +func TestGetJobOptions_DisableRunnerV2ExperimentsSetFailJob(t *testing.T) { resetGlobals() *stagingLocation = "gs://testStagingLocation" *gcpopts.Project = "testProject" @@ -238,13 +239,15 @@ func TestGetJobOptions_DisableRunnerV2ExperimentsSet(t *testing.T) { if err == nil { t.Error("getJobOptions() returned error nil, want an error") + } else if !strings.Contains(err.Error(), "Disabling Dataflow Portable Runner is no longer supported") { + t.Errorf("getJobOptions() returned wrong error %q, want it to mention %q", err.Error(), "Disabling Dataflow Portable Runner is no longer supported") } if opts != nil { t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts) } } -func TestGetJobOptions_DisablePortableRunnerExperimentsSet(t *testing.T) { +func TestGetJobOptions_DisablePortableRunnerExperimentsSetFailJob(t *testing.T) { resetGlobals() *stagingLocation = "gs://testStagingLocation" *gcpopts.Project = "testProject" @@ -255,13 +258,15 @@ func TestGetJobOptions_DisablePortableRunnerExperimentsSet(t *testing.T) { if err == nil { t.Error("getJobOptions() returned error nil, want an error") + } else if !strings.Contains(err.Error(), "Disabling Dataflow Portable Runner is no longer supported") { + t.Errorf("getJobOptions() returned wrong error %q, want it to mention %q", err.Error(), "Disabling Dataflow Portable Runner is no longer supported") } if opts != nil { t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts) } } -func TestGetJobOptions_EnableStreamingJavaRunnerExperimentsSet(t *testing.T) { +func TestGetJobOptions_EnableStreamingJavaRunnerExperimentsSetFailJob(t *testing.T) { resetGlobals() *stagingLocation = "gs://testStagingLocation" *gcpopts.Project = "testProject" @@ -272,6 +277,8 @@ func TestGetJobOptions_EnableStreamingJavaRunnerExperimentsSet(t *testing.T) { if err == nil { t.Error("getJobOptions() returned error nil, want an error") + } else if !strings.Contains(err.Error(), "Disabling Dataflow Portable Runner is no longer supported") { + t.Errorf("getJobOptions() returned wrong error %q, want it to mention %q", err.Error(), "Disabling Dataflow Portable Runner is no longer supported") } if opts != nil { t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts) From 711053578164fd126f91868305f133a2f70520b4 Mon Sep 17 00:00:00 2001 From: TongruiLi <12992126+TongruiLi@users.noreply.github.com> Date: Wed, 27 May 2026 00:11:58 +0000 Subject: [PATCH 09/12] spotless --- .../apache/beam/runners/dataflow/DataflowRunner.java | 11 +++++++---- .../beam/runners/dataflow/DataflowRunnerTest.java | 5 ++++- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 810d1060531e..d04afb351e44 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1254,7 +1254,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { } } if (useUnifiedWorker(options)) { - if (hasExperiment(options, "disable_runner_v2") + if (hasExperiment(options, "disable_runner_v2") || hasExperiment(options, "disable_runner_v2_until_2023") || hasExperiment(options, "disable_prime_runner_v2") || hasExperiment(options, "disable_portable_runner") @@ -1374,10 +1374,13 @@ public DataflowPipelineJob run(Pipeline pipeline) { options.as(SdkHarnessOptions.class).setPipelineProtoHash(pipelineProtoHash); if (useUnifiedWorker(options)) { - LOG.info("Skipping Dataflow Streaming Java Runner transform replacements since job will run on Dataflow Portable Runner."); + LOG.info( + "Skipping Dataflow Streaming Java Runner transform replacements since job will run on Dataflow Portable Runner."); } else { - // Now rewrite things to be as needed for Dataflow Streaming Java Runner (mutates the pipeline) - // This way the job submitted is valid for Dataflow Streaming Java Runner and Dataflow Portable Runner, simultaneously + // Now rewrite things to be as needed for Dataflow Streaming Java Runner (mutates the + // pipeline) + // This way the job submitted is valid for Dataflow Streaming Java Runner and Dataflow + // Portable Runner, simultaneously replaceV1Transforms(pipeline); } // Capture the SdkComponents for look up during step translations diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 12c0c1c44cd1..9f812438451b 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -1847,7 +1847,10 @@ public void testSettingConflictingEnableAndDisableExperimentsThrowsException() t ExperimentalOptions.addExperiment(options, disabledExperiment); Pipeline p = Pipeline.create(options); p.apply(Create.of("A")); - assertThrows("Dataflow Portable Runner both disabled and enabled", IllegalArgumentException.class, p::run); + assertThrows( + "Dataflow Portable Runner both disabled and enabled", + IllegalArgumentException.class, + p::run); } } } From 9a195c6f645c03d312e8bcd371677e5e24d4944f Mon Sep 17 00:00:00 2001 From: TongruiLi <12992126+TongruiLi@users.noreply.github.com> Date: Wed, 27 May 2026 00:30:34 +0000 Subject: [PATCH 10/12] python spotless --- .../apache_beam/runners/dataflow/dataflow_runner_test.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index f9f16829cf6d..21c62510df51 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -748,8 +748,9 @@ def test_runner_v2_disabled_experiments_raise(self): self.assertTrue( _is_runner_v2_disabled(options), f'Expected {experiment} to disable Portable Runner') - with self.assertRaisesRegex(ValueError, - 'Disabling Dataflow Portable Runner .* no longer supported'): + with self.assertRaisesRegex( + ValueError, + 'Disabling Dataflow Portable Runner .* no longer supported'): DataflowRunner().run_pipeline(None, options) From 07e3f0a8a36929f429a18717a55449d8389afa68 Mon Sep 17 00:00:00 2001 From: TongruiLi <12992126+TongruiLi@users.noreply.github.com> Date: Wed, 27 May 2026 20:13:26 +0000 Subject: [PATCH 11/12] fixed broken test --- .../python/apache_beam/runners/dataflow/dataflow_runner_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index 21c62510df51..a6b2fd640294 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -750,7 +750,7 @@ def test_runner_v2_disabled_experiments_raise(self): f'Expected {experiment} to disable Portable Runner') with self.assertRaisesRegex( ValueError, - 'Disabling Dataflow Portable Runner .* no longer supported'): + 'Disabling Dataflow Portable Runner .* no longer supported.*'): DataflowRunner().run_pipeline(None, options) From 028dc303eee1b303aa9954c1e2eaf478d2791732 Mon Sep 17 00:00:00 2001 From: TongruiLi <12992126+TongruiLi@users.noreply.github.com> Date: Wed, 27 May 2026 23:05:53 +0000 Subject: [PATCH 12/12] removed unnessary regex --- .../python/apache_beam/runners/dataflow/dataflow_runner_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index a6b2fd640294..d848b5ffe595 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -750,7 +750,7 @@ def test_runner_v2_disabled_experiments_raise(self): f'Expected {experiment} to disable Portable Runner') with self.assertRaisesRegex( ValueError, - 'Disabling Dataflow Portable Runner .* no longer supported.*'): + 'Disabling Dataflow Portable Runner no longer supported.*'): DataflowRunner().run_pipeline(None, options)