Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14936: Add Grace period logic to Stream Table Join (2/N) #13855

Merged
merged 40 commits into from Jun 29, 2023
Merged

KAFKA-14936: Add Grace period logic to Stream Table Join (2/N) #13855

merged 40 commits into from Jun 29, 2023

Conversation

wcarlson5
Copy link
Contributor

@wcarlson5 wcarlson5 commented Jun 14, 2023

This Pr adds the interface for grace period to the Joined object as well as uses the buffer. The majority of it is tests and moving some of the existing join logic.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@wcarlson5 wcarlson5 changed the title Grace period added KAFKA-14936: Add Grace period logic to Stream Table Join (2/N) Jun 14, 2023
Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @wcarlson5!

Do you plan to add the changelogging in on of the next PRs?

Here my feedback on this PR.

I think you should move the eviction logic completely to the buffer. That would make the code easier to maintain.


Optional<TimeOrderedKeyValueBuffer<K, V, V>> buffer = Optional.empty();

if (joined.gracePeriod() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be

Suggested change
if (joined.gracePeriod() != null) {
if (joined.gracePeriod() != null && joined.gracePeriod() > 0) {

In the KIP discussion we said that a grace period of zero is equal to no grace period.

Note, some conditions later on need to be changed if you agree with this change, like for example gracePeriod.isPresent() ^ buffer.isPresent().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean but we actually were going to create the store and leave it empty for zero duration. The point was to make it easier to change the grace period if desired so the store isn't getting created and destroyed. Something about making it more backward compatible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we are going to make a store even when the grace period is zero. This should make changing the grace period to and from zero smoother. So if someone set the grace period to zero then we can evict everything from the buffer even after the grace period is changed.

Copy link
Contributor

@cadonna cadonna Jun 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if someone set the grace period to zero then we can evict everything from the buffer even after the grace period is changed.

Are you saying that if I set the grace period > 0 and I run my application, then I stop my application and set the grace period == 0, and restart my application, my application will process records from my first run?
Why should removing the grace period (grace period == null) behave differently?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thought here:
What happens if I specify a grace period > 0 and run my application for a while. Then, I remove the grace period and again run my application for a while. Finally, I specify a grace period > 0 or even == 0 and again run my application for a while. Does my application in the last run process records that the first run wrote into the buffer? (Assuming the task is always assigned to the same node).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I am trying to say is that we should try to put simplicity over partial support for topology updates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be the main sticking point of the PR.

The thought was that this that we would need some way to stop using the grace period without loosing records, but you bring up a good point.

doJoin(record);
} else {
final long deadline = observedStreamTime - gracePeriod.get().toMillis();
if (record.timestamp() <= deadline) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why here record.timestamp() <= deadline and in RocksDBTimeOrderedKeyValueBuffer at line 127 wrapped().observedStreamTime - gracePeriod > record.timestamp()?

I am wondering, if it would be simpler to remove the condition in RocksDBTimeOrderedKeyValueBuffer at line 127 and only check late records here. In the end, you do not get any return value from the put() in RocksDBTimeOrderedKeyValueBuffer. So it would silently not add the record to the buffer. Such a behavior might become hard to debug in case of a bug and it would also be quite error-prone because you have the condition for late records in two places instead of just one.

I am even in favor of let RocksDBTimeOrderedKeyValueBuffer#put() return whether the record was added to the buffer or not. In such a way, you can move the logic for eviction and late records to the buffer instead of having it half here and half there making maintaining the code harder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good idea. I'm also in favor of having put decide.

} else {
buffer.get().put(observedStreamTime, record, internalProcessorContext.recordContext());
}
buffer.get().evictWhile(() -> buffer.get().minTimestamp() <= deadline, this::emit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if it might not be simpler to use as predicate just () -> true instead of () -> buffer.get().minTimestamp() <= deadline and leave the conditions for evicting to the buffer. Otherwise, we have half of the logic for evicting here and the other half in the buffer. That seems error-prone and hard to maintain and debug to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be the same. So sure I can go with the simple approach

super.prepareEnvironment();
appID = "stream-table-join-integration-test";
builder = new StreamsBuilder();
joined = Joined.with(Serdes.Long(), Serdes.String(), Serdes.String(), "Grace", Duration.ZERO);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you have only tests with grace period set to zero. I thought a grace period of zero is equivalent to no grace period. Basically, you are not testing the new code if you look at it from the outside.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add some, I think I need to rework what is in the integration tests and the unit tests. Should be fixed.

}

@Test
public void shouldFailIfGracePeriodIsLongerThanHistoryRetention() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am missing assertions in this test. You should call the init() and process() methods directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah this should be removed. It will be added back in the next PR (or the one after)

Comment on lines 157 to 158
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0+Y0", 0),
new KeyValueTimestamp<>(1, "X1+Y1", 1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0+Y0", 0),
new KeyValueTimestamp<>(1, "X1+Y1", 1));
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "X0+Y0", 0),
new KeyValueTimestamp<>(1, "X1+Y1", 1));

Comment on lines 166 to 169
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0+YY0", 0),
new KeyValueTimestamp<>(1, "X1+YY1", 1),
new KeyValueTimestamp<>(2, "X2+YY2", 2),
new KeyValueTimestamp<>(3, "X3+YY3", 3));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0+YY0", 0),
new KeyValueTimestamp<>(1, "X1+YY1", 1),
new KeyValueTimestamp<>(2, "X2+YY2", 2),
new KeyValueTimestamp<>(3, "X3+YY3", 3));
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "X0+YY0", 0),
new KeyValueTimestamp<>(1, "X1+YY1", 1),
new KeyValueTimestamp<>(2, "X2+YY2", 2),
new KeyValueTimestamp<>(3, "X3+YY3", 3));

private final int[] expectedKeys = {0, 1, 2, 3};

private MockApiProcessor<Integer, String, Void, Void> processor;
private TopologyTestDriver driver;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need the topology test driver for unit tests?

Copy link
Contributor Author

@wcarlson5 wcarlson5 Jun 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I was following the normal KStreamKTableJoinTest. I think its best if they aline with each other.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its best if they aline with each other

Why is that?

I find it a bit sad that we do not have real unit tests for the processor. But it's not your fault. I just wanted to document it somewhere.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another related question: What is the difference between KStreamKTableJoinWithGraceTest and StreamTableJoinWithGraceIntegrationTest? They both use the TopologyTestDriver so it seems they both test the stream table join on the same level. I would rather expect to have unit tests that test directly the processor and an intergration test that tests the integration of the processor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, using unit tests would make the tests much simpler. You just need to mock the value getter (and maybe some other parts) instead of building up a whole topology.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay so I actually tried to set this up and it was NOT simpler. The mocking got way out of hand. Normally I 100 precent agree with out, mocking is way better.

Due to you other comment I just ended up moving this whole class to the unit testing class for the normal Join anyways, so this is a bit of a moot point

@wcarlson5 wcarlson5 added the kip Requires or implements a KIP label Jun 15, 2023
Copy link
Contributor Author

@wcarlson5 wcarlson5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made updates to the code, still working on rejiggering the tests to see if I can get Junit5 working. Hopfully will get that soon


Optional<TimeOrderedKeyValueBuffer<K, V, V>> buffer = Optional.empty();

if (joined.gracePeriod() != null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean but we actually were going to create the store and leave it empty for zero duration. The point was to make it easier to change the grace period if desired so the store isn't getting created and destroyed. Something about making it more backward compatible.


Optional<TimeOrderedKeyValueBuffer<K, V, V>> buffer = Optional.empty();

if (joined.gracePeriod() != null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we are going to make a store even when the grace period is zero. This should make changing the grace period to and from zero smoother. So if someone set the grace period to zero then we can evict everything from the buffer even after the grace period is changed.

} else {
buffer.get().put(observedStreamTime, record, internalProcessorContext.recordContext());
}
buffer.get().evictWhile(() -> buffer.get().minTimestamp() <= deadline, this::emit);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be the same. So sure I can go with the simple approach

doJoin(record);
} else {
final long deadline = observedStreamTime - gracePeriod.get().toMillis();
if (record.timestamp() <= deadline) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good idea. I'm also in favor of having put decide.

private final int[] expectedKeys = {0, 1, 2, 3};

private MockApiProcessor<Integer, String, Void, Void> processor;
private TopologyTestDriver driver;
Copy link
Contributor Author

@wcarlson5 wcarlson5 Jun 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I was following the normal KStreamKTableJoinTest. I think its best if they aline with each other.

}

@Test
public void shouldFailIfGracePeriodIsLongerThanHistoryRetention() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah this should be removed. It will be added back in the next PR (or the one after)

super.prepareEnvironment();
appID = "stream-table-join-integration-test";
builder = new StreamsBuilder();
joined = Joined.with(Serdes.Long(), Serdes.String(), Serdes.String(), "Grace", Duration.ZERO);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add some, I think I need to rework what is in the integration tests and the unit tests. Should be fixed.

@wcarlson5 wcarlson5 requested a review from cadonna June 20, 2023 02:54
@wcarlson5
Copy link
Contributor Author

@cadonna I got around to updating the tests as well like you asked

Copy link
Contributor Author

@wcarlson5 wcarlson5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made updates to the code, still working on rejiggering the tests to see if I can get Junit5 working. Hopfully will get that soon

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, @wcarlson5 !

Here my feedback.

}

buffer.get().setSerdesIfNull(new SerdeGetter(context));
//cast doesn't matter, it is just because the processor is deprecated. The context gets converted back with StoreToProcessorContextAdapter.adapt(context)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment so important or can you remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we can remove

private final int[] expectedKeys = {0, 1, 2, 3};

private MockApiProcessor<Integer, String, Void, Void> processor;
private TopologyTestDriver driver;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its best if they aline with each other

Why is that?

I find it a bit sad that we do not have real unit tests for the processor. But it's not your fault. I just wanted to document it somewhere.

private final int[] expectedKeys = {0, 1, 2, 3};

private MockApiProcessor<Integer, String, Void, Void> processor;
private TopologyTestDriver driver;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another related question: What is the difference between KStreamKTableJoinWithGraceTest and StreamTableJoinWithGraceIntegrationTest? They both use the TopologyTestDriver so it seems they both test the stream table join on the same level. I would rather expect to have unit tests that test directly the processor and an intergration test that tests the integration of the processor.

final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));

assertThrows(IllegalArgumentException.class,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do also verify the exception message. We had cases in the past where the test did not fail although a different exception was thrown.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, good call.

}

@Test
public void shouldNotJoinOnTableUpdates() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here and for the rest of the test class. I think you should just move the first two tests (i.e., the new ones) to KStreamKTableJoinTest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good idea, massively simplified the code. I resolved the other comments that were pretty much the same as this one

Copy link
Contributor Author

@wcarlson5 wcarlson5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @cadonna ! It looks like we have one thing to figure out.

}

buffer.get().setSerdesIfNull(new SerdeGetter(context));
//cast doesn't matter, it is just because the processor is deprecated. The context gets converted back with StoreToProcessorContextAdapter.adapt(context)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we can remove

final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));

assertThrows(IllegalArgumentException.class,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, good call.

private final int[] expectedKeys = {0, 1, 2, 3};

private MockApiProcessor<Integer, String, Void, Void> processor;
private TopologyTestDriver driver;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay so I actually tried to set this up and it was NOT simpler. The mocking got way out of hand. Normally I 100 precent agree with out, mocking is way better.

Due to you other comment I just ended up moving this whole class to the unit testing class for the normal Join anyways, so this is a bit of a moot point

}

@Test
public void shouldNotJoinOnTableUpdates() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good idea, massively simplified the code. I resolved the other comments that were pretty much the same as this one


Optional<TimeOrderedKeyValueBuffer<K, V, V>> buffer = Optional.empty();

if (joined.gracePeriod() != null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be the main sticking point of the PR.

The thought was that this that we would need some way to stop using the grace period without loosing records, but you bring up a good point.

@wcarlson5
Copy link
Contributor Author

wcarlson5 commented Jun 22, 2023

The next PR is the restore logic https://github.com/wcarlson5/kafka/pull/2

I have it targeted to this branch for easy reading and will retarget it when this is merged.

@vcrfxia @cadonna

Also a tangental PR for checking the history retention is not less than the grace period can be reviewed and merged at the same time https://github.com/wcarlson5/kafka/pull/3

Copy link
Collaborator

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @wcarlson5 . Some suggestions inline, but nothing blocking.

}

/**
* Set the grace period on the stream side of the join. Records will enter a buffer before being processed. Out of order records in the grace period will be processed in timestamp order. Late records, out of the grace period, will be executed right as they come in, if it is past the table history retention this could result in joins on the wrong version or a null join. Long gaps in stream side arriving records will cause records to be delayed in processing, even resulting in be processed out of the grace period window.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it is past the table history retention this could result in joins on the wrong version or a null join

I think this should be "if it is past the table history retention this could result in a null join" instead? I don't think it's possible to join with the wrong version, the only possible issue is that it joins with a null instead.

even resulting in be processed out of the grace period window

What does this mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also add a couple of line breaks to the java docs as you did for the javadocs of the with() method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated and clarified

private void emit(final TimeOrderedKeyValueBuffer.Eviction<K1, V1> toEmit) {
final Record<K1, V1> record = new Record<>(toEmit.key(), toEmit.value(), toEmit.recordContext().timestamp())
.withHeaders(toEmit.recordContext().headers());
internalProcessorContext.setRecordContext(toEmit.recordContext());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you unset the record context (i.e., set it back to the original context) after finishing this step, in order to avoid confusion about what context is being used downstream? That's what we do with the suppress buffer, for example:

final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext();
internalProcessorContext.setRecordContext(toEmit.recordContext());
try {
internalProcessorContext.forward(toEmit.record()
.withTimestamp(toEmit.recordContext().timestamp())
.withHeaders(toEmit.recordContext().headers()));
suppressionEmitSensor.record(1.0d, internalProcessorContext.currentSystemTimeMs());
} finally {
internalProcessorContext.setRecordContext(prevRecordContext);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

certainly


// push all four items to the primary stream. this should produce two items.
pushToStream(4, "X");
processor.checkAndClearProcessResult(
Copy link
Collaborator

@vcrfxia vcrfxia Jun 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason this produces two output records is because the max timestamp seen so far is still 3, which means only records with timestamp 0 and 1 are emitted (timestamps 2 and 3 are still in the buffer). Can you add another step to this test which now produces a record with a larger timestamp and verifies that the records with timestamps 2 and 3 are emitted? There should be four of them, and they should be emitted in timestamp order which is different from the offset order that they arrived in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added something to flush the buffer at the end of this test. Works just fine :)

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates!

I have one question about the need to evict when a late record comes in.

Please also consider @vcrfxia 's suggestions about testing and resetting the record context.

@@ -203,4 +250,4 @@ public Serde<V> valueSerde() {
public Serde<VO> otherValueSerde() {
return otherValueSerde;
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could remove this change?

}

/**
* Set the grace period on the stream side of the join. Records will enter a buffer before being processed. Out of order records in the grace period will be processed in timestamp order. Late records, out of the grace period, will be executed right as they come in, if it is past the table history retention this could result in joins on the wrong version or a null join. Long gaps in stream side arriving records will cause records to be delayed in processing, even resulting in be processed out of the grace period window.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also add a couple of line breaks to the java docs as you did for the javadocs of the with() method?


// push all four items to the primary stream. this should produce two items.
pushToStream(4, "X");
processor.checkAndClearProcessResult(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call!

Comment on lines 102 to 105
if (!buffer.get().put(observedStreamTime, record, internalProcessorContext.recordContext())) {
doJoin(record);
}
buffer.get().evictWhile(() -> true, this::emit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that should be the following way to avoid a unnecessary range query:

Suggested change
if (!buffer.get().put(observedStreamTime, record, internalProcessorContext.recordContext())) {
doJoin(record);
}
buffer.get().evictWhile(() -> true, this::emit);
if (!buffer.get().put(observedStreamTime, record, internalProcessorContext.recordContext())) {
doJoin(record);
} else {
buffer.get().evictWhile(() -> true, this::emit);
}

If the record is a late record it will not update the observed stream time. If the observed stream time is not updated, the range query will not return records that need to be evicted, since they have been already evicted the last time evictWhile() was called.
Does this make sense?

If you agree could also please add a test for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure that is just fine with me. I'll add a test adding records out of the grace period to

Copy link
Contributor Author

@wcarlson5 wcarlson5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cadonna and @vcrfxia

It didn't seem like any blockers are left, is it good to go?


// push all four items to the primary stream. this should produce two items.
pushToStream(4, "X");
processor.checkAndClearProcessResult(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added something to flush the buffer at the end of this test. Works just fine :)

Comment on lines 102 to 105
if (!buffer.get().put(observedStreamTime, record, internalProcessorContext.recordContext())) {
doJoin(record);
}
buffer.get().evictWhile(() -> true, this::emit);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure that is just fine with me. I'll add a test adding records out of the grace period to

@vcrfxia
Copy link
Collaborator

vcrfxia commented Jun 28, 2023

Hey @wcarlson5 did you mean to push a new commit to this PR since @cadonna and I last reviewed? Some of your replies mention having made changes but I don't see any.

Nothing blocking from my side, though I do think that the latest round of suggestions (including test coverage improvements and @cadonna 's suggestion for avoiding an unnecessary range query) would be good to incorporate either in this PR or the next one.

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, @wcarlson5 !

LGTM!

@cadonna
Copy link
Contributor

cadonna commented Jun 29, 2023

Build failures are unrelated:

    Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testOneWayReplicationWithFrequentOffsetSyncs()
    Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplicateSourceDefault()
    Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testRestartReplication()
    Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testNoCheckpointsIfNoRecordsAreMirrored()
    Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testOneWayReplicationWithFrequentOffsetSyncs()
    Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testNoCheckpointsIfNoRecordsAreMirrored()
    Build / JDK 17 and Scala 2.13 / kafka.admin.ReassignPartitionsIntegrationTest.testAlterReassignmentThrottle(String).quorum=kraft
    Build / JDK 17 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT
    Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testOneWayReplicationWithAutoOffsetSync()
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testOneWayReplicationWithAutoOffsetSync()
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic
    Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft
    Build / JDK 11 and Scala 2.13 / kafka.server.RaftClusterSnapshotTest.testSnapshotsGenerated()
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMultiplePartitions()
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.tests.RelationalSmokeTestTest.verifySmokeTestLogic

The looked into the two Kafka Streams related tests. org.apache.kafka.streams.tests.RelationalSmokeTestTest.verifySmokeTestLogic fails because of a file issue thrown by the JVM. I ran org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMultiplePartitions() locally but could not reproduce the issue after multiple runs.

@cadonna cadonna merged commit 12be344 into apache:trunk Jun 29, 2023
1 check failed
@cadonna
Copy link
Contributor

cadonna commented Jun 29, 2023

Thanks @wcarlson5 !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kip Requires or implements a KIP streams
Projects
None yet
4 participants