New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-7106: remove deprecated Windows APIs #10378
KAFKA-7106: remove deprecated Windows APIs #10378
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @vvcephei @ableegoldman to review.
/** Maximum time difference for tuples that are before the join tuple. */ | ||
public final long beforeMs; | ||
/** Maximum time difference for tuples that are after the join tuple. */ | ||
public final long afterMs; | ||
|
||
// By default grace period is 24 hours, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose to use 24h, not 24h - size as the default value. Since it is simpler to set at construction time and I think the difference is insignificant. Ditto for other classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's extremely reasonable.
@@ -42,66 +38,10 @@ | |||
*/ | |||
public abstract class Windows<W extends Window> { | |||
|
|||
private long maintainDurationMs = DEFAULT_RETENTION_MS; | |||
@Deprecated public int segments = 3; | |||
protected static final long DEFAULT_GRACE_PERIOD_MS = 24 * 60 * 60 * 1000L; // one day |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class become a pure abstract one, and I'm moving the default value (also renamed it) from WindowsDefault and remove the other class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great!
Do you think we can write down that this class should effectively be treated as an interface (no instance members), but that it's kept as an abstract class purely for compatibility with old versions?
Even better than a comment would be to add a unit test that will fail if someone declares a non-static member in this class. Looking at the reflection API, it looks like you can take the union of: getDeclaredFields() ++ getFields() ++ getDeclaredMethods() ++ getMethods()
(cf https://docs.oracle.com/javase/tutorial/reflect/class/classMembers.html)
This will give you a set of Member
s (Methods
and Fields
). You could then use Member#getModifiers()
(https://docs.oracle.com/javase/8/docs/api/java/lang/reflect/Member.html#getModifiers--) and verify that Modifier.isStatic(modifier)
(https://docs.oracle.com/javase/8/docs/api/java/lang/reflect/Modifier.html#isStatic-int-) is true
for every member.
Not sure if there's a more direct way. Also, it's up to you whether you go this way or not... I'd just be a little sad if we get right back into this mess immediately after finally untangling it just because some reviewer didn't remember the history here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When trimming the other inherited classes, I actually feel that some of the common fields like grace period may be better moved here in the future (I did not do this immediately to keep the PR small). WDYT?
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.kafka.streams.kstream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is no more needed since Windows is a pure abstract class now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oooh! Or maybe we could use it to enforce that Windows is really a pure abstract class (my idea above). :)
.aggregate( | ||
() -> "", | ||
MockAggregator.toStringInstance("+"), | ||
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled() | ||
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still maintained this test while moving the retention from window to materialized.
@@ -185,8 +185,8 @@ private void runTest(final String optimizationConfig, final int expectedNumberRe | |||
.filter((k, v) -> k.equals("A"), Named.as("join-filter")) | |||
.join(countStream, (v1, v2) -> v1 + ":" + v2.toString(), | |||
JoinWindows.of(ofMillis(5000)), | |||
StreamJoined.<String, String, Long>with(Stores.inMemoryWindowStore("join-store", ofDays(1), ofMillis(10000), true), | |||
Stores.inMemoryWindowStore("other-join-store", ofDays(1), ofMillis(10000), true)) | |||
StreamJoined.<String, String, Long>with(Stores.inMemoryWindowStore("join-store", ofDays(1).plus(ofMillis(10000)), ofMillis(10000), true), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the by-product of this change as we default grace to 24h, and retention to grace + window-size.
We can consider whether we keep the default grace at 24h cc @ableegoldman
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks so much for this, @guozhangwang !
It LGTM, although I had a couple of thoughts. It's up to you if you want to take them up or not.
// NOTE: in the future, when we remove maintainMs, | ||
// we should default the grace period to 24h to maintain the default behavior, | ||
// or we can default to (24h - size) if you want to be super accurate. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels so good to see this go...
/** Maximum time difference for tuples that are before the join tuple. */ | ||
public final long beforeMs; | ||
/** Maximum time difference for tuples that are after the join tuple. */ | ||
public final long afterMs; | ||
|
||
// By default grace period is 24 hours, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's extremely reasonable.
// By default grace period is 24 hours, | ||
// in other words we allow out-of-order data for up to a day |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not to be nitpicky, but should we have comments like this in places like this? If we change the default later, it might be hard to track down all the comments that need to be updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's actually a great point. I will remove all these on inherited classes and only leave a comment on top of DEFAULT_GRACE_PERIOD_MS
. When we change it later, we only need to change the single comment there.
@@ -42,66 +38,10 @@ | |||
*/ | |||
public abstract class Windows<W extends Window> { | |||
|
|||
private long maintainDurationMs = DEFAULT_RETENTION_MS; | |||
@Deprecated public int segments = 3; | |||
protected static final long DEFAULT_GRACE_PERIOD_MS = 24 * 60 * 60 * 1000L; // one day |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great!
Do you think we can write down that this class should effectively be treated as an interface (no instance members), but that it's kept as an abstract class purely for compatibility with old versions?
Even better than a comment would be to add a unit test that will fail if someone declares a non-static member in this class. Looking at the reflection API, it looks like you can take the union of: getDeclaredFields() ++ getFields() ++ getDeclaredMethods() ++ getMethods()
(cf https://docs.oracle.com/javase/tutorial/reflect/class/classMembers.html)
This will give you a set of Member
s (Methods
and Fields
). You could then use Member#getModifiers()
(https://docs.oracle.com/javase/8/docs/api/java/lang/reflect/Member.html#getModifiers--) and verify that Modifier.isStatic(modifier)
(https://docs.oracle.com/javase/8/docs/api/java/lang/reflect/Modifier.html#isStatic-int-) is true
for every member.
Not sure if there's a more direct way. Also, it's up to you whether you go this way or not... I'd just be a little sad if we get right back into this mess immediately after finally untangling it just because some reviewer didn't remember the history here.
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.kafka.streams.kstream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oooh! Or maybe we could use it to enforce that Windows is really a pure abstract class (my idea above). :)
Bumped into https://issues.apache.org/jira/browse/KAFKA-12557, there's a PR fixing it already. |
1. Remove all deprecated APIs in KIP-328. 2. Remove deprecated APIs in Windows in KIP-358. Reviewers: John Roesler <vvcephei@apache.org>
… / inactivity-gap (#10953) In 2.8 and before, we computed the default grace period with Math.max(maintainDurationMs - sizeMs, 0); in method gracePeriodMs() in TimeWindows, SessionWindows, and JoinWindows. That means that the default grace period has never been 24 hours but 24 hours - window size. Since gracePeriodMs() is used to compute the retention time of the changelog topic for the corresponding window state store and the segments for the window state store it is important to keep the same computation for the deprecated methods. Otherwise, Streams app that run with 2.8 and before might not be compatible with Streams 3.0 because the retention time of the changelog topics created with older Streams apps will be smaller than the assumed retention time for Streams apps in 3.0. For example, with a window size of 10 hours, an old Streams app would have created a changelog topic with retention time 10 hours (window size) + 14 hours (default grace period, 24 hours - 10 hours). A 3.0 Streams app would assume a retention time of 10 hours (window size) + 24 hours (deprecated default grace period as currently specified on trunk). In the presence of failures, where a state store needs to recreated, records might get lost, because before the failure the state store of a 3.0 Streams app contained 10 hours + 24 hours of records whereas the changelog topic that was created with the old Streams app would only contain 10 hours + 14 hours of records. All this happened due to us always stating that the default grace period was 24 hours although it was not completely correct and a connected and unfortunate misunderstanding when we removed deprecated windows APIs (#10378). Co-authors: Bruno Cadonna <cadonna@apache.org> Reviewers: Luke Chen <showuon@gmail.com>, Matthias J. Sax <mjsax@apache.org>, Bruno Cadonna <cadonna@apache.org>
… / inactivity-gap (#10953) In 2.8 and before, we computed the default grace period with Math.max(maintainDurationMs - sizeMs, 0); in method gracePeriodMs() in TimeWindows, SessionWindows, and JoinWindows. That means that the default grace period has never been 24 hours but 24 hours - window size. Since gracePeriodMs() is used to compute the retention time of the changelog topic for the corresponding window state store and the segments for the window state store it is important to keep the same computation for the deprecated methods. Otherwise, Streams app that run with 2.8 and before might not be compatible with Streams 3.0 because the retention time of the changelog topics created with older Streams apps will be smaller than the assumed retention time for Streams apps in 3.0. For example, with a window size of 10 hours, an old Streams app would have created a changelog topic with retention time 10 hours (window size) + 14 hours (default grace period, 24 hours - 10 hours). A 3.0 Streams app would assume a retention time of 10 hours (window size) + 24 hours (deprecated default grace period as currently specified on trunk). In the presence of failures, where a state store needs to recreated, records might get lost, because before the failure the state store of a 3.0 Streams app contained 10 hours + 24 hours of records whereas the changelog topic that was created with the old Streams app would only contain 10 hours + 14 hours of records. All this happened due to us always stating that the default grace period was 24 hours although it was not completely correct and a connected and unfortunate misunderstanding when we removed deprecated windows APIs (#10378). Co-authors: Bruno Cadonna <cadonna@apache.org> Reviewers: Luke Chen <showuon@gmail.com>, Matthias J. Sax <mjsax@apache.org>, Bruno Cadonna <cadonna@apache.org>
… / inactivity-gap (apache#10953) In 2.8 and before, we computed the default grace period with Math.max(maintainDurationMs - sizeMs, 0); in method gracePeriodMs() in TimeWindows, SessionWindows, and JoinWindows. That means that the default grace period has never been 24 hours but 24 hours - window size. Since gracePeriodMs() is used to compute the retention time of the changelog topic for the corresponding window state store and the segments for the window state store it is important to keep the same computation for the deprecated methods. Otherwise, Streams app that run with 2.8 and before might not be compatible with Streams 3.0 because the retention time of the changelog topics created with older Streams apps will be smaller than the assumed retention time for Streams apps in 3.0. For example, with a window size of 10 hours, an old Streams app would have created a changelog topic with retention time 10 hours (window size) + 14 hours (default grace period, 24 hours - 10 hours). A 3.0 Streams app would assume a retention time of 10 hours (window size) + 24 hours (deprecated default grace period as currently specified on trunk). In the presence of failures, where a state store needs to recreated, records might get lost, because before the failure the state store of a 3.0 Streams app contained 10 hours + 24 hours of records whereas the changelog topic that was created with the old Streams app would only contain 10 hours + 14 hours of records. All this happened due to us always stating that the default grace period was 24 hours although it was not completely correct and a connected and unfortunate misunderstanding when we removed deprecated windows APIs (apache#10378). Co-authors: Bruno Cadonna <cadonna@apache.org> Reviewers: Luke Chen <showuon@gmail.com>, Matthias J. Sax <mjsax@apache.org>, Bruno Cadonna <cadonna@apache.org>
Committer Checklist (excluded from commit message)