Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,7 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
metrics,
clientId,
processId.toString(),
applicationId,
time
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public int hashCode() {
private final Map<Sensor, Sensor> parentSensors;
private final String clientId;
private final String processId;
private final String applicationId;

private final Version version;
private final Deque<MetricName> clientLevelMetrics = new LinkedList<>();
Expand Down Expand Up @@ -118,6 +119,7 @@ public int hashCode() {

public static final String CLIENT_ID_TAG = "client-id";
public static final String PROCESS_ID_TAG = "process-id";
public static final String APPLICATION_ID_TAG = "application-id";
public static final String THREAD_ID_TAG = "thread-id";
public static final String TASK_ID_TAG = "task-id";
public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
Expand Down Expand Up @@ -168,11 +170,13 @@ public int hashCode() {
public StreamsMetricsImpl(final Metrics metrics,
final String clientId,
final String processId,
final String applicationId,
final Time time) {
Objects.requireNonNull(metrics, "Metrics cannot be null");
this.metrics = metrics;
this.clientId = clientId;
this.processId = processId;
this.applicationId = applicationId;
version = Version.LATEST;
rocksDBMetricsRecordingTrigger = new RocksDBMetricsRecordingTrigger(time);

Expand Down Expand Up @@ -284,6 +288,7 @@ public Map<String, String> clientLevelTagMap() {
final Map<String, String> tagMap = new LinkedHashMap<>();
tagMap.put(CLIENT_ID_TAG, clientId);
tagMap.put(PROCESS_ID_TAG, processId);
tagMap.put(APPLICATION_ID_TAG, applicationId);
return tagMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class KStreamSessionWindowAggregateProcessorTest {

private final MockTime time = new MockTime();
private final Metrics metrics = new Metrics();
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", "processId", time);
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", time);
private final String threadId = Thread.currentThread().getName();
private final Initializer<Long> initializer = () -> 0L;
private final Aggregator<String, String, Long> aggregator = (aggKey, value, aggregate) -> aggregate + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class ActiveTaskCreatorTest {
private ChangelogReader changeLogReader;

private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", "processId", new MockTime());
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", "processId", "applicationId", new MockTime());
private final Map<String, Object> properties = mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class DefaultStateUpdaterTest {

// need an auto-tick timer to work for draining with timeout
private final Time time = new MockTime(1L);
private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", "", time);
private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", "", "", time);
private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL));
private final ChangelogReader changelogReader = mock(ChangelogReader.class);
private final TopologyMetadata topologyMetadata = unnamedTopology().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void process(final Record<Object, Object> record) {
mockConsumer,
new StateDirectory(config, time, true, false),
0,
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", time),
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", time),
time,
"clientId",
stateRestoreListener,
Expand Down Expand Up @@ -169,7 +169,7 @@ public List<PartitionInfo> partitionsFor(final String topic) {
mockConsumer,
new StateDirectory(config, time, true, false),
0,
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", time),
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", time),
time,
"clientId",
stateRestoreListener,
Expand Down Expand Up @@ -418,7 +418,7 @@ public List<PartitionInfo> partitionsFor(final String topic) {
consumer,
new StateDirectory(config, time, true, false),
0,
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", time),
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", time),
time,
"clientId",
stateRestoreListener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@
public class MockStreamsMetrics extends StreamsMetricsImpl {

public MockStreamsMetrics(final Metrics metrics) {
super(metrics, "test", "processId", new MockTime());
super(metrics, "test", "processId", "applicationId", new MockTime());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ public Set<TopicPartition> partitions() {
public void testMetricsWithBuiltInMetricsVersionLatest() {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime());
new StreamsMetricsImpl(metrics, "test-client", "processId", "applicationId", new MockTime());
final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
final ProcessorNode<Object, Object, Object, Object> node =
new ProcessorNode<>(NAME, new NoOpProcessor(), Collections.emptySet());
Expand Down Expand Up @@ -363,7 +363,7 @@ public void process(final Record<Object, Object> record) {
public void testTopologyLevelClassCastExceptionDirect() {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime());
new StreamsMetricsImpl(metrics, "test-client", "processId", "applicationId", new MockTime());
final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
final ProcessorNode<Object, Object, Object, Object> node =
new ProcessorNode<>("pname", new ClassCastProcessor(), Collections.emptySet());
Expand Down Expand Up @@ -441,7 +441,7 @@ private InternalProcessorContext<Object, Object> mockInternalProcessorContext()
final InternalProcessorContext<Object, Object> internalProcessorContext = mock(InternalProcessorContext.class, withSettings().strictness(Strictness.LENIENT));

when(internalProcessorContext.taskId()).thenReturn(TASK_ID);
when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime()));
when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", new MockTime()));
when(internalProcessorContext.topic()).thenReturn(TOPIC);
when(internalProcessorContext.partition()).thenReturn(PARTITION);
when(internalProcessorContext.offset()).thenReturn(OFFSET);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class RecordQueueTest {

private final Metrics metrics = new Metrics();
private final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "mock", "processId", new MockTime());
new StreamsMetricsImpl(metrics, "mock", "processId", "applicationId", new MockTime());

final InternalMockProcessorContext<Integer, Integer> context = new InternalMockProcessorContext<>(
StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public String deserialize(final String topic, final byte[] data) {
public void shouldExposeProcessMetrics() {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime());
new StreamsMetricsImpl(metrics, "test-client", "processId", "applicationId", new MockTime());
final InternalMockProcessorContext<String, String> context = new InternalMockProcessorContext<>(streamsMetrics);
final SourceNode<String, String> node =
new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public class StandbyTaskTest {

private final MockTime time = new MockTime();
private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), time);
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName, "processId", time);
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName, "processId", "applicationId", time);

private File baseDir;
private StreamsConfig config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,7 @@ private StateStore initializeStartupTasks(final TaskId taskId, final boolean cre
Mockito.when(metadata.buildSubtopology(ArgumentMatchers.any())).thenReturn(processorTopology);
Mockito.when(metadata.taskConfig(ArgumentMatchers.any())).thenReturn(topologyConfig.getTaskConfig());

directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new Metrics(), "test", "processId", time), new LogContext("test"));
directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new Metrics(), "test", "processId", "applicationId", time), new LogContext("test"));

return store;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2604,7 +2604,7 @@ public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() {
streamsMetrics,
null
);
final StreamsMetricsImpl metrics = new StreamsMetricsImpl(this.metrics, "test", "processId", time);
final StreamsMetricsImpl metrics = new StreamsMetricsImpl(this.metrics, "test", "processId", "applicationId", time);

// The processor topology is missing the topics
final ProcessorTopology topology = withSources(emptyList(), mkMap());
Expand Down Expand Up @@ -3238,7 +3238,7 @@ private StreamTask createSingleSourceStateless(final StreamsConfig config) {
topology,
consumer,
new TopologyConfig(null, config, new Properties()).getTaskConfig(),
new StreamsMetricsImpl(metrics, "test", "processId", time),
new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", time),
stateDirectory,
cache,
time,
Expand Down Expand Up @@ -3275,7 +3275,7 @@ private StreamTask createStatelessTask(final StreamsConfig config) {
topology,
consumer,
new TopologyConfig(null, config, new Properties()).getTaskConfig(),
new StreamsMetricsImpl(metrics, "test", "processId", time),
new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", time),
stateDirectory,
cache,
time,
Expand Down Expand Up @@ -3311,7 +3311,7 @@ private StreamTask createStatelessTaskWithForwardingTopology(final SourceNode<In
topology,
consumer,
new TopologyConfig(null, config, new Properties()).getTaskConfig(),
new StreamsMetricsImpl(metrics, "test", "processId", time),
new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", time),
stateDirectory,
cache,
time,
Expand Down
Loading