Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Task]: Fill Runner Metrics support gap in Sources #32021

Open
3 of 17 tasks
Abacn opened this issue Jul 30, 2024 · 1 comment
Open
3 of 17 tasks

[Task]: Fill Runner Metrics support gap in Sources #32021

Abacn opened this issue Jul 30, 2024 · 1 comment

Comments

@Abacn
Copy link
Contributor

Abacn commented Jul 30, 2024

What needs to happen?

It is found some runners does not support reporting metrics in

  1. BoundedSource.split()
  • Direct runner
  • non-portable Flink runner
  • non-portable Spark runner
  • unknown - Samza runner, AttemptedMetrics tests are excluded altogether

Dataflow runner (legacy / runner v2) are supported.

  1. BoundedReader.advance()
  • Spark Structured Streaming Runner

Notably, portable runners support metrics report in split, as it executes Source as a splittable DoFn, at the point the metrics container is present

This task is created to track these gaps.

Issue Priority

Priority: 2 (default / most normal work should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@Abacn
Copy link
Contributor Author

Abacn commented Jul 30, 2024

By saying unsupported, the following log is seen:

Direct runner:

[Test worker] ERROR org.apache.beam.sdk.metrics.MetricsEnvironment - Unable to update metrics on the current thread. Most likely caused by using metrics outside the managed work-execution thread:
  java.lang.Thread.getStackTrace(Thread.java:1564)
org.apache.beam.sdk.metrics.MetricsEnvironment.getCurrentContainer(MetricsEnvironment.java:140)
org.apache.beam.sdk.metrics.DelegatingCounter.inc(DelegatingCounter.java:76)
org.apache.beam.sdk.metrics.DelegatingCounter.inc(DelegatingCounter.java:67)
org.apache.beam.sdk.metrics.MetricsTest$CountingSourceWithMetrics.split(MetricsTest.java:495)
org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$InputProvider.getInitialInputs(BoundedReadEvaluatorFactory.java:217)
org.apache.beam.runners.direct.ReadEvaluatorFactory$InputProvider.getInitialInputs(ReadEvaluatorFactory.java:88)
org.apache.beam.runners.direct.RootProviderRegistry.getInitialInputs(RootProviderRegistry.java:80)
org.apache.beam.runners.direct.ExecutorServiceParallelExecutor.start(ExecutorServiceParallelExecutor.java:161)
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
...

Flink runner:

[jobmanager-io-thread-1] ERROR org.apache.beam.sdk.metrics.MetricsEnvironment - Unable to update metrics on the current thread. Most likely caused by using metrics outside the managed work-execution thread:
  java.lang.Thread.getStackTrace(Thread.java:1564)
org.apache.beam.sdk.metrics.MetricsEnvironment.getCurrentContainer(MetricsEnvironment.java:140)
org.apache.beam.sdk.metrics.DelegatingCounter.inc(DelegatingCounter.java:76)
org.apache.beam.sdk.metrics.DelegatingCounter.inc(DelegatingCounter.java:67)
org.apache.beam.sdk.metrics.MetricsTest$CountingSourceWithMetrics.split(MetricsTest.java:495)
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:135)
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:44)
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:251)
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:894)
org.apache.flink.runtime.executiongraph.ExecutionGraph.initializeJobVertex(ExecutionGraph.java:224)
...

This is different from a log ususally seen when reporting metrics in a callback thread

"Reporting metrics are not supported in the current execution environment:\n {}",

here Java core's isMetricsSupported() returns true, but not actually supported by the runner. So this is a feature gap rather than WAI by spec

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant