From ea1d3189c8113cc2888366fa5b24579776279886 Mon Sep 17 00:00:00 2001 From: Robert Hastings Date: Thu, 31 Mar 2016 16:14:47 -0700 Subject: [PATCH] Addresses network flood from KafkaSpout to kafka server. * Allows minBytes in fetch request to be configured from KafkaConfig.fetchMinBytes. * Defaults new configuration KafkaConfig.fetchMinBytes to 1. * Defaults fetchMaxWait to 100ms instead of 10000ms. Discovered 30 megabits of traffic flowing between a set of KafkaSpouts and our kafka servers even though no Kafka messages were moving. Using the wireshark kafka dissector, we were able to see that each FetchRequest had maxWait set to 10000 and minBytes set to 0. When binBytes is set to 0 the kafka server responds immediately when there are no messages. In turn the KafkaSpout polls without any delay causing a constant stream of FetchRequest/ FetchResponse messages. Using a non-KafkaSpout client had a similar traffic pattern with two key differences 1) minBytes was 1 2) maxWait was 100 With these FetchRequest parameters and no messages flowing, the kafka server delays the FetchResponse by 100 ms. This reduces the network traffic from megabits to the low kilobits. It also reduced the CPU utilization of our kafka server from 140% to 2%. Hopefully the risk of this change is low because the old behavior can be restored using the API by setting KafkaConfig.fetchMaxWait to 10000 KafkaConfig.fetchMinBytes to 0 --- external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java | 3 ++- external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java index dd71b5a2151..3b4436008f0 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java @@ -30,7 +30,8 @@ public class KafkaConfig implements Serializable { public int fetchSizeBytes = 1024 * 1024; public int socketTimeoutMs = 10000; - public int fetchMaxWait = 10000; + public int fetchMaxWait = 100; + public int fetchMinBytes = 1; public int bufferSizeBytes = 1024 * 1024; public MultiScheme scheme = new RawMultiScheme(); public boolean ignoreZkOffsets = false; diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java index b324d795194..b9af6829870 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java @@ -159,7 +159,7 @@ public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsu int partitionId = partition.partition; FetchRequestBuilder builder = new FetchRequestBuilder(); FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes). - clientId(config.clientId).maxWait(config.fetchMaxWait).build(); + clientId(config.clientId).maxWait(config.fetchMaxWait).minBytes(config.fetchMinBytes).build(); FetchResponse fetchResponse; try { fetchResponse = consumer.fetch(fetchRequest);