Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4001: Improve Kafka Streams Join Semantics (KIP-77) #1777

Closed
wants to merge 6 commits into from

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Aug 23, 2016

  • fixed leftJoin -> outerJoin test bug
  • simplified to only use values
  • fixed inner KTable-KTable join
  • fixed left KTable-KTable join
  • fixed outer KTable-KTable join
  • fixed inner, left, and outer left KStream-KStream joins
  • added inner KStream-KTable join
  • fixed left KStream-KTable join

@mjsax mjsax force-pushed the kafka-4001-joins branch 3 times, most recently from cb2f1e1 to cca15e3 Compare August 24, 2016 00:32
@guozhangwang
Copy link
Contributor

guozhangwang commented Aug 26, 2016

I tried to re-run the unit tests locally and seems the following three test cases fail frequently:

org.apache.kafka.streams.integration.JoinIntegrationTest > testInnerKTableKTable FAILED
    java.lang.AssertionError: Condition not met within timeout 300000. Did not receive 1 number of records
        at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:256)
        at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:245)
        at org.apache.kafka.streams.integration.JoinIntegrationTest.checkResult(JoinIntegrationTest.java:169)
        at org.apache.kafka.streams.integration.JoinIntegrationTest.runTest(JoinIntegrationTest.java:196)
        at org.apache.kafka.streams.integration.JoinIntegrationTest.testInnerKTableKTable(JoinIntegrationTest.java:366)

org.apache.kafka.streams.integration.JoinIntegrationTest > testLeftKTableKTable FAILED
    java.lang.AssertionError: Condition not met within timeout 300000. Did not receive 1 number of records
        at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:256)
        at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:245)
        at org.apache.kafka.streams.integration.JoinIntegrationTest.checkResult(JoinIntegrationTest.java:169)
        at org.apache.kafka.streams.integration.JoinIntegrationTest.runTest(JoinIntegrationTest.java:196)
        at org.apache.kafka.streams.integration.JoinIntegrationTest.testLeftKTableKTable(JoinIntegrationTest.java:393)

org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoinTest > testNotSendingOldValue FAILED
    java.lang.AssertionError
        at org.junit.Assert.fail(Assert.java:86)
        at org.junit.Assert.assertTrue(Assert.java:41)
        at org.junit.Assert.assertTrue(Assert.java:52)
        at org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoinTest.testNotSendingOldValue(KTableKTableLeftJoinTest.java:181)

@mjsax
Copy link
Member Author

mjsax commented Aug 26, 2016

@guozhangwang Just realized, that I forget to commit something which fails left-join tests...

About inner-join: that is exactly the timing issues we discussed already. The integration test is super slow... not sure why it takes to long for the result consumer to receive output records...
If you observe system-out you will see that it take multiple seconds -- and sometime even longer...

@mjsax mjsax changed the title KAFKA-4001: Improve Kafka Streams Join Semantics (KIP-76) [WiP] KAFKA-4001: Improve Kafka Streams Join Semantics (KIP-77) [WiP] Aug 27, 2016
@mjsax
Copy link
Member Author

mjsax commented Aug 27, 2016

The last run passed (ie, no timeout). However, the integration test takes quite long (to process 15 records...).

https://builds.apache.org/job/kafka-trunk-git-pr-jdk7/5304/testReport/org.apache.kafka.streams.integration/JoinIntegrationTest/

@guozhangwang
Copy link
Contributor

@mjsax When you got time, could you do some profiling on the inner join integration test to see what is the performance bottleneck? This latency is surprising and may indicate some bugs AFAIK.

@mjsax mjsax changed the title KAFKA-4001: Improve Kafka Streams Join Semantics (KIP-77) [WiP] KAFKA-4001: Improve Kafka Streams Join Semantics (KIP-77) Sep 5, 2016
@mjsax mjsax force-pushed the kafka-4001-joins branch 2 times, most recently from 953074a to 6a6f214 Compare September 28, 2016 20:19
@mjsax
Copy link
Member Author

mjsax commented Sep 29, 2016

Please review @hjafarpour @guozhangwang @enothereska @dguy @miguno

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One general comment before I look further. There appear to be quite a few changes that are related to format changes - did you mean to do this? The problem with these sort of changes is that we probably all have different settings in our IDEs, so any formatting rules applied can end up causing unnecessary changes, conflicts etc.

@mjsax
Copy link
Member Author

mjsax commented Sep 29, 2016

@dguy I just use default Intellij formatting with some adjustments to meet code style checks of the build -- I thought that is the default we use? No? I don't care about the formatting, but I strongly recommend that we agree on a unique formatting and use the same auto-formatting rules...

Completely agree, that formatting changes in PR should be avoided.

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm struggling to find what has actually changed as most of the changes are formatting and adding final to parameters etc. While i agree with using final everywhere possible, do we really want to be doing it as part of this PR?

}

if (!testCondition.conditionMet()) {
if (!testConditionMet) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this meant to be in this PR? I think i saw that you've done it in another?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I extracted this into the other PR -- need to update this one...

@mjsax
Copy link
Member Author

mjsax commented Sep 30, 2016

@dguy I can revert all formatting changes -- however, I am wondering if anyone would do a "reformat only" PR? I guess if we want to get unique auto-formatted code, we need to do this gradually over all PRs. WDYT?

@dguy
Copy link
Contributor

dguy commented Sep 30, 2016

Yes, i'm not sure anyone would do a reformat only PR. However, i think the best approach is as you suggested, do it gradually, i.e., just changes in the area you are currently working in. Otherwise it becomes to much noise to process, i.e., hence the struggle to review the real changes

@mjsax
Copy link
Member Author

mjsax commented Sep 30, 2016

@dguy updated (reverted reformatting changes and removed test improvement)

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor comments, but otherwise LGTM

}
}

private <V1, R> KStream<K, R> doStreamTableJoin(final KTable<K, V1> other,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add another param to this method, boolean leftJoin and then get rid of doStreamTableLeftJoin?


private <V1, R> KStream<K, R> doStreamTableJoin(final KTable<K, V1> other,
final ValueJoiner<V, V1, R> joiner) {
final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we start adding @SuppressWarnings("unchecked") to the method for these unchecked casts?


@Override
public void process(final K key, final V1 value) {
// if the key is null, we do not need proceed joining
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably should update or remove the comment.

CLUSTER.deleteTopic(INPUT_TOPIC_2);
CLUSTER.deleteTopic(OUTPUT_TOPIC);

TestUtils.waitForCondition(topicsGotDeleted, 300, "Topics not deleted after 120 seconds.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wait time is 300 milliseconds, yet the message says 120 seconds...

private KTable<Long, String> leftTable;
private KTable<Long, String> rightTable;

private final long anyUniqueKey = 0L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this field? It is only used in the constructor of Input

@guozhangwang
Copy link
Contributor

One meta comment about the re-formatting is that, mixing multiple "changes" in one PR will make the reviews harder, as there will be more diffs to look at that may not be directly related to the core change.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. One meta suggestion: let's leave some comments in each of the join processor explaining the join semantics, regarding null values etc.


return join(other, joiner, windows, null, null, null, false);
return doJoin(other, joiner, windows, null, null, null, new KStreamImplJoin(false, false));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: call join(other, joiner, windows, null, null, null) instead?


return join(other, joiner, windows, null, null, null, true);
return doJoin(other, joiner, windows, null, null, null, new KStreamImplJoin(true, true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: call outerJoin(other, joiner, windows, null, null, null) instead?

public void process(K key, V1 value) {
if (key == null)
public void process(final K key, final V1 value) {
if (key == null || value == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a comment explaining why we are excluding <key: null> for stream stream join, pros and cons compared with treating each <key: null> as a different message?

@@ -666,12 +668,12 @@ public boolean test(final K1 key, final V1 value) {
Serde<V2> otherValueSerde) {
String thisWindowStreamName = topology.newName(WINDOWED_NAME);
String otherWindowStreamName = topology.newName(WINDOWED_NAME);
String joinThisName = outer ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME);
String joinOtherName = outer ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME);
String joinThisName = leftOuter ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check on rightOuter in line 671 and leftOuter in line 672?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is not "this join" the left hand side of the join and "other join" the right hand side of the join? Can you clarify?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

joinThisName is used in the store of thisWindow which is to be queried by the other stream, so my personal understanding is that:

  1. for inner-join (rightOuter = false, leftOuter = false): both window-store has JOINTHIS_NAME-store.
  2. for outer-join (rightOuter = true, leftOuter = true): both window-store has OUTERTHIS_NAME-store.
  3. for left-join (rightOuter = false, leftOuter = true): the left window-store THIS_NAME-store and the right window-store OUTERTHIS_NAME-store, since we will join with null if the right window-store returns null (hence "outer"), but not vice-versa.

// if the key is null, we do not need proceed joining
// the record with the table
if (key != null) {
if (key != null && value != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto above: maybe add a comment explaining why we are excluding <key: null> for stream stream join.

context().forward(key, joiner.apply(value, valueGetter.get(key)));
}
}
}

private class KStreamKTableJoinProcessor extends AbstractProcessor<K, V1> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we merge KStreamKTableJoinProcessor with KStreamKTableLeftJoinProcessor into a single processor?

@@ -312,6 +312,8 @@ public V apply(Change<V> change) {
Objects.requireNonNull(joiner, "joiner can't be null");

Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
enableSendingOldValues();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we want to enforce sending old data for table-table join now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need old value to omit duplicate null tombstone result records.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we need to require all the parent processors of the join operator to send old values, originally I thought we were requiring the join processor to also send old values but checking again I think you are doing the right thing: only require parent KTable streams to send old values, so this is actually correct.

"[D@0]:null",
"[A@0]:null"
);
proc3.checkAndClearProcessResult();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to change the test for windowed aggregation here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do test inner KTable-KTable join and null values are ignored. (I cannot recall exactly: according to KIP, this should have been empty an result before this change -- however, it seem correct this way to me.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good.


// push all items to the other stream. this should produce four items.
// push all items to the other stream. this should produce two items.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still four items, right?

@mjsax
Copy link
Member Author

mjsax commented Oct 8, 2016

Updated and rebase.

DO NOT MERGE (need to rebase again after #1992 got merged)

@guozhangwang
Copy link
Contributor

@mjsax #1992 is merged, could you rebase again?

@mjsax
Copy link
Member Author

mjsax commented Oct 18, 2016

@guozhangwang Rebased.

@guozhangwang
Copy link
Contributor

Let's merge #2039 first, left a comment on the PR for unit test.

 - fixed leftJoin -> outerJoin test bug
 - simplified to only use values
 - fixed inner KTable-KTable join
 - fixed left KTable-KTable join
 - fixed outer KTable-KTable join
 - fixed inner, left, and outer left KStream-KStream joins
 - added inner KStream-KTable join
 - fixed left KStream-KTable join
@mjsax
Copy link
Member Author

mjsax commented Oct 20, 2016

@guozhangwang rebases as #2039 got merged.

@guozhangwang
Copy link
Contributor

Merged to trunk.

@asfgit asfgit closed this in 62c0972 Oct 20, 2016
@mjsax mjsax deleted the kafka-4001-joins branch October 21, 2016 00:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants