Skip to content
This repository has been archived by the owner on Dec 19, 2017. It is now read-only.

CompletableFuture might complete in unexpected thread - Copycat gets unreliable #75

Open
bgloeckle opened this issue Dec 4, 2015 · 3 comments

Comments

@bgloeckle
Copy link
Contributor

Hi!

Copycat uses the class CompletableFuture heavily and expects the handlers along those "pipelines" to be called on the same thread as the complete function was called on: For example the server classes (e.g. ServerState, *State, ServerContext, ...) expect to be only called on the single thread that is created in a SingleThreadContext in the constructor of ServerContext. With this assumption, Copycat has much simpler code throughout these classes, because it does not need to take any care of multithreading. Jordan and I discussed about that already in a thread in the Google Group: https://groups.google.com/d/msg/copycat/p9j8I0SRw3M/xR7fwplvCwAJ.

Exactly the same example that I talk about in the mail thread came up for my implementation in diqube now again (diqube internally uses copycat to have a reliable way to distribute some internal data across the cluster).

In the logs of some of diqubes tests, the following lines started to show up sometimes, and in those cases the copycat server did not start up correctly:

23:42:20.629 [main] INFO  o.d.consensus.DiqubeCopycatServer [DiqubeCopycatServer.java:143] - Starting up consensus node with local data dir at '/home/diqube/.jenkins/workspace/diqube/diqube-server/data/consensus'.
23:42:20.632 [copycat-server-Address[/127.0.0.1:5101]] DEBUG i.a.c.server.storage.SegmentManager [SegmentManager.java:318] - Created segment: Segment[id=1, version=1, index=0, length=0]
23:42:20.634 [copycat-server-Address[/127.0.0.1:5101]] DEBUG i.a.c.s.s.ServerStateMachineExecutor [ServerStateMachineExecutor.java:199] - Registered value operation callback class org.diqube.cluster.ClusterLayoutStateMachine$GetAllTablesServed
23:42:20.634 [copycat-server-Address[/127.0.0.1:5101]] DEBUG i.a.c.s.s.ServerStateMachineExecutor [ServerStateMachineExecutor.java:199] - Registered value operation callback class org.diqube.cluster.ClusterLayoutStateMachine$GetAllNodes
23:42:20.634 [copycat-server-Address[/127.0.0.1:5101]] DEBUG i.a.c.s.s.ServerStateMachineExecutor [ServerStateMachineExecutor.java:190] - Registered void operation callback class org.diqube.cluster.ClusterLayoutStateMachine$RemoveNode
23:42:20.634 [copycat-server-Address[/127.0.0.1:5101]] DEBUG i.a.c.s.s.ServerStateMachineExecutor [ServerStateMachineExecutor.java:190] - Registered void operation callback class org.diqube.cluster.ClusterLayoutStateMachine$SetTablesOfNode
23:42:20.634 [copycat-server-Address[/127.0.0.1:5101]] DEBUG i.a.c.s.s.ServerStateMachineExecutor [ServerStateMachineExecutor.java:199] - Registered value operation callback class org.diqube.cluster.ClusterLayoutStateMachine$IsNodeKnown
23:42:20.635 [copycat-server-Address[/127.0.0.1:5101]] DEBUG i.a.c.s.s.ServerStateMachineExecutor [ServerStateMachineExecutor.java:199] - Registered value operation callback class org.diqube.cluster.ClusterLayoutStateMachine$FindNodesServingTable
23:42:20.635 [copycat-server-Address[/127.0.0.1:5101]] INFO  i.a.c.server.state.ServerContext [ServerContext.java:87] - Server started successfully!
23:42:20.636 [main] DEBUG i.a.copycat.server.state.ServerState [ServerState.java:461] - Address[/127.0.0.1:5101] - Single member cluster. Transitioning directly to leader.
[...more lines of diqube logging on main thread...]

The first line is logged by diqube code that tries to start up the copycat server. That then starts up without having any other nodes in the cluster (single node setup in those tests). Note that in the last line, copycats "ServerState" logs that it identified a single member cluster, but then there are no logs at all any more from the server (in the whole log of that test). That last log entry is made on a different thread, though: "main" instead of the copycat server thread. This should never happen, as copycat expects for the ServerState#join() method (which logs that line) to be called on the copycat server thread; but in this case it does not happen.
It is slightly hard to debug this, as most of the time it works just fine (and the ServerState#join() is not executed on the main thread but on the copycat server thread). Therefore I can just guess that the ServerState#transition() method calls #checkThread() which in turn then throws an IllegalStateException - I cannot see that exception in the log though, diqube continues executing the startup of the test on the "main" thread. That exception is swallowed and I guess it is swallowed in CompletableFuture somewhere.

So, I dug into why that ServerState#join() method is sometimes called on a wrong thread. First of all: diqubes implementation of the catalyst server completes a call to the #listen() method very quickly and the returned future of that method is completed quickly as well. Now, I'm pretty sure that is is what happens:

  1. diqube calls the #open method on CopycatServer
  2. This then executes (on the main thread): openFuture = context.open().thenCompose(completionFunction); (see here)
  3. The call to context.open() is executed first, therefore ServerContext#open() is executed (see here)
  4. That switches to the server thread, executes the #listen() on the catalyst server. The future that is returned by #open() is completed as soon as #listen replies.
  5. The main thread continues up until just before the "return" statement of ServerContext#open() - this means it installs the whenComplete handler on the future.
  6. The Server (on the server thread) then completes the listen-future, and then ServerContext#open() completes its future, too (after creating the new ServerState, line 85)
  7. The whenComplete of the result future of ServerContext#open() is executed and logs "Server started successfully" (this is logged in the correct thread above!)
  8. The completion of the result future of ServerContext#open() finishes (because everything is done)
  9. ServerContex#open() returns the (already completed) future
  10. CopycatServer receives the result of context.open() and then executes thenCompose(completionFunction);
  11. As the future is completed already though, the completionFunction can only be called on the current thread (= "main") by the CompletableFuture, which it does -> the completionFunction is executed on thread "main".

This shows that copycat cannot rely on the handlers that are registered on a CompletableFuture (like thenCompose, whenComplete, ...) to be called in the same thread that called the complete method on the CompletableFuture. This might work "most of the time", but that is most probably not good enough.

I have an immediate solution for my concrete problem: I can simply delay the completion of the future that is returned on my servers #listen call to later.

But I think before any release candidates or final releases of copycat can be created, the usage of all CompletableFutures throughout the whole codebase (copycat and catalyst) have to be inspected and all such problems have to be fixed, so that copycat does not rely on any timings of when CompletableFutures may be completed. (EDIT: I understood the Google Group thread in the way that copycat usually assumes that the handlers are called on the same thread. As this is obviously not true, I think all occurrences have to be inspected. If copycat does not assume this usually, then this issue might just be a single bug, not a general implementation pattern issue)
It could be a solution to do context.executor().execute(() -> ...) in each result handler again, to simply ensure that everything is executed on the correct thread. Or all those methods that rely on being executed on a specific thread (that's like at least all the methods in *State, ServerContext etc.) do that.

@bgloeckle
Copy link
Contributor Author

Just for a quick illustration of the problem: Basically this is what happens:

public class CompletableFutureTest {
  public static void main(String[] args) throws InterruptedException {
    Executor executor = Executors.newSingleThreadScheduledExecutor();

    System.out.println(Thread.currentThread().getId() + ": Main thread");

    CompletableFuture<Void> f = new CompletableFuture<>();

    executor.execute(() -> {
      try {
        f.whenComplete((r, e) -> {
          System.out.println(Thread.currentThread().getId() + ": Complete!");
        });
        Thread.sleep(1000);
        f.complete(null);
      } catch (Exception e) {
        e.printStackTrace();
      }
    });

    Thread.sleep(3000);
    f.thenCompose(v -> {
      System.out.println(Thread.currentThread().getId() + ": thenCompose!");
      return new CompletableFuture<>();
    });
  }
}

Output is:

1: Main thread
10: Complete!
1: thenCompose!

That means that the Function we pass in "thenCompose" is executed on the main thread, but copycat expects it to be executed on the same thread as the one writing "10: Complete!".

@kuujo
Copy link
Member

kuujo commented Feb 23, 2016

The Copycat client internals have been completely rewritten, but I think we still do need to do some review of that code before a full release to make sure this issue is not present. I've seen some non-intuitive behavior in CompletableFuture, particularly with thenCompose.

@bgloeckle correct me if I'm wrong, but essentially what you're seeing is if the main thread gets to the thenCompose call after the future being composed is completed, CompletableFuture will run the composed callback on the main thread rather than the thread that completed it?

A while back, we actually had similar issues with thenComposeAsync causing deadlocks in the server. I found an obscure reference to an issue in the CompletableFuture implementation that was fixed in some later Java releases, and I removed all use of thenComposeAsync.

I think I may just want to do the same thing and replace thenCompose calls with a helper for internal use that ensures composed callbacks are executed on the proper thread (the thread that completed the future). Its use only seems to be safe inside the Copycat server where futures are created and completed on the same thread. Most of the other code that completes futures on a thread other than the one on which it was created already account for this issue by explicitly completeing futures in a specific thread.

I will go through and check all the uses of this method. If this pattern isn't being used in the new client I'll close this.

@bgloeckle
Copy link
Contributor Author

@bgloeckle correct me if I'm wrong, but essentially what you're seeing is if the main thread gets to the thenCompose call after the future being composed is completed, CompletableFuture will run the composed callback on the main thread rather than the thread that completed it?

Yes, that's what I was seeing (I think that was on the "beta5" release). I did not yet check all the code again for the newer releases (I'm just in the process of switching diqube to copycat rc2), but essentially all the methods that create the "pipeline" that will be executed as soon as a CompletableFuture is completed should themselves take care of being executed on the correct thread. I think it's a good idea to have a single utility class for that in copycat.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants