-
Notifications
You must be signed in to change notification settings - Fork 565
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add new RecordBatch classes and interfaces #10118
Conversation
engine/src/main/java/io/camunda/zeebe/streamprocessor/records/RecordBatchEntry.java
Fixed
Show fixed
Hide fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚀
Nice, I think switching to the engine returning an immutable, ordered batch is much better, and we can be in charge of the complete persistence layer (including serialization/deserialization of records).
- Naming: I like
Mutable
andImmutable
better thanModifiable
/Unmodifiable
. Plus, we use them in other places in the project already. - ❌ My only concern is regarding the max batch size. See I can't activate jobs with a high max job count #5525 and its fix, Correctly truncate a job activation batch if it will not fit in the dispatcher #8799. Previously we had to use a function to compute the remaining writable bytes in the dispatcher, since the configured
maxFragmentSize
isn't actually how much bytes you can write (from a producer point of view). After framing, aligning, etc., you can usually write less, and it's variable based on how many entries you write in a single dispatcher batch. Here it seems we're back to using a fixed value? See theBufferedProcessingResultBuilder
which has acapacityCalculator
function to do that. Is there a reason we don't use it here?
engine/src/main/java/io/camunda/zeebe/streamprocessor/records/ModifiableRecordBatch.java
Outdated
Show resolved
Hide resolved
engine/src/main/java/io/camunda/zeebe/streamprocessor/records/RecordBatchEntry.java
Outdated
Show resolved
Hide resolved
Fine by me. I can change that :)
Tbh. I still don't get why we have it like that. If I as a user configures something like Reminds me of an analogy: I want to build a house with 128 sqm, after it is finished I get a house with 100 sqm with the argument the walls are too thick. Why aren't the walls put outside 😅 Anyhow I don't see that as a blocker for the PR. We can adjust that in a follow-up PR. The BufferedResultBuilder, as you said use the calculate function, I can also use that but tbh I tried to avoid it because it doesn't make much sense to me. I would prefer either we increase the dispatcher or reduce the max message size for the buffer (by 10% or something so headers also fit in). |
I agree, and I wish the dispatcher would instead figure that out, e.g. if I say I want to write at most 4MB, I should be able to do so 😄 If it was just one entry at a time, this would work fine (just remove some space), but since it depends on the number of items in a dispatcher batch, I'm not sure how you'd pre-calculate it. Maybe by fixing an upper bound to how many items in a batch you can add? Then you can calculate a deterministic upper bound (e.g. let D be the difference between the aligned max fragment size and the max fragment size, and E be the max number of events possible, and F be the outer frame length, and F_e be the single even frame length, then total overhead is D + F + (E * F_e) - I think that works, though again single events are also aligned so maybe I forgot something). I wish it was just simpler tbh, or better, that max fragment size was the max length of a single event, not the max length of a batch or something, and we could handle infinitely large batches (via distributed transactions). But that all seems more complex. If you can figure out a way to calculate a deterministic upper bound, then we can go with that. In the meantime, I think merging this PR and using it will re-open the bug #5525, no? Of course if we merge and don't use it's fine, but I'd be worried we start using it without fixing it, and then we're essentially re-introducing #5525. Or am I missing something? 🤔 |
Ok thanks for your input. With the current PR it is not yet used, and it would only be in upcoming PR's but Ok. I will time-box it and take a look at the dispatcher (and potential replacement). Otherwise I will use than a similar approach as we do it right now with the BufferedResultBuilder (giving it a calculation function). The problem I see here is that we then can't remove the ResultBuilder from the processing interface which is bad :/ Only if the ResultBatch has the knowledge of how big a fragment is (which ofc makes not really sense). |
Yeah I get that it's frustrating, sorry. I'm not sure what's a good intermediate solution =/ The important is that in the job batch collector, we can probe if it's safe to add one more job or not in an accurate way - it might be fine if we're even a little too pessimistic, as long as the error reporting is good. Or perhaps it's time to force looking into removing the constraint that all follow up events are contained in a single dispatcher batch - a naive approach would be to just not have a max fragment size for a short while, and re-introduce it later simply as an internal configuration setting when we've figured out how to spread follow up records across multiple Raft entries. Not sure what the consequences in the intermediate step would be though. |
Couldn't we spread already records into multiple dispatcher fragments as long as they end in the same raft entry? 🤔 Like the LogStorageAppender could collect all corresponding events together, but not sure how this would help 🤔 The maxMessageSize is also the maximum for the raft entry right ? 🤔 Or does this no longer exist, because we have the mmap files? |
The max entry is sort of there for transmission, maybe? We'd have to make sure we chunk append entries, and I can't remember if we do or not. But it's a good idea. The journal doesn't really (or shouldnt) care about the max entry size, it's just limited by the max segment size and framing. |
I checked, there is no concept of max entry size in the journal - we don't support an entry spanning multiple segments though, so there's an upper bound being the segment size (minus some fixed framing). Downside there is that by the time you realize that something doesn't fit in a single entry/segment, it's quite far downstream and you can't react on the producer side. To avoid having to buffer multiple dispatcher fragments in memory before writing it to the Raft entry, we would also need to "stream" those directly to file, which we also don't support. I don't know how much easier it is. Or we do buffer them all in memory, I guess, and maybe it's not so bad performance wise...? :S Could be a memory issue I guess. EDIT: my proposal here would be to keep what we have (use the capacity calculator), and create a new epic/issue which we can try to get prioritized sooner than later. We could try in the meantime to adapt the interface such that it's not too bad to expose this? Maybe we adapt it to something like If you need someone to brainstorm or to focus more on this, I guess this week it's difficult for me, so I trust you and Deepthi to figure out something great :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. I'm missing the broad picture of how this will be extended. So my questions are based on that. I approved the PR assuming, those will be handled in later PRs.
I agree with Nicolas' suggestion regarding naming Mutable
and Immutable
. Also the issue with maxBatchSize
should be fixed, but I'm ok if it is done in a different PR.
engine/src/main/java/io/camunda/zeebe/streamprocessor/records/RecordBatch.java
Outdated
Show resolved
Hide resolved
engine/src/test/java/io/camunda/zeebe/streamprocessor/RecordBatchTest.java
Show resolved
Hide resolved
engine/src/test/java/io/camunda/zeebe/streamprocessor/RecordBatchTest.java
Show resolved
Hide resolved
engine/src/test/java/io/camunda/zeebe/streamprocessor/RecordBatchTest.java
Show resolved
Hide resolved
engine/src/main/java/io/camunda/zeebe/streamprocessor/records/RecordBatchEntry.java
Show resolved
Hide resolved
engine/src/main/java/io/camunda/zeebe/streamprocessor/records/RecordBatch.java
Show resolved
Hide resolved
* Modifiable -> Mutable * Unmodifiable -> Immutable
Predicate allows to verify whether the potential batch count or batch size reaches a certain limit, which needs to be rejected.
Thanks @npepinpe and @deepthidevaki for your review and input!
|
bors r+ |
10118: Add new RecordBatch classes and interfaces r=Zelldon a=Zelldon ## Description With the #9600 and especially with #9724 we wanted to create a way to return results (records) on processing and on scheduled Tasks, without the need to use the LogStreamWriters. This should allow us to reduce the dependency on the log stream and in general, make the test setup more easier. We did the first attempt with the [BufferedProcessingResultBuilder](https://github.com/camunda/zeebe/blob/main/engine/src/main/java/io/camunda/zeebe/streamprocessor/BufferedProcessingResultBuilder.java), which only contains a buffer to write into the needed record details. This was also described as a solution in the issue #9724. During the discussion of the proposal, there was already the idea raised that it would be nicer to have a List of records returned by the Processing- and TaskResult. This was added as a Bonus in #9724. Today I thought longer about the `BufferedProcessingResultBuilder` and how we could test it and in general make it nicer to return the processing result. Checking the code of the result builder and [LogStreamBatchWriterImpl.java](https://github.com/camunda/zeebe/blob/main/logstreams/src/main/java/io/camunda/zeebe/logstreams/impl/log/LogStreamBatchWriterImpl.java), I realized that we do not really need to copy the record details into a big buffer (as we do it [here](https://github.com/camunda/zeebe/blob/main/logstreams/src/main/java/io/camunda/zeebe/logstreams/impl/log/LogStreamBatchWriterImpl.java#L163-L195)). We could create a "struct" (some would call it a java record but to avoid confusion I call it struct now) of the record details which we want to write. Most parts of the struct are primitives (like enums intent, valueType etc.) and the RecordValue would be only thing we would need to copy into a buffer. This would allow better debuggability (e.g. we can see during processing what is already part of the result) and no need to mess around with offsets in the implementation. We can keep references to these "structs" in a list and iterate over it in the StreamProcessor to write the record details, similar to what we [do in the LogStreamBatchWriter here](https://github.com/camunda/zeebe/blob/main/logstreams/src/main/java/io/camunda/zeebe/logstreams/impl/log/LogStreamBatchWriterImpl.java#L240). One interesting part after writing the classes and tests, I also realized that the batch entry interface could return the `UnifiedRecordValue` in order to not have to mess again with buffers (BufferWriters or BufferReaders) and to make the engine tests easier to implement. For example, if we would like to split the modules we could write tests without the stream processor and take the records from the processing result and give it directly again to the engine 🤯 So this PR adds new interfaces and first implementation for so called RecordBatch and RecordBatchEntry, which should be later used in the Processing- and TaskResult. Since the CommandWriter needs similar information as the LogStreamWriter, we can reuse the RecordBatch for both the records to write and for the response, which is part of the Processing Result. No other code was touched/modified. Right now the interfaces are kept to a minimum and documentation as well. I guess we might need to iterate over them later again, but I think it is a good step forward. Feel free to propose another names for the classes and interfaces, nothing is set into stone. #namingishard <!-- Please explain the changes you made here. --> ## Related issues <!-- Which issues are closed by this PR or are related --> related to #10001 Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
Build failed: |
bors r+ |
Build succeeded: |
Description
With the #9600 and especially with #9724 we wanted to create a way to return results (records) on processing and on scheduled Tasks, without the need to use the LogStreamWriters. This should allow us to reduce the dependency on the log stream and in general, make the test setup more easier.
We did the first attempt with the BufferedProcessingResultBuilder, which only contains a buffer to write into the needed record details. This was also described as a solution in the issue #9724.
During the discussion of the proposal, there was already the idea raised that it would be nicer to have a List of records returned by the Processing- and TaskResult. This was added as a Bonus in #9724.
Today I thought longer about the
BufferedProcessingResultBuilder
and how we could test it and in general make it nicer to return the processing result. Checking the code of the result builder and LogStreamBatchWriterImpl.java, I realized that we do not really need to copy the record details into a big buffer (as we do it here). We could create a "struct" (some would call it a java record but to avoid confusion I call it struct now) of the record details which we want to write. Most parts of the struct are primitives (like enums intent, valueType etc.) and the RecordValue would be only thing we would need to copy into a buffer. This would allow better debuggability (e.g. we can see during processing what is already part of the result) and no need to mess around with offsets in the implementation. We can keep references to these "structs" in a list and iterate over it in the StreamProcessor to write the record details, similar to what we do in the LogStreamBatchWriter here.One interesting part after writing the classes and tests, I also realized that the batch entry interface could return the
UnifiedRecordValue
in order to not have to mess again with buffers (BufferWriters or BufferReaders) and to make the engine tests easier to implement. For example, if we would like to split the modules we could write tests without the stream processor and take the records from the processing result and give it directly again to the engine 🤯So this PR adds new interfaces and first implementation for so called RecordBatch and RecordBatchEntry, which should be later used in the Processing- and TaskResult. Since the CommandWriter needs similar information as the LogStreamWriter, we can reuse the RecordBatch for both the records to write and for the response, which is part of the Processing Result. No other code was touched/modified.
Right now the interfaces are kept to a minimum and documentation as well. I guess we might need to iterate over them later again, but I think it is a good step forward.
Feel free to propose another names for the classes and interfaces, nothing is set into stone. #namingishard
Related issues
related to #10001
Definition of Done
Not all items need to be done depending on the issue and the pull request.
Code changes:
backport stable/1.3
) to the PR, in case that fails you need to create backports manually.Testing:
Documentation:
Please refer to our review guidelines.