From 8c2022755c08828edea3ef61f35a8490d1d6af24 Mon Sep 17 00:00:00 2001
From: zhangxinyu1 <342689740@qq.com>
Date: Tue, 11 Oct 2016 17:15:09 +0800
Subject: [PATCH 1/3] add KafkaForeachWriter for structured streaming. It can
be used with ForeachSink
---
external/kafka-0-8/pom.xml | 6 ++
.../apache/spark/sql/KafkaForeachWriter.scala | 88 +++++++++++++++++++
2 files changed, 94 insertions(+)
create mode 100644 external/kafka-0-8/src/main/scala/org/apache/spark/sql/KafkaForeachWriter.scala
diff --git a/external/kafka-0-8/pom.xml b/external/kafka-0-8/pom.xml
index 91ccd4a927e98..4ea52e2a4809c 100644
--- a/external/kafka-0-8/pom.xml
+++ b/external/kafka-0-8/pom.xml
@@ -39,6 +39,12 @@
spark-streaming_${scala.binary.version}
${project.version}
provided
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+ ${project.version}
+ provided
org.apache.spark
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/sql/KafkaForeachWriter.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/sql/KafkaForeachWriter.scala
new file mode 100644
index 0000000000000..89be76757643e
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/sql/KafkaForeachWriter.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.Properties
+
+import kafka.producer.OldProducer
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.types.{StringType, StructType}
+
+class KafkaForeachWriter[T](
+ kafkaParams: Properties,
+ topics: Set[String],
+ schema: StructType) extends ForeachWriter[T] with Logging{
+ private var producer : OldProducer = null
+ override def open(partitionId: Long, version: Long): Boolean = {
+ verifySchema(schema)
+ producer = new OldProducer(kafkaParams)
+ if (producer == null) return false
+ logInfo(s"Producer created. PartitionId: ${partitionId}, version: ${version}")
+ true
+ }
+
+ override def process(value: T): Unit = {
+ logDebug("Starting to send message")
+ val data = value.asInstanceOf[Row]
+ verifySchema(data.schema)
+ val keyIsNull = if (schema.size == 1) true else false
+ if (keyIsNull) {
+ logDebug(s"Send message to kafka. Message Value: ${data.getString(0)}")
+ topics.map(producer.send(_, null, data.getString(0).getBytes()))
+ } else {
+ val messageKey: String = data.getAs[String]("key")
+ val messageValue: String = data.getAs[String]("value")
+ logDebug(s"Send message to kafka. Message Value: (${messageKey}, ${messageValue})")
+ topics.map(producer.send(_, messageKey.getBytes(), messageValue.getBytes()))
+ }
+ }
+
+ override def close(errorOrNull: Throwable): Unit = {
+ if (errorOrNull != null) logInfo(errorOrNull.getMessage)
+ logInfo("Close producer.")
+ producer.close()
+ }
+
+ private def verifySchema(schema: StructType): Unit = {
+ if (schema.size == 1) {
+ if (schema(0).dataType != StringType) {
+ throw new AnalysisException(
+ s"Producer sends messages Failed! KafkaForeachWriter supports only StringType value, " +
+ s"and you have ${schema(0).dataType} value.")
+ }
+ } else if (schema.size == 2) {
+ if (schema(0).dataType != StringType || schema(1).dataType != StringType) {
+ throw new AnalysisException(
+ s"Producer sends messages Failed! KafkaForeachWriter supports only StringType value, " +
+ s"and you have ${schema(0).dataType} and ${schema(1).dataType} value.")
+ }
+ if (schema.fieldNames.contains("key") && schema.fieldNames.contains("value")) {
+ } else {
+ throw new AnalysisException(
+ s"""Producer sends messages Failed! If there are two columns in Row, fieldsNames
+ should be "key" and "value" .
+ """)
+ }
+ } else {
+ throw new AnalysisException(
+ s"Producer sends messages Failed! KafkaForeachWriter supports only one or two columns, " +
+ s"and you have ${schema.size} columns.")
+ }
+ }
+}
From 1ad774919598aca5cf126c2c6e19fa618eaaa5d1 Mon Sep 17 00:00:00 2001
From: zhangxinyu1 <342689740@qq.com>
Date: Fri, 14 Oct 2016 18:29:33 +0800
Subject: [PATCH 2/3] Add KafkaForeachWriter to write result to kafka in
structured streaming It can be used with ForeachSink
---
external/kafka-0-8/pom.xml | 7 +
.../apache/spark/sql/KafkaForeachWriter.scala | 11 +-
.../streaming/kafka/KafkaTestUtils.scala | 2 +-
.../spark/sql/KafkaForeachWriterSuite.scala | 134 ++++++++++++++++++
4 files changed, 150 insertions(+), 4 deletions(-)
create mode 100644 external/kafka-0-8/src/test/scala/org/apache/spark/sql/KafkaForeachWriterSuite.scala
diff --git a/external/kafka-0-8/pom.xml b/external/kafka-0-8/pom.xml
index 4ea52e2a4809c..2a6fee5dff942 100644
--- a/external/kafka-0-8/pom.xml
+++ b/external/kafka-0-8/pom.xml
@@ -45,6 +45,13 @@
spark-sql_${scala.binary.version}
${project.version}
provided
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+ ${project.version}
+ test-jar
+ test
org.apache.spark
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/sql/KafkaForeachWriter.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/sql/KafkaForeachWriter.scala
index 89be76757643e..13b674448db68 100644
--- a/external/kafka-0-8/src/main/scala/org/apache/spark/sql/KafkaForeachWriter.scala
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/sql/KafkaForeachWriter.scala
@@ -20,10 +20,15 @@ package org.apache.spark.sql
import java.util.Properties
import kafka.producer.OldProducer
-
import org.apache.spark.internal.Logging
import org.apache.spark.sql.types.{StringType, StructType}
-
+/**
+ * A ForeachWriter that outputs streaming query results to kafka clusters.
+ * The streaming query results must be one or two [[StringType]] columns. When the output
+ * is a single String column, this column will be considered as value and
+ * key will be null by default. If output has two [[StringType]] columns, the fieldsName
+ * of columns should be "key" and "value".
+ */
class KafkaForeachWriter[T](
kafkaParams: Properties,
topics: Set[String],
@@ -39,7 +44,7 @@ class KafkaForeachWriter[T](
override def process(value: T): Unit = {
logDebug("Starting to send message")
- val data = value.asInstanceOf[Row]
+ val data : Row = value.asInstanceOf[Row]
verifySchema(data.schema)
val keyIsNull = if (schema.size == 1) true else false
if (keyIsNull) {
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
index 03c9ca7524e5d..5d894cf0695cc 100644
--- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
@@ -48,7 +48,7 @@ import org.apache.spark.util.Utils
*
* The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
*/
-private[kafka] class KafkaTestUtils extends Logging {
+private[spark] class KafkaTestUtils extends Logging {
// Zookeeper related configurations
private val zkHost = "localhost"
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/sql/KafkaForeachWriterSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/sql/KafkaForeachWriterSuite.scala
new file mode 100644
index 0000000000000..b93cd9b557b85
--- /dev/null
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/sql/KafkaForeachWriterSuite.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.Properties
+import scala.collection.mutable.ArrayBuffer
+
+import kafka.consumer.{ConsumerConfig, Consumer}
+import org.scalatest.concurrent.Eventually
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter}
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.streaming.kafka.KafkaTestUtils
+
+class KafkaForeachWriterSuite
+ extends SparkFunSuite
+ with BeforeAndAfter
+ with BeforeAndAfterAll
+ with Eventually
+ with SharedSQLContext{
+ import testImplicits._
+
+ private var kafkaTestUtils: KafkaTestUtils = _
+
+ override def beforeAll {
+ super.beforeAll
+ kafkaTestUtils = new KafkaTestUtils
+ kafkaTestUtils.setup()
+ }
+
+ override def afterAll {
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown()
+ kafkaTestUtils = null
+ }
+ super.afterAll
+ }
+
+ test("Basic usage: Write kafka without key"){
+ val topic = "testWithoutKey"
+ val topics = Set(topic)
+ kafkaTestUtils.createTopic(topic)
+ val topicCountMap = Map(topic -> 1)
+
+ //create streaming query, write results to kafka
+ val input = MemoryStream[String]
+ val producerProps : Properties = new Properties()
+ producerProps.setProperty("metadata.broker.list", kafkaTestUtils.brokerAddress)
+ val query = input.toDF().writeStream
+ .outputMode("append")
+ .foreach(new KafkaForeachWriter[Row](producerProps, topics, input.toDF().schema))
+ .start()
+ val source = Array("A","B","C","E","F","G")
+ input.addData(source)
+ query.processAllAvailable()
+
+ //create consumer, and consume data in kafka
+ val consumerProps: Properties = new Properties()
+ consumerProps.setProperty("zookeeper.connect", kafkaTestUtils.zkAddress)
+ consumerProps.setProperty("auto.offset.reset", "smallest")
+ consumerProps.setProperty("group.id", "spark")
+ val connector = Consumer.create(new ConsumerConfig(consumerProps))
+ val stream = connector.createMessageStreams(topicCountMap)
+ val result = ArrayBuffer[String]()
+ for (message <- stream.get(topic)) {
+ message.map(kafkaStream => {
+ val it = kafkaStream.iterator()
+ while(result.size < source.length) {
+ result += new String(it.next.message())
+ }
+ })
+ }
+ assert(result.synchronized { source === result.sorted.toArray })
+ }
+
+
+ test("Basic usage: Write kafka with key"){
+ val topic = "testWithKey"
+ kafkaTestUtils.createTopic(topic)
+ val topics = Set(topic)
+ val topicCountMap = Map(topic -> 1)
+
+ //create streaming query, write results to kafka
+ val input = MemoryStream[String]
+ val producerProps : Properties = new Properties()
+ producerProps.setProperty("metadata.broker.list", kafkaTestUtils.brokerAddress)
+ val data = input.toDS()
+ .map( word => (word,word))
+ .as[(String,String)].toDF("key","value")
+ val query = data.writeStream
+ .outputMode("append")
+ .foreach(new KafkaForeachWriter[Row](producerProps, topics, data.schema))
+ .start()
+ val source = Array("A","B","C")
+ input.addData(source)
+ query.processAllAvailable()
+
+ //create consumer, and consume data in kafka
+ val consumerProps: Properties = new Properties()
+ consumerProps.setProperty("zookeeper.connect", kafkaTestUtils.zkAddress)
+ consumerProps.setProperty("auto.offset.reset", "smallest")
+ consumerProps.setProperty("group.id", "spark")
+ val connector = Consumer.create(new ConsumerConfig(consumerProps))
+ val stream = connector.createMessageStreams(topicCountMap)
+ val result = ArrayBuffer[String]()
+ for (message <- stream.get(topic)) {
+ message.map(kafkaStream => {
+ val it = kafkaStream.iterator()
+ while(result.size < source.length) {
+ result += new String(it.next.message())
+ }
+ })
+ }
+ assert(result.synchronized { source === result.sorted.toArray })
+ }
+
+}
From b8e41e5861954f2ea9c0ca0f7b78023442e8110f Mon Sep 17 00:00:00 2001
From: zhangxinyu1 <342689740@qq.com>
Date: Fri, 14 Oct 2016 19:17:06 +0800
Subject: [PATCH 3/3] correct code-style bug
---
.../main/scala/org/apache/spark/sql/KafkaForeachWriter.scala | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/sql/KafkaForeachWriter.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/sql/KafkaForeachWriter.scala
index 13b674448db68..b78a7fff51256 100644
--- a/external/kafka-0-8/src/main/scala/org/apache/spark/sql/KafkaForeachWriter.scala
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/sql/KafkaForeachWriter.scala
@@ -77,8 +77,7 @@ class KafkaForeachWriter[T](
s"Producer sends messages Failed! KafkaForeachWriter supports only StringType value, " +
s"and you have ${schema(0).dataType} and ${schema(1).dataType} value.")
}
- if (schema.fieldNames.contains("key") && schema.fieldNames.contains("value")) {
- } else {
+ if (!schema.fieldNames.contains("key") || !schema.fieldNames.contains("value")) {
throw new AnalysisException(
s"""Producer sends messages Failed! If there are two columns in Row, fieldsNames
should be "key" and "value" .