From 917eda2d98ea584e2a64fcc3b942a23b206c290f Mon Sep 17 00:00:00 2001 From: dixingxing Date: Thu, 22 Jun 2017 11:29:52 +0800 Subject: [PATCH] Set kafka clientId while fatch messages --- .../main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala index d4881b140df3c..fc00e5f93b094 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala @@ -187,7 +187,7 @@ class KafkaRDD[ } private def fetchBatch: Iterator[MessageAndOffset] = { - val req = new FetchRequestBuilder() + val req = new FetchRequestBuilder().clientId(kc.config.clientId) .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes) .build() val resp = consumer.fetch(req)