From e65bf828a235d89c1d64468c3178e34ac6a2d804 Mon Sep 17 00:00:00 2001 From: Xinyu Liu Date: Wed, 10 May 2017 16:23:19 -0700 Subject: [PATCH] SAMZA-1283: Expose the buffered-message-size metric from BlockingEnvelopeMap --- .../samza/util/BlockingEnvelopeMap.java | 20 +++++-------------- .../samza/util/TestBlockingEnvelopeMap.java | 2 +- .../system/kafka/KafkaSystemConsumer.scala | 3 +-- 3 files changed, 7 insertions(+), 18 deletions(-) diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java index 8238d2ed3c..0205a4449e 100644 --- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java +++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java @@ -68,7 +68,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { private final ConcurrentHashMap bufferedMessagesSize; // size in bytes per SystemStreamPartition private final Map noMoreMessage; private final Clock clock; - protected final boolean fetchLimitByBytesEnabled; public BlockingEnvelopeMap() { this(new NoOpMetricsRegistry()); @@ -83,17 +82,15 @@ public long currentTimeMillis() { } public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock) { - this(metricsRegistry, clock, null, false); + this(metricsRegistry, clock, null); } - public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName, boolean fetchLimitByBytesEnabled) { + public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName) { metricsGroupName = (metricsGroupName == null) ? this.getClass().getName() : metricsGroupName; this.metrics = new BlockingEnvelopeMapMetrics(metricsGroupName, metricsRegistry); this.bufferedMessages = new ConcurrentHashMap>(); this.noMoreMessage = new ConcurrentHashMap(); this.clock = clock; - this.fetchLimitByBytesEnabled = fetchLimitByBytesEnabled; - // Created when size is disabled for code simplification, and as the overhead is negligible. this.bufferedMessagesSize = new ConcurrentHashMap(); } @@ -103,7 +100,6 @@ public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String public void register(SystemStreamPartition systemStreamPartition, String offset) { metrics.initMetrics(systemStreamPartition); bufferedMessages.putIfAbsent(systemStreamPartition, newBlockingQueue()); - // Created when size is disabled for code simplification, and the overhead is negligible. bufferedMessagesSize.putIfAbsent(systemStreamPartition, new AtomicLong(0)); } @@ -155,9 +151,7 @@ public Map> poll(Set 0) { messagesToReturn.put(systemStreamPartition, outgoingList); - if (fetchLimitByBytesEnabled) { - subtractSizeOnQDrain(systemStreamPartition, outgoingList); - } + subtractSizeOnQDrain(systemStreamPartition, outgoingList); } } @@ -183,9 +177,7 @@ private void subtractSizeOnQDrain(SystemStreamPartition systemStreamPartition, L */ protected void put(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) throws InterruptedException { bufferedMessages.get(systemStreamPartition).put(envelope); - if (fetchLimitByBytesEnabled) { - bufferedMessagesSize.get(systemStreamPartition).addAndGet(envelope.getSize()); - } + bufferedMessagesSize.get(systemStreamPartition).addAndGet(envelope.getSize()); } /** @@ -262,9 +254,7 @@ public void initMetrics(SystemStreamPartition systemStreamPartition) { this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(group, "blocking-poll-timeout-count-" + systemStreamPartition)); metricsRegistry.newGauge(group, new BufferGauge(systemStreamPartition, "buffered-message-count-" + systemStreamPartition)); - if (fetchLimitByBytesEnabled) { - metricsRegistry.newGauge(group, new BufferSizeGauge(systemStreamPartition, "buffered-message-size-" + systemStreamPartition)); - } + metricsRegistry.newGauge(group, new BufferSizeGauge(systemStreamPartition, "buffered-message-size-" + systemStreamPartition)); } public void setNoMoreMessages(SystemStreamPartition systemStreamPartition, boolean noMoreMessages) { diff --git a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java index afdae163ce..f5394c019f 100644 --- a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java +++ b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java @@ -213,7 +213,7 @@ public MockBlockingEnvelopeMap() { } public MockBlockingEnvelopeMap(boolean fetchLimitByBytesEnabled) { - super(new NoOpMetricsRegistry(), CLOCK, null, fetchLimitByBytesEnabled); + super(new NoOpMetricsRegistry(), CLOCK, null); injectedQueue = new MockQueue(); } diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index f25bb683f0..aa13fd8b53 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -127,8 +127,7 @@ private[kafka] class KafkaSystemConsumer( new Clock { def currentTimeMillis = clock() }, - classOf[KafkaSystemConsumerMetrics].getName, - fetchLimitByBytesEnabled) with Toss with Logging { + classOf[KafkaSystemConsumerMetrics].getName) with Toss with Logging { type HostPort = (String, Int) val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()