From e2b670af618958610a3002dcc278930f35c557cb Mon Sep 17 00:00:00 2001 From: Bill Chambers Date: Thu, 27 Apr 2017 11:52:54 -0700 Subject: [PATCH 1/6] only look at analyzed plan --- .../main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala index a637d52c933a3..0ba461ee874a2 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala @@ -47,7 +47,8 @@ private[kafka010] object KafkaWriter extends Logging { queryExecution: QueryExecution, kafkaParameters: ju.Map[String, Object], topic: Option[String] = None): Unit = { - val schema = queryExecution.logical.output + queryExecution.assertAnalyzed() + val schema = queryExecution.analyzed.output schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse( if (topic == None) { throw new AnalysisException(s"topic option required when no " + From 703513c10aaf54fb4311f560236610e8a5e1c8be Mon Sep 17 00:00:00 2001 From: Bill Chambers Date: Thu, 27 Apr 2017 11:55:35 -0700 Subject: [PATCH 2/6] remove assert --- .../main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala index 0ba461ee874a2..919bbed040713 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala @@ -47,7 +47,6 @@ private[kafka010] object KafkaWriter extends Logging { queryExecution: QueryExecution, kafkaParameters: ju.Map[String, Object], topic: Option[String] = None): Unit = { - queryExecution.assertAnalyzed() val schema = queryExecution.analyzed.output schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse( if (topic == None) { From ec9e091b02c914f960b25af7e482470d9053f749 Mon Sep 17 00:00:00 2001 From: Bill Chambers Date: Thu, 27 Apr 2017 12:46:19 -0700 Subject: [PATCH 3/6] enforce analyzed test --- .../spark/sql/kafka010/KafkaSinkSuite.scala | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index 4bd052d249eca..a95be6670eeee 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -108,6 +108,31 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { s"save mode overwrite not allowed for kafka")) } + test("batch - enforce analyzed plans SPARK-20496") { + val df = Seq(1, 1).toDF("key").selectExpr("key", "2 as value") + /* bad dataframe plan */ + val input = df.union(df) + val topic = newTopic() + testUtils.createTopic(topic) + + /* No topic field or topic option */ + var writer: StreamingQuery = null + var ex: Exception = null + try { + ex = intercept[AnalysisException] { + writer = createKafkaWriter(input.toDF())( + withSelectExpr = "value as key", "value" + ) + writer.processAllAvailable() + } + } finally { + } + assert(ex.getMessage + .toLowerCase(Locale.ROOT) + .contains("can be called only on streaming")) + + } + test("streaming - write to kafka with topic field") { val input = MemoryStream[String] val topic = newTopic() From 7f5ecb76d897df712ce2382e0a6077004d1c2a10 Mon Sep 17 00:00:00 2001 From: Bill Chambers Date: Thu, 27 Apr 2017 13:18:20 -0700 Subject: [PATCH 4/6] fix try block --- .../scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index a95be6670eeee..2e6e25204b74f 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -118,15 +118,13 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { /* No topic field or topic option */ var writer: StreamingQuery = null var ex: Exception = null - try { ex = intercept[AnalysisException] { writer = createKafkaWriter(input.toDF())( withSelectExpr = "value as key", "value" ) writer.processAllAvailable() } - } finally { - } + println(ex.getMessage) assert(ex.getMessage .toLowerCase(Locale.ROOT) .contains("can be called only on streaming")) From a3c155663733bddc54811e44ed717ce641eb454f Mon Sep 17 00:00:00 2001 From: Bill Chambers Date: Thu, 27 Apr 2017 16:02:07 -0700 Subject: [PATCH 5/6] two tests for it --- .../spark/sql/kafka010/KafkaWriter.scala | 2 +- .../spark/sql/kafka010/KafkaSinkSuite.scala | 41 ++++++++++++------- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala index 919bbed040713..61936e32fd837 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala @@ -84,7 +84,7 @@ private[kafka010] object KafkaWriter extends Logging { queryExecution: QueryExecution, kafkaParameters: ju.Map[String, Object], topic: Option[String] = None): Unit = { - val schema = queryExecution.logical.output + val schema = queryExecution.analyzed.output validateQuery(queryExecution, kafkaParameters, topic) SQLExecution.withNewExecutionId(sparkSession, queryExecution) { queryExecution.toRdd.foreachPartition { iter => diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index 2e6e25204b74f..e7dac263bfe03 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{BinaryType, DataType} class KafkaSinkSuite extends StreamTest with SharedSQLContext { + import testImplicits._ protected var testUtils: KafkaTestUtils = _ @@ -109,25 +110,35 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { } test("batch - enforce analyzed plans SPARK-20496") { - val df = Seq(1, 1).toDF("key").selectExpr("key", "2 as value") + val input = Seq(1, 1).toDF("key").selectExpr("key", "2 as value") /* bad dataframe plan */ - val input = df.union(df) + val invalidInput = input.union(input) val topic = newTopic() testUtils.createTopic(topic) /* No topic field or topic option */ var writer: StreamingQuery = null var ex: Exception = null - ex = intercept[AnalysisException] { - writer = createKafkaWriter(input.toDF())( - withSelectExpr = "value as key", "value" - ) - writer.processAllAvailable() - } - println(ex.getMessage) + // for improperly formatted DataFrame + ex = intercept[AnalysisException] { + writer = createKafkaWriter(invalidInput)( + withSelectExpr = "value as key", "value" + ) + writer.processAllAvailable() + } assert(ex.getMessage .toLowerCase(Locale.ROOT) .contains("can be called only on streaming")) + // for invalid Select Expressions + ex = intercept[AnalysisException] { + writer = createKafkaWriter(input)( + withSelectExpr = "val as key", "value" + ) + writer.processAllAvailable() + } + assert(ex.getMessage + .toLowerCase(Locale.ROOT) + .contains("cannot resolve")) } @@ -213,7 +224,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { withTopic = Some(topic), withOutputMode = Some(OutputMode.Update()))( withSelectExpr = "'foo' as topic", - "CAST(value as STRING) key", "CAST(count as STRING) value") + "CAST(value as STRING) key", "CAST(count as STRING) value") val reader = createKafkaReader(topic) .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") @@ -412,11 +423,11 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { } private def createKafkaWriter( - input: DataFrame, - withTopic: Option[String] = None, - withOutputMode: Option[OutputMode] = None, - withOptions: Map[String, String] = Map[String, String]()) - (withSelectExpr: String*): StreamingQuery = { + input: DataFrame, + withTopic: Option[String] = None, + withOutputMode: Option[OutputMode] = None, + withOptions: Map[String, String] = Map[String, String]()) + (withSelectExpr: String*): StreamingQuery = { var stream: DataStreamWriter[Row] = null withTempDir { checkpointDir => var df = input.toDF() From 4a426425cadbaa7b143751dd214d97c7633045b3 Mon Sep 17 00:00:00 2001 From: Bill Chambers Date: Thu, 27 Apr 2017 16:07:59 -0700 Subject: [PATCH 6/6] formatting --- .../apache/spark/sql/kafka010/KafkaSinkSuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index e7dac263bfe03..30e702b52f68d 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -224,7 +224,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { withTopic = Some(topic), withOutputMode = Some(OutputMode.Update()))( withSelectExpr = "'foo' as topic", - "CAST(value as STRING) key", "CAST(count as STRING) value") + "CAST(value as STRING) key", "CAST(count as STRING) value") val reader = createKafkaReader(topic) .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") @@ -423,11 +423,11 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { } private def createKafkaWriter( - input: DataFrame, - withTopic: Option[String] = None, - withOutputMode: Option[OutputMode] = None, - withOptions: Map[String, String] = Map[String, String]()) - (withSelectExpr: String*): StreamingQuery = { + input: DataFrame, + withTopic: Option[String] = None, + withOutputMode: Option[OutputMode] = None, + withOptions: Map[String, String] = Map[String, String]()) + (withSelectExpr: String*): StreamingQuery = { var stream: DataStreamWriter[Row] = null withTempDir { checkpointDir => var df = input.toDF()