Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3705: Added a foreignKeyJoin implementation for KTable. #5527

Merged
merged 64 commits into from
Oct 3, 2019
Merged
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
d472a94
Merged and resolved differences. Left a number of TODOs, and now tryi…
May 15, 2019
78ccb49
oneToMany join Working, with a number of prefixScans stubbed out. Als…
May 16, 2019
b47b3e3
Changed SubscriptionResolverJoinProcessorSupplier to use the KTableVa…
May 30, 2019
3c7ad51
"Fixed" an issue where the SubscriptionResponseWrapper was sending a…
May 31, 2019
1b07216
Added the code that should support the prefixscan in the timestamped …
May 31, 2019
74f4ae8
prefixScan provisionally working on a number of previously unsupporte…
Jun 3, 2019
40fca2d
Populated AbstractKeyValueStoreTest.java with prefixScan tests equiva…
Jun 3, 2019
cb2e625
Added more tests. Incorrectly used Bytes.MAX_VALUE, replaced it with …
Jun 3, 2019
75ea876
Split serdes and combinedkey into their own test files. Fixed some un…
Jun 4, 2019
beaa82f
Enforcing co-partitioning of source topics to be identical to that of…
Jun 4, 2019
897fa90
Empty test, did not need
Jun 4, 2019
4bde43d
Rebased to the latest trunk
Jun 4, 2019
d687dbb
Fixed a few erroring unit tests with incorrect asserts (not sure how …
Jun 5, 2019
73cc69e
Potential fix for the timestamp store casting issue.
Jun 6, 2019
9c31fcd
Add the missing licenses
Jun 7, 2019
5ad29a6
Potential solution for prefixScan bug that I had introduced previousl…
Jun 25, 2019
8a8e10d
Reworking the test cases to better exercise all of the logical nuance…
Jun 26, 2019
bb84b8a
Added leftJoin for foreignKey joiner. Modified the SubscriptionWrappe…
Jun 27, 2019
6d80434
Added commit hash and sources to Murmur3 and Murmur3Test to indicate …
Jun 27, 2019
2f11d5d
Fixed all the checkstyle and spotbug issues in streams and clients. A…
Jun 27, 2019
fa358ce
Must add exclusions to spotbugs for the copied Murmur3 file. The good…
Jun 27, 2019
9f65880
Fixed a formatting failure with scala-streams. Added some clearer com…
Jun 27, 2019
8695037
Forgot to close the iterator after a prefixScan
Jun 28, 2019
c7ccdc2
...aaaaand I was missing a closing bracket... sheesh
Jun 28, 2019
7ec664b
Added a bunch of comments and attempted to clarify the topology by ad…
Jun 28, 2019
23b4e02
Reworked the Named usage in the FKjoiners to adhere much more closely…
Jun 28, 2019
ed11b9f
Added versions and primaryKey to SubscriptionWrapper. Removed Combine…
Jun 29, 2019
947cf6d
Reworked the Serdes to indicate if the hash is null, without using mo…
Jun 29, 2019
955cf06
Added more tests and debugged SubscriptionWrapper.java and Subscripti…
Jun 29, 2019
c7d8ddf
Remove errant code that wasn't supposed to be committed
Jun 29, 2019
19b11fe
Added handling for key == null based off of logic in KStreamSessionWi…
Jun 29, 2019
aff3cd9
Copartitioning works with multiple partition count for single join, b…
Jun 29, 2019
f15ab35
Ran checkStyleMain and corrected the style errors
Jun 29, 2019
ce51bf4
Removed check to see if old and newvalue are the same in prefixscan. …
Jul 1, 2019
10eacb2
Added some comments to help with the prefixScan readability
Jul 1, 2019
dfdfc34
Not a byte
Jul 1, 2019
6add956
Do not support multiple input topic partitions. The copartitioning lo…
Jul 2, 2019
b67fae8
Changed ValueMapper to Function. Also fixed a number of SpotBugs issu…
Jul 25, 2019
fd8394c
John Roesler's extensive simplification changes
Aug 1, 2019
953da3a
Merged John Roesler's extensive simplification changes into branch.
Aug 1, 2019
2445e87
Fixed a few minor issues with imports, Murmur3 newline, and and comme…
Aug 8, 2019
a8e6b40
Merge branch 'trunk' into trunk-oneToMany
Aug 8, 2019
0dbac68
Undoing the changes made to NamedCache since it's not required.
Aug 8, 2019
1678a54
Fixed a number of issues preventing the pre-build checks and the buil…
Aug 9, 2019
543aa4f
Merge remote-tracking branch 'source/trunk' into trunk-oneToMany
Aug 28, 2019
4e58aa3
Missed a dependency on merge
Aug 28, 2019
c69824a
Cleaning up the commit by reverting a number of unneccessary files. A…
Aug 29, 2019
0536c81
Remove unused code in RocksDBTimestampedStore.java
Aug 29, 2019
259f20e
Updated the docs for KTable
Aug 29, 2019
e93c25f
Reverting some unnecessary changes
Aug 29, 2019
811b477
Reverting more seemingly unnecessary changes.
Aug 29, 2019
7c7581b
Fixed some checkstyle violations
Aug 29, 2019
2a64bf7
Trying to revert file cause I messed it up before.. bah
Aug 29, 2019
ce558e8
Revert unnecessary files
Aug 29, 2019
8b33ffb
Change Bytes.increment to throw an exception if the incremented index…
Sep 3, 2019
9da1cd9
Updated the docs on Bytes.increment()
Sep 3, 2019
67e1ee6
A few cleanups and some extra testing based on Bill's feedback
Sep 12, 2019
6917848
Add javadoc for streams
Sep 12, 2019
1e0aaae
Added the two named options to streamsResetter
Sep 13, 2019
faa3cba
Added prospective code to streamsresetter, need to find the correct p…
Sep 19, 2019
a1e7fbb
Pass in the streamsConfig to builder.build() in the integration tests
Sep 19, 2019
92ab536
Merged in trunk, moved Murmur3 to use the streams version, removed gu…
Oct 2, 2019
4c50463
Added a comment about the foreignKey leftJoin not propagating on a nu…
Oct 2, 2019
6e1b6a1
Exclude tests ending in Suite in :streams project
Oct 2, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,7 @@ project(':clients') {
testCompile libs.bcpkix
testCompile libs.junit
testCompile libs.mockitoCore
testCompile libs.guava

testRuntime libs.slf4jlog4j
testRuntime libs.jacksonDatabind
Expand Down
9 changes: 7 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<?xml version="1.0"?>


<!DOCTYPE suppressions PUBLIC
"-//Puppy Crawl//DTD Suppressions 1.1//EN"
Expand Down Expand Up @@ -59,6 +59,8 @@

<suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)"
files="CoordinatorClient.java"/>
<suppress checks="(UnnecessaryParentheses|BooleanExpressionComplexity|CyclomaticComplexity|WhitespaceAfter|LocalVariableName)"
files="Murmur3.java"/>

<!-- clients tests -->
<suppress checks="ClassDataAbstractionCoupling"
Expand All @@ -76,6 +78,9 @@
<suppress checks="NPathComplexity"
files="MemoryRecordsTest|MetricsTest"/>

<suppress checks="(WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/>

<!-- Connect -->
<suppress checks="ClassFanOutComplexity"
files="DistributedHerder(|Test).java"/>
Expand Down Expand Up @@ -132,7 +137,7 @@
files="(TopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl|StreamThread|StreamTask).java"/>

<suppress checks="MethodLength"
files="StreamsPartitionAssignor.java"/>
files="(KTableImpl|StreamsPartitionAssignor.java)"/>

<suppress checks="ParameterNumber"
files="StreamTask.java"/>
Expand Down
27 changes: 26 additions & 1 deletion clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,32 @@ private static String toString(final byte[] b, int off, int len) {
}

/**
* A byte array comparator based on lexicographic ordering.
* Increment the underlying byte array by adding 1.
*
* @param input - The byte array to increment
* @return A new copy of the incremented byte array.
*/
public static Bytes increment(Bytes input) throws IndexOutOfBoundsException {
byte[] inputArr = input.get();
byte[] ret = new byte[inputArr.length];
int carry = 1;
for (int i = inputArr.length - 1; i >= 0; i--) {
if (inputArr[i] == (byte) 0xFF && carry == 1) {
ret[i] = (byte) 0x00;
} else {
ret[i] = (byte) (inputArr[i] + carry);
carry = 0;
}
}
if (carry == 0) {
return wrap(ret);
} else {
throw new IndexOutOfBoundsException();
}
}

/**
* A byte array comparator based on lexicograpic ordering.
*/
public final static ByteArrayComparator BYTES_LEXICO_COMPARATOR = new LexicographicByteArrayComparator();

Expand Down
Loading