From 12b251c8d32d79f74f74b65eb62d08994f83da5a Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 10 Mar 2017 15:35:41 -0800 Subject: [PATCH 1/3] Log warning rather than throw exception when MemorySink or ForeachSink is used with partitioning --- .../apache/spark/sql/streaming/DataStreamWriter.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index c8fda8cd83598..f799255933e55 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.streaming import scala.collection.JavaConverters._ import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Dataset, ForeachWriter} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.execution.command.DDLUtils @@ -35,7 +36,7 @@ import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, Memory */ @Experimental @InterfaceStability.Evolving -final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { +final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends Logging { private val df = ds.toDF() @@ -218,7 +219,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { } if (source == "memory") { - assertNotPartitioned("memory") + checkNotPartitioned("memory") if (extraOptions.get("queryName").isEmpty) { throw new AnalysisException("queryName must be specified for memory sink") } @@ -238,7 +239,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { resultDf.createOrReplaceTempView(query.name) query } else if (source == "foreach") { - assertNotPartitioned("foreach") + checkNotPartitioned("foreach") val sink = new ForeachSink[T](foreachWriter)(ds.exprEnc) df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), @@ -345,9 +346,9 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { s"existing columns (${validColumnNames.mkString(", ")})")) } - private def assertNotPartitioned(operation: String): Unit = { + private def checkNotPartitioned(operation: String): Unit = { if (partitioningColumns.isDefined) { - throw new AnalysisException(s"'$operation' does not support partitioning") + logWarning(s"'$operation' does not support partitioning") } } From 7372aa509453d94db5cf267cc8006c78c1be54ef Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 10 Mar 2017 15:36:01 -0800 Subject: [PATCH 2/3] Log warning when output is partitioned though the format is console --- .../scala/org/apache/spark/sql/streaming/DataStreamWriter.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index f799255933e55..a5d337cff3c18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -252,6 +252,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends Logging { } else { val (useTempCheckpointLocation, recoverFromCheckpointLocation) = if (source == "console") { + checkNotPartitioned("console") (true, false) } else { (false, true) From 01ad04a84655d779940bdc0487a58028d1ef3771 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Tue, 14 Mar 2017 14:12:09 -0700 Subject: [PATCH 3/3] Removed a test case "check Foreach() does not support partitioning" --- .../test/DataStreamReaderWriterSuite.scala | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index f61dcdcbcf718..c97f90ea170f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -401,23 +401,6 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { } } - - test("check foreach() does not support partitioning") { - val df = spark.readStream - .format("org.apache.spark.sql.streaming.test") - .load() - val foreachWriter = new ForeachWriter[Row] { - override def open(partitionId: Long, version: Long): Boolean = false - override def process(value: Row): Unit = {} - override def close(errorOrNull: Throwable): Unit = {} - } - var w = df.writeStream.partitionBy("value") - var e = intercept[AnalysisException](w.foreach(foreachWriter).start()) - Seq("foreach", "partitioning").foreach { s => - assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) - } - } - test("ConsoleSink can be correctly loaded") { LastOptions.clear() val df = spark.readStream