Skip to content

Commit

Permalink
[SPARK-7396] [STREAMING] [EXAMPLE] Update KafkaWordCountProducer to u…
Browse files Browse the repository at this point in the history
…se new Producer API

Otherwise it will throw exception:

```
Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
	at kafka.producer.Producer.send(Producer.scala:77)
	at org.apache.spark.examples.streaming.KafkaWordCountProducer$.main(KafkaWordCount.scala:96)
	at org.apache.spark.examples.streaming.KafkaWordCountProducer.main(KafkaWordCount.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:623)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
```

Author: jerryshao <saisai.shao@intel.com>

Closes #5936 from jerryshao/SPARK-7396 and squashes the following commits:

270bbe2 [jerryshao] Fix Kafka Produce throw Exception issue
  • Loading branch information
jerryshao authored and tdas committed May 7, 2015
1 parent 4e93042 commit 316a5c0
Showing 1 changed file with 13 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.spark.examples.streaming

import java.util.Properties
import java.util.HashMap

import kafka.producer._
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
Expand Down Expand Up @@ -77,23 +77,25 @@ object KafkaWordCountProducer {
val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

// Zookeeper connection properties
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")

val config = new ProducerConfig(props)
val producer = new Producer[String, String](config)
val producer = new KafkaProducer[String, String](props)

// Send some messages
while(true) {
val messages = (1 to messagesPerSec.toInt).map { messageNum =>
(1 to messagesPerSec.toInt).foreach { messageNum =>
val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
.mkString(" ")

new KeyedMessage[String, String](topic, str)
}.toArray
val message = new ProducerRecord[String, String](topic, null, str)
producer.send(message)
}

producer.send(messages: _*)
Thread.sleep(100)
}
}
Expand Down

0 comments on commit 316a5c0

Please sign in to comment.