-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-7969] Report FnAPI counters as deltas in streaming jobs. #9330
Conversation
FnAPI counters were not reported in streaming worker. Streaming Dataflow jobs do not tie counters to workitems, but report them as deltas on same workitem instead. This change picks only latest counters per workitem from FnAPI harness, treats them as updates and reports to DFE.
cc: @angoenka |
@@ -1867,6 +1873,28 @@ private void sendWorkerUpdatesToDataflowService( | |||
cumulativeCounters.extractUpdates(false, DataflowCounterUpdateExtractor.INSTANCE)); | |||
counterUpdates.addAll( | |||
deltaCounters.extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE)); | |||
if (hasExperiment(options, "beam_fn_api")) { | |||
while (!this.pendingMonitoringInfos.isEmpty()) { | |||
final CounterUpdate item = this.pendingMonitoringInfos.poll(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SDK harness will give you the full value of the counter on each MonitoringInfo. Don't you need to convert it to a delta, since the last reported value? Or does setting cumulative have DFE solve this internally?
You might want to confirm with @edre.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's ok to just report the full value as the delta, but only as long as we only report counters when finishing the work item.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-cumulative means delta.
+ " Please, update convertion to delta logic" | ||
+ " if non-cumulative counter type is required."); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method essentially has 3 inputs correct? pendingMonitoringInfos, deltaCounters and cumulativeCounters.
And one output counterUpdates
Can you write some unit tests which verifies that counterUpdates is populated properly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
private final MetricTrackingWindmillServerStub metricTrackingWindmillServer; | ||
private final CounterSet pendingDeltaCounters = new CounterSet(); | ||
private final CounterSet pendingCumulativeCounters = new CounterSet(); | ||
private final java.util.concurrent.ConcurrentLinkedQueue<CounterUpdate> pendingMonitoringInfos = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please add some comments about how pendingMonitoringInfos is concurrently read/written to? i.e. Which threads are running concurrently and using this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the change!
@@ -1867,6 +1873,28 @@ private void sendWorkerUpdatesToDataflowService( | |||
cumulativeCounters.extractUpdates(false, DataflowCounterUpdateExtractor.INSTANCE)); | |||
counterUpdates.addAll( | |||
deltaCounters.extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE)); | |||
if (hasExperiment(options, "beam_fn_api")) { | |||
while (!this.pendingMonitoringInfos.isEmpty()) { | |||
final CounterUpdate item = this.pendingMonitoringInfos.poll(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets also check for item == null to avoid possible race condition as its called at start, via a timer and at stop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are good here since we first initialize class and metric and only then start multiple threads.
} else { | ||
// In current world all counters coming from FnAPI are cumulative. | ||
// This is a safety check in case new counter type appears in FnAPI. | ||
throw new UnsupportedOperationException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we log the error and move on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to fail fast in this case. New type of counter appears only when someone modifies SDK, so it comes from beam developer, not user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should not fail in this case. What happens if a someone writes a custom SDK to run on Dataflow. It may introduce different metrics, but you should not fail.
An SDK can also be written by someone outside of beam repo entirely, not necessarily written by a beam developer.
https://docs.google.com/document/d/1MtBZYV7NAcfbwyy9Op8STeFNBxtljxgy69FkHMvhTMA/edit
Customization- Custom SDKs+engines can add their own custom metrics and custom monitored state without modifying beam proto files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was bad way to formulate on my side:
CounterUpdates are generated within Java DF Worker. So if we receive different type of CounterUpdate here, that means that someone modified DF worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, To extract metrics from SDK harness we use BeamFnMapTaskExecutor which uses map as may places like
Line 459 in b7bca84
Map<MetricKey, MetricUpdate<DistributionData>> snapshotDistributionUpdates = |
This map uses
MetricKey
as the key which can do not contain sdkHarness id and can potentially have name collision across multiple sdkHarness.We use multiple SdkHarness in case of python SDK which will have this problem.
Can you elaborate on the problem a bit please? IIUC different SDK harnesses will handle different steps, so metric keys will not overlap anyway. Also I feel fixing that issue better be done in separate PR. |
Thanks for the offline discussion. |
@Ardagan do you know what the test failures are? |
Seem to be some libraries import failure. Does work on my machine. Looking for fix. |
The error in "Google Cloud Dataflow Runner Python ValidatesContainer Tests" seems to be for matching the metrics now. Is this a different error? |
For missing |
Testing this in separate PR not to pollute for reviewers. Current summary. |
Hide pubsub import under method to avoid test failure on initialization. Move test to ValidatesRunner test attr.
SLEEP_TIME_SECS = 1 | ||
|
||
|
||
class StreamingUserMetricsDoFn(beam.DoFn): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pleaes add some comments to describe what this test does,
} else { | ||
// In current world all counters coming from FnAPI are cumulative. | ||
// This is a safety check in case new counter type appears in FnAPI. | ||
throw new UnsupportedOperationException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should not fail in this case. What happens if a someone writes a custom SDK to run on Dataflow. It may introduce different metrics, but you should not fail.
An SDK can also be written by someone outside of beam repo entirely, not necessarily written by a beam developer.
https://docs.google.com/document/d/1MtBZYV7NAcfbwyy9Op8STeFNBxtljxgy69FkHMvhTMA/edit
Customization- Custom SDKs+engines can add their own custom metrics and custom monitored state without modifying beam proto files.
pipeline_options.view_as(StandardOptions).streaming = True | ||
pipeline = beam.Pipeline(options=pipeline_options) | ||
|
||
_ = (pipeline |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason why you did not reuse the existing pipeline integration test and run it in streaming mode? Why did you need to write a new pipeline? Rather than reusing code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading from files requires side inputs that is not properly supported in FnApi streaming and causes existing pipeline to fail. Reusing your existing pipeline was the first thing I tried to do.
return dataflow_exercise_streaming_metrics_pipeline.run(argv) | ||
|
||
@attr('IT', 'ValidatesRunner') | ||
def test_streaming_pipeline_returns_expected_user_metrics_fnapi_it(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add comment describing that this test does.
Thank you all for fixing this! |
…e#9330) * Report FnAPI counters as deltas in streaming jobs. FnAPI counters were not reported in streaming worker. Streaming Dataflow jobs do not tie counters to workitems, but report them as deltas on same workitem instead. This change picks only latest counters per workitem from FnAPI harness, treats them as updates and reports to DFE.
…e#9330) * Report FnAPI counters as deltas in streaming jobs. FnAPI counters were not reported in streaming worker. Streaming Dataflow jobs do not tie counters to workitems, but report them as deltas on same workitem instead. This change picks only latest counters per workitem from FnAPI harness, treats them as updates and reports to DFE.
FnAPI counters were not reported in streaming worker.
Streaming Dataflow jobs do not tie counters to workitems,
but report them as deltas on same workitem instead.
This change picks only latest counters per workitem
from FnAPI harness, treats them as updates and reports
to DFE.
[Tested manually at this point]
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.