Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize time between job creation and handling #11231

Closed
40 tasks done
npepinpe opened this issue Dec 9, 2022 · 20 comments
Closed
40 tasks done

Optimize time between job creation and handling #11231

npepinpe opened this issue Dec 9, 2022 · 20 comments
Assignees
Labels
component/engine component/gateway kind/epic Categorizes an issue as an umbrella issue (e.g. OKR) which references other, smaller issues

Comments

@npepinpe
Copy link
Member

npepinpe commented Dec 9, 2022

Description

We would like to reduce the time between a job is created until it is received by a JobHandler. One idea is to explore a push-based approach, where workers can open long living streams and receive jobs as soon as they are available, without having to poll for them.

To reiterate, the primary goal is: reduce the time between job creation and client handling. It is not to create a push-based pipeline - this is simply a proposed solution.

As a first step, we'll investigate a push based approach. The first iteration will consist of getting feedback on the performance gain as early as possible. This means ignoring most failure paths, most edge cases, QoL features, etc.

Prototype

  1. component/engine component/gateway kind/research
    npepinpe
  2. component/engine component/gateway kind/research
    npepinpe

Organizational

The initial alpha target will implement end-to-end job pushing (from broker to client), with naive solution for the more complex problems (log persistence, failure handling, and flow control), and will only implement support for the Java client.

Alpha

  1. component/broker component/engine component/stream-platform kind/toil version:8.2.0
    deepthidevaki koevskinikola
    npepinpe
  2. component/broker kind/toil version:8.2.0
    deepthidevaki npepinpe
  3. 11 of 11
    kind/epic
    berkaycanbc koevskinikola
  4. component/broker component/transport kind/toil version:8.2.0
    deepthidevaki npepinpe
  5. kind/toil version:8.3.0 version:8.3.0-alpha1
    deepthidevaki npepinpe
  6. version:8.3.0 version:8.3.0-alpha1
    deepthidevaki npepinpe
  7. area/reliability component/transport kind/toil
    deepthidevaki
  8. area/observability component/transport kind/toil version:8.3.0 version:8.3.0-alpha1
    npepinpe
  9. area/performance component/transport kind/toil version:8.3.0 version:8.3.0-alpha1 version:8.3.0-alpha2
    deepthidevaki
  10. area/maintainability area/reliability component/gateway kind/toil version:8.3.0 version:8.3.0-alpha1
    deepthidevaki
  11. component/gateway kind/toil version:8.3.0 version:8.3.0-alpha2
    npepinpe
  12. component/gateway kind/toil version:8.3.0 version:8.3.0-alpha4
    berkaycanbc npepinpe
  13. component/gateway kind/toil version:8.3.0 version:8.3.0-alpha4
    berkaycanbc npepinpe
  14. component/gateway kind/toil version:8.3.0 version:8.3.0-alpha1
    deepthidevaki npepinpe
  15. kind/feature version:8.3.0 version:8.3.0-alpha4
    berkaycanbc npepinpe
  16. component/clients kind/feature scope/clients-java
    koevskinikola npepinpe
  17. area/test kind/toil version:8.3.0 version:8.3.0-alpha5 version:8.3.0-alpha6
    npepinpe
  18. kind/toil version:8.3.0 version:8.3.0-alpha4
    npepinpe
  19. area/performance area/reliability component/transport kind/bug version:8.3.0 version:8.3.0-alpha4
  20. component/gateway component/transport kind/toil
  21. area/maintainability area/performance component/gateway kind/toil version:8.3.0 version:8.3.0-alpha5 version:8.3.0-alpha6
    npepinpe
  22. component/transport kind/bug version:8.3.0 version:8.3.0-alpha3
    npepinpe
  23. component/transport kind/toil version:8.1.14 version:8.2.8
    deepthidevaki
  24. component/broker component/gateway kind/bug severity/low version:8.3.0 version:8.3.0-alpha3
    npepinpe
  25. component/gateway kind/feature version:8.3.0 version:8.3.0-alpha4
    berkaycanbc
  26. kind/feature version:8.3.0 version:8.3.0-alpha5 version:8.3.0-alpha6
    npepinpe
  27. area/performance component/clients kind/toil
    rodrigo-lourenco-lopes
  28. area/test component/clients kind/toil version:8.3.0 version:8.3.0-alpha5 version:8.3.0-alpha6
    rodrigo-lourenco-lopes
  29. kind/toil severity/low version:8.3.0 version:8.3.0-alpha6
    npepinpe

Production

  1. area/test kind/toil
  2. kind/toil
    npepinpe

Timeline

We'll approach this in two phases. Alpha and production-ready. Target for the alpha scope is 8.3.0-alpha2 (so the June release). Target for production-ready would be 8.3 (so September).

@npepinpe npepinpe added the kind/toil Categorizes an issue or PR as general maintenance, i.e. cleanup, refactoring, etc. label Dec 9, 2022
@npepinpe npepinpe self-assigned this Dec 9, 2022
@npepinpe npepinpe added kind/epic Categorizes an issue as an umbrella issue (e.g. OKR) which references other, smaller issues and removed kind/toil Categorizes an issue or PR as general maintenance, i.e. cleanup, refactoring, etc. labels Dec 9, 2022
@npepinpe npepinpe changed the title Write proposal for push-based job activation pipeline Optimize time between job creation and handling Dec 9, 2022
@npepinpe
Copy link
Member Author

A quick summary (you can read more in the prototype issue), the prototype was a success I think. There's still many issues to solve, but it's clear the approach is much faster:

  • For the common case, you skip having to send an ACTIVATE command. This means removing a required commit from your hot path.
  • For the common case, it removes the need to query every partition from the gateway, so you don't need to wait for each request.

It's however a more complex approach than simple polling, and we would likely want to continue supporting polling, at least for the foreseeable future, which means increasing the complexity of the application. That said, the performance is much, much faster for the happy path, so it's worth pursuing.

@npepinpe
Copy link
Member Author

I've updated the issue with a tentative breakdown for the alpha target. Next week, me, @deepthidevaki, and @koevskinikola will fix the scope for the alpha and production-ready target and finalize a provisional timeline of deliverables. This will be a soft timeline to help us better organize our time, not a fixed one.

@npepinpe
Copy link
Member Author

Alpha scope

Note The main goal of the first alpha is to provide the minimum fundamental parts of the feature on which we can iterate. The idea is that we cannot properly compare various solutions for flow control, failure handling, job routing, etc., without having the base pipeline in place.

Clients will be able to open so-called job streams. A job stream will target a specific job type, and will define a set of job activation properties identical to the existing job worker. A client job stream will consist of a long living unidirectional stream between the client and the gateway, on which the gateway will push ActivatedJob instances. This ensures compatibility with the JobHandler interface and the existing JobWorker API.

The gateway will aggregate clients by their activation properties. For the first client with logically equivalent activation properties, it will open a stream to each broker for this job type, passing these properties and a unique ID identifying the aggregated stream. Clients which open a stream after that, on this gateway, will not cause the gateway to send a request to the broker. On the last client to close a job stream of a given type, the gateway will send a request to the broker to close the stream for this type to itself.

Note Both the gateway and the broker will use our membership service to detect membership changes, and add/remove job streams accordingly.

The broker will have a new top-level service: the job stream service. This consists of a shared job stream registry, an API request handler to add/remove streams from the registry, and a streaming client to push activated jobs to registered streams.

The job stream registry will provide a synchronous, thread-safe API to fetch a job stream, if any, for a given job type. This must be synchronous since it will be accessed by the engine during command processing. A job stream in the registry will consist of an association of a job type, a set of job activation properties, and a list of possible recipients (gateway + unique aggregated stream ID).

The API request handler will receive the gateway’s open/close stream requests, and add/ remove the gateway from the possible recipients for activated jobs of a given type. When the last recipient is removed from a job stream, it is removed from the registry.

When a job would be made available, the engine will query the registry for an available stream. If there is none, then the job is made activate-able, and the job type is broadcasted. This is important for compatibility with the current long-polling approach. If there is one, then the job will be activated immediately during the same command processing, using the stream’s job activation properties. Once activated, the job and its variables are handed over (as a side-effect) to a specific consumer which will then forward the job to the stream's gateway (including the job, its key, variables, and the unique aggregated stream ID).

In more details:

Client

  • The Java client can open a long living stream which can activate jobs and receive them. It can pass a worker name, activation timeout, and a set of fetch variables. The semantics for activation will be the exact same ones as that with the ActivateJob command. It will receive a stream of ActivatedJob instances, such that the results can be fed to a JobHandler that it shares with a JobWorker.
  • Activation of existing jobs before the stream is created will be handled initially by the existing job worker/poller.
    • As a stretch goal, if possible, we can integrate it as an opt-in feature of the job worker, so that the stream can be used seamlessly with the worker (i.e. the same job handler is reused). As we don't have experimental features yet on the client-side, we should consider whether this is worth doing now or wait.
  • The client will emit a metric of how many jobs it receives (note: if this is already achievable with standard gRPC metric interceptors, this can be omitted).
  • If the stretch goal is met, then we should also implement Introduce JobWorker metrics for the Java client #4700

Gateway

Stream management

  • The gateway will implement a new JobStream API, which can handle long-living client streams. Streams will be aggregated by their activation properties, and the aggregated stream given a unique ID.
    • When a new aggregated stream is created, the gateway will register it to all brokers.
    • When a new aggregated stream is removed, the gateway will remove it from all brokers.
    • When a new broker is added to the topology, the gateway will register each of its existing aggregated streams.
    • When the gateway is shutting down, it should broadcast a single, best-of effort request to all brokers to remove its streams.
    • When a client is detected as dead, if it is the last client for the aggregated stream, the gateway will notify all brokers to remove this stream.
  • The gateway will emit metrics of how many clients are registered and how many unique streams there are.

Job proxy

  • The gateway will implement a new internal endpoint to receive a job, a job key, and the aggregated stream unique ID.
    • Received jobs will be transformed into ActivatedJob and forwarded in a round-robin fashion to one of the client streams associated with the aggregated stream.
    • If there are no clients available, the gateway will yield the job back to its partition leader.
    • If the client is detected as dead when forwarding the job, the gateway will yield the job back to its partition leader.
  • The gateway will emit metrics reporting how many jobs it receives per partition, the rate of reception, and how many jobs it forwards, and the rate of forwarding.

Broker

Stream management

  • The broker will implement a new API endpoint to manage aggregated job streams, where gateways can add and remove unique streams, and remove all streams associated to a gateway.
    • When a gateway is removed from the topology, all associated streams should be removed from the broker.

Job push

  • The broker will expose via an interface the set of registered unique streams and their activation properties to the stream platform, such that the engine can activate jobs with these properties.
  • The broker will implement a new component which will receive an activated job, its key, its variables, and the activation properties which were used to activate the job.
    • This is to be exposed to the engine, indirectly via an API of course.
  • This component try pushing the job to all associated recipients for the given activation properties.
    • We can implement this in a very naive way initially, best-of effort well distributed, e.g. randomly or round-robin.
    • When a job fails to be pushed, the broker will retry the next possible recipient for the given activation properties.
    • If none of the recipients succeed, we can use a log stream writer to yield the job back to the engine.
    • If there was a leader change in between, the job can be considered "lost".
    • There is initially no flow control.

Job activation

  • When a job would be made available, we should always try to activate it immediately if possible using the API described above. This means:
    • When a job fails, if there are retries left
    • When a job is created
    • When a job is retried with back off
    • When a job timed out
    • When a job-incident is resolved

@npepinpe
Copy link
Member Author

npepinpe commented Mar 10, 2023

Open questions relating to the production-ready scope. For each, we should come up with plausible test scenarios which we will use to identify and define problems, as well as refine and iterate over potential solutions.

Workload distribution

In the alpha scope, we pick a random stream out of all registered streams, and activate the job for it, then push it downstream. This is sub-optimal, as it does not take into account slower workers (e.g. in a different network region, lower resources, unstable workers, performance varying by input/payload). Furthermore, there is a risk of workers being overwhelmed by brokers.

Possible test scenarios:

  • Too few workers to handle throughput of cluster-wide job pushes.
    • Will the workers crash?
    • What is the impact on overall process latency? With and without flow control?
  • Half the workers are much slower than the other half.
    • What is the impact on throughput?
    • What is the impact on overall processing latency? With and without flow control?

Back-filling job streams

In the alpha scope, we will rely on the existing long-polling solution (without modification) to activate jobs that were made activate-able could not be pushed downstream. This is potentially sub-optimal if there is a large backlog of such jobs, as we could already start pushing them downstream without having to wait for a JobBatch.ACTIVATE command.

Possible test scenario:

  • Create a large backlog of jobs with no streams or workers available, then register streams and start workers.
    • What is the impact on latency if jobs can be pushed (i.e. back-filled), versus only polled?

Intelligent failure handling

In the alpha scope, we will adopt a naive approach to failures.

  • When the broker fails to push a job downstream, the job is yielded back to the engine, which should make it activate-able again via a Job.FAIL command. This indirectly causes a retry, but only after the command is processed asynchronously.
  • When the gateway fails to forward a job downstream to a client, it will yield it back to the broker by sending a Job.FAIL command.

Since streams can be logically identical, as identified by their type and activation properties, it should be possible to intelligently forward a job to another stream if possible, minimizing interactions between the gateway/broker, or the broker/engine, and potentially removing a complete commit barrier from the latency profile.

NOTE It's a bit difficult here to create controlled test scenarios, so ideas are welcome! But ideally, we would be able to inject failures between the broker/gateway, or gateway/client, and observe the difference in overall latency when retrying immediately to another stream instead of yielding the job back to the engine.

This may end up being quite complex when mixed with the flow control solution, so we will want to approach everything one step at a time.

Batch stream registry API

For the alpha scope, we implemented simple single-item RPCs for the gateway/broker stream API. Gateways can add/remove one stream at a time, or remove all associated streams. It may be useful to allow adding/removing multiple streams at once in the future. For example, when a new broker joins the cluster, every gateway will want to add all of their streams to that broker. If there is a high enough number, it's much more efficient to send fewer batched requests with all of them.

Possible test scenario:

  • With a cluster of 3 brokers and 2 gateways, shutdown one broker, then create a high number of streams, e.g. 100 per gateway. Then start the broker, and measure impact on overall latency (including impact on the gateway) and startup (on the broker). First without batching, then if we see it could be an issue, with.

@deepthidevaki
Copy link
Contributor

An open question from #11713 (comment)

  • Optimize retrying of request sent from client to server. Currently AddRequest and RemoveRequest are retried indefinitely. What is the impact of this? Should we stop retrying at some point? When we stop retrying, should we return an error to the client? Should we use backoff for retry?

@npepinpe
Copy link
Member Author

Another open question:

  • When pushing a job from the broker to the gateway times out, how should we handle this? We probably don't want to retry, as this could cause the job to be sent out twice. Do we want some form of handshake between broker/gateway before forwarding the job?

@deepthidevaki
Copy link
Contributor

Added #12663 as follow up.

@berkaycanbc
Copy link
Contributor

A note from our meeting with @koevskinikola and @npepinpe.

In the initial phase, if pushing a job fails, we yield the job back and make it available (activatable) for long polling. In other words, initially, there will not be a mechanism to push the job again when it fails pushing.

@npepinpe
Copy link
Member Author

Another open question, when yielding jobs back on failure, we could run into the following problems:

  • Thundering herd, where we suddenly see a spike of failed jobs, resulting in a lot of commands being appended to the log at once. This leads to delays in further commands across the partition.
  • Related, but yielding a job when the log is backed up could result in this job taking, say, 15 seconds to be yielded. If it will time out in 5, this is just pointless busy work.

@deepthidevaki
Copy link
Contributor

Related, but yielding a job when the log is backed up could result in this job taking, say, 15 seconds to be yielded. If it will time out in 5, this is just pointless busy work.

What would happen right now, if this happens?

  1. Push failed
  2. Yield command appended
  3. Job timed-out
  4. Job pushed, or activated by polling
  5. Yield command is processed

@npepinpe
Copy link
Member Author

npepinpe commented May 15, 2023

Genau, 💥

We might want to include some form of sequencing or lease as part of the job yield, i.e.

  • Activate job, returns you a "lease" (can be anything - logical clock, timestamp, the activated record position, etc.)
  • On yield, you yield back that specific lease. If the current holder is not the same, it's a no-op

@koevskinikola
Copy link
Member

I've included #12773 in the tasks related to this ticket so we don't loose track of the issue.

ZPA will try to schedule time for it once work on the gateway/client side for this topic is done.

@npepinpe
Copy link
Member Author

npepinpe commented Jul 3, 2023

Another thought:

We may want to preemptively close long living gRPC streams once they reach a certain age to allow rebalancing between gateways and avoid having too many clients on the same gateway. This can be done in a future iteration, but it's likely that users will eventually run into the problem of having too many clients converging to the same gateways if they live long enough.

@npepinpe
Copy link
Member Author

As we're nearing the end of the alpha scope, I would propose the following for the production scope:

And of course fixing any bugs and issues we find in the mean time 🙃

@deepthidevaki
Copy link
Contributor

Intelligent error handling

Don't we already retry with identical stream on both gateway and broker? What is missing?

@npepinpe
Copy link
Member Author

Hm, I thought I had expanded that. The missing part (which is not in the linked comment, even if I was sure I'd added it 😄) is about thundering herd issues. If we pushed out many jobs, and suddenly all fail (e.g. all time out), we would be yielding many jobs back and possibly overloading the writer/processor, leading to high back pressure.

@npepinpe
Copy link
Member Author

npepinpe commented Sep 6, 2023

One caveat to anyone thinking of using this before it's production ready. We already ran benchmarks with larger clusters, and without flow control, you not only have to scale your workers to accomodate the peak load at all times, but you have to also scale your gateways. There is no failsafe in the gateway to reject requests, so it will keep allocating memory to cope and will eventually crash.

When introducing flow control, we'll likely want to add two form of limiting: per client, and per gateway (since the gateway can only do so much anyway).

@korthout
Copy link
Member

korthout commented Jan 2, 2024

@npepinpe Do we have plans to support the stream activated jobs RPC in zeebe-process-test?

zeebe-bors-camunda bot added a commit to camunda/zeebe-process-test that referenced this issue Jan 2, 2024
1012: Support using process instance migration from the zeebe client r=korthout a=korthout

## Description

<!-- Please explain the changes you made here. -->

This adds support for the new `MigrateProcessInstance` RPC to ZPT, such that users can try migrating process instances from ZPT. This allows testing migrations before doing so in production.

> [!NOTE]
> This does not add assertions for the migration. That is out of scope.

Additionally, this fixes a test case that no longer added value. The test case was intended to ensure that we don't forget supporting RPCs in ZPT, but the implementation of the test did not function correctly anymore. Now it is able to correctly detect unsupported RPCs again, which [highlighted that `streamActivatedJobs` is not supported by ZPT](camunda/zeebe#11231 (comment)).

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #972

<!-- Cut-off marker
_All lines under and including the cut-off marker will be removed from the merge commit message_

## Definition of Ready

Please check the items that apply, before requesting a review.

You can find more details about these items in our wiki page about [Pull Requests and Code Reviews](https://github.com/camunda/zeebe/wiki/Pull-Requests-and-Code-Reviews).

* [ ] I've reviewed my own code
* [ ] I've written a clear changelist description
* [ ] I've narrowly scoped my changes
* [ ] I've separated structural from behavioural changes
-->

## Definition of Done

<!-- Please check the items that apply, before merging or (if possible) before requesting a review. -->

_Not all items need to be done depending on the issue and the pull request._

Code changes:
* [x] The changes are backwards compatibility with previous versions
* [ ] If it fixes a bug then PRs are created to backport the fix

Testing:
* [x] There are unit/integration tests that verify all acceptance criterias of the issue
* [x] New tests are written to ensure backwards compatibility with further versions
* [ ] The behavior is tested manually

Documentation:
* [ ] Javadoc has been written
* [ ] The documentation is updated


Co-authored-by: Nico Korthout <nico.korthout@camunda.com>
zeebe-bors-camunda bot added a commit to camunda/zeebe-process-test that referenced this issue Jan 2, 2024
1012: Support using process instance migration from the zeebe client r=korthout a=korthout

## Description

<!-- Please explain the changes you made here. -->

This adds support for the new `MigrateProcessInstance` RPC to ZPT, such that users can try migrating process instances from ZPT. This allows testing migrations before doing so in production.

> [!NOTE]
> This does not add assertions for the migration. That is out of scope.

Additionally, this fixes a test case that no longer added value. The test case was intended to ensure that we don't forget supporting RPCs in ZPT, but the implementation of the test did not function correctly anymore. Now it is able to correctly detect unsupported RPCs again, which [highlighted that `streamActivatedJobs` is not supported by ZPT](camunda/zeebe#11231 (comment)).

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #972

<!-- Cut-off marker
_All lines under and including the cut-off marker will be removed from the merge commit message_

## Definition of Ready

Please check the items that apply, before requesting a review.

You can find more details about these items in our wiki page about [Pull Requests and Code Reviews](https://github.com/camunda/zeebe/wiki/Pull-Requests-and-Code-Reviews).

* [ ] I've reviewed my own code
* [ ] I've written a clear changelist description
* [ ] I've narrowly scoped my changes
* [ ] I've separated structural from behavioural changes
-->

## Definition of Done

<!-- Please check the items that apply, before merging or (if possible) before requesting a review. -->

_Not all items need to be done depending on the issue and the pull request._

Code changes:
* [x] The changes are backwards compatibility with previous versions
* [ ] If it fixes a bug then PRs are created to backport the fix

Testing:
* [x] There are unit/integration tests that verify all acceptance criterias of the issue
* [x] New tests are written to ensure backwards compatibility with further versions
* [ ] The behavior is tested manually

Documentation:
* [ ] Javadoc has been written
* [ ] The documentation is updated


Co-authored-by: Nico Korthout <nico.korthout@camunda.com>
zeebe-bors-camunda bot added a commit to camunda/zeebe-process-test that referenced this issue Jan 2, 2024
1012: Support using process instance migration from the zeebe client r=korthout a=korthout

## Description

<!-- Please explain the changes you made here. -->

This adds support for the new `MigrateProcessInstance` RPC to ZPT, such that users can try migrating process instances from ZPT. This allows testing migrations before doing so in production.

> [!NOTE]
> This does not add assertions for the migration. That is out of scope.

Additionally, this fixes a test case that no longer added value. The test case was intended to ensure that we don't forget supporting RPCs in ZPT, but the implementation of the test did not function correctly anymore. Now it is able to correctly detect unsupported RPCs again, which [highlighted that `streamActivatedJobs` is not supported by ZPT](camunda/zeebe#11231 (comment)).

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #972

<!-- Cut-off marker
_All lines under and including the cut-off marker will be removed from the merge commit message_

## Definition of Ready

Please check the items that apply, before requesting a review.

You can find more details about these items in our wiki page about [Pull Requests and Code Reviews](https://github.com/camunda/zeebe/wiki/Pull-Requests-and-Code-Reviews).

* [ ] I've reviewed my own code
* [ ] I've written a clear changelist description
* [ ] I've narrowly scoped my changes
* [ ] I've separated structural from behavioural changes
-->

## Definition of Done

<!-- Please check the items that apply, before merging or (if possible) before requesting a review. -->

_Not all items need to be done depending on the issue and the pull request._

Code changes:
* [x] The changes are backwards compatibility with previous versions
* [ ] If it fixes a bug then PRs are created to backport the fix

Testing:
* [x] There are unit/integration tests that verify all acceptance criterias of the issue
* [x] New tests are written to ensure backwards compatibility with further versions
* [ ] The behavior is tested manually

Documentation:
* [ ] Javadoc has been written
* [ ] The documentation is updated


Co-authored-by: Nico Korthout <nico.korthout@camunda.com>
@npepinpe
Copy link
Member Author

npepinpe commented Jan 3, 2024

We should, although it's perfectly usable without.

@npepinpe
Copy link
Member Author

npepinpe commented Jan 4, 2024

All tasks were completed or pushed to the backlog, 🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component/engine component/gateway kind/epic Categorizes an issue as an umbrella issue (e.g. OKR) which references other, smaller issues
Projects
None yet
Development

No branches or pull requests

6 participants