Skip to content

Conversation

zentol
Copy link
Contributor

@zentol zentol commented Aug 21, 2018

What is the purpose of the change

This PR fixes a severe issue in the metric system where chained batch operators would always operate on the same OperatorMetricGroup. As a result most Flink-provided metrics were not exposed for chained operators at all, while other metrics, like task-level IO metrics, were rendered incorrect.

The problem is that we used the tasks VertexID to identify operators; which is obviously identical for all operators in a chain. We now use the vertexID and operator name to identify them.

Brief change log

  • fix identification in TaskMetricGroup by using both the ID and operator name
  • extend MockEnvironment[Builder] to allow the TaskMetricGroup to be set

Verifying this change

This change added tests:

  • ChainedOperatorsMetricTest
  • run a basic wordcount as described in the JIRA and verify the results via the UI/reporter of your choice

Copy link
Contributor

@azagrebin azagrebin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix @zentol !
I have left only one more suggestion which can be addressed before merging.


synchronized (this) {
OperatorMetricGroup previous = operators.put(operatorID, operator);
OperatorMetricGroup previous = operators.put(key, operator);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to reduce the code under synchronized using:

return operators.putIfAbsent(key, operator);

Also, I would rename addOperator function name to getOrAddOperatorMetric.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will rename the method but leave the code as is. It was intentionally written that way so that we only do a single lookup on the happy path. The default implementation of putIfAbsent is just syntactic sugar for separate get/put calls. While the HashMap implementation of this method is indeed more efficient in this regard this is an implementation detail.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants