Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API #10840

Conversation

jlprat
Copy link
Contributor

@jlprat jlprat commented Jun 8, 2021

Implementation of KIP-744.

Creates new Interfaces for TaskMetadata, ThreadMetadata, and
StreamsMetadata, providing internal implementations for each of them.

Deprecates current TaskMetadata, ThreadMetadata under o.a.k.s.processor,
and SreamsMetadata under a.o.k.s.state.

Updates references on internal classes from deprecated classes to new interfaces.

Deprecates methods on KStreams returning deprecated ThreadMeatada and
StreamsMetadta, and provides new ones returning the new interfaces.

Update Javadocs referencing to deprecated classes and methods to point
to the right ones.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Contributor Author

@jlprat jlprat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some notes to help navigate the PR

*
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
*/
public Collection<StreamsMetadata> allRunningMetadata() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not so much convinced about this name, open to suggestions if you don't like it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about allStreamsMetadata? I think that gets at the core difference here, which is that this returns metadata for all Streams instances.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about metadataForAllStreamsClients()? I think this makes it more explicit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to for for the pattern xxxForxxx to keep consistency among different changes.

It is now metadataForAllStreamsClients but happy to change if anyone has reasons against it.

/**
* Returns runtime information about the local threads of this {@link KafkaStreams} instance.
*
* @return the set of {@link ThreadMetadata}.
*/
public Set<ThreadMetadata> localThreadsMetadata() {
public Set<ThreadMetadata> threadsMetadata() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly here, name is maybe not the best, happy to change it if there is a better alternative.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think localThreadMetadata sounds better to me (actually even better than the original localThreadsMetadata tbh), but I'm fine with threadsMetadata too

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think localThreadMetadata does not sound consistent since metadata of multiple stream threads is returned. What about metadataForLocalThreads()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to for for the pattern xxxForxxx to keep consistency among different changes.

It is now metadataForLocalThreads but happy to change if anyone has reasons against it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna localThreadMetadata still sounds more correct to me than localThreadsMetadata. I really can't explain it other than to say that English is weird, and names/titles like this do not always follow the regular rules of grammar/plurals 🤷‍♀️

But actually I like your suggestion metadataForLocalThreads() even better than any of them, SGTM

public class TaskMetadata {

private final TaskId taskId;
private final String taskId;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reverts TaskMetadata to the state before KIP-740. As this TaskMetadata is deprecated, no need to do any type change here.

@@ -68,17 +61,8 @@ public TaskMetadata(final TaskId taskId,
/**
* @return the basic task metadata such as subtopology and partition id
*/
public TaskId getTaskId() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, this reverts to the state prior KIP-740. Users are anyway discouraged to use this class.

/**
* @return the basic task metadata such as subtopology and partition id
*/
TaskId taskId();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned up API here, with proper naming convention for the new TaskMetada

@jlprat
Copy link
Contributor Author

jlprat commented Jun 8, 2021

As @cadonna, @ableegoldman and @guozhangwang showed some interest on the KIP, I guess maybe any of you can review this PR?

@jlprat
Copy link
Contributor Author

jlprat commented Jun 8, 2021

Failure was:

[2021-06-08T14:01:59.439Z] FAILURE: Build failed with an exception.
[2021-06-08T14:01:59.439Z] 
[2021-06-08T14:01:59.439Z] * What went wrong:
[2021-06-08T14:01:59.439Z] Execution failed for task ':core:integrationTest'.
[2021-06-08T14:01:59.439Z] > Process 'Gradle Test Executor 127' finished with non-zero exit value 1
[2021-06-08T14:01:59.439Z]   This problem might be caused by incorrect test process configuration.
[2021-06-08T14:01:59.439Z]   Please refer to the test execution section in the User Manual at https://docs.gradle.org/7.0.2/userguide/java_testing.html#sec:test_execution

On top of the common RaftClusterTest ones

@jlprat
Copy link
Contributor Author

jlprat commented Jun 15, 2021

Following Konstantine's recommendation on the mailing list in regards of the dates for the release of Kafka 3.0.0, I'm pinging again potential reviewers for this KIP's implementation.
@ableegoldman (as creator of the original Jira Ticket), @cadonna and @guozhangwang (as voters of the KIP), I would highly appreciate your feedback in this PR.

Thanks a lot!

Copy link
Contributor

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a quick pass, looks good for the most part but I think we can take the opportunity to clean up the method names a bit. Lmk what you think of the suggestions

/**
* Returns runtime information about the local threads of this {@link KafkaStreams} instance.
*
* @return the set of {@link ThreadMetadata}.
*/
public Set<ThreadMetadata> localThreadsMetadata() {
public Set<ThreadMetadata> threadsMetadata() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think localThreadMetadata sounds better to me (actually even better than the original localThreadsMetadata tbh), but I'm fine with threadsMetadata too

*
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
*/
public Collection<StreamsMetadata> allRunningMetadata() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about allStreamsMetadata? I think that gets at the core difference here, which is that this returns metadata for all Streams instances.

* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of
* this application
*/
public Collection<StreamsMetadata> allMetadataForGivenStore(final String storeName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel too strongly about any of these names, but it's probably best to try and keep them (a) consistent with each other, where possible, and (b) consistent with the return type. Assuming we go with something like allStreamsMetadata for the new method above, maybe something like streamsMetadataForStore would be appropriate for this one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am +1 for streamsMetadataForStore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to for for the pattern xxxForxxx to keep consistency among different changes.

It is now streamsMetadataForStore but happy to change if anyone has reasons against it.

@@ -27,10 +27,12 @@

/**
* Represents the state of a single task running within a {@link KafkaStreams} application.
* @deprecated since 3.0, not intended for public use, use {@link org.apache.kafka.streams.TaskMetadata} instead.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @deprecated since 3.0, not intended for public use, use {@link org.apache.kafka.streams.TaskMetadata} instead.
* @deprecated since 3.0, use {@link org.apache.kafka.streams.TaskMetadata} instead.

I think people might be a bit confused if we say this, just point them to the new interface

Comment on lines 234 to 236
Collections.emptySet(),
Collections.emptySet(),
Collections.emptySet(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fix alignment (here and below change)

if (o == null || getClass() != o.getClass()) {
return false;
}
final org.apache.kafka.streams.processor.internals.TaskMetadataImpl that = (org.apache.kafka.streams.processor.internals.TaskMetadataImpl) o;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be fully qualified? There's only one TaskMetadataImpl class right?

Comment on lines 41 to 45
Collections.emptySet(),
Collections.emptySet(),
Collections.emptySet(),
Collections.emptySet());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fix alignment

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jlprat Thank you for the PR!

Sorry for the late review!

Here my feedback.

/**
* This function will return a set of the current TopicPartitions
*/
Set<TopicPartition> topicPartitions();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please use javadoc mark-up like @return and @param for the docs? Here and for the other methods.

*/
@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If we deprecate the class, we do not need to deprecate the constructor anymore, right?

/**
* This function will return the set of the {@link TaskMetadata} for the current active tasks
*/
Set<TaskMetadata> activeTasks();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use javadoc mark-up for the docs? Here and for the other methods.

/**
* Returns runtime information about the local threads of this {@link KafkaStreams} instance.
*
* @return the set of {@link ThreadMetadata}.
*/
public Set<ThreadMetadata> localThreadsMetadata() {
public Set<ThreadMetadata> threadsMetadata() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think localThreadMetadata does not sound consistent since metadata of multiple stream threads is returned. What about metadataForLocalThreads()?

@@ -1972,12 +1971,6 @@ public void shouldAlwaysUpdateTasksMetadataAfterChangingState() {
assertEquals(StreamThread.State.RUNNING.name(), metadata.threadState());
}

private void assertThreadMetadataHasEmptyTasksWithState(final ThreadMetadata metadata, final StreamThread.State state) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the clean up!

Comment on lines 46 to 48
this.topicPartitions = topicPartitions;
this.committedOffsets = committedOffsets;
this.endOffsets = endOffsets;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this class is read only, it makes sense to use Collections.unmodifiable*() methods here to avoid modification of the contents of the member fields.
Unit tests as in StreamsMetadataTest would also be good.

final Set<TaskMetadata> standbyTasks) {
this.mainConsumerClientId = mainConsumerClientId;
this.restoreConsumerClientId = restoreConsumerClientId;
this.producerClientIds = producerClientIds;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, you should use Collections.unmodifiableSet() also here.
Also unit tests to verify the immutability as in StreamsMetadataTest would be great!

*/
@Override
public Set<String> stateStoreNames() {
return Collections.unmodifiableSet(stateStoreNames);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to make the collections immutable in the constructor since they should also not be modified within this class (for now).

<code>org.apache.kafka.streams.TaskMetadata</code> to be used instead. This change was introduced to better reflect the fact that <code>TaskMetadata</code> was not meant to be instantiated outside
of Kafka codebase.
Please note that the new <code>TaskMetadata</code> offers APIs that better represent the task id as an actual <code>TaskId</code> object instead of a String. Please migrate to the new
<code>org.apache.kafka.streams.TaskMetadata</code> which offers these better methods, for example, by using the new <code>ThreadMetadata#getActiveTasks</code> and <code>ThreadMetadata#getStandbyTasks</code>.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
<code>org.apache.kafka.streams.TaskMetadata</code> which offers these better methods, for example, by using the new <code>ThreadMetadata#getActiveTasks</code> and <code>ThreadMetadata#getStandbyTasks</code>.
<code>org.apache.kafka.streams.TaskMetadata</code> which offers these better methods, for example, by using the new <code>ThreadMetadata#activeTasks</code> and <code>ThreadMetadata#standbyTasks</code>.

@jlprat jlprat force-pushed the KAFKA-12849-TaskMetadata-ThreadMetadata-StreamsMetadata-to-Interface branch from d266b98 to 9c3ffe3 Compare June 23, 2021 13:32
Implementation of KIP-744.

Creates new Interfaces for TaskMetadata, ThreadMetadata, and
StreamsMetadata, providing internal implementations for each of them.

Deprecates current TaskMetadata, ThreadMetadata under o.a.k.s.processor,
and SreamsMetadata under a.o.k.s.state.

Updates references on internal classes from deprecated classes to new interfaces.

Deprecates methods on KStreams returning deprecated ThreadMeatada and
StreamsMetadta, and provides new ones returning the new interfaces.

Update Javadocs referencing to deprecated classes and methods to point
to the right ones.
@jlprat jlprat force-pushed the KAFKA-12849-TaskMetadata-ThreadMetadata-StreamsMetadata-to-Interface branch from 9c3ffe3 to a6a0bec Compare June 23, 2021 14:03
Comment on lines 60 to 64
if (producerClientIds != null) {
this.producerClientIds = Collections.unmodifiableSet(producerClientIds);
} else {
this.producerClientIds = Collections.emptySet();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna, I needed to add this guard as ThreadMetadataTest was failing with NPE.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not reproduce this issue. the set of producer IDs should never be null. If this is the case the test is wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, now I see. You meant StreamThreadTest. There you need to setup the mock for the TaskManager to return at least an empty set with expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());.

@jlprat
Copy link
Contributor Author

jlprat commented Jun 23, 2021

Needed to rebase as there were some conflicts with trunk, hence the force push.

I applied the changes in separate commits:

  • One for method renames, formattings and small refactors (like the unmodifiable collections)
  • One for Adding tests
  • One to fix the regression caused by the unmodifiable collections in constructor.

Copy link
Contributor Author

@jlprat jlprat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some notes about the hash code and equals testing approach

}

@Test
public void shouldFollowEqualsAndHasCodeContract() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna For hash code and equals contract validation, I decided to keep it under one single test, and validate the positive cases and any of the reasons why it might fall into the negative case.
Please note that objects differing only on committed offsets, end offsets, and/or time curring idling started will be considered equals.

}

@Test
public void shouldFollowHashCodeAndEqualsContract() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna For hash code and equals contract validation, I decided to keep it under one single test, and validate the positive case and any of the reasons why it might fall into the negative case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to have separate tests, because when something fails you immediately know the cause. But if you like to keep like that, I am fine with it.

@@ -55,6 +62,63 @@ public void shouldNotAllowModificationOfInternalStateViaGetters() {
assertTrue(isUnmodifiable(streamsMetadata.standbyStateStoreNames()));
}

@Test
public void shouldFollowHashCodeAndEqualsContract() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna For hash code and equals contract validation, I decided to keep it under one single test, and validate the positive case and any of the reasons why it might fall into the negative case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my other comment about separate tests.

@jlprat
Copy link
Contributor Author

jlprat commented Jun 23, 2021

@ableegoldman and @cadonna thanks a lot for your reviews! I think I addressed all of your comments. Let me know what do you think about them.

@jlprat
Copy link
Contributor Author

jlprat commented Jun 24, 2021

I see some usual suspects on test failures, but org.apache.kafka.connect.integration.SourceConnectorsIntegrationTest.testSwitchingToTopicCreationEnabledis new. Looking at the stack trace, doesn't seem relevant to the changes made in this PR.

java.lang.NullPointerException
	at java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
	at org.reflections.Store.getAllIncluding(Store.java:82)
	at org.reflections.Store.getAll(Store.java:93)
	at org.reflections.Reflections.getSubTypesOf(Reflections.java:404)
	at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:352)
	at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:337)
	at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
	at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216)
	at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209)
	at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:61)
	at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:93)
	at org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50)
	at org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:174)
	at org.apache.kafka.connect.integration.SourceConnectorsIntegrationTest.lambda$testSwitchingToTopicCreationEnabled$1(SourceConnectorsIntegrationTest.java:197)
	at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
	at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:559)
	at org.apache.kafka.connect.integration.SourceConnectorsIntegrationTest.testSwitchingToTopicCreationEnabled(SourceConnectorsIntegrationTest.java:197)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
	at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
	at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
	at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:121)
	at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182)
	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164)
	at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:414)
	at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
	at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)
	at java.lang.Thread.run(Thread.java:748)

Is there some relation I'm missing, or should I create a new issue for this flaky test?

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jlprat Thank you for the update!

Here my comments!

Regarding the docs of the interfaces, could you provide at least one initial sentence for each javadoc? I know that the old classes were not documented that well. If you do not have time, we can also postpone the improvements of the docs to a separate PR and get this PR merged as soon as possible.

Comment on lines 135 to 137
<li>Users of <code>KafkaStreams#allMetadata</code> are meant to migrate to the new <code>KafkaStreams#allRunningMetadata</code>.</li>
<li>Users of <code>KafkaStreams#allMetadataForStore(String)</code> are meant to migrate to the new <code>KafkaStreams#allMetadataForGivenStore(String)</code>.</li>
<li>Users of <code>KafkaStreams#localThreadsMetadata</code> are meant to migrate to the new <code>KafkaStreams#threadsMetadata</code>.</li>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to adapt this text to the new method names.

Comment on lines 233 to 238
localMetadata.set(new StreamsMetadataImpl(thisHost,
Collections.emptySet(),
Collections.emptySet(),
Collections.emptySet(),
Collections.emptySet()
));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it should be 4 not 8 spaces

Suggested change
localMetadata.set(new StreamsMetadataImpl(thisHost,
Collections.emptySet(),
Collections.emptySet(),
Collections.emptySet(),
Collections.emptySet()
));
localMetadata.set(new StreamsMetadataImpl(
thisHost,
Collections.emptySet(),
Collections.emptySet(),
Collections.emptySet(),
Collections.emptySet()
));

Comment on lines 262 to 266
final StreamsMetadata metadata = new StreamsMetadataImpl(hostInfo,
activeStoresOnHost,
activePartitionsOnHost,
standbyStoresOnHost,
standbyPartitionsOnHost);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final StreamsMetadata metadata = new StreamsMetadataImpl(hostInfo,
activeStoresOnHost,
activePartitionsOnHost,
standbyStoresOnHost,
standbyPartitionsOnHost);
final StreamsMetadata metadata = new StreamsMetadataImpl(
hostInfo,
activeStoresOnHost,
activePartitionsOnHost,
standbyStoresOnHost,
standbyPartitionsOnHost
);

Comment on lines 60 to 64
if (producerClientIds != null) {
this.producerClientIds = Collections.unmodifiableSet(producerClientIds);
} else {
this.producerClientIds = Collections.emptySet();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not reproduce this issue. the set of producer IDs should never be null. If this is the case the test is wrong.

Comment on lines 40 to 44
public final static StreamsMetadataImpl NOT_AVAILABLE = new StreamsMetadataImpl(HostInfo.unavailable(),
Collections.emptySet(),
Collections.emptySet(),
Collections.emptySet(),
Collections.emptySet());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public final static StreamsMetadataImpl NOT_AVAILABLE = new StreamsMetadataImpl(HostInfo.unavailable(),
Collections.emptySet(),
Collections.emptySet(),
Collections.emptySet(),
Collections.emptySet());
public final static StreamsMetadataImpl NOT_AVAILABLE = new StreamsMetadataImpl(
HostInfo.unavailable(),
Collections.emptySet(),
Collections.emptySet(),
Collections.emptySet(),
Collections.emptySet()
);

Comment on lines 52 to 57
public static final TaskMetadata TM_0 = new TaskMetadataImpl(
TASK_ID_0,
mkSet(TP_0_0, TP_1_0),
mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
Optional.of(3L));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
public static final TaskMetadata TM_0 = new TaskMetadataImpl(
TASK_ID_0,
mkSet(TP_0_0, TP_1_0),
mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
Optional.of(3L));
public static final TaskMetadata TM_0 = new TaskMetadataImpl(
TASK_ID_0,
mkSet(TP_0_0, TP_1_0),
mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
Optional.of(3L)
);

here and below.


/**
* Compares the specified object with this ThreadMetadata. Returns {@code true} if and only if the specified object is
* also a TaskMetadata and both {@code threadName()} are equal, {@code threadState()} are equal, {@code activeTasks()} contain the same
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskMetadata -> ThreadMetadata

}

@Test
public void shouldFollowHashCodeAndEqualsContract() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to have separate tests, because when something fails you immediately know the cause. But if you like to keep like that, I am fine with it.

Comment on lines 48 to 53
streamsMetadata = new StreamsMetadataImpl(
HOST_INFO,
STATE_STORE_NAMES,
TOPIC_PARTITIONS,
STAND_BY_STORE_NAMES,
STANDBY_TOPIC_PARTITIONS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The indentation was actually correct before.

@@ -55,6 +62,63 @@ public void shouldNotAllowModificationOfInternalStateViaGetters() {
assertTrue(isUnmodifiable(streamsMetadata.standbyStateStoreNames()));
}

@Test
public void shouldFollowHashCodeAndEqualsContract() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my other comment about separate tests.

@jlprat
Copy link
Contributor Author

jlprat commented Jun 24, 2021

Hi @cadonna I'll try to fix address the comments either today or tomorrow.

Regarding the SourceConnectorsIntegrationTest failing in Jenkins, do you agree that is unrelated?

@cadonna
Copy link
Contributor

cadonna commented Jun 24, 2021

Regarding the SourceConnectorsIntegrationTest failing in Jenkins, do you agree that is unrelated?

Yes, I agree!

It includes
- Split equality tests by cases
- Add JavaDoc on methods
- Fix formatting
- Use proper assertThat instead of other means
- Fix names, use the proper names consistently
@jlprat
Copy link
Contributor Author

jlprat commented Jun 25, 2021

Hi @cadonna I applied all the feedback you provided. I gave my best at adding some Extra sentences on the JavaDocs for the interfaces. Let me know if they suffice.

@jlprat jlprat requested a review from cadonna June 25, 2021 10:47
Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jlprat I did a pass over the javadocs.

Co-authored-by: Bruno Cadonna <cadonna@apache.org>
@jlprat
Copy link
Contributor Author

jlprat commented Jun 25, 2021

@cadonna accepted your suggestions. Thanks a bunch!a

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jlprat, Thanks a lot!

LGTM!

@cadonna
Copy link
Contributor

cadonna commented Jun 25, 2021

I am just waiting for the builds to finish and then I will merge.

@jlprat
Copy link
Contributor Author

jlprat commented Jun 25, 2021

It seems that the test failures were RaftClusterTest and SslTransportLayerTest ones. Both groups being known flaky ones AFAIU.

@cadonna
Copy link
Contributor

cadonna commented Jun 25, 2021

The test failures are unrelated and known to be flaky:

Build / JDK 8 and Scala 2.12 / org.apache.kafka.common.network.SslTransportLayerTest.[1] tlsProtocol=TLSv1.2, useInlinePem=false
Build / JDK 8 and Scala 2.12 / org.apache.kafka.common.network.SslTransportLayerTest.[2] tlsProtocol=TLSv1.2, useInlinePem=true
Build / JDK 8 and Scala 2.12 / org.apache.kafka.common.network.SslTransportLayerTest.[1] tlsProtocol=TLSv1.2, useInlinePem=false
Build / JDK 8 and Scala 2.12 / org.apache.kafka.common.network.SslTransportLayerTest.[2] tlsProtocol=TLSv1.2, useInlinePem=true
Build / JDK 8 and Scala 2.12 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
Build  / JDK 11 and Scala 2.13 /  org.apache.kafka.common.network.SslTransportLayerTest.[1]  tlsProtocol=TLSv1.2, useInlinePem=false
Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[2] tlsProtocol=TLSv1.2, useInlinePem=true
Build  / JDK 11 and Scala 2.13 /  org.apache.kafka.common.network.SslTransportLayerTest.[3]  tlsProtocol=TLSv1.3, useInlinePem=false
Build  / JDK 11 and Scala 2.13 /  org.apache.kafka.common.network.SslTransportLayerTest.[1]  tlsProtocol=TLSv1.2, useInlinePem=false
Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[2] tlsProtocol=TLSv1.2, useInlinePem=true
Build  / JDK 11 and Scala 2.13 /  org.apache.kafka.common.network.SslTransportLayerTest.[3]  tlsProtocol=TLSv1.3, useInlinePem=false
Build / JDK 11 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
Build / JDK 11 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()

@cadonna cadonna merged commit 6655a09 into apache:trunk Jun 25, 2021
@jlprat jlprat deleted the KAFKA-12849-TaskMetadata-ThreadMetadata-StreamsMetadata-to-Interface branch June 25, 2021 16:43
@jlprat
Copy link
Contributor Author

jlprat commented Jun 25, 2021

Thanks a lot @cadonna and @ableegoldman for the reviews!

xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…PI (apache#10840)

Implementation of KIP-744.

Creates new Interfaces for TaskMetadata, ThreadMetadata, and
StreamsMetadata, providing internal implementations for each of them.

Deprecates current TaskMetadata, ThreadMetadata under o.a.k.s.processor,
and SreamsMetadata under a.o.k.s.state.

Updates references on internal classes from deprecated classes to new interfaces.

Deprecates methods on KafkaStreams returning deprecated ThreadMeatada and
StreamsMetadata, and provides new ones returning the new interfaces.

Update Javadocs referencing to deprecated classes and methods to point
to the right ones.

Co-authored-by: Bruno Cadonna <cadonna@apache.org>

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants