From 1fc4a954c08296ed24ca90b7dde0f1cfaedf572f Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 16 Jun 2017 16:22:25 +0200 Subject: [PATCH 1/3] [SPARK-19909][SS] Disabling the usage of a temporary directory for the checkpoint location if the temporary directory is on a filesystem different from the default one. --- .../sql/streaming/DataStreamWriter.scala | 23 +++++++++++++--- .../spark/sql/streaming/StreamSuite.scala | 27 +++++++++++++++++++ 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 14e7df672cc58..a09df1a47670d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -17,8 +17,10 @@ package org.apache.spark.sql.streaming +import java.net.URI import java.util.Locale +import org.apache.hadoop.fs.FileSystem import scala.collection.JavaConverters._ import org.apache.spark.annotation.InterfaceStability @@ -235,6 +237,21 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { "write files of Hive data source directly.") } + val hadoopConf = df.sparkSession.sessionState.newHadoopConf() + val defaultFS = FileSystem.getDefaultUri(hadoopConf).getScheme + val tmpFS = new URI(System.getProperty("java.io.tmpdir")).getScheme + + val isTempCheckpointLocationAvailable = tmpFS match { + case null | "file" => + if (defaultFS == null || defaultFS.equals("file")) { + true + } else { + false + } + case defaultFS => true + case _ => false + } + if (source == "memory") { assertNotPartitioned("memory") if (extraOptions.get("queryName").isEmpty) { @@ -250,7 +267,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { df, sink, outputMode, - useTempCheckpointLocation = true, + useTempCheckpointLocation = isTempCheckpointLocationAvailable, recoverFromCheckpointLocation = recoverFromChkpoint, trigger = trigger) resultDf.createOrReplaceTempView(query.name) @@ -264,12 +281,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { df, sink, outputMode, - useTempCheckpointLocation = true, + useTempCheckpointLocation = isTempCheckpointLocationAvailable, trigger = trigger) } else { val (useTempCheckpointLocation, recoverFromCheckpointLocation) = if (source == "console") { - (true, false) + (isTempCheckpointLocationAvailable, false) } else { (false, true) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 4ede4fd9a035e..58ef08e6638c8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -25,6 +25,7 @@ import scala.util.control.ControlThrowable import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem import org.apache.spark.SparkContext import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} @@ -553,6 +554,32 @@ class StreamSuite extends StreamTest { } } + test("SPARK-19909: if the checkpoint location is not set and the default filesystem " + + "is different from the java.io.tmp one an AnalysisException should be thrown") { + + val defaultFS = spark.conf.getOption(FileSystem.FS_DEFAULT_NAME_KEY) + spark.conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs:///") + + val input = MemoryStream[Int] + var streamingQuery: StreamingQuery = null + input.addData(1) + try { + intercept[AnalysisException]( + streamingQuery = input.toDF().writeStream.format("console").start() + ) + } finally { + if (streamingQuery ne null) { + streamingQuery.stop() + } + // Restore previous state + if (defaultFS.isEmpty) { + spark.conf.unset(FileSystem.FS_DEFAULT_NAME_KEY) + } else { + spark.conf.set(FileSystem.FS_DEFAULT_NAME_KEY, defaultFS.get) + } + } + } + test("calling stop() on a query cancels related jobs") { val input = MemoryStream[Int] val query = input From 7c4c785e9901e69e7b3c5aee53ec171cfef5a11a Mon Sep 17 00:00:00 2001 From: mark91 Date: Thu, 22 Jun 2017 09:39:55 +0200 Subject: [PATCH 2/3] [SPARK-19909][SS] Fix imports ordering to respect scalastyle. --- .../org/apache/spark/sql/streaming/DataStreamWriter.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index a09df1a47670d..5ce7fe2893fd5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -20,9 +20,10 @@ package org.apache.spark.sql.streaming import java.net.URI import java.util.Locale -import org.apache.hadoop.fs.FileSystem import scala.collection.JavaConverters._ +import org.apache.hadoop.fs.FileSystem + import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.{AnalysisException, Dataset, ForeachWriter} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes @@ -30,6 +31,7 @@ import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink} + /** * Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems, * key-value stores, etc). Use `Dataset.writeStream` to access this. From 4a56ecf184de9d19938dd881e1634bef609e7927 Mon Sep 17 00:00:00 2001 From: mark91 Date: Fri, 23 Jun 2017 10:59:39 +0200 Subject: [PATCH 3/3] [SPARK-19909][SS] Fix code style. --- .../org/apache/spark/sql/streaming/DataStreamWriter.scala | 5 ++--- .../scala/org/apache/spark/sql/streaming/StreamSuite.scala | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 5ce7fe2893fd5..13631f9849cc5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.streaming -import java.net.URI import java.util.Locale import scala.collection.JavaConverters._ @@ -30,7 +29,7 @@ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink} - +import org.apache.spark.util.Utils /** * Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems, @@ -241,7 +240,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { val hadoopConf = df.sparkSession.sessionState.newHadoopConf() val defaultFS = FileSystem.getDefaultUri(hadoopConf).getScheme - val tmpFS = new URI(System.getProperty("java.io.tmpdir")).getScheme + val tmpFS = Utils.resolveURI(System.getProperty("java.io.tmpdir")).getScheme val isTempCheckpointLocationAvailable = tmpFS match { case null | "file" => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 58ef08e6638c8..47947498268b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -568,7 +568,7 @@ class StreamSuite extends StreamTest { streamingQuery = input.toDF().writeStream.format("console").start() ) } finally { - if (streamingQuery ne null) { + if (streamingQuery != null) { streamingQuery.stop() } // Restore previous state