-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-10247][Metrics] Run MetricQueryService in separate thread pool #6676
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
yanghua
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just some little suggestion~
| private final Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>(); | ||
| private final Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>(); | ||
| private final Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>(); | ||
| private ExecutorService threadpool; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we mark this field with final?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the remind.
| MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters); | ||
| sender.tell(dump, self); | ||
| } else { | ||
| LOG.warn("MetricQueryServiceActor received an invalid message. " + message.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe LOG.warn("MetricQueryServiceActor received an invalid message : {}. ", message.toString()); look better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.
| sender.tell(dump, self); | ||
| } else { | ||
| LOG.warn("MetricQueryServiceActor received an invalid message. " + message.toString()); | ||
| sender.tell(new Status.Failure(new IOException("MetricQueryServiceActor received an invalid message. " + message.toString())), self); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same
| sender.tell(new Status.Failure(new IOException("MetricQueryServiceActor received an invalid message. " + message.toString())), self); | ||
| } | ||
| } catch (Exception e) { | ||
| LOG.warn("An exception occurred while processing a message.", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"An exception occurred while processing a metric message."?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like a little bit ambiguous, I think change it to "... a Metric related message." would be better.
|
Hi @tillrohrmann , could you please review this PR when you have time? |
tillrohrmann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening this PR @Clarkkkkk. I think your solution solves the described problem. However, I'm wondering whether we should not go one step further and execute the MetricQueryService actor in a separate ActorSystem independent of the main components AkkaRpcService. That way, we would separate the components so that they no longer can directly influence each other.
What we would need to do for this is to start and stop an auxiliary ActorSystem in the ClusterEntrypoint and the TaskManagerRunner. This ActorSystem should have a fixed thread pool executor with a single thread. Next, we would need to propagate the metric query service path from the TaskExecutor through the ResourceManager to the Dispatcher. At the moment, the RM constructs the query service metric path from the TaskExecutors address.
As a follow up, we could add a special dispatcher which starts the executor threads with a lower priority. That way, we would prioritize the main component threads of the threads fueling the metric query service.
What do you think?
|
Hi @tillrohrmann, I have investigated on this after you comment on Jira. Thank you for sharing your idea and it sounds good to me. I will work on this recently. |
In order to execute MetricQueryService in a single thread pool executor, the AkkaRpcService should be refactored to support two executor modes - fork-and-join in default and single thread will be used by MetricQueryService actor system. This commit adds two modes and add methods in AkkaRpcServiceUtils and AkkaUtils to accordingly.
What is the purpose of the change
Introduce a dedicated thread pool for MetricQueryService to process messages.
Brief change log
Verifying this change
This change is already covered by existing tests, such as (please describe tests).
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation