Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-7368][metrics] Make MetricStore ThreadSafe class #4840

Closed
wants to merge 2 commits into from

Conversation

pnowojski
Copy link
Contributor

Remove external synchronisation on MetricStore

What is the purpose of the change

This is a refactor that makes MetricStore thread safe. It achieves it by pulling in all modifying methods and by returning immutable copies in getters.

Verifying this change

This change is already covered by existing tests, such as MetricStoreTest or MetricFetcherTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)

@pnowojski pnowojski force-pushed the flink7368 branch 3 times, most recently from 7442005 to 27eb459 Compare October 18, 2017 11:15
Remove external synchronization on MetricStore
*
* @param activeTaskManagers to retain.
*/
public synchronized void retainTaskManagers(List<String> activeTaskManagers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the method synchronization unnecessary now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly speaking right now it isn't. However without synchronising those methods, implementing them is much harder. For example I made one mistake where I implemented:

if (!jobs.containKey(jobID)) {
  return null;
}
return jobs.get(jobID).getTaskMetricStore(taskID)

Also without synchronising it's harder to understand correctness of code, that's handling three separate fields (whether this is a correct access order; whether the state of variable is/should be coherent, like if existence of key in one map, should imply existence a key on another, etc).

Long story short, synchronising is not an issue here, but helps with reasoning about this code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but now we're paying the synchronization costs twice, once for the synchronized keyword and once again for every access to the map. If every method is synchronized the maps don't have to be concurrent hash maps.

Copy link
Contributor Author

@pnowojski pnowojski Oct 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. performance here is not that big a deal, but code correctness is. With synchronized it is just easier to implement any changes here in a thread safe manner. Without it, any new developer coming here will have to understand much more assumptions about this code, like whether consistency matters here or nor? whether order of the operations/access to the fields is important or nor? etc...

  2. even with synchronized we still need either concurrent hash maps, because we return and make them visible to the outside world by getters. So we either need concurrent hash maps or return copies of them.

But I'm not that strongly opinionated about this one. If after this comment you still would like to drop those synchronized, I will do it.

*
* @param activeTaskManagers to retain.
*/
public synchronized void retainTaskManagers(List<String> activeTaskManagers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but now we're paying the synchronization costs twice, once for the synchronized keyword and once again for every access to the map. If every method is synchronized the maps don't have to be concurrent hash maps.


public void addGarbageCollectorName(String name) {
garbageCollectorNames.add(name);
}

public static TaskManagerMetricStore unmodifiable(TaskManagerMetricStore source) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether we really need this unmodifiable business. Yes, it's technically a good idea, but the access to the MetricStore is limited and fully under our control; so we know that we never try to modify the map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a matter of clean design and maintaining it in the future. You only know that we do not modify this map at the moment. Are you sure that every committer reviewing changes in this parts of the code will always spot any such modifications without unmodifiable assertion? On the other hand removing unmodifiable in a PR is big yellow warning light that's something is going on.

Following this logic, why don't we make all methods/fields public, since we know that we will never try to access supposed to be private fields?

Copy link
Contributor

@zentol zentol Oct 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case we should rather modify the store to not allow writes in the first place, instead of opting for unmodifiable collections that are pretty much a hack. "here, have an object that fails for 50% of the defined methods"; that's hardly good design is it.

Till suggested dedicated read methods that return metrics, something like List<Tuple<String, String>> getMetrics(List<String> metricNames) instead of exposing the metric maps.

This would make the interface cleaner and would allow us to simplify the synchronization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you rather have a runtime error about illegal/unintended write access or none at all? Java sucks in this regard (missing const-correctness) :(

Exposing List<...> have the same issue.

Copy link
Contributor

@zentol zentol Oct 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the returned list is a fresh copy independent of the map it can be modifiable; the user can do whatever he wants with it. There are no side-effects on the MetricStore which as i understand is the reasoning behind making things unmodifiable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then you pay costs of copying all of the objects, which is much higher compared to the synchronisation. But yes, you can do that. In that case you don't need concurrent hash maps.

If you intend to go in other direction with this and leave the current code in the master as it is, please log the work in Jira for this after the release (I will close release blocker issue for which I started this PR). However I have an impression that any follow up work would be easier on top of this change and in the mean time this is still way better then current code on master branch with this external synchronisation.

@@ -79,7 +79,7 @@ public void addIOMetrics(AccessExecution attempt, @Nullable MetricFetcher fetche
fetcher.update();
MetricStore metricStore = fetcher.getMetricStore();
synchronized (metricStore) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leftover synchronization

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know why I have missed it :(

*
* @param activeJobs to retain.
*/
public synchronized void retainJobs(List<String> activeJobs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be package private

*
* @param metricDumps to add.
*/
public synchronized void addAll(List<MetricDump> metricDumps) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

package private

public void add(MetricDump metric) {

/**
* Returns the {@link ComponentMetricStore}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc is missing any mention of jobmanager

/**
* Returns the {@link ComponentMetricStore}.
*
* @return JobManagerMetricStore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outdated class reference

JobMetricStore job = getJobMetricStore(jobID);
if (job == null) {
return null;
public ComponentMetricStore() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all constructors can be private

* Sub-structure containing metrics of the JobManager.
*/
public static class JobManagerMetricStore extends ComponentMetricStore {
public static ComponentMetricStore unmodifiable(ComponentMetricStore source) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be private

assertEquals(0, store.jobManager.metrics.size());
assertEquals(0, store.taskManagers.size());
assertEquals(0, store.jobs.size());
assertTrue(store.getTaskManagers().isEmpty());
Copy link
Contributor

@zentol zentol Oct 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i prefer the previous version as it prints the actual value.

Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merging.

zentol pushed a commit to zentol/flink that referenced this pull request Oct 25, 2017
Remove external synchronization on MetricStore

This closes apache#4840.
@pnowojski
Copy link
Contributor Author

Thanks!

@pnowojski pnowojski closed this Oct 25, 2017
@pnowojski pnowojski reopened this Oct 25, 2017
zentol pushed a commit to zentol/flink that referenced this pull request Oct 26, 2017
Remove external synchronization on MetricStore

This closes apache#4840.
@asfgit asfgit closed this in f622de3 Oct 26, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants