Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@ public void shouldPassMetrics(final String topologyType, final String groupProto
streamsApplicationProperties = props(groupProtocol);
final Topology topology = topologyType.equals("simple") ? simpleTopology(false) : complexTopology();

shouldPassMetrics(topology, FIRST_INSTANCE_CLIENT);
shouldPassMetrics(topology, SECOND_INSTANCE_CLIENT);
}

private void shouldPassMetrics(final Topology topology, final int clientInstance) throws Exception {
try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);

Expand All @@ -292,8 +297,8 @@ public void shouldPassMetrics(final String topologyType, final String groupProto



final List<MetricName> consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics().stream().map(KafkaMetric::metricName).toList();
final List<MetricName> adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).toList();
final List<MetricName> consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(clientInstance).passedMetrics().stream().map(KafkaMetric::metricName).toList();
final List<MetricName> adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(clientInstance).passedMetrics.stream().map(KafkaMetric::metricName).toList();


assertEquals(streamsThreadMetrics.size(), consumerPassedStreamThreadMetricNames.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ public interface StateStore {
*/
void init(final StateStoreContext stateStoreContext, final StateStore root);


/**
* Assigns the store to a stream thread.
* <p>
* This function is called from the final stream thread,
* thus can be used to initialize resources that might require to know the running thread, e.g. metrics.
* </p>
* To access the thread use {@link Thread#currentThread()}
*/
default void assignThread() { }

/**
* Flush any cached data
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public void init(final StateStoreContext stateStoreContext,
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Override
public void assignThread() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Override
public void close() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public void initializeIfNeeded() {
throw new UnsupportedOperationException("This task is read-only");
}

@Override
public void assignThread() {
throw new UnsupportedOperationException("This task is read-only");
}

@Override
public void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) {
throw new UnsupportedOperationException("This task is read-only");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
Expand All @@ -45,8 +46,8 @@
*/
public class StandbyTask extends AbstractTask implements Task {
private final boolean eosEnabled;
private final Sensor closeTaskSensor;
private final Sensor updateSensor;
private Sensor closeTaskSensor;
private Sensor updateSensor;
private final StreamsMetricsImpl streamsMetrics;

protected final InternalProcessorContext<?, ?> processorContext;
Expand Down Expand Up @@ -83,8 +84,6 @@ public class StandbyTask extends AbstractTask implements Task {
this.streamsMetrics = streamsMetrics;
processorContext.transitionToStandby(cache);

closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics);
updateSensor = TaskMetrics.updateSensor(Thread.currentThread().getName(), id.toString(), streamsMetrics);
this.eosEnabled = config.eosEnabled;
}

Expand Down Expand Up @@ -129,6 +128,15 @@ public void initializeIfNeeded() {
}
}

@Override
public void assignThread() {
closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics);
updateSensor = TaskMetrics.updateSensor(Thread.currentThread().getName(), id.toString(), streamsMetrics);
for (final StateStore stateStore : topology.stateStores()) {
stateStore.assignThread();
}
}

@Override
public void completeRestoration(final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
throw new IllegalStateException("Standby task " + id + " should never be completing restoration");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.api.Record;
Expand Down Expand Up @@ -278,6 +279,13 @@ public void initializeIfNeeded() {
}
}

@Override
public void assignThread() {
for (final StateStore stateStore : topology.stateStores()) {
stateStore.assignThread();
}
}

public void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) {
mainConsumer.pause(partitionsForOffsetReset);
resetOffsetsForPartitions.addAll(partitionsForOffsetReset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ enum TaskType {
*/
void initializeIfNeeded();

void assignThread();

default void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ private Map<Task, Set<TopicPartition>> assignStartupTasks(final Map<TaskId, Set<
final TaskId taskId = entry.getKey();
final Task task = stateDirectory.removeStartupTask(taskId);
if (task != null) {
task.assignThread();

// replace our dummy values with the real ones, now we know our thread and assignment
final Set<TopicPartition> inputPartitions = entry.getValue();
task.stateManager().assignToStreamThread(new LogContext(threadLogPrefix), changelogReader, inputPartitions);
Expand Down Expand Up @@ -928,6 +930,7 @@ boolean tryToCompleteRestoration(final long now,
for (final Task task : tasks.allTasks()) {
try {
task.initializeIfNeeded();
task.assignThread();
task.clearTaskTimeout();
} catch (final LockException lockException) {
// it is possible that if there are multiple threads within the instance that one thread
Expand Down Expand Up @@ -1082,6 +1085,7 @@ private void addTaskToStateUpdater(final Task task) {
try {
if (canTryInitializeTask(task.id(), nowMs)) {
task.initializeIfNeeded();
task.assignThread();
taskIdToBackoffRecord.remove(task.id());
stateUpdater.add(task);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
Expand Down Expand Up @@ -243,16 +242,6 @@ public String name() {
public void init(final StateStoreContext stateStoreContext, final StateStore root) {
this.internalProcessorContext = asInternalProcessorContext(stateStoreContext);

final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(stateStoreContext);
final String threadId = Thread.currentThread().getName();
final String taskName = stateStoreContext.taskId().toString();

expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
threadId,
taskName,
metrics
);

final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position");
this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
Expand All @@ -276,6 +265,15 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo
);
}

@Override
public void assignThread() {
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
Thread.currentThread().getName(),
internalProcessorContext.taskId().toString(),
ProcessorContextUtils.metricsImpl(internalProcessorContext)
);
}

@Override
public void flush() {
segments.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
Expand Down Expand Up @@ -294,16 +293,6 @@ public String name() {
public void init(final StateStoreContext stateStoreContext, final StateStore root) {
this.internalProcessorContext = asInternalProcessorContext(stateStoreContext);

final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(stateStoreContext);
final String threadId = Thread.currentThread().getName();
final String taskName = stateStoreContext.taskId().toString();

expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
threadId,
taskName,
metrics
);

final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position");
this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
Expand All @@ -325,6 +314,15 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo
false);
}

@Override
public void assignThread() {
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
Thread.currentThread().getName(),
internalProcessorContext.taskId().toString(),
ProcessorContextUtils.metricsImpl(internalProcessorContext)
);
}

@Override
public void flush() {
segments.flush();
Expand Down Expand Up @@ -404,4 +402,4 @@ Map<S, WriteBatch> getWriteBatches(final Collection<ConsumerRecord<byte[], byte[
public Position getPosition() {
return position;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo
}
});
super.init(stateStoreContext, root);
}

@Override
public void assignThread() {
super.assignThread();
// save the stream thread as we only ever want to trigger a flush
// when the stream thread is the current thread.
streamThread = Thread.currentThread();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
Expand Down Expand Up @@ -78,6 +78,7 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {

private StateStoreContext stateStoreContext;
private final Position position;
private TaskId taskId;

InMemorySessionStore(final String name,
final long retentionPeriod,
Expand All @@ -97,22 +98,14 @@ public String name() {
public void init(final StateStoreContext stateStoreContext,
final StateStore root) {
this.stateStoreContext = stateStoreContext;
final String threadId = Thread.currentThread().getName();
final String taskName = stateStoreContext.taskId().toString();
taskId = stateStoreContext.taskId();

// The provided context is not required to implement InternalProcessorContext,
// If it doesn't, we can't record this metric.
if (stateStoreContext instanceof InternalProcessorContext) {
this.context = (InternalProcessorContext<?, ?>) stateStoreContext;
final StreamsMetricsImpl metrics = this.context.metrics();
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
threadId,
taskName,
metrics
);
} else {
this.context = null;
expiredRecordSensor = null;
}

if (root != null) {
Expand Down Expand Up @@ -140,6 +133,19 @@ public void init(final StateStoreContext stateStoreContext,
open = true;
}

@Override
public void assignThread() {
if (context != null) {
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
Thread.currentThread().getName(),
taskId.toString(),
this.context.metrics()
);
} else {
expiredRecordSensor = null;
}
}

@Override
public Position getPosition() {
return position;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,14 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo
taskId = context.taskId().toString();
streamsMetrics = context.metrics();

this.context.register(root, (RecordBatchingStateRestoreCallback) this::restoreBatch);
updateBufferMetrics();
open = true;
partition = context.taskId().partition();
}

@Override
public void assignThread() {
bufferSizeSensor = StateStoreMetrics.suppressionBufferSizeSensor(
taskId,
METRIC_SCOPE,
Expand All @@ -214,11 +222,6 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo
storeName,
streamsMetrics
);

this.context.register(root, (RecordBatchingStateRestoreCallback) this::restoreBatch);
updateBufferMetrics();
open = true;
partition = context.taskId().partition();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
Expand Down Expand Up @@ -104,15 +103,6 @@ public void init(final StateStoreContext stateStoreContext,
final StateStore root) {
this.internalProcessorContext = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext);

final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(stateStoreContext);
final String threadId = Thread.currentThread().getName();
final String taskName = stateStoreContext.taskId().toString();
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
threadId,
taskName,
metrics
);

if (root != null) {
final boolean consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
stateStoreContext.appConfigs(),
Expand Down Expand Up @@ -142,6 +132,15 @@ public void init(final StateStoreContext stateStoreContext,
open = true;
}

@Override
public void assignThread() {
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
Thread.currentThread().getName(),
internalProcessorContext.taskId().toString(),
ProcessorContextUtils.metricsImpl(internalProcessorContext)
);
}

@Override
public Position getPosition() {
return position;
Expand Down
Loading
Loading