Skip to content

Commit

Permalink
KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as A…
Browse files Browse the repository at this point in the history
…PI (#10840)

Implementation of KIP-744.

Creates new Interfaces for TaskMetadata, ThreadMetadata, and
StreamsMetadata, providing internal implementations for each of them.

Deprecates current TaskMetadata, ThreadMetadata under o.a.k.s.processor,
and SreamsMetadata under a.o.k.s.state.

Updates references on internal classes from deprecated classes to new interfaces.

Deprecates methods on KafkaStreams returning deprecated ThreadMeatada and
StreamsMetadata, and provides new ones returning the new interfaces.

Update Javadocs referencing to deprecated classes and methods to point
to the right ones.

Co-authored-by: Bruno Cadonna <cadonna@apache.org>

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@apache.org>
  • Loading branch information
jlprat committed Jun 25, 2021
1 parent bd72ef1 commit 6655a09
Show file tree
Hide file tree
Showing 38 changed files with 1,473 additions and 190 deletions.
19 changes: 16 additions & 3 deletions docs/streams/upgrade-guide.html
Expand Up @@ -121,10 +121,23 @@ <h3><a id="streams_api_changes_300" href="#streams_api_changes_300">Streams API
<p>
The public <code>topicGroupId</code> and <code>partition</code> fields on TaskId have been deprecated and replaced with getters. Please migrate to using the new <code>TaskId.subtopology()</code>
(which replaces <code>topicGroupId</code>) and <code>TaskId.partition()</code> APIs instead. Also, the <code>TaskId#readFrom</code> and <code>TaskId#writeTo</code> methods have been deprecated
and will be removed, as they were never intended for public use. Finally, we have deprecated the <code>TaskMetadata.taskId()</code> method as well as the <code>TaskMetadata</code> constructor.
These have been replaced with APIs that better represent the task id as an actual <code>TaskId</code> object instead of a String. Please migrate to the new <code>TaskMetadata#getTaskId</code>
method. See <a href="https://cwiki.apache.org/confluence/x/vYTOCg">KIP-740</a> for more details.
and will be removed, as they were never intended for public use. We have also deprecated the <code>org.apache.kafka.streams.processsor.TaskMetadata</code> class and introduced a new interface
<code>org.apache.kafka.streams.TaskMetadata</code> to be used instead. This change was introduced to better reflect the fact that <code>TaskMetadata</code> was not meant to be instantiated outside
of Kafka codebase.
Please note that the new <code>TaskMetadata</code> offers APIs that better represent the task id as an actual <code>TaskId</code> object instead of a String. Please migrate to the new
<code>org.apache.kafka.streams.TaskMetadata</code> which offers these better methods, for example, by using the new <code>ThreadMetadata#activeTasks</code> and <code>ThreadMetadata#standbyTasks</code>.
<code>org.apache.kafka.streams.processor.ThreadMetadata</code> class is also now deprecated and the newly introduced interface <code>org.apache.kafka.streams.ThreadMetadata</code> is to be used instead. In this new <code>ThreadMetadata</code>
interface, any reference to the deprecated <code>TaskMetadata</code> is replaced by the new interface.
Finally, also <code>org.apache.kafka.streams.state.StreamsMetadata</code> has been deprecated. Please migrate to the new <code>org.apache.kafka.streams.StreamsMetadata</code>.
We have deprecated several methods under <code>org.apache.kafka.streams.KafkaStreams</code> that returned the aforementioned deprecated classes:
</p>
<ul>
<li>Users of <code>KafkaStreams#allMetadata</code> are meant to migrate to the new <code>KafkaStreams#metadataForAllStreamsClients</code>.</li>
<li>Users of <code>KafkaStreams#allMetadataForStore(String)</code> are meant to migrate to the new <code>KafkaStreams#streamsMetadataForStore(String)</code>.</li>
<li>Users of <code>KafkaStreams#localThreadsMetadata</code> are meant to migrate to the new <code>KafkaStreams#metadataForLocalThreads</code>.</li>
</ul>
<p>See <a href="https://cwiki.apache.org/confluence/x/vYTOCg">KIP-740</a> and <a href="https://cwiki.apache.org/confluence/x/XIrOCg">KIP-744</a> for more details.</p>

<p>
We removed the following deprecated APIs:
</p>
Expand Down
93 changes: 87 additions & 6 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Expand Up @@ -51,8 +51,6 @@
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
Expand All @@ -66,7 +64,6 @@
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
Expand Down Expand Up @@ -95,6 +92,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
Expand Down Expand Up @@ -1449,8 +1447,30 @@ public void cleanUp() {
* Note: this is a point in time view and it may change due to partition reassignment.
*
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
* @deprecated since 3.0.0 use {@link KafkaStreams#metadataForAllStreamsClients}
*/
public Collection<StreamsMetadata> allMetadata() {
@Deprecated
public Collection<org.apache.kafka.streams.state.StreamsMetadata> allMetadata() {
validateIsRunningOrRebalancing();
return streamsMetadataState.getAllMetadata().stream().map(streamsMetadata ->
new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),
streamsMetadata.stateStoreNames(),
streamsMetadata.topicPartitions(),
streamsMetadata.standbyStateStoreNames(),
streamsMetadata.standbyTopicPartitions()))
.collect(Collectors.toSet());
}

/**
* Find all currently running {@code KafkaStreams} instances (potentially remotely) that use the same
* {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all instances that belong to
* the same Kafka Streams application) and return {@link StreamsMetadata} for each discovered instance.
* <p>
* Note: this is a point in time view and it may change due to partition reassignment.
*
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
*/
public Collection<StreamsMetadata> metadataForAllStreamsClients() {
validateIsRunningOrRebalancing();
return streamsMetadataState.getAllMetadata();
}
Expand All @@ -1469,8 +1489,36 @@ public Collection<StreamsMetadata> allMetadata() {
* @param storeName the {@code storeName} to find metadata for
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of
* this application
* @deprecated since 3.0.0 use {@link KafkaStreams#streamsMetadataForStore} instead
*/
public Collection<StreamsMetadata> allMetadataForStore(final String storeName) {
@Deprecated
public Collection<org.apache.kafka.streams.state.StreamsMetadata> allMetadataForStore(final String storeName) {
validateIsRunningOrRebalancing();
return streamsMetadataState.getAllMetadataForStore(storeName).stream().map(streamsMetadata ->
new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),
streamsMetadata.stateStoreNames(),
streamsMetadata.topicPartitions(),
streamsMetadata.standbyStateStoreNames(),
streamsMetadata.standbyTopicPartitions()))
.collect(Collectors.toSet());
}

/**
* Find all currently running {@code KafkaStreams} instances (potentially remotely) that
* <ul>
* <li>use the same {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all
* instances that belong to the same Kafka Streams application)</li>
* <li>and that contain a {@link StateStore} with the given {@code storeName}</li>
* </ul>
* and return {@link StreamsMetadata} for each discovered instance.
* <p>
* Note: this is a point in time view and it may change due to partition reassignment.
*
* @param storeName the {@code storeName} to find metadata for
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of
* this application
*/
public Collection<StreamsMetadata> streamsMetadataForStore(final String storeName) {
validateIsRunningOrRebalancing();
return streamsMetadataState.getAllMetadataForStore(storeName);
}
Expand Down Expand Up @@ -1549,12 +1597,45 @@ private void processStreamThread(final Consumer<StreamThread> consumer) {
for (final StreamThread thread : copy) consumer.accept(thread);
}

/**
* Returns runtime information about the local threads of this {@link KafkaStreams} instance.
*
* @return the set of {@link org.apache.kafka.streams.processor.ThreadMetadata}.
* @deprecated since 3.0 use {@link #metadataForLocalThreads()}
*/
@Deprecated
@SuppressWarnings("deprecation")
public Set<org.apache.kafka.streams.processor.ThreadMetadata> localThreadsMetadata() {
return metadataForLocalThreads().stream().map(threadMetadata -> new org.apache.kafka.streams.processor.ThreadMetadata(
threadMetadata.threadName(),
threadMetadata.threadState(),
threadMetadata.consumerClientId(),
threadMetadata.restoreConsumerClientId(),
threadMetadata.producerClientIds(),
threadMetadata.adminClientId(),
threadMetadata.activeTasks().stream().map(taskMetadata -> new org.apache.kafka.streams.processor.TaskMetadata(
taskMetadata.taskId().toString(),
taskMetadata.topicPartitions(),
taskMetadata.committedOffsets(),
taskMetadata.endOffsets(),
taskMetadata.timeCurrentIdlingStarted())
).collect(Collectors.toSet()),
threadMetadata.standbyTasks().stream().map(taskMetadata -> new org.apache.kafka.streams.processor.TaskMetadata(
taskMetadata.taskId().toString(),
taskMetadata.topicPartitions(),
taskMetadata.committedOffsets(),
taskMetadata.endOffsets(),
taskMetadata.timeCurrentIdlingStarted())
).collect(Collectors.toSet())))
.collect(Collectors.toSet());
}

/**
* Returns runtime information about the local threads of this {@link KafkaStreams} instance.
*
* @return the set of {@link ThreadMetadata}.
*/
public Set<ThreadMetadata> localThreadsMetadata() {
public Set<ThreadMetadata> metadataForLocalThreads() {
final Set<ThreadMetadata> threadMetadata = new HashSet<>();
processStreamThread(thread -> {
synchronized (thread.getStateLock()) {
Expand Down
Expand Up @@ -236,7 +236,7 @@ public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern,
* K key = "some-key";
* ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* @param topic the topic name; cannot be {@code null}
Expand Down
104 changes: 104 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.state.HostInfo;

import java.util.Set;

/**
* Metadata of a Kafka Streams client.
*/
public interface StreamsMetadata {

/**
* The value of {@link StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the Streams
* client.
*
* @return {@link HostInfo} corresponding to the Streams client
*/
HostInfo hostInfo();

/**
* Names of the state stores assigned to active tasks of the Streams client.
*
* @return names of the state stores assigned to active tasks
*/
Set<String> stateStoreNames();

/**
* Source topic partitions of the active tasks of the Streams client.
*
* @return source topic partitions of the active tasks
*/
Set<TopicPartition> topicPartitions();

/**
* Changelog topic partitions for the state stores the standby tasks of the Streams client replicates.
*
* @return set of changelog topic partitions of the standby tasks
*/
Set<TopicPartition> standbyTopicPartitions();

/**
* Names of the state stores assigned to standby tasks of the Streams client.
*
* @return names of the state stores assigned to standby tasks
*/
Set<String> standbyStateStoreNames();

/**
* Host where the Streams client runs.
*
* This method is equivalent to {@code StreamsMetadata.hostInfo().host();}
*
* @return the host where the Streams client runs
*/
String host();

/**
* Port on which the Streams client listens.
*
* This method is equivalent to {@code StreamsMetadata.hostInfo().port();}
*
* @return the port on which Streams client listens
*/
int port();

/**
* Compares the specified object with this StreamsMetadata. Returns {@code true} if and only if the specified object is
* also a StreamsMetadata and for both {@code hostInfo()} are equal, and {@code stateStoreNames()}, {@code topicPartitions()},
* {@code standbyStateStoreNames()}, and {@code standbyTopicPartitions()} contain the same elements.
*
* @return {@code true} if this object is the same as the obj argument; {@code false} otherwise.
*/
boolean equals(Object o);

/**
* Returns the hash code value for this TaskMetadata. The hash code of a list is defined to be the result of the following calculation:
* <pre>
* {@code
* Objects.hash(hostInfo(), stateStoreNames(), topicPartitions(), standbyStateStoreNames(), standbyTopicPartitions());
* }
* </pre>
*
* @return a hash code value for this object.
*/
int hashCode();

}
87 changes: 87 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/TaskMetadata.java
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.TaskId;

import java.util.Map;
import java.util.Optional;
import java.util.Set;


/**
* Metadata of a task.
*/
public interface TaskMetadata {

/**
* Task ID of the task.
*
* @return task ID consisting of subtopology and partition ID
*/
TaskId taskId();

/**
* Source topic partitions of the task.
*
* @return source topic partitions
*/
Set<TopicPartition> topicPartitions();

/**
* Offsets of the source topic partitions committed so far by the task.
*
* @return map from source topic partitions to committed offsets
*/
Map<TopicPartition, Long> committedOffsets();

/**
* End offsets of the source topic partitions of the task.
*
* @return map source topic partition to end offsets
*/
Map<TopicPartition, Long> endOffsets();

/**
* Time task idling started. If the task is not currently idling it will return empty.
*
* @return time when task idling started, empty {@code Optional} if the task is currently not idling
*/
Optional<Long> timeCurrentIdlingStarted();

/**
* Compares the specified object with this TaskMetadata. Returns {@code true} if and only if the specified object is
* also a TaskMetadata and both {@code taskId()} and {@code topicPartitions()} are equal.
*
* @return {@code true} if this object is the same as the obj argument; {@code false} otherwise.
*/
boolean equals(final Object o);

/**
* Returns the hash code value for this TaskMetadata. The hash code of a list is defined to be the result of the following calculation:
* <pre>
* {@code
* Objects.hash(taskId(), topicPartitions());
* }
* </pre>
*
* @return a hash code value for this object.
*/
int hashCode();

}

0 comments on commit 6655a09

Please sign in to comment.