Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STORM-2905] Fix KeyNotFoundException when kill a storm and add isRem… #2618

Closed
wants to merge 1 commit into from

Conversation

danny0405
Copy link

@danny0405 danny0405 commented Mar 31, 2018

When we kill a topology, at the moment of topology blob-files be removed, Supervisor executor may still request blob-files and get an KeyNotFoundException.

I stepped in and found the reason:

  1. We do not add a guarded lock on AsyncLocalize#topologyBlobs which is singleton to a supervisor node.
  2. And we remove jar/code/conf blob keys in topologyBlobs of killed storm only in a timer task: AsyncLocalize#cleanUp, the remove condition is :[no one reference the blobs] AND [ blobs removed by master OR exceeds the max configured size ], the default scheduling interval is 30 seconds. Here we may remove topologyBlobs overdue keys if condition is matched.
  3. When we have killed a storm on a node[ which means that the slot container are empty], the AsyncLocalizer will do: releaseSlotFor to only remove reference on the blobs [topologyBlobs overdue keys are still there].
  4. Then the container is empty, and Slot.java will do: cleanupCurrentContainer to invoke AsyncLocalizer #releaseSlotFor for releasing the slot.
  5. AsyncLocalizer have a timer task: updateBlobs to update base/user blobs every 30 seconds, which based on the AsyncLocalizer#topologyBlobs based keys.
  6. We know that AsyncLocalizer#topologyBlobs overdue keys are only removed by its AsyncLocalizer#cleanUp which is also a timer task.
  7. So when we kill a storm, AsyncLocalizer#updateBlobs may update based on a removed jar/code/conf blob-keys and fire a exception[if the AsyncLocalize#`cleanUp not cleaned these keys], then retried until the configured max times to end.

Here is how i fixed it:

  1. Just remove the base blob keys eagerly when we do AsyncLocalizer #releaseSlotFor when there is no reference [no one used] on the blobs, and remove the overdue keys in AsyncLocalizer#topologyBlobs
  2. Guard the AsyncLocalizer#updateBlobs and AsyncLocalizer #releaseSlotFor by the same lock.
  3. When container is empty, we do not need to exec AsyncLocalizer #releaseSlotFor[because we have already deleted them].
  4. I also add a new RPC api for decide if there exists a remote blob, we can use it to decide it the blob could be removed instead of use getBlobMeta and catch a confusing KeyNotFoundException [both on supervisors and master log for every base blobs].

This is the JIRA: STORM-2905

@@ -497,7 +497,6 @@ static DynamicState cleanupCurrentContainer(DynamicState dynamicState, StaticSta
assert(dynamicState.container.areAllProcessesDead());

dynamicState.container.cleanUp();
staticState.localizer.releaseSlotFor(dynamicState.currentAssignment, staticState.port);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleanupCurrentConatiner gets used in many different locations, not just during the blob update. We need to release the slot when the container is killed or reference counting will be off.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, but staticState.localizer.releaseSlotFor only remove topology base blobs[jar/code/conf] which are already taken care of by releaseSlotFor itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is not true.

https://github.com/danny0405/storm/blob/a4e659b5073794396ea23e3dd7b79c00536fc3fe/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java#L505-L511

The code first removes the base blobs reference counts, but then it decrements a reference count for other blobs too.

@@ -94,17 +93,17 @@ public void cleanup(ClientBlobStore store) {
Map.Entry<LocallyCachedBlob, Map<String, ? extends LocallyCachedBlob>> rsrc = i.next();
LocallyCachedBlob resource = rsrc.getKey();
try {
resource.getRemoteVersion(store);
if (!store.isRemoteBlobExists(resource.getKey())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Admittedly the code is cleaner with this, but the change is totally unneeded. It behaves exactly the same as it did before. I think this is a good change, it would just be nice to have it be a separate pull request and a separate JIRA as it is not really a part of the needed fix.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, i will separate it apart with another JIRA.

@revans2
Copy link
Contributor

revans2 commented Apr 2, 2018

I am trying to understand the reasons behind this change. Is this jira just to remove an exception that shows up in the logs? Or is that exception actually causing a problem?

The reason I ask is a risk vs reward situation. The code in AsyncLocalizer is really very complicated and because it is asynchronous there are lots of races and corner cases. This makes me a bit nervous to start changing fundamental things just because of some extra logs. Additionally this is a distributed system and this particular race is inherent in the system. It is possible for someone to delete a blob at any point in time and the code in the supervisor needs to handle it.

@danny0405
Copy link
Author

@revans2
I did this path for the concurrent race condition on AsyncLocalize#topologyBlobs of func: updateBlobs and releaseSlotFor, AsyncLocalize#topologyBlobs overdue keys will be only cleaned by AsyncLocalize#clean() timer task, updateBlobs is also a timer task but not guarded by lock.

I add a RPC api isRemoteBlobExist only to let the log not confusing, which is unrelated to this patch.

@revans2
Copy link
Contributor

revans2 commented Apr 3, 2018

@danny0405

updateBlobs does not need to be guarded by a lock. This is what I was talking about with the code being complex.

requestDownloadBaseTopologyBlobs is protected by a lock simply because of this non-thread safe code.

CompletableFuture<Void> ret = topologyBasicDownloaded.get(topologyId);
if (ret == null) {
ret = downloadOrUpdate(topoJar, topoCode, topoConf);
topologyBasicDownloaded.put(topologyId, ret);
}

Part of this were written prior to the more to java8 so computeIfAbsent was not available. Now that it is we could replace it and I believe remove the lock, but I would want to spend some time to be sure it was not accidentally protecting something else in there too.

requestDownloadTopologyBlobs looks like it does not need to be synchronized at all. It must have been a mistake on my part, but it does look like it might be providing some protection to a bug in

CompletableFuture<Void> localResource = blobPending.get(topologyId);
if (localResource == null) {
Supplier<Void> supplier = new DownloadBlobs(pna, cb);
localResource = CompletableFuture.supplyAsync(supplier, execService);
blobPending.put(topologyId, localResource);

Which is executing outside of a lock, but looks to not be thread safe.

Declaring updateBlobs as synchronized does absolutely noting except make it have conflicts with requestDownloadTopologyBlobs and requestDownloadBaseTopologyBlobs. And if we are able to remove the locks there, then it will not be an issue at all. updateBlobs is scheduled using scheduleWithFixedDelay

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay(java.lang.Runnable,%20long,%20long,%20java.util.concurrent.TimeUnit)

The javadocs clearly states that the next execution only starts a fixed delay after the previous one finished. There will only ever be one copy it running at a time. Additionally everything it does is already asynchronous so would be happening on a separate thread. Making it synchronized would just slow things down.

The having a blob disappear at the wrong time is a race that will always be in the system and we cannot fix it with synchronization because it is happening on separate servers. The only thing we can do is to deal with it when it happens. The way the current code deals with it is to try again later. This means that a worker that is trying to come up for the first time will not come up until the blob is fully downloaded, but if we are trying to update the blob and it has disappeared we will simply keep the older version around until we don't need it any more. Yes we may log some exceptions while we do it, but that is the worst thing that will happen.

@revans2
Copy link
Contributor

revans2 commented Apr 3, 2018

Just FYI I files STORM-3020 to address the race that I just found.

@danny0405
Copy link
Author

danny0405 commented Apr 4, 2018

@revans2

To me the race condition has none of the business of AsyncLocalize# requestDownloadBaseTopologyBlobs , it's the race condition on AsyncLocalize#topologyBlobs of timer task AsyncLocalize#cleanup and AsyncLocalize#updateBlobs.

From my cases the exception is throw from this code segment:

private CompletableFuture<Void> downloadOrUpdate(Collection<? extends LocallyCachedBlob> blobs) {
        //ignored
                                synchronized (blob) {
                                long localVersion = blob.getLocalVersion();
                                //KeyNotFoundException is thrown here
                                long remoteVersion = blob.getRemoteVersion(blobStore);
                                if (localVersion != remoteVersion || !blob.isFullyDownloaded()) {
                                    try {
                                        long newVersion = blob.downloadToTempLocation(blobStore);
                                        blob.informAllOfChangeAndWaitForConsensus();
                                        blob.commitNewVersion(newVersion);
                                        blob.informAllChangeComplete();
                                    } finally {
                                        blob.cleanupOrphanedData();
                                    }
                                }
                            }
       //ignored
    }

When AsyncLocalizer start, it will start the two timer task:

execService.scheduleWithFixedDelay(this::updateBlobs, 30, 30, TimeUnit.SECONDS);
LOG.debug("Scheduling cleanup every {} millis", cacheCleanupPeriod);
execService.scheduleAtFixedRate(this::cleanup, cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);

Although scheduleAtFixedDelay is event trig model and only one task will exec at a time, we still can not make sure which can exec first for this two timer task because the time cost of these two tasks are different.

This bug happens because cleanup did after updateBlobs for killed storms overdue keys, so updateBlobs will still get a KeyNotFoundException.

@revans2
Copy link
Contributor

revans2 commented Apr 4, 2018

@danny0405

I just created #2622 to fix the race condition in AsyncLocalizer. It does conflict a lot with this patch, so I wanted to make sure you saw it and had a chance to give feedback on it.

I understand where the exception is coming from, but what I am saying is that putting a synchronize on both cleanup and updateBlobs does not fix the issue. Adding in the synchronize only serves to slow down other parts of the processing. Even controlling the order in which they execute is not enough, because cleanup will only happen after the scheduling change has been fully processed.

Perhaps some kind of a message sequence chart would better explain the race here.

worker being killed

The issue is not in the order of cleanup and checking for updates. The race is between nimbus deleting the blobs and the supervisor fully processing the topology being killed.

Any time after nimbus deletes the blobs in the blob store until the supervisor has killed the workers and released all references to those blobs we can still get this issue.

worker being killed bad

The above sequence is an example of this happening even if we got the ordering right.

The only way to "fix" the race is to make it safe to lose the race. The current code will output an exception stack trace when it loses the race. This is not ideal, but it is safe at least as far as I am able to determine.

That is why I was asking if the issue is just that we are outputting the stack trace or if there is something else that is happening that is worse than having all of the stack traces? If it is just the stack traces there are things we can do to address them. If it goes beyond that then I still don't understand the issue yet.

@danny0405
Copy link
Author

@danny0405
I agree with you that the race condition is between nimbus deleting the blobs and the supervisor fully processing the topology being killed.

But i still think we should handle this race condition reasonably as a normal and expectant case. It's a distribution system, so can the AsyncLocalizer decide based on master blob keys' existence when update local resources? A thrown stack trace it really confusing and non-necessary, especially when the blob-client try as configured max times to fetch a blob already removed from cluster.

At lease we should not make a downloading request when we have removed the blobs, and should not decide the blob's existence based on a thrown KeyNotFoundException[ confusing too, because user have already killed/removed them on their own initiative ].

@revans2
Copy link
Contributor

revans2 commented Apr 5, 2018

OK good I do understand the problem.

There really are a few ways that I see we can make the stack trace much less likely to come out in the common case. The following are in my preferred order, but I am open to other ideas.

  1. We don't delete the blobs on the nimbus side for a while after we kill the topology.
    Currently we delete the blobs on a timer that runs every 10 seconds by default, and I would have to trace through things, but I think we may do some other deletions before that happens. If instead we kept a separate map (TOPO_X can be cleaned up after Y) then when cleanup runs it can check that map and if it does not find the topo it wants to clean up, or if it finds it and the time has passed, then it cleans it up.

  2. We don't output the stack trace until it has failed some number of times in a row. This would mean that we would still output the error if the blob was deleted when it should not have been, but would not look like an error until it had been gone for 1 or 2 seconds. Hopefully long enough to actually have killed the workers.

  3. We have the supervisor inform the AsyncLocalizer about topologies that are in the process of being killed.
    Right now part of the issue with the race is that killing a worker can take a non-trivial amount of time. This makes the window that the race can happen in much larger. If as soon as the supervisors know that a topology is being killed they tell the AsyncLocalizer it could then not output errors for any topology in the process of being killed. The issue here is that informing the supervisors happens in a background thread and is not guaranteed to happen, so it might not work as frequently as we would like.

@danny0405
Copy link
Author

I am inclined to choose option 3 because:

  1. We now have already made an RPC request for killing/starting-worker from master to supervisors as soon as we the event happens on master. So the supervisor can inform the AsyncLocalizer [or AsyncLocalizer fetch it locally] which topology/blobs have already been removed.

  2. We can request local supervisor every time before update blobs for the little probability that AsyncLocalizer still get overdue keys.

But another situation is for the user archives and normal blob files, the model is: master make a event and AsyncLocalizer sync it at fixed interval, this model is too simple to handle this. I think there should be sync inform of PRC about add/remove events between blob-server/blob-client. Just make AsyncLocalizer sync it at fixed interval is ok, but we should not decide if blobs are remove based on KeyNotFoundException.

@danny0405
Copy link
Author

danny0405 commented Apr 20, 2018

@revans2
Sorry to leave for a long time cause i'm on a training course. Do you have any good idea how can we fix or promote this case?

@revans2
Copy link
Contributor

revans2 commented May 14, 2018

@danny0405 Sorry about the long delay. I also got rather busy with other things.

My personal choice would be a combination of 1 and 2. We have run into an issue internally where very rarely where a blob may be uploaded to nimbus as part of submitting a topology and then the blob is deleted before the topology itself can be submitted.

We are likely to fix this by using a variant of 1, something where we give ourselves a few mins after we see a blob with no corresponding topology before we decide it is fine to delete it. That should fix the issue in 99% of the cases, and also fix the upload issue.

@agresch
Copy link
Contributor

agresch commented Jul 18, 2018

Just curious what the plan is fo this PR?

@agresch
Copy link
Contributor

agresch commented Jul 18, 2018

Couple of comments back to @revans2 from Apr5.....

  1. We don't delete the blobs on the nimbus side for a while after we kill the topology. - Would we also prevent the user from doing so in this case?

  2. We don't output the stack trace until it has failed some number of times in a row. - This may be fine for the supervisor, but I see this exception trace polluting the nimbus log non-stop. Would you prevent this as well?

2018-06-25 21:49:03.928 o.a.s.d.n.Nimbus pool-37-thread-483 [WARN] get blob meta exception.
org.apache.storm.utils.WrappedKeyNotFoundException: run-1-1529962766-stormjar.jar
at org.apache.storm.blobstore.LocalFsBlobStore.getStoredBlobMeta(LocalFsBlobStore.java:256) ~[storm-server-2.0.0.y.jar:2.0.0.y]
at org.apache.storm.blobstore.LocalFsBlobStore.getBlobMeta(LocalFsBlobStore.java:286) ~[storm-server-2.0.0.y.jar:2.0.0.y]
at org.apache.storm.daemon.nimbus.Nimbus.getBlobMeta(Nimbus.java:3498) [storm-server-2.0.0.y.jar:2.0.0.y]
at org.apache.storm.generated.Nimbus$Processor$getBlobMeta.getResult(Nimbus.java:4014) [storm-client-2.0.0.y.jar:2.0.0.y]
at org.apache.storm.generated.Nimbus$Processor$getBlobMeta.getResult(Nimbus.java:3993) [storm-client-2.0.0.y.jar:2.0.0.y]
at org.apache.storm.thrift.ProcessFunction.process(ProcessFunction.java:38) [shaded-deps-2.0.0.y.jar:2.0.0.y]
at org.apache.storm.thrift.TBaseProcessor.process(TBaseProcessor.java:39) [shaded-deps-2.0.0.y.jar:2.0.0.y]
at org.apache.storm.security.auth.sasl.SaslTransportPlugin$TUGIWrapProcessor.process(SaslTransportPlugin.java:147) [storm-client-2.0.0.y.jar:2.0.0.y]
at org.apache.storm.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:291) [shaded-deps-2.0.0.y.jar:2.0.0.y]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]

@danny0405
Copy link
Author

@agresch
Yeah , the problem still exists but i have no time to fix it now, i will try to take it.

@danny0405 danny0405 closed this Jan 29, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants