Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A single Verticle consumer is processing messages concurrently for a worker thread pool #3798

Closed
guidomedina opened this issue Feb 10, 2021 · 39 comments
Assignees
Labels
Milestone

Comments

@guidomedina
Copy link

guidomedina commented Feb 10, 2021

Version

Which version did you encounter this bug? 4.0.2

Context

I'm using Vert.x Verticles to process messages sequentially per given address, up to 3.9.5 a consumer would only process messages in a sequential order but now they seem to be doing that concurrently.

Do you have a reproducer?

Yes, here is a simple unit test, it passes with 3.9.5 and fails with 4.0.2:
https://gist.github.com/guidomedina/ff20d1531bf59e046dd5fd5599918052

@Gattag
Copy link

Gattag commented Feb 10, 2021

Duplicate of #3790

@guidomedina
Copy link
Author

OK, so it is been handled already, at least I hope that unit test give you guys another perspective.

@vietj
Copy link
Member

vietj commented Feb 10, 2021

thanks @guidomedina

maybe we need to rethink the affinity between task queue and duplicate contexts to avoid concurrent access like this

I think we could introduce a new flag for worker execution, currently we have sequential/unordered, we would add a third mode that is per duplicate context that is the current behavior and rollback by default to sequential for the verticle.

@vietj vietj added this to the 4.0.3 milestone Feb 10, 2021
@Gattag
Copy link

Gattag commented Feb 10, 2021

@vietj did you take a closer look at PR #3793 ?

@Gattag
Copy link

Gattag commented Feb 10, 2021

But I do understand what you are saying which is a little different than the PR

@vietj
Copy link
Member

vietj commented Feb 10, 2021

#3793 seems to bring lot of complexity, I will have a look at the original issue and this one

@vietj
Copy link
Member

vietj commented Feb 10, 2021

it seems to me we should

  • rollback to the 3.x behavior
  • introduce a new execute blocking mode which is sequential per duplicate context
  • use this new mode in vertx-jdbc-client and other appropriate places

@Gattag
Copy link

Gattag commented Feb 10, 2021

@vietj So I like that idea at the surface, but I do see some issues with it.

  • As more aspects of vert.x get tracing support, more handlers will boot the TaskQueue back to the parent context or a new DuplicateContext as each new duplicate would be its own TaskQueue
  • Supporting this new mode of blocking execution will not be possible with the RxHelper schedulers. The behavior will be undefined. The behavior is always going to be undefined when we start adding multiple queues to a single context, but I believe that limiting it to only when needed rather than giving it to the user will work out better

While the PR does add some complexity. It does properly handle the issues of new duplicates for tracing being produced, as those will propagate the TaskQueue of a SubstitutedContext through new duplicates.

@Gattag
Copy link

Gattag commented Feb 10, 2021

context0.executeBlocking(ignored0 -> {
  //excutecutes on the main TaskQueue for context0
  JDBC jdbc = /* whatever */
  jdbc.getConnection(con ->{
    //executes on a new TaskQueue per invocation.
    vertx.eventBus().consume("addr", msg ->{
      vertx.executeBlocking(ignored1 -> {
        //excutecutes on the main TaskQueue for context0 because there is a new DuplicatedContext.
      });
    });
  });
});

If you don't completely separate the DuplicatedContext from the TaskQueue, that will be an issue. That's why I made the SubstitutedContext. You could also just tell them to save the context, but I don't know if that will work perfectly with other things and it requires them to keep track of the context.

I know it is a weird case with the placement of the eventbus consumer, but I imagine more things will get tracing support and need a DuplicatedContext so more handlers will be returning one in the future. And I think the issue is of even more importance on worker verticles

@andrm
Copy link

andrm commented Feb 10, 2021

IMHO there should be no guarantees at all with executeBlocking/workers at least with regard to ordering. I think Ordered=true should be deprecated, because it implies one (identifiable) taskqueue, which a Context would need to enforce.

Maybe it would be a good idea to tell executeBlocking on which Context to execute if the user should have control over it.
(just my 2 cents)

@guidomedina
Copy link
Author

guidomedina commented Feb 10, 2021

Even though ordering and concurrency are closely related, the issue I'm reporting is the fact that a single consumer on a single Verticle instance is able to consume messages concurrently, this IMHO breaks the core concept of what originally a Verticle is and does, which is similar to the guarantees you get in Akka for example, you expect a single actor modifying state, but in this case such contract is broken as demonstrated with my unit test.

@vietj
Copy link
Member

vietj commented Feb 10, 2021 via email

@andrm
Copy link

andrm commented Feb 10, 2021

Even though ordering and concurrency are closely related, the issue I'm reporting is the fact that a single consumer on a single Verticle instance is able to consume messages concurrently, this IMHO breaks the core concept of what originally a Verticle is and does, which is similar to the guarantees you get in Akka for example, you expect a single actor modifying state, but in this case such contract is broken as demonstrated with my unit test.

@guidomedina The contract is valid if you are executing on the event loop, it is broken as soon as you use executeBlocking(). If you insist on ordered blocking execution all the benefits of the Vertx model will go down the drain. I would advise the following:

  • Please re-think if you really need total order. Very often this is not the case. Whatever you are doing in your blocking code, does it really have a relationship with the previous and next message? Multimedia streams and files/blobs broken up in pieces are things that I know where it is needed, but you wouldn't use the eventloop/EventBus for that but a normal TCP stream. Logging is the other thing but there are low latency solutions for that so you don't need executeBlocking().
  • If you decide that you need it, consider using a CompletableFuture with a single-threaded Executor, chain them and the store the latest CompletableFuture in the Context. This should give you the desired behavior.
  • If you don't want to do that, maybe Vertx is not the right tool for your use case. You are forcing everything into one thread basically. If the arrival rate of the messages on the event bus exceeds the execution rate in the executeBlocking() code, the application will have high latency problems. I doubt that Akka can do it better. Project Loom is the only thing that may help here.

BTW: Reduce setWorkerPoolSize() to 1. Your test will pass.

@Gattag Old behavior is ok, but it is still confusing and it's not easy for Vertx maintainers to support it, code-wise and user-wise.

@vietj and vertx maintainers Please keep this as simple as possible, keep the model simple. Otherwise you will get a lot of bug reports / issues with hard-to-reproduce concurrency bugs. I'm using Vertx since 2016, very successfully. It's better not to support all cases and not to overpromise. With DuplicatedContext I think you've worked around old JDBC code and got better behavior but the truth is that these problems need to be fixed in the driver, preferrably an asynchronous one.

@guidomedina
Copy link
Author

guidomedina commented Feb 10, 2021

I have a complex queuing system sitting on top of Vert.x where each message processor do some heavy I/O, let me describe it:

  • each processor has N addresses: address[0 - (N - 1)]
  • each processor can have X threads processing messages
  • messages with the same hash will go into the same address

What are the promises of my system:

  • messages sent to a specific address need to be processed sequentially so order is required
  • messages sent to different addresses can be processed in parallel

I also have a HTTP and a TCP servers using Vert.x, this system has been in place for 3 years now, it was fine until 3.9.5 but now it isn't working with 4.0.x

I simply came here because something was working and now is not, I'm not asking Vert.x guys to over-promise on anything, it will just take me sometime to migrate it to something like Akka.

I have used Vert.x and Akka in 2 projects now, including a low latency distributed trading system, I don't mean to over-burden Vert.x with things that aren't its specialty.

@andrm
Copy link

andrm commented Feb 10, 2021

This is weird, how did you make sure that the Vertx was able to parallelize the messages to different addresses but not to the same address? Did you deploy a verticle for each address?

@guidomedina
Copy link
Author

guidomedina commented Feb 10, 2021

Did you deploy a verticle for each address?

Yes, there are not too many addresses, but some parallelism is required, so an Actor system was too much for me.

@andrm
Copy link

andrm commented Feb 10, 2021

Ok, but that would mean you can afford a single-threaded Executor in each of your verticle. This way you are always sure about in- order processing. Are you sure that the heavy IO code cannot become a bottleneck if too many messages arrive?

@guidomedina
Copy link
Author

Ok, but that would mean you can afford a single-threaded Executor in each of your verticle. This way you are always sure about in- order processing. Are you sure that the heavy IO code cannot become a bottleneck if too many messages arrive?

If a message with the same hash or mod addresses.length blocks another that's fine, but we can't have one thread per address because that would kill our system I/O resource sharing, so that's why a worker thread pool with a limited amount of threads is assigned to each type of processor.

Some of the processors have something like 64 addresses with 4 threads so they are maxed out with minimal hash coalition, I know I know, Akka offers this but it was easier to do with Vert.x

@andrm
Copy link

andrm commented Feb 10, 2021

I understand. Did it work with 3.9.5 and workerPoolSize set to 4 for each verticle?

@guidomedina
Copy link
Author

guidomedina commented Feb 10, 2021

But I do believe the idea of worker threads is kind of dumb, maybe it should be eliminated and just let the user pick between main loop thread(s) or another thread pool, for example it would be something like:

  • if non-blocking then main loop thread(s) is fine
  • if blocking, pass your deploy options to use a different thread pool

@guidomedina
Copy link
Author

guidomedina commented Feb 10, 2021

I understand. Did it work with 3.9.5 and workerPoolSize set to 4 for each verticle?

Up to 3.9.5 the system is working flawlessly, but you have to remember that if you name the worker thread pool it doesn't create a thread pool per Verticle, so it is basically one thread pool handling N addresses with N verticles and X threads

@andrm
Copy link

andrm commented Feb 10, 2021

Yes, that's the idea.

I have a different idea, I did a similar thing a while ago:

  • Run all addresses in one verticle
  • If a message arrives (in the message handler), check with context.get("address") if there is a CompletableFuture object there
  • if no, create one, let the heavy i/o code run in it and store it in the Context: context.put("address", cf)
  • if yes: check if it completed: if no, chain a new CF (.andThen() ) with the new data for this address to the old CF
  • put the new CF into the context (context.put(...)
  • done

(Maybe this can be done with Vertx Futures/Promises too)

@andrm
Copy link

andrm commented Feb 11, 2021

From the docs https://vertx.io/docs/vertx-core/java/#_verticles:

Worker verticle instances are never executed concurrently by Vert.x by more than one thread, but can executed by different threads at different times.

@guidomedina this is the documentation that you are referring to. I apologize to you, the contract seems to be exactly what you said. I still believe it is wrong and should be deprecated.

@vietj
Copy link
Member

vietj commented Feb 11, 2021 via email

@vietj
Copy link
Member

vietj commented Feb 11, 2021

I think there are two things to address here for duplicate context:

1/ the worker execution that is currently broken
2/ the executeBlocking case that I believe is correct, i.e each duplicate context has its own execute blocking order by default but might not be sufficient in some case

@vietj
Copy link
Member

vietj commented Feb 11, 2021

I've made a draft branch to address the worker verticle execution here #3802

we can use it for execute blocking execution too

@vietj
Copy link
Member

vietj commented Feb 11, 2021

@guidomedina can you check the branch I created ? I run your test and now it passes again

@vietj
Copy link
Member

vietj commented Feb 11, 2021

@Gattag you said this is a duplicate of the execute blocking task but it is not, executeBlocking has a different semantic than worker execution (also it allows parallel execution happening).

@vietj
Copy link
Member

vietj commented Feb 11, 2021

I think that execute blocking contract should be clarified on how it works for duplicate context, perhaps that an ordered execution by duplicate context is not the good default. I believe it is since

1/ this is most of the time what the user intend to have (e.g this solve work arounds we used to have for JDBC client)
2/ this is more efficient
3/ the user should not concurrently update state in the executeBlocking block but instead in the callback of the executeBlocking that will be executed according the verticle semantic

@Gattag
Copy link

Gattag commented Feb 11, 2021

@vietj I said it was a duplicate because they are caused by the same thing under the hood, but they do have different semantics to an extent which I didn't fully conceptualize

I agree that it is more efficient, than what I came up with, but the more I think about it, the more I think that the added taskqueue behavior should be reverted in its entirety.

The biggest thing I don't get after looking more at jdbc client now is why was storing a task queue per connection considered a work around? At this point all I see is DuplicatedContext with the extra task queue causing way more hassle down the line than its worth.

@andrm
Copy link

andrm commented Feb 11, 2021

@vietj I agree, but how easy is it to tell the user not to update the state?

My main concern is the Context object itself which has a lot of state. Does duplicate context have it's own Map for put/get?

@andrm
Copy link

andrm commented Feb 11, 2021

The biggest thing I don't get after looking more at jdbc client now is why was storing a task queue per connection considered a work around? At this point all I see is DuplicatedContext with the extra task queue causing way more hassle down the line than its worth.

I fully agree @Gattag ! Thanks!

@Gattag
Copy link

Gattag commented Feb 11, 2021

3.9 concurrency behavior is what we should have in 4.x, I'm sorry to sorta change gears here from earlier, it just really doesn't seem like this will play out cleanly and it adds cases for weird thing to occur. I'm not sure if I'm not seeing the full picture here and I'm sorry if I'm wasting time with all these questions. I'm just concerned

@vietj
Copy link
Member

vietj commented Feb 11, 2021 via email

@vietj
Copy link
Member

vietj commented Feb 11, 2021 via email

@Gattag
Copy link

Gattag commented Feb 11, 2021

@vietj Yes

@Gattag
Copy link

Gattag commented Feb 11, 2021

But really what I mean is that, DuplicatedContext should have its own localData and that's it. There should be no special context with a special task queue

@Gattag
Copy link

Gattag commented Feb 11, 2021

The exception would be that internally jdbc could use a special context for that (A context that is not reachable outside of the internal workings of jdbc, so callbacks would not provide this context), but what I was saying earlier, I'm not grasping what was wrong with the solution of storing an independant TaskQueue with each JDBCConnection. I thought I understood before, but I realized I don't

@guidomedina
Copy link
Author

@guidomedina can you check the branch I created ? I run your test and now it passes again

Looks good, the test is spot on (except for the comment I made), I should had probably made my tests much simpler like you did.
Moving forward, maybe future 4.1.x or 4.2.x here are my two cents of what we should probably do:

  • Eliminate the concept of worker thread pool, instead allow Verticles to be ran on the default or provided thread pool, maybe provide different implementations of thread pools more fit to blocking vs non-blocking tasks, for example; Akka provides a modified version of Fork-Join vs traditional thread pool
  • Keep the contract of sequential execution per Verticle/Consumer to be able to work with Verticle state as if they were single threaded
  • Don't promise thread affinity but ordering, a message can be processed by any thread of the assigned thread pool but always in a sequential order if they are sent to the same verticle/address/consumer
  • I assume that after Java 5 state modified by operations done before will be visible to operations done after so it is safe to promise state visibility from previous to next message

In other words, for any kind of thread pool the following being true will make Vert.x very reliable, having one producer and one verticle with one consumer the following should be true regardless of how many threads its assigned thread pool has:

  • if the same producer send 2 messages to the same verticle/address/consumer both messages should arrive in the same order in which they were sent and
  • they should be processed sequentially in the same order they were sent

@vietj vietj self-assigned this Feb 19, 2021
@vietj vietj closed this as completed Feb 22, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants