Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC]: Engine Abstraction #9602

Closed
Zelldon opened this issue Jun 24, 2022 · 9 comments
Closed

[POC]: Engine Abstraction #9602

Zelldon opened this issue Jun 24, 2022 · 9 comments
Assignees

Comments

@Zelldon
Copy link
Member

Zelldon commented Jun 24, 2022

Based on the proposal #9601 I will do a POC to verify the idea and maybe uncover some edge cases. I will use this issue to document my findings and certainties and uncertainties.

Part of #9600

@Zelldon Zelldon mentioned this issue Jun 24, 2022
71 tasks
@Zelldon Zelldon self-assigned this Jun 24, 2022
@Zelldon
Copy link
Member Author

Zelldon commented Jun 27, 2022

Friday - 2022-06-24

The first day of the POC just creating some issues and starting to look at the code.

Notes:

  • Created a branch here https://github.com/camunda/zeebe/tree/zell-poc-engine-abstraction
  • We need a new class for the engine which encapsulates general logic
  • We have to change how current writing is done. This is a huge effort, if we introduce new interface or change the general structure I think. Alternative Idea: it might be easier if we rename/refactor the current writer interfaces, e.g. rename to producers / or record list or something, which can be used to build a list of records, and append new records. This list is then consumed by the StreamProcessor and written to the dispatcher.
  • TypedRecordProcessor has a position in the interface, this can completely removed (done in the branch)
  • TypedRecordProcessor has some todo's to remove the writers -> It seems the processors get the writers in the CTOR's. Might be possible to completely remove it, and the engine creates their own reusable RecordList structure which it returns after processing everytime. StateWriter, could be a listener of the RecordList, so it can react on such new records. This probably needs some more in-depth thinking...

@Zelldon
Copy link
Member Author

Zelldon commented Jun 27, 2022

Monday - 2022-06-27

I continued looking at the code and found some more interesting facts. It feels a lot of coupling between StreamProcessor and Processors etc. Did several notes again today, I will post them here to free my brain and make it transparent what I'm doing. I will continue looking at the code and try to find out where to start. You can watch the branch for updates as well https://github.com/camunda/zeebe/tree/zell-poc-engine-abstraction

Notes:

  • At the branch I annotated several methods which need to be moved into the engine class
    • InitProcessors in StreamProcessor have to be moved to the engine
    • The Engine init methods, should allow to initialize ZeebeState and processors.
  • Lifecyclelisteners is somehow interesting. I guess we can still support it. StreamProcessor calls Engine on specific lifecycle methods and the Engine forwards or calls the different processors and listeners (but manages on its own)
  • Several listeners implement pause/resume; ideally this shouldn't be necessary later on
  • Note: CURRENTLY the processingStateMachine uses the currentProcessor as flag to indicate whether processing is going on. This is used to not start concurrently the next stream processing, when triggered by commit listener. We should introduce for this purpose a separate flag.
  • Currently I see stream processor still reading the metadata to determine whether it is a command or event, especially for replay and processing etc. BUT I think we COULD also move this complexity to the engine. This would allow to just read the each record the same way and pass it to the registered consumer. Can be engine can be exporter, this might make it possible to combine ExporterDirector and Streamprocesor (but not 100% sure). Based on lifecycle (reaching replay/process border) the engine could switch its filtering. But this idea can be investigate later, we have to verify whether it is feasible. Currently, it kind of makes sense that metadata is part of the StreamProcessing, since we need that position, source position etc. the engine actually doesn't care about these.
  • Followed getActor call from ProcessContext, found ~ 10 results
    • DeploymentCreateProcessor (DeploymentDistributionBehavior.distributeDeployment) to write COMPLETE distribution
    • DeploymentRedistributor (same as above) we need to make sure that we can somehow react on futures on our ScheduleService interface and submit records after done
    • JobTimeoutTrigger uses fixedRate in deactivateTimedOutJobs to write TIME_OUT job
      • on paused/resumed timer are canceled/scheduled (we should remove this knowledge) and only schedule if running, this would simplify the code
      • MessageObserver uses fixedRate PendingMessageSubscriptionChecker send commands - no return of records !!! there is no pause/cancel ?!!!
      • PendingProcessMessageSubscriptionChecker uses fixedRate send commands
      • DueDateChecker uses runDelayed used by JobBackoffChecker and DueDateTimerChecker
      • On implementation we should first implement everything with runDelayed, then introduce StreamScheduleService as in the proposal, which allows to schedule producers (which produce records) called either via delay or "immediately" (job will be enqueued transparent to the caller). It should allow to run action on futures completion (here we should use the java API future) We could also investigate whether we can write the DeploymentDistribution different without that need.
    • Side effects:
      • There was a wider discussion about that in the proposal and slack. In general we don't want to have this anymore, we could just sent responses immediately on commands, since on replay they are not executed anymore anyway. Thus of course would be a bigger change for now. In order not to have the issues with overwriting the responseWriter flushm, we can offer a queue which can be filled with details. I guess we could investigate how much work it would be to just replace side effects, since adding the queue will be also somehow an effort, as far as I can see from the code.
      • The todo regarding removing the writers, is depending on removing the side-effects, since otherwise we can't add the response writer flush to the side effects
    • Remove side effects queue usage from BpmnStreamProcessor #9611

@Zelldon
Copy link
Member Author

Zelldon commented Jun 28, 2022

Tuesday - 2022-06-28

Spent only half day on the POC

Notes:

  • continued removing the side effects to make the interface cleaner and easier
    • ❓ Do we still want to retry the response sending? We currently have a retry strategy which retries if the return value is false. The response writer returns always true, so it doesn't really make sense here anymore.
  • Discussion regarding timers Remove side effects queue usage from BpmnStreamProcessor #9611 (comment)
    • In order to solve this, the stream platform has to guarantee that scheduled tasks/jobs are executed after the transaction/state changes are committed, in order to be safe and have no weird execution. See comment.
  • Implement engine as stream processor lifecycle aware and moved all listeners processors to the engine
  • Moved all methods and fields related to the engine implementation 12c0595
    • Thinking about a good abstraction name, currently just use a class but ideally I have an interface here where I can inject/replace the implementation, e.g. running multiple stream processors for different use cases, like process automation, exporting, etc.
  • Passing always the processing context and updating and getting data out feels so odd, I would like to replace this with actually method arguments, so dependencies come also clearer.
    • I think we can remove several things from the context, some are used only once. I see no value in having them all the time in the context, might be easier to just pass them to the places where it is needed directly.
  • ⏭️ I will start to work on the scheduling service, so we can remove the actor usage
  • ⚠️ IF we do the refactoring and introduce the abstraction it is likely we have rewritten/adjust several tests, at least the EngineRule and the related ones (which is btw anyway overdue since a long time).
  • We have lot of indirection, which makes it really hard to understand what is happening and which code is actually called. This point applies also to the writers, where we for example have the TypedStreamWriterProxy which IMHO is no longer necessary. On replay, we will not write any records. Regarding the proxy, I can remember we did something like that to not have the writer on replay, we had also a fix to open the writer later on (after replay), this means we no longer need the proxy. ℹ️ Removed the proxy 88ed05a
  • ❓ will be RecordValues be part of the engine? Is then the contact point LoggedEvent? In the POC I used it in the StreamProcessor to construct the TypedRecord, which is given to the engine.

@Zelldon
Copy link
Member Author

Zelldon commented Jun 29, 2022

Wednesday - 2022-06-29

Again only a half day to work on this topic. What I plan to investigate today:

  1. Introduce a different interface RecordListBuilder which is implemented by the writers and used in the engine (so no writer anymore here)
  2. Error handling in the engine, callback in the engine if an error appears.

As everyday I will update this comment during my progress.

Notes:

  • Removed the writers completely from the TypedProcessor, this needed some more preparation since the writers class is not used everywhere yet. This would be pre-work. 154b911 🗑️
    • multiple tests are relying on the fact and usage of the given writers
    • After writers are removed interface looks pretty
  • Renamed all writers to builders, like CommandsBuilder, EventsBuilder etc. is of course discussable but want to emphasize that this is actually not writing just building up some Record list or buffering them 47cb438
  • Split up the LogStreamBatchWriter this allows to use in the RecordsBuilders impl to write the records directly into a buffer without further processing. 67ecfb2
    • We implement BufferWriter here as well to write it into a separate buffer. The LogBatchWriter allows to put a BufferWrite, which writes to another buffer and then to the dispatcher this can of course be simplified / improved.
    • In that way we can pre-allocate the Builders with the internal buffer and use it in the engine (reference it) and give it back after processing
    • The StreamProcessor takes the BufferWrite and writes it to the dispatcher at the specific step.
    • Similar we have to do it now with the scheduling service that it just returns the RecordsBuilder (BufferWriter)

@Zelldon
Copy link
Member Author

Zelldon commented Jun 30, 2022

Thursday - 2022-06-30

Again half-day of POC. Plan for today, add error handling to the engine, and introduce the SchedulingService abstraction.

Notes:

  • Add a new method to the engine, which is called onProcessingError returns also an ProcessingResult (could also be renamed) but this allows to take the same set of actions as after the processing. Means write processing result, commit transaction, send response and start new.
  • 💭 ❓ Thought about the engine interface and the name. I guess we have to find a good name for the engine interface, the engine is more the implementation, but the interface is maybe called StreamProcessor ? 🤔 and the current StreamProcessor could be called StreamingPlatform and we build a distributed streaming platform with Zeebe and partitions 🤷
  • ℹ️ instead of splitting the implementation as I did yesterday, we could also just introduce a separate interface to just allow a view of some of the methods, which might be simpler. We have to verify whether this is possible regarding dependencies (for example if we split the modules, but this can also be done later). This wouldn't help us if we really want to have some results from the processing call, this only works if we keep the implicit calls, like give the Builders in the init and after calling process we expect something is in the builders/writer and we flush that. That is a bit intransparent and makes not really clear from the API that we expect a result in my opinion. So the solution with the splitting the writers also helps in always create the writer with the buffer and only if needed the writer which writes to the dispatcher.
  • Similar to the logstreamBatchWriter we would need something similar for the response writer, so we can return that on the ProcessingResult, like a BufferWriter again which is then used to write to the SeverOutput internally.
  • Move error handling to engine, produce an ProcessingResult which will be written later on, we go the same path after onError as we do after onProcessing which is nice since we can just reuse the states/code.
  • Introduce the ProcessingSchedulingService and ProcessingSchedulingServiceImpl
    • Support runDelayed (used for checkers) and runOnSuccess (used for deployment)
    • They will accept a supplier which returns a ProcessingResult, the result will be written by the ProcessingSchedulingServiceImpl (with retry)
    • Impl is quite simple but allows to remove the actor usage completely from the engine!
    • Furthermore due to its guarantees it allows to reduce lot of code, regarding pausing and resuming! 👍

Next steps:

  • introduce interfaces to make it more clear the separation and
  • think about missing stuff.
  • summarize the result and arrange what would be needed first etc.
  • schedule a meeting to discuss the result
  • discuss the result

@Zelldon
Copy link
Member Author

Zelldon commented Jul 1, 2022

Summary

TL;DR;

  • It is possible.
  • It is a lot to do, but we can do it incrementally.
  • I personally like it, but I might be biased.
  • In general I think it will help a lot to split this into more modules and make a clear boundary
  • It could help us in doing some changes in the ActorScheduler (without interfering with the engine)
  • It will improve our testing, but first we have to rewrite several things.
  • It allows us to run the same StreamPlatform with a different StreamProcessor, maybe we can reuse it for the Exporting later.

Details

In the last week, I spent some time doing the POC and implementing Proposal #9601. I was able to implement most of it, of course, some of this is still a bit hacky, and it is not fully working nor complete. At some point, I stopped fixing tests, etc. Still, it gives a good idea of how it can look in the end, especially how it would improve our boundaries. Furthermore, I see a good possibility to completely split up the StreamPlatform from the engine into separate modules, which allows us also to better test each component.

How it can look like

The following order is not necessarily the order how to implement it.

Overview:

  1. We rename the now known StreamProcessor into for example StreamPlatform (discussable of course #namingishard)
  2. We introduce a new interface called StreamProcessor, where the Implementation is the engine ce20359 8a389e4
  3. We could create the engine completely outside of the StreamPlatform and then in the Broker, which makes it possible to decouple both more
  4. This also allows us to test both separately in more detail, with a simple StreamProcessor test implementation
Details
Writers

One of the bigger parts is removing/replacing the writers in the engine and making it return something. During the POC I came to the conclusion that we can just rename the current writers since they are already an abstraction that we can reuse.

Processing
Error Handling
SchedulingService
Findings
  • The RecordMetadata/RecordValues is currently really coupled with both the StreamPlatform and the StreamProcessor / Engine. It seems to be hard to get that refactored in a good way.
  • The ProcessingContext is really ugly and can be cleaned up a bit with the changes above, but still it is not optimal.
  • Writers can only be removed from the Processor Interface if side effects are removed.
  • Retry sending responses seems to be unnecessary, we always return true.
  • If we do the refactoring, and follow the proposal we have to adjust a lot of tests (or at least the test infrastructure)

@npepinpe
Copy link
Member

npepinpe commented Jul 5, 2022

  • Should the engine implementation actually serialize the outputs?

    • Pros: frictionless, no need to interact with the distributed, easier to extend later if we add different processors.
    • Cons: more complicated for testing, embedding, possibly need to implement own integrity checks, schema evolution, not reusing the expertise of the distributed team regarding persistence.
    • I personally lean with Peter on having "record in -> record(s) out" instead of dealing with (de)serialization.
  • What if an error occurs in onProcessingError? Do we have some escape hatch? How about UnrecoverableError classes? How much control do we give to the engine (i.e. can it shut itself down on unrecoverable error)?

  • Abstracting actor scheduler sounds good. What is the minimal interface? Can we get rid of runUntilDone and the likes, and just go for two capabilities: defer execution (e.g. run or submit) and schedule (e.g. runDelayed)? That would greatly simplify things. The caller can take care of runUntilDone by implementing their own state machine which ensures no other tasks can be completed in between. However if we want to keep the queuing mechanism, then this is not possible. Do we need it, though?

  • Side effects: how to guarantee that a side effect (which may be submitted in between processing) does not affect the state/buffers? It should run in an isolated sandbox in that case.

  • Should the stream processor control the flow of processing? e.g. using a credits based approach, it would notify the underlying platform that it can only accept so many credits. So instead of pausing, you could just set the credits to 0. You could also always just have 1 credit which is added after processing, and consumed when processing starts. Might be overengineered for now since we only always have 1 credit.

  • Exporters? What happens to those? Are they also stream processors, or do they become a special case? After all they modify the state, which means they're somehow touching the engine (though we can somehow abstract the state to be only "part" of RocksDB and hope they are sufficiently isolated).

  • I think the proposal from Deepthi regarding rewriting the records is a good idea and will help here, where then only RecordMetadata and RecordData (or whatever) are handled here, without any envelope. This will impact the serialization point, so when starting we should figure out which to do first.

@saig0
Copy link
Member

saig0 commented Jul 5, 2022

Minor suggestions regarding the naming:

  • in the StreamProcessor interface: we could rename apply() to replay()
    • use the correct terms: replay (event) + process (command)
  • we could rename *Builders like CommandsBuilder to *Buffer like CommandBuffer
    • it is is a buffer of commands/events/rejections/*
    • you can append items to a buffer
    • a builder doesn't store anything but return the created object
    • in our case, we use it like a buffer

@Zelldon
Copy link
Member Author

Zelldon commented Jul 8, 2022

Thanks @saig0 and @npepinpe for your feedback and input I moved you comments to the related sub-topics. Feel free to discuss there further with the team.

The following I think are currently out of scope.

  • Should the stream processor control the flow of processing? e.g. using a credits based approach, it would notify the underlying platform that it can only accept so many credits. So instead of pausing, you could just set the credits to 0. You could also always just have 1 credit which is added after processing, and consumed when processing starts. Might be overengineered for now since we only always have 1 credit.

Not sure whether I get the idea, and yes sounds a bit overengineered right now.

  • Exporters? What happens to those? Are they also stream processors, or do they become a special case? After all they modify the state, which means they're somehow touching the engine (though we can somehow abstract the state to be only "part" of RocksDB and hope they are sufficiently isolated).

As mentioned earlier we will focus first on the StreamProcessing and Engine part and abstraction. We might be able to run the StreamProcessor later together with both the engine and exporter but this is not clear yet see #9725

  • I think the proposal from Deepthi regarding rewriting the records is a good idea and will help here, where then only RecordMetadata and RecordData (or whatever) are handled here, without any envelope. This will impact the serialization point, so when starting we should figure out which to do first.

Let's take a look at this after we did most of the needed changes.

I will close with this the POC, we have a break down of the topic here #9600

@Zelldon Zelldon closed this as completed Jul 8, 2022
zeebe-bors-camunda bot added a commit that referenced this issue Jul 13, 2022
9756: Hide scheduling behind interface r=pihme a=pihme

## Description

In terms of the scope defined in #9730 this implements the following:

- [x] Create a new interface for the ProcessingScheduleService (with narrowed scope)
  - [x] Possibily only two methods, runDelayed and runComplete take a look at the POC #9602 
- [x] Implement the interface
- [x] Before migrating to the new abstraction, migrate the ActorCOntrol#runAtFixedRate consumers to the #runDelayed usage, this means after each run the job needs to be scheduled again
- [x] Migrate step by step the actorControl usage
- [x] Remove the actor control from the ProcessingContext

## Related issues

related to #9730

## Review Hints
* This is not the final form of the scheduling interface, instead the focus of this PR is to hide all the dependencies behind an interface first
* The change is not a pure refactoring. So there is a residual risk that the behavior is different in subtle ways (which is why I would like to have a review by both of you)
* The new code sometimes (indirectly) calls different methods on the `ActorControl`. Therefore there might be differences in the way tasks are scheduled (top of queue/bottom of queue; fast lane or not). The intention of the change was to simplify the interface that is available to the engine. In this regard some subtle changes are unavoidable
* Part of the simplification is also that the engine does not have access to something like a `ScheduledTimer`. Therefore, the engine is unable to cancel tasks which have been scheduled
* `RunAtFixedRate` has been replaced by tasks that reschedule themselves after they are called
* There is a difference in the way exceptions are propagated. See commit message a406d3f for one such example
* Other than that, the tests pass and I just started a QA run, so let's see 🤞 



Co-authored-by: pihme <pihme@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants