From 1664dcf8bae68630a35d670fe2d30591bfa81656 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 28 Apr 2017 16:51:25 +0100 Subject: [PATCH 1/3] MINOR: Add missing metrics.log.level to Producer --- .../kafka/clients/producer/ProducerConfig.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 4bceb9528b9f0..0d24d92fabb6f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serializer; import java.util.HashMap; @@ -180,6 +181,11 @@ public class ProducerConfig extends AbstractConfig { /** metrics.num.samples */ public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG; + /** + * metrics.log.level + */ + public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG; + /** metric.reporters */ public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG; @@ -269,7 +275,6 @@ public class ProducerConfig extends AbstractConfig { MAX_REQUEST_SIZE_DOC) .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, false, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC) .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 50L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC) - .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC) .define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, @@ -297,6 +302,17 @@ public class ProducerConfig extends AbstractConfig { Importance.LOW, CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC) .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, CommonClientConfigs.METRICS_NUM_SAMPLES_DOC) + .define(METRICS_RECORDING_LEVEL_CONFIG, + Type.STRING, + Sensor.RecordingLevel.INFO.toString(), + in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()), + Importance.LOW, + CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC) + .define(METRIC_REPORTER_CLASSES_CONFIG, + Type.LIST, + "", + Importance.LOW, + CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) .define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, Type.INT, 5, From 98774af8286ef50c3c373235b4545ed2d0de00dc Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 2 May 2017 03:30:37 +0100 Subject: [PATCH 2/3] Also fix Consumer and add tests --- .../kafka/clients/consumer/KafkaConsumer.java | 6 ++- .../kafka/clients/producer/KafkaProducer.java | 4 +- .../clients/consumer/KafkaConsumerTest.java | 15 ++++++ .../clients/producer/KafkaProducerTest.java | 46 +++++++++++++------ 4 files changed, 54 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 9df674da0c125..aea303f082f3b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -41,6 +41,7 @@ import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.requests.IsolationLevel; @@ -519,6 +520,9 @@ public class KafkaConsumer implements Consumer { private static final String JMX_PREFIX = "kafka.consumer"; static final long DEFAULT_CLOSE_TIMEOUT_MS = 30 * 1000; + // Visible for testing + final Metrics metrics; + private final String clientId; private final ConsumerCoordinator coordinator; private final Deserializer keyDeserializer; @@ -528,7 +532,6 @@ public class KafkaConsumer implements Consumer { private final Time time; private final ConsumerNetworkClient client; - private final Metrics metrics; private final SubscriptionState subscriptions; private final Metadata metadata; private final long retryBackoffMs; @@ -626,6 +629,7 @@ private KafkaConsumer(ConsumerConfig config, metricsTags.put("client-id", clientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) + .recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG))) .tags(metricsTags); List reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 286387ba4bb65..c8155132dded3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -149,13 +149,14 @@ public class KafkaProducer implements Producer { private static final String JMX_PREFIX = "kafka.producer"; private String clientId; + // Visible for testing + final Metrics metrics; private final Partitioner partitioner; private final int maxRequestSize; private final long totalMemorySize; private final Metadata metadata; private final RecordAccumulator accumulator; private final Sender sender; - private final Metrics metrics; private final Thread ioThread; private final CompressionType compressionType; private final Sensor errors; @@ -234,6 +235,7 @@ private KafkaProducer(ProducerConfig config, Serializer keySerializer, Serial metricTags.put("client-id", clientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) + .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG))) .tags(metricTags); List reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index a598b5da873d6..5928a2864e58e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -1234,6 +1235,20 @@ public void closeShouldBeIdempotent() { consumer.close(); } + @Test + public void testMetricConfigRecordingLevel() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + try (KafkaConsumer consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + assertEquals(Sensor.RecordingLevel.INFO, consumer.metrics.config().recordLevel()); + } + + props.put(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); + try (KafkaConsumer consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + assertEquals(Sensor.RecordingLevel.DEBUG, consumer.metrics.config().recordLevel()); + } + } + private void consumerCloseTest(final long closeTimeoutMs, List responses, long waitMs, diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index ea493d2253841..3a6426a13afe3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.ExtendedSerializer; @@ -53,6 +54,7 @@ import java.util.Map; import java.util.Properties; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -87,9 +89,9 @@ public void testConstructorFailureCloseResource() { KafkaProducer producer = new KafkaProducer( props, new ByteArraySerializer(), new ByteArraySerializer()); } catch (KafkaException e) { - Assert.assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get()); - Assert.assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get()); - Assert.assertEquals("Failed to construct kafka producer", e.getMessage()); + assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get()); + assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get()); + assertEquals("Failed to construct kafka producer", e.getMessage()); return; } fail("should have caught an exception and returned"); @@ -107,12 +109,12 @@ public void testSerializerClose() throws Exception { KafkaProducer producer = new KafkaProducer( configs, new MockSerializer(), new MockSerializer()); - Assert.assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get()); - Assert.assertEquals(oldCloseCount, MockSerializer.CLOSE_COUNT.get()); + assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get()); + assertEquals(oldCloseCount, MockSerializer.CLOSE_COUNT.get()); producer.close(); - Assert.assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get()); - Assert.assertEquals(oldCloseCount + 2, MockSerializer.CLOSE_COUNT.get()); + assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get()); + assertEquals(oldCloseCount + 2, MockSerializer.CLOSE_COUNT.get()); } @Test @@ -126,15 +128,15 @@ public void testInterceptorConstructClose() throws Exception { KafkaProducer producer = new KafkaProducer( props, new StringSerializer(), new StringSerializer()); - Assert.assertEquals(1, MockProducerInterceptor.INIT_COUNT.get()); - Assert.assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get()); + assertEquals(1, MockProducerInterceptor.INIT_COUNT.get()); + assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get()); // Cluster metadata will only be updated on calling onSend. Assert.assertNull(MockProducerInterceptor.CLUSTER_META.get()); producer.close(); - Assert.assertEquals(1, MockProducerInterceptor.INIT_COUNT.get()); - Assert.assertEquals(1, MockProducerInterceptor.CLOSE_COUNT.get()); + assertEquals(1, MockProducerInterceptor.INIT_COUNT.get()); + assertEquals(1, MockProducerInterceptor.CLOSE_COUNT.get()); } finally { // cleanup since we are using mutable static variables in MockProducerInterceptor MockProducerInterceptor.resetCounters(); @@ -150,12 +152,12 @@ public void testPartitionerClose() throws Exception { KafkaProducer producer = new KafkaProducer( props, new StringSerializer(), new StringSerializer()); - Assert.assertEquals(1, MockPartitioner.INIT_COUNT.get()); - Assert.assertEquals(0, MockPartitioner.CLOSE_COUNT.get()); + assertEquals(1, MockPartitioner.INIT_COUNT.get()); + assertEquals(0, MockPartitioner.CLOSE_COUNT.get()); producer.close(); - Assert.assertEquals(1, MockPartitioner.INIT_COUNT.get()); - Assert.assertEquals(1, MockPartitioner.CLOSE_COUNT.get()); + assertEquals(1, MockPartitioner.INIT_COUNT.get()); + assertEquals(1, MockPartitioner.CLOSE_COUNT.get()); } finally { // cleanup since we are using mutable static variables in MockPartitioner MockPartitioner.resetCounters(); @@ -418,4 +420,18 @@ public void closeShouldBeIdempotent() { producer.close(); } + @Test + public void testMetricConfigRecordingLevel() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + try (KafkaProducer producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) { + assertEquals(Sensor.RecordingLevel.INFO, producer.metrics.config().recordLevel()); + } + + props.put(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); + try (KafkaProducer producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) { + assertEquals(Sensor.RecordingLevel.DEBUG, producer.metrics.config().recordLevel()); + } + } + } From b3146c11b64622a2d3452e30dcd748954bf2198b Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 2 May 2017 02:55:45 +0100 Subject: [PATCH 3/3] A few clean-ups --- .../kafka/clients/consumer/KafkaConsumer.java | 4 +--- .../kafka/clients/producer/KafkaProducer.java | 4 +--- .../apache/kafka/common/network/Selector.java | 20 ++++++++++--------- 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index aea303f082f3b..aad445378063f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -59,7 +59,6 @@ import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -625,8 +624,7 @@ private KafkaConsumer(ConsumerConfig config, if (clientId.length() <= 0) clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement(); this.clientId = clientId; - Map metricsTags = new LinkedHashMap<>(); - metricsTags.put("client-id", clientId); + Map metricsTags = Collections.singletonMap("client-id", clientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) .recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG))) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index c8155132dded3..e6adc5eff2669 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -64,7 +64,6 @@ import java.net.InetSocketAddress; import java.util.Collections; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -231,8 +230,7 @@ private KafkaProducer(ProducerConfig config, Serializer keySerializer, Serial clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); if (clientId.length() <= 0) clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); - Map metricTags = new LinkedHashMap(); - metricTags.put("client-id", clientId); + Map metricTags = Collections.singletonMap("client-id", clientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG))) diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 312e1f51ced8d..a74a58463ce17 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -26,6 +26,7 @@ import java.nio.channels.UnresolvedAddressException; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collections; import java.util.Deque; import java.util.HashMap; import java.util.HashSet; @@ -96,11 +97,8 @@ public class Selector implements Selectable, AutoCloseable { private final List failedSends; private final Time time; private final SelectorMetrics sensors; - private final String metricGrpPrefix; - private final Map metricTags; private final ChannelBuilder channelBuilder; private final int maxReceiveSize; - private final boolean metricsPerConnection; private final boolean recordTimePerConnection; private final IdleExpiryManager idleExpiryManager; @@ -132,8 +130,6 @@ public Selector(int maxReceiveSize, } this.maxReceiveSize = maxReceiveSize; this.time = time; - this.metricGrpPrefix = metricGrpPrefix; - this.metricTags = metricTags; this.channels = new HashMap<>(); this.completedSends = new ArrayList<>(); this.completedReceives = new ArrayList<>(); @@ -143,9 +139,8 @@ public Selector(int maxReceiveSize, this.connected = new ArrayList<>(); this.disconnected = new ArrayList<>(); this.failedSends = new ArrayList<>(); - this.sensors = new SelectorMetrics(metrics); + this.sensors = new SelectorMetrics(metrics, metricGrpPrefix, metricTags, metricsPerConnection); this.channelBuilder = channelBuilder; - this.metricsPerConnection = metricsPerConnection; this.recordTimePerConnection = recordTimePerConnection; this.idleExpiryManager = connectionMaxIdleMs < 0 ? null : new IdleExpiryManager(time, connectionMaxIdleMs); } @@ -162,7 +157,7 @@ public Selector(int maxReceiveSize, } public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) { - this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, new HashMap(), true, channelBuilder); + this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, Collections.emptyMap(), true, channelBuilder); } /** @@ -679,6 +674,10 @@ private void addToCompletedReceives(KafkaChannel channel, Deque private class SelectorMetrics { private final Metrics metrics; + private final String metricGrpPrefix; + private final Map metricTags; + private final boolean metricsPerConnection; + public final Sensor connectionClosed; public final Sensor connectionCreated; public final Sensor bytesTransferred; @@ -691,8 +690,11 @@ private class SelectorMetrics { private final List topLevelMetricNames = new ArrayList<>(); private final List sensors = new ArrayList<>(); - public SelectorMetrics(Metrics metrics) { + public SelectorMetrics(Metrics metrics, String metricGrpPrefix, Map metricTags, boolean metricsPerConnection) { this.metrics = metrics; + this.metricGrpPrefix = metricGrpPrefix; + this.metricTags = metricTags; + this.metricsPerConnection = metricsPerConnection; String metricGrpName = metricGrpPrefix + "-metrics"; StringBuilder tagsSuffix = new StringBuilder();