Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15569: test and add test cases in IQv2StoreIntegrationTest #14523

Conversation

hanyuzheng7
Copy link
Contributor

@hanyuzheng7 hanyuzheng7 commented Oct 10, 2023

  • Update test and add test cases in IQv2StoreIntegrationTest.
  • Originally, all key-value pairs were confined to a single window, with all data added at WINDOW_START. To improve our testing, we've expanded to multiple windows.
  • With a total of 10 records, keys ranging from 0 to 4 determine the designated partition for each message, either 0 or 1. Each consecutive pair of records, such as <0,0> and <0,1> or <1,2> and <1,3>, shares the same key. For instance, both <0,0> and <0,1> are directed to partition 0 due to their shared key '0', while pairs like <1,2> and <1,3>, having the key '1', are sent to partition 1. This pattern is consistent across all records.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@mjsax mjsax added streams tests Test fixes (including flaky tests) labels Oct 10, 2023
@hanyuzheng7 hanyuzheng7 changed the title Update test and add test cases in IQv2StoreIntegrationTest KAFKA-15569: test and add test cases in IQv2StoreIntegrationTest Oct 11, 2023
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for extending this test. This is very helpful.

I am wondering, if it would be beneficial to add more input record, such that a few windows have more than one record in them?

@@ -716,7 +716,7 @@ public void process(final Record<Integer, Integer> record) {
final SessionStore<Integer, Integer> stateStore =
context().getStateStore(sessionStoreStoreBuilder.name());
stateStore.put(
new Windowed<>(record.key(), new SessionWindow(WINDOW_START, WINDOW_START)),
new Windowed<>(record.key(), new SessionWindow((record.timestamp() / WINDOW_SIZE.toMillis()) * WINDOW_SIZE.toMillis(), (record.timestamp() / WINDOW_SIZE.toMillis()) * WINDOW_SIZE.toMillis() + WINDOW_SIZE.toMillis())),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use a session window gap of WINDOW_SIZE -- this means that every record falls into it's own session, and session-start = session-end = record.ts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it should be
new SessionWindow((record.timestamp() / WINDOW_SIZE.toMillis()) * WINDOW_SIZE.toMillis(), (record.timestamp() / WINDOW_SIZE.toMillis()) * WINDOW_SIZE.toMillis())
but their session window size seem to become 0

 /**
     * Create a new window for the given start time and end time (both inclusive).
     *
     * @param startMs the start timestamp of the window
     * @param endMs   the end timestamp of the window
     * @throws IllegalArgumentException if {@code startMs} is negative or if {@code endMs} is smaller than {@code startMs}
     */
    public SessionWindow(final long startMs, final long endMs) throws IllegalArgumentException {
        super(startMs, endMs);
    }

so you mean, each session window size is 0 and their interval is window_size, and in the end each record just falls in it's own session window.

Ok I got that, it should be like this picture each record stand for each session.
1697085600599

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be new SessionWindow(record.timestamp(), record.timestamp())

but their session window size seem to become 0

Yes, the size would become 1 (note, for sessions window, both upper and lower bound are inclusive in contrast to Tumbling/Hopping window). For session windows, there is no fixed size, but the "inactivity gap" parameter (the test only re-uses WINDOW_SIZE variable as "gap" -- this might be a little bit confusing). The size of the created windows depend on the data -- in the current test setup, records are apart from each other more than the "gap" and thus each record creates it's own session, and thus each session has size 1 -- only if two (or more) records fall into the same session (ie, their timestamp are close enough to each other, ie, <= gap the session size would become larger).

@@ -863,6 +863,14 @@ private <T> void shouldHandleWindowKeyQueries(final Function<T, Integer> extract
mkSet()
);

shouldHandleWindowKeyQuery(
2,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fix indention -- should only be 4 spaces (similar below)

What is the indent of adding this test case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I will fix it. This isn't the new test case I introduced; it's the previous one. However, the outcome has now changed.
The previous like this

 // tightest possible start range
        shouldHandleWindowKeyQuery(
            2,
            Instant.ofEpochMilli(WINDOW_START),
            Instant.ofEpochMilli(WINDOW_START),
            extractor,
            mkSet(2)
        );

this time the answer is mkSet()
I should I add comment in this one like

 // tightest possible start range
        shouldHandleWindowKeyQuery(
            2,
            Instant.ofEpochMilli(WINDOW_START),
            Instant.ofEpochMilli(WINDOW_START),
            extractor,
            mkSet()
        );

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, yes and no. The original test was updated to use key 0 now -- the goal if the test is to verify miss the window start range -- This test with key 2 does not test "miss the window start range" -- so what's the purpose of it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// tightest possible start range
        shouldHandleWindowKeyQuery(
            2,
            Instant.ofEpochMilli(WINDOW_START),
            Instant.ofEpochMilli(WINDOW_START),
            extractor,
            mkSet()
        );

this test comment maybe change to cannot find key 2 at first window

Instant.ofEpochMilli(WINDOW_START),
Instant.ofEpochMilli(WINDOW_START),
extractor,
mkSet(2)
mkSet(0)
);

// miss the window start range
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key below should be updated to 0? Otherwise we don't miss the window of the key be 1 ms as intended by the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the value should be 0. In this case, <0,0> is aligned with the WINDOW_START, making it retrievable.
https://github.com/apache/kafka/pull/14523/files#r1356043842

@@ -880,25 +888,113 @@ private <T> void shouldHandleWindowKeyQueries(final Function<T, Integer> extract
extractor,
mkSet()
);

shouldHandleWindowKeyQuery(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this and the other added queries. How do you pick the query search bounds? It's not obvious to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have three windows: 0-5, 5-10, and 10-15, any effective query must encompass the window's start time. For instance, with a key-value pair at time 6, the query range must include 5. We could have ranges like query(5, 6) or query(5,7). If the query's lower bound exceeds 5, we won't retrieve the value for that key-value pair.

When selecting a query range, I ensure the lower bound is less than or equal to the window's start time. For instance, if I want to retrieve the data at timestamp 6 (represented as <1,1>), I would set the lower bound to a value less than or equal to 5. The upper bound, on the other hand, is selected randomly from values greater than or equal to 5.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Maybe add some (short) comment about it similar to the existing test cases.

The "confusing" thing is, that the bounds are not "strict": for example you use range 4 to 11, what gives the same result at 5 to 11, or 4 to 10.... (so why is 4,11 used but not some other bound? if it's random, also ok, but should be clarified). For testing, the interesting part is usually when "bounds" are crossed:

For example, we could have a query range 5,9 that only returns the window starting at 5, plus range 5,10 that returns two windows (thus we test the upper bound from 9 and 10), plus range 6,10 that would only return the window at 10 (and thus verify the lower bound from 5 to 6). Does this make sense?

);

// miss the window start
shouldHandleWindowRangeQuery(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above -- it's not obvious why you picked the time bounds for the queries. Can you explain?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#14523 (comment)
I might consider choosing the query bounds more methodically. Additionally, I'll annotate my code with comments to clarify the rationale behind my boundary selections.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet -- yes, that is what I was getting at (cf my other comment above).

@hanyuzheng7
Copy link
Contributor Author

hanyuzheng7 commented Oct 12, 2023

Thanks for extending this test. This is very helpful.

I am wondering, if it would be beneficial to add more input record, such that a few windows have more than one record in them?

Yes, we can include additional records within the same window. I'll insert more records and adjust their interval from 6 minutes to 2 minutes.

@@ -413,7 +413,7 @@ public static void before()
new ProducerRecord<>(
INPUT_TOPIC_NAME,
i % partitions,
RECORD_TIME,
WINDOW_START + Duration.ofMinutes(6).toMillis() * i,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, the first record time is same as WINDOW_START.

@hanyuzheng7 hanyuzheng7 force-pushed the KAFKA-15569-Update-test-and-add-test-cases-in-IQv2StoreIntegrationTest branch from 1c18527 to 95aac46 Compare October 13, 2023 15:48
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Just a few nits.

@@ -671,7 +673,7 @@ public void process(final Record<Integer, Integer> record) {
public void process(final Record<Integer, Integer> record) {
final WindowStore<Integer, Integer> stateStore =
context().getStateStore(windowStoreStoreBuilder.name());
stateStore.put(record.key(), record.value(), WINDOW_START);
stateStore.put(record.key(), record.value(), (record.timestamp() / WINDOW_SIZE.toMillis()) * WINDOW_SIZE.toMillis());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add the same comment as above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -716,7 +718,7 @@ public void process(final Record<Integer, Integer> record) {
final SessionStore<Integer, Integer> stateStore =
context().getStateStore(sessionStoreStoreBuilder.name());
stateStore.put(
new Windowed<>(record.key(), new SessionWindow(WINDOW_START, WINDOW_START)),
new Windowed<>(record.key(), new SessionWindow(record.timestamp(), record.timestamp())),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a comment above this line:

// we do not re-implement the actual session-window logic from the DSL here to keep the test simple,
// but instead just put each record into it's own session

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -655,7 +657,7 @@ public void process(final Record<Integer, Integer> record) {
ValueAndTimestamp.make(
record.value(), record.timestamp()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a comment here:

// We don't re-implement the DSL logic (which implements sum) but instead just keep the lasted value per window

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}
}

public <V> void shouldHandleKeyPAPIQuery(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I missed it, but how is this method different to shouldHandleKeyDSLQuery ? -- Do we really need to versions of exiting shouldHandleKeyQuery ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this time, these two methods are the same. We can use one method to handle both topologies. So we can use shouldHandleKeyQuery one to handle two topologies. I will update it.

@mjsax mjsax merged commit 732bffc into apache:trunk Oct 17, 2023
1 check failed
@hanyuzheng7
Copy link
Contributor Author

@mjsax Thank you for reviewing the code.

AnatolyPopov pushed a commit to aiven/kafka that referenced this pull request Feb 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
streams tests Test fixes (including flaky tests)
Projects
None yet
2 participants