From 445bc27e5890d6a9ce5dc0f234da798cac8a42a9 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Wed, 3 Jun 2026 14:33:13 +0200 Subject: [PATCH] FNHarness, Dataflow Runner v1 - set open telemetry settings. --- .../dataflow/worker/StreamingDataflowWorker.java | 13 +++++++++++++ .../java/org/apache/beam/fn/harness/FnHarness.java | 11 +++++++++++ 2 files changed, 24 insertions(+) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 4d070da995b3..4c3e58978ec3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -115,6 +115,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.options.SdkHarnessOptions; import org.apache.beam.sdk.util.construction.CoderTranslation; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.auth.MoreCallCredentials; @@ -1042,6 +1043,18 @@ public static void main(String[] args) throws Exception { WindowedValues.FullWindowedValueCoder.setMetadataSupported(); } + SdkHarnessOptions sdkHarnessOptions = options.as(SdkHarnessOptions.class); + Map openTelemetryProperties = sdkHarnessOptions.getOpenTelemetryProperties(); + if (openTelemetryProperties != null && !openTelemetryProperties.isEmpty()) { + openTelemetryProperties.forEach( + (k, v) -> { + if (k != null && v != null) { + System.setProperty(k, v); + } + }); + LOG.info("Enabled Open Telemetry with properties: {}", openTelemetryProperties); + } + LOG.debug("Creating StreamingDataflowWorker from options: {}", options); StreamingDataflowWorker worker = StreamingDataflowWorker.fromOptions(options); diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index d0724432f3c9..703e726739a0 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -296,6 +296,17 @@ public static void main( // Register standard file systems. FileSystems.setDefaultPipelineOptions(options); CoderTranslation.verifyModelCodersRegistered(); + SdkHarnessOptions sdkHarnessOptions = options.as(SdkHarnessOptions.class); + Map openTelemetryProperties = sdkHarnessOptions.getOpenTelemetryProperties(); + if (openTelemetryProperties != null && !openTelemetryProperties.isEmpty()) { + openTelemetryProperties.forEach( + (k, v) -> { + if (k != null && v != null) { + System.setProperty(k, v); + } + }); + LOG.info("Enabled Open Telemetry with properties: {}", openTelemetryProperties); + } EnumMap< BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction>