Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-3162: Fix concurrent modification bug #2800

Closed
wants to merge 3 commits into from

Conversation

zd-project
Copy link
Contributor

See: https://issues.apache.org/jira/browse/STORM-3162

I managed to alter the Java implementation and it seems to be passing storm-server test. However in storm-core there's a nimbus_test.clj which depends on some of the older implementation that I changed in this PR. I myself am not very familiar with Clojure so I don't know how to fix it. If any of you can take a look it'll be great.

Copy link
Contributor

@srdo srdo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find. I'll be happy to help update the clojure code.

It's unrelated to this, but I'm wondering if we should replace the Map<String, Object> heartbeat model with a real class. It looks to me like it's currently a map with some magic strings in it.

if (cache == null) {
if (executorBeats == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The construction here seems a little odd. Initializing cache and executorBeats when null isn't necessary. I think we should keep the "if cache and executorBeats are null" clause, then replace the other two branches with something like Map<String, Object> currBeat = cache == null ? null : cache.get(executor); and equivalent for newBeat.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, i did see any reason why this method is not thread safe, cause it almost a tool method, only to initialize a Map cache which is updated into Nimbus heartbeatsCache through heartbeatsCache.getAndUpdate(new Assoc<>(topoId, cache)), ConcurrentModificationException happens when we iterate over a collection through iterator and also modify it, but here, we only iterate the executor list and do not modify any of the list entry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The very same map created here will be used in updateHeartbeatCache, which may be modified concurrently there. Hope this answered your question.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concurrently modify a HashMap is ok if we are not also iterate over it, for heartbeats updating, we only need a final consistency.

//else refresh nimbus-time and executor-reported-time by heartbeats reporting
for (List<Integer> executor : executors) {
cache.put(executor, updateExecutorCache(cache.get(executor), executorBeats.get(executor), timeout));
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Explicit return can help decrease indentation, I liked it better before.

heartbeatsCache.getAndUpdate(new Assoc<>(topoId, cache));
StatsUtil.convertExecutorBeats(stormClusterState.executorBeats(topoId, existingAssignment.get_executor_node_port()));
heartbeatsCache.compute(topoId, (k, v) ->
//Guaranteed side-effect-free
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably put this requirement in comments on the two methods in StatsUtil rather than here, so they stay side effect free.

@srdo
Copy link
Contributor

srdo commented Aug 16, 2018

Fixed the nimbus_test zd-project#1

if (executorBeats == null) {
for (Map.Entry<List<Integer>, Map<String, Object>> executorbeat : cache.entrySet()) {
Map<String, Object> beat = executorbeat.getValue();
//If not executor beats, refresh is-timed-out of the cache which is done by master

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code branch can only be invoked by 'Nimbus' and it is always a single thread modification, so please make sure if it will throw any ConcurrentModificationException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is wrapped in another method exposed in Thrift API, see sendSupervisorWorkerHeartbeat

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually where the ConcurrentModificationException is thrown. Notice that the old code invokes both cache.entrySet() and cache.put() in this method. Since it's exposed through thrift, it's possible to have ConcurrentModificationException. Also see travis log here for an example: https://travis-ci.org/apache/storm/jobs/408719153#L1897

Copy link

@danny0405 danny0405 Aug 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then please check the code invocation when the passed in executorBeats == null, for sendSupervisorWorkerHeartbeat we will never get a null but at least a empty map. The code
comment already address that.

For testing, i believe there should be some bug to fix, but this code modification is not that necessary.

Actually i used the 2.0 version storm for our 30 nodes cluster at least for 3 months and i never got a ConcurrentModificationException.

@@ -490,7 +493,7 @@ public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stor
stormClusterState = makeStormClusterState(conf);
}
this.stormClusterState = stormClusterState;
this.heartbeatsCache = new AtomicReference<>(new HashMap<>());
this.heartbeatsCache = new ConcurrentHashMap<>();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change back to AtomicReference cause it is multi_thread visible, actually the thrift server serves the RPC methods through multi threading, so we should keep the heartbeatsCache modification be seen as much as possible.

@danny0405
Copy link

Please make sure again why the ConcurrentModificationException happens and attach the stack trace.

@srdo
Copy link
Contributor

srdo commented Aug 25, 2018

I'm sorry if this turns a bit verbose, but I'm going to write down what I see as the issue here, so we can hopefully come to a common understanding (and so I don't forget and have to look at this again)

As far as I can tell, the uses of heartbeatsCache in Nimbus are thread safe, because the values are never modified, just overwritten. That is, we don't do heartbeatsCache.get(topoId).put(foo, bar), instead we do heartbeatsCache.getAndUpdate(func), which replaces the value entirely. I don't believe we need further synchronization here, since the AtomicReference ensures that the value changes are propagated to all threads, and two threads reading from an effectively immutable map at the same time should be fine(?)

However in the updateHeartbeatCache method in StatsUtil

public static void updateHeartbeatCache(Map<List<Integer>, Map<String, Object>> cache,
we take one of the values from heartbeatsCache and modify it.

There are a couple of problems here. First, the cache value is a regular HashMap and not a ConcurrentHashMap, so modifying it from two threads at once isn't safe. Second, in the branch in updateHeartbeatCache where executorBeats is null, we iterate over the cache parameter. If one thread is in the iteration, and another thread is in the other branch in updateHeartbeatCache, we get the exception.

The reason this exception isn't thrown in a real cluster is that the executorBeats parameter is only null when called from

StatsUtil.updateHeartbeatCache(heartbeatsCache.get().get(topoId),

This only happens when Nimbus is booting up as part of launchServer, or when someone triggers a rebalance in the topology. We see it in the tests, because Nimbus and the supervisors are started concurrently, so Nimbus can be in one branch in StatsUtil.updateHeartbeatCache while one of the supervisors is in the other branch. It can technically happen in a real cluster, but someone would have to get extremely unlucky with rebalance timing.

I think the fix here should be making sure that StatsUtil.updateHeartbeatCache is thread safe. One option is to make the cache value a ConcurrentHashMap. Another option would be to make updateHeartbeatCache create and return a new map, instead of modifying the existing one.

@danny0405
Copy link

@srdo @zd-project
Thx for your explanation, that make sense for me.

One thing needs to clarify is that executorBeats parameter for StatsUtil#updateHeartbeatCache is null for every scheduling round of master in order to refresh the is-timed-out flag.

There does have possibility that supervisor/worker will walk into code branch:

//else refresh nimbus-time and executor-reported-time by heartbeats reporting
for (List<Integer> executor : executors) {
cache.put(executor, updateExecutorCache(cache.get(executor), executorBeats.get(executor), timeout));
}

and Nimbus the other:

//if not executor beats, refresh is-timed-out of the cache which is done by master
if (executorBeats == null) {
for (Map.Entry<List<Integer>, Map<String, Object>> executorbeat : cache.entrySet()) {
Map<String, Object> beat = executorbeat.getValue();
beat.put("is-timed-out", Time.deltaSecs((Integer) beat.get("nimbus-time")) >= timeout);
}
return;

I think the key here is we used a forEach interation for the cache, so here, we could change it to a iterator loop, which is okey cause we only need final consistency instead of ConcurrentMap or copy which will cause perf regression.

@srdo
Copy link
Contributor

srdo commented Aug 26, 2018

I don't think fixing the executorBeats == null branch is enough. As far as I can tell, two supervisors/workers can be in the

//else refresh nimbus-time and executor-reported-time by heartbeats reporting
for (List<Integer> executor : executors) {
cache.put(executor, updateExecutorCache(cache.get(executor), executorBeats.get(executor), timeout));
}
branch at the same time for the same topology. We won't get an exception if this happens, but we'll still be modifying a HashMap from two threads at the same time, which isn't safe.

Regarding fixing the executorBeats == null branch, it isn't enough to switch to an iterator, since iterators have the same behavior as a forEach loop (throws exception if underlying collection is concurrently modified).

@srdo
Copy link
Contributor

srdo commented Aug 26, 2018

Regarding performance, consider that Nimbus is already copying heartbeatCache on writes everywhere else https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java#L4636-L4639.

Changing StatsUtil.updateHeartbeatCache to return a new Map and using Assoc to update heartbeatCache would be my preferred solution.

@danny0405
Copy link

danny0405 commented Aug 26, 2018

@srdo
Agree that copy is a better solution.

Cause we only need a final consistency, fix the executorBeats == null branch is enough, but i also agree to keep the other branch thread safe in order this will not confuse.

@srdo
Copy link
Contributor

srdo commented Aug 26, 2018

@danny0405 Could you elaborate on why fixing the executorBeats == null branch is enough? My concern is that the other branch modifies a HashMap (the cache parameter) from multiple threads with no synchronization. Why is this safe?

@danny0405
Copy link

@srdo
Cause only modify it through multi-threads but not iterator over it, and the cache key is executor-id, which will only have conflict between master and supervisor.

@srdo
Copy link
Contributor

srdo commented Aug 27, 2018

@danny0405 That doesn't sound safe to me. I think you're right that it works fine most of the time, but if there are key collisions or an insert leads the map to get resized, I would think that two threads modifying the map at the same time could interfere with each other.

Either way, if you're okay with making the whole function thread safe, I think we should do it.

@srdo
Copy link
Contributor

srdo commented Sep 11, 2018

@zd-project I'd like to finish this up. Let me know if you want to make the last couple of fixes, otherwise I'll open a new PR containing this fix.

@danny0405 I thought about it a bit more, and while I still think we can fix this by making updateHeartbeatCache thread safe by making it return a new map and keeping the pre-this-PR AtomicReference in Nimbus, I'm not sure why this would be faster than just using a ConcurrentHashMap like the current PR code here does? Using the AtomicReference in Nimbus essentially makes the heartbeat cache a copy-on-write Map due to the way we do updates via Assoc and Dissoc. I would expect a ConcurrentHashMap to provide better parallelism. What do you think?

@zd-project
Copy link
Contributor Author

Agreed. I think atomic reference is really just for Clojure compatibility there. I’ll finish this up.

@danny0405
Copy link

danny0405 commented Sep 12, 2018

@srdo
I think the only difference is that compared to AtomicReference, ConcurrentHashMap can keep thread safe but can not ensure that the value read is up to date, which will cause some inconsistent behavior,cause the scheduling thread will read it every 10s, with an out of date heartbeat cache, master will kill a fine worker or restart an already started one.

But the ConcurrentHashMap will have more specific lock granularityand less susceptible to resource contention, and is likely to be the more performant of the two.

@srdo
Copy link
Contributor

srdo commented Sep 12, 2018

@danny0405 Thanks for explaining. I'm not sure I understand why the scheduling thread will see older values with ConcurrentHashMap than with AtomicReference? It was my understanding that ConcurrentHashMap had the same happens-before guarantees as volatile variables for reads/writes?

@danny0405
Copy link

danny0405 commented Sep 13, 2018

@srdo
For thread visibility, i think the 2 are the same.

I think ConcurrentHashMap is a better choice for performance.

@revans2
Copy link
Contributor

revans2 commented Sep 15, 2018

Please take a look at #2836 as an alternative. It is a much bigger patch, but I think the refactoring it does will make things much easier longer term. Having done the other patch I think this one does fix the immediate issue, but the current HB cache is so complex that I didn't feel like I understood all of the ways the cache was accessed before doing the other patch.

STORM-3162: Fix nimbus_test
@zd-project
Copy link
Contributor Author

As far as my understanding of the code goes I believe this fix should resolve this particular issue. However my understanding in HB cache was limited and the PR was pushed hastily during my last few days of internship. I definitely support switching to the alternative if it actually solves the deeper structural issue.

Please let me know if there's anything else I can help with.

@srdo
Copy link
Contributor

srdo commented Apr 15, 2019

@srdo srdo closed this Apr 15, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants