-
Notifications
You must be signed in to change notification settings - Fork 565
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix estimation of claimed batch length when truncating job batch activation records #8491
Conversation
To review: the test is I think not very future proof and needs to be improved. It works now and shows that the fix is correct, but it may not be correct in the future and will become useless. |
403cb4e
to
457785c
Compare
final var jobCopyBuffer = new ExpandableArrayBuffer(); | ||
final var unwritableJob = new MutableReference<LargeJob>(); | ||
|
||
jobState.forEachActivatableJobs( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💭 This lambda is way too big imo. In fact, this method in general has a lot of state, and I think it would be good to decompose it.
5dd1bee
to
9f1ed54
Compare
@@ -133,6 +133,34 @@ public void shouldNotClaimBeyondPublisherLimit() { | |||
verify(logBufferPartition0).getTailCounterVolatile(); | |||
} | |||
|
|||
@Test | |||
public void canClaimFragmentBatch() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💭 I really dislike these tests, so I hope I can discuss with the reviewer a better approach.
@@ -105,7 +105,7 @@ public long getRequestId() { | |||
} | |||
|
|||
@Override | |||
public long getLength() { | |||
public int getLength() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I switched these things from long to int mostly because we add a bunch of unnecessary casts everywhere. I'm not sure why we ever went with longs - did we expect ByteBuffer to eventually support more than 2GB? Do we want to write buffers > 2GB?
return variables; | ||
} | ||
|
||
record LargeJob(long key, JobRecord record) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💭 Unsure about this, but it seems like we should already have a way to group a job and it's key
/** | ||
* This is not actually accurate, as the frame length needs to also be aligned by the same amount | ||
* of bytes as the batch. However, this would break concerns here, i.e. the writer here would have | ||
* to become Dispatcher aware. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💭 I guess the comment breaks the abstraction boundary as well 🤡
import org.junit.jupiter.api.extension.ExtendWith; | ||
|
||
@ExtendWith(ZeebeStateExtension.class) | ||
final class JobBatchCollectorTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💭 I know we never quite finalized the discussion on unit tests and how close we want to test, but I do believe these tests have value if only that when they fail, they'll be much easier to diagnose than when the higher level engine tests fail.
|
||
@ExtendWith(ZeebeStateExtension.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💭 Just a small refactoring, happy to extract this since it's not really in the scope here.
@@ -284,4 +292,8 @@ private void resetEvent() { | |||
bufferWriterInstance.reset(); | |||
metadataWriterInstance.reset(); | |||
} | |||
|
|||
private int computeBatchLength(final int eventsCount, final int eventsLength) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💭 Extracted as it was important to me that we keep canWriteAdditionalEvent
and tryWrite
as consistent as possible together. I'm happy to get other suggestions though.
private final LogStreamBatchWriterImpl writer = new LogStreamBatchWriterImpl(1, dispatcher); | ||
|
||
/** | ||
* This test asserts that {@link LogStreamBatchWriterImpl#canWriteAdditionalEvent(int)} computes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👀 Would love some alternative suggestions here. I tried to get that if we ever change how a batch is claimed, i.e. what we pass as fragment count/batch length between tryWrite
and canWriteAdditionalEvent
, a test would fail and notify us, ensuring we are doing the change on purpose. However, I understand this is way too coupled to the implementation...I just couldn't think of something else.
@@ -230,7 +230,7 @@ public void updateVariables( | |||
.command() | |||
.put("scopeKey", scopeKey) | |||
.put("updateSemantics", updateSemantics) | |||
.put("document", MsgPackUtil.asMsgPack(document).byteArray()) | |||
.put("variables", MsgPackUtil.asMsgPack(document).byteArray()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a little mistake I noticed - we renamed the property at some point but never updated it here.
* | ||
* @param <L> the left type | ||
* @param <R> the right type | ||
*/ | ||
public final class EitherAssert<L, R> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👀 I just saw we have another EitherAssert
in the util module's test classes...should we use that one? Should we use the one here? What's our idea?
This is a big PR. We could split it in the following way:
Let me know what you think 👍 Right now it's kind of a mess, so I need to clean it up before setting it for review. |
Correctly estimates the size of the batch claim required by the record that will be return in order to estimate when to cut off the job batch.
Adds a new public API method to the Dispatcher, `#canClaimFragmentBatch(int, int)`. This method determines whether the batch with the given fragment count and of the given length can actually be claimed for this dispatcher instance. This method is mostly useful to determine if something will be claimable before you wish to claim it, mostly to avoid having to abort your claim unnecessarily if the batch isn't finished yet when calling this method.
Replaces the API to get the batch length with an additional event with a simpler predicate method, `canWriteAdditionalEvent(int)`. This avoids breaking the abstractions and having the writer know how the dispatcher will compute the framed and aligned batch length, and instead simply delegates deciding whether the batch, with the additional event of the given length, can be written to the dispatcher or not.
c740e88
to
70bb557
Compare
8798: Add API to probe the logstream batch writer if more bytes can be written without writing them r=npepinpe a=npepinpe ## Description This PR adds a new API method, `LogStreamBatchWriter#canWriteAdditionalEvent(int)`. This allows users of the writer to probe if adding the given amount of bytes to the batch would cause it to become un-writable, without actually having to write anything to the batch, or even modify their DTO (e.g. the `TypedRecord<?>` in the engine). To avoid having dispatcher details leak into the implementation, an analogous method is added to the dispatcher, `Dispatcher#canClaimFragmentBatch(int, int)`, which will compare the given size, framed and aligned, with the max fragment length. This is the main building block to eventually solve #5525, and enable other use cases (e.g. multi-instance creation) which deal with large batches until we have a more permanent solution (e.g. chunking follow up batches). NOTE: the tests added in the dispatcher are not very good, but I couldn't come up with something else that wouldn't be too coupled to the implementation (i.e. essentially reusing `LogBufferAppender`). I would like some ideas/suggestions. NOTE: this PR comes out of the larger one, #8491. You can check that one out to see how the new API would be used, e.g. in the `JobBatchCollector`. As such, this is marked for backporting, since we'll backport the complete fix for #5525. ## Related issues related to #5525 Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
8798: Add API to probe the logstream batch writer if more bytes can be written without writing them r=npepinpe a=npepinpe ## Description This PR adds a new API method, `LogStreamBatchWriter#canWriteAdditionalEvent(int)`. This allows users of the writer to probe if adding the given amount of bytes to the batch would cause it to become un-writable, without actually having to write anything to the batch, or even modify their DTO (e.g. the `TypedRecord<?>` in the engine). To avoid having dispatcher details leak into the implementation, an analogous method is added to the dispatcher, `Dispatcher#canClaimFragmentBatch(int, int)`, which will compare the given size, framed and aligned, with the max fragment length. This is the main building block to eventually solve #5525, and enable other use cases (e.g. multi-instance creation) which deal with large batches until we have a more permanent solution (e.g. chunking follow up batches). NOTE: the tests added in the dispatcher are not very good, but I couldn't come up with something else that wouldn't be too coupled to the implementation (i.e. essentially reusing `LogBufferAppender`). I would like some ideas/suggestions. NOTE: this PR comes out of the larger one, #8491. You can check that one out to see how the new API would be used, e.g. in the `JobBatchCollector`. As such, this is marked for backporting, since we'll backport the complete fix for #5525. ## Related issues related to #5525 Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
8797: Extend Either/EitherAssert capabilities r=npepinpe a=npepinpe ## Description This PR extends `Either` by adding a new API, `Either#getOrElse(R)`. This allows to extract the right value of the `Either` or returning a fallback. I did not add any tests as the implementation is incredibly simple, and I can't foresee ever getting more complex, but do challenge this. It also extends the related `EitherAssert` by adding a new adding new `left` and `right` extraction capabilities. So you can now assert something like: ```java EitherAssert.assertThat(either).left().isEqualTo(1); EitherAssert.assertThat(instantEither) .right() .asInstanceOf(InstanceOfAssertFactories.INSTANT) .isBetween(today, tomorrow); ``` Note that calling `EitherAssert#right()` will, under the hood, still call `EitherAssert#isRight()`. This PR is related to #5525 and is extracted from the bigger spike in #8491. You can review how it's used there, specifically in the `JobBatchCollectorTest`. As such, this is marked for backporting, since we'll backport the complete fix for #5525. ## Related issues related to #5525 Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com> Co-authored-by: Nicolas Pepin-Perreault <43373+npepinpe@users.noreply.github.com>
Description
Related issues
closes #5525
Definition of Done
Not all items need to be done depending on the issue and the pull request.
Code changes:
backport stable/0.25
) to the PR, in case that fails you need to create backports manually.Testing:
Documentation: