Skip to content

Conversation

zentol
Copy link
Contributor

@zentol zentol commented Nov 29, 2022

Capturing the checkpointId for a generated record in a subsequent map function is impossible since the notifyCheckpointComplete notification may arrive at any time (or not at all). Instead just assert that each subtask got exactly as many records as expected, which can only happen (reliably) if the rate-limiting works as expected.

Capturing the checkpointId for a generated record is impossible since the notifyCheckpointComplete notification may arrive at any time (or not at all).
Instead just assert that each subtask got exactly as many records as expected, which can only happen (reliably) if the rate-limiting works as expected.
@zentol zentol requested a review from XComp November 29, 2022 08:55
@flinkbot
Copy link
Collaborator

flinkbot commented Nov 29, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@XComp XComp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which can only happen (reliably) if the rate-limiting works as expected.

You're saying that we would introduce a test instability here if the RateLimitedStrategy wouldn't perform as expected?

@zentol
Copy link
Contributor Author

zentol commented Nov 29, 2022

You're saying that we would introduce a test instability here if the RateLimitedStrategy wouldn't perform as expected?

Yes. At least that was the idea. Now I'm not so sure anymore whether this makes sense. Given that we limit the count we invariably end up with capacityPerCycle * numCycles elements, regardless of whether rate-limiting was applied or not.

@XComp
Copy link
Contributor

XComp commented Nov 29, 2022

yeah, that's something I was wondering as well. But the behavior of the RateLimitedStrategy doesn't necessarily need to be tested here, I guess. It feels like we're missing a RateLimitedSourceReaderTest for that kind of functionality. 🤔

@zentol
Copy link
Contributor Author

zentol commented Nov 29, 2022

There is a RateLimitedSourceReaderITCase.

I'll try finding another way to test this; my current thinking goes towards using a FlatMapFunction that stops emitting values after the first call to snapshotState, so it should truly only emit the values of the first checkpoint (and then you can assert the total number of records emitted in a single checkpoint). but that's so far also not working; too many values get emitted...

@zentol
Copy link
Contributor Author

zentol commented Nov 29, 2022

I think we actually found a bug.

If a split was already assigned to a reader, then the first call to SourceReader#pollNext (which happens before SourceReader#isAvailable) circumvents rate-limiting.
We need to force this first call to also go through isAvailable.

@zentol
Copy link
Contributor Author

zentol commented Nov 29, 2022

Additionally, the RateLimitedSourceReader may reset the checkpoint limit at the wrong time. We don't really that to happen when the checkpoint is complete, but rather when the next checkpoint starts (== when snapshotState was called).
That said I haven't seen a test failure because of this (yet).

Copy link
Contributor

@XComp XComp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really should push up reading up on FLIP-27 in my todo list. 8) Anyway, after some code reading, the change in pollNext() makes sense. Inially, I thought of initializing availabilityFuture in pollNext() instead of returning NOTHING_AVAILABLE. But that was a wrong train of thought. I still don't get you 2nd comment, though. Please find my remarks below.

Copy link
Contributor

@XComp XComp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue where we complete the gatingFuture when receiving the completed checkpoint notification instead of when a new checkpoint is triggered sounds like a separate issue. I think it would make sense to create a new Jira ticket for that. WDYT?

@zentol
Copy link
Contributor Author

zentol commented Nov 30, 2022

I think it would make sense to create a new Jira ticket for that. WDYT?

yes, it's a separate issue (and for the one in this ticket we at least already have a test that shows the issue).

- the test was never calling isAvailable(), relying on the previous (bugged) behavior of rate-limiting not being enforced
- The loop was difficult to understand in terms of how many records are actually being processed and was refactored accordingly
- there were a series of math errors in here; 563-177=386, but 128(elementsPerCycle)*3 = 384. This was hidden by the final call to pollNext() in the while loop (emitting 1 additional record), and the final range assertion also incrementing to by 1.
@zentol
Copy link
Contributor Author

zentol commented Nov 30, 2022

Another test relied on the previous (bugged) behavior :(

- use 0-383 to make off-by-one error obvious (the splits included 385 values, not 384)
- assert that we reach END_OF_INPUT
- correctly assert all 384 elements
@zentol zentol requested a review from XComp December 1, 2022 12:45
@zentol zentol requested a review from XComp December 2, 2022 13:15
Copy link
Contributor

@XComp XComp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍 ...just a few minor things

@zentol zentol merged commit 81ed6c6 into apache:master Dec 4, 2022
@zentol zentol deleted the 30202 branch April 20, 2023 07:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants