Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: fix broken master #448

Merged
merged 3 commits into from Oct 6, 2023
Merged

MINOR: fix broken master #448

merged 3 commits into from Oct 6, 2023

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Oct 5, 2023

StreamsResetter got some methods renamed breaking the build.

In addition, this PR cleansup warning, and migrates off deprecated APIs.

StreamsResetter got some methods renamed breaking the build.

In addition, this PR cleansup warning, and migrates off deprecated APIs.
@mjsax mjsax requested a review from a team as a code owner October 5, 2023 03:36
@cla-assistant
Copy link

cla-assistant bot commented Oct 5, 2023

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

1 similar comment
@cla-assistant
Copy link

cla-assistant bot commented Oct 5, 2023

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warning that the created Serde instance might not be closed -- we actually don't need to create an object to get the class though.

@@ -134,7 +134,7 @@ public static void main(final String[] args) {
// count users, using one-minute tumbling windows;
// no need to specify explicit serdes because the resulting key and value types match our default serde settings
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deprecated -- by default I switch to use "no grace".

}

@Override
public void close() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close() has a default impl now and we can remove it.

@@ -40,7 +40,7 @@
* <br>
* HOW TO RUN THIS EXAMPLE
* <p>
* 1) Start Zookeeper and Kafka. Please refer to <a href='http://docs.confluent.io/current/quickstart.html#quickstart'>QuickStart</a>.
* 1) Start Zookeeper and Kafka. Please refer to <a href="http://docs.confluent.io/current/quickstart.html#quickstart">QuickStart</a>.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some HTML cleanup.

@@ -164,7 +164,7 @@ static Topology buildTopology(final Map<String, String> serdeConfig) {
// group by key so we can count by session windows
.groupByKey(Grouped.with(Serdes.String(), playEventSerde))
// window by session
.windowedBy(SessionWindows.with(INACTIVITY_GAP))
.windowedBy(SessionWindows.ofInactivityGapAndGrace(INACTIVITY_GAP, Duration.ofMillis(100)))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this case, we need a grace to make the test pass -- without a grace, store history is limited and cleaned up too quickly, and IQ that is used in the test cannot find the data any longer.

@@ -456,31 +429,26 @@ public TopFiveSongs deserialize(final String s, final byte[] bytes) {
}
return result;
}

@Override
public void close() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has a default impl now and can be removed.

continue;
} catch (final NotFoundException swallow) {
// swallow
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an actually rewrite -- hope I got it right...

@@ -248,11 +247,6 @@ private HostStoreInfo getKeyLocationOrBlock(final String id, final AsyncResponse
return locationOfKey;
}

private boolean locationMetadataIsUnavailable(final HostStoreInfo hostWithKey) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method does not work any longer because NOT_AVAILABLE is not available any more -- triggered the rewrite from above

@@ -134,7 +134,7 @@ public void shouldReprocess() throws Exception {
}

// reset application
final int exitCode = new StreamsResetter().run(
final int exitCode = new StreamsResetter().execute(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method was renamed what broke the build.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: apache/kafka#13983 (comment)
Using the StreamsResetter class directly is actually not the intended use.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use StreamsResetter directly in multiple places like the benchmarks, I think. So I am fine with it. That also means that we might run into breaking changes once in a while.

@@ -241,7 +241,9 @@ public void shouldCreateChartsAndAccessThemViaInteractiveQueries() throws Except
final ReadOnlyKeyValueStore<Long, Song> songsStore;
try {
songsStore = streams.store(fromNameAndType(KafkaMusicExample.ALL_SONGS, QueryableStoreTypes.keyValueStore()));
return songsStore.all().hasNext();
try (final KeyValueIterator<Long, Song> it = songsStore.all()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterator must be closed to avoid resource leak.

Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @mjsax !

Here my feedback!

import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually do not use wildcard imports, do we?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. IntelliJ... Fixing.

};
}
}

/**
* Used in aggregations to keep track of the Top five songs
*
* Warning: this aggregator relies on the current order of execution
* <p></p>Warning: this aggregator relies on the current order of execution
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be only <p> ?

import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above about wildcard imports.

@@ -134,7 +134,7 @@ public void shouldReprocess() throws Exception {
}

// reset application
final int exitCode = new StreamsResetter().run(
final int exitCode = new StreamsResetter().execute(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: apache/kafka#13983 (comment)
Using the StreamsResetter class directly is actually not the intended use.

Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Thanks @mjsax

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, thanks!

@mjsax mjsax merged commit e9494c9 into master Oct 6, 2023
3 of 4 checks passed
@mjsax mjsax deleted the fix-master branch October 6, 2023 20:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants