From 8692d1ca2d0e685dc752e9996637cd1189043557 Mon Sep 17 00:00:00 2001 From: Xin Wu Date: Fri, 5 Aug 2016 11:52:54 -0700 Subject: [PATCH] Support option("inferSchema", true) option for DataStreamReadder --- .../execution/datasources/DataSource.scala | 31 +++++++---- .../sql/streaming/FileStreamSourceSuite.scala | 52 +++++++++++++++++++ 2 files changed, 72 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 79024fda2f8ca..4ad112f23cae5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -174,6 +174,25 @@ case class DataSource( private def inferFileFormatSchema(format: FileFormat): StructType = { userSpecifiedSchema.orElse { val caseInsensitiveOptions = new CaseInsensitiveMap(options) + + // we need to check this inferSchema option from both places + val udpatedCaseInsensitiveOptions = + if (sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE) || + caseInsensitiveOptions.getOrElse("inferSchema", "false") == "true") { + caseInsensitiveOptions + ("inferSchema" -> "true") + } else { + val isTextSource = providingClass == classOf[text.TextFileFormat] + if(!isTextSource) { + throw new IllegalArgumentException( + "Schema must be specified when creating a streaming source DataFrame. " + + "If some files already exist in the directory, then depending on the file format " + + "you may be able to create a static DataFrame on that directory with " + + "'spark.read.load(directory)' and infer schema from it.") + } else { + caseInsensitiveOptions + } + } + val allPaths = caseInsensitiveOptions.get("path") val globbedPaths = allPaths.toSeq.flatMap { path => val hdfsPath = new Path(path) @@ -184,7 +203,7 @@ case class DataSource( val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None) format.inferSchema( sparkSession, - caseInsensitiveOptions, + udpatedCaseInsensitiveOptions, fileCatalog.allFiles()) }.getOrElse { throw new AnalysisException("Unable to infer schema. It must be specified manually.") @@ -216,16 +235,6 @@ case class DataSource( } } - val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE) - val isTextSource = providingClass == classOf[text.TextFileFormat] - // If the schema inference is disabled, only text sources require schema to be specified - if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty) { - throw new IllegalArgumentException( - "Schema must be specified when creating a streaming source DataFrame. " + - "If some files already exist in the directory, then depending on the file format " + - "you may be able to create a static DataFrame on that directory with " + - "'spark.read.load(directory)' and infer schema from it.") - } SourceInfo(s"FileSource[$path]", inferFileFormatSchema(format)) case _ => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 47260a23c7ee3..22af013c01dad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -114,6 +114,21 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext { reader.load(path) } + def createFileStreamWithOptions( + format: String, + path: String, + schema: Option[StructType] = None, + options: Map[String, String]): DataFrame = { + + val reader = + if (schema.isDefined) { + spark.readStream.format(format).schema(schema.get).options(options) + } else { + spark.readStream.format(format).options(options) + } + reader.load(path) + } + protected def getSourceFromFileStream(df: DataFrame): FileStreamSource = { val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath df.queryExecution.analyzed @@ -331,8 +346,45 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + // =============== csv file stream tests ================= + test("inforSchema option for csv file") { + withTempDirs { case (src, tmp) => + // Add a file so that we can infer its schema + stringToFile(new File(src, "existing"), "a,b\n1,2\n3,4") + createFileStreamWithOptions("csv", src.getCanonicalPath, None, Map("inferSchema" -> "true")) + } + } + + test("spark.sql.streaming.schemaInference set for csv file") { + withTempDirs { case (src, tmp) => + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { + // Add a file so that we can infer its schema + stringToFile(new File(src, "existing"), "a,b\n1,2\n3,4") + createFileStream("csv", src.getCanonicalPath, None) + } + } + } + // =============== JSON file stream tests ================ + test("inforSchema option for json") { + withTempDirs { case (src, tmp) => + // Add a file so that we can infer its schema + stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}") + createFileStreamWithOptions("json", src.getCanonicalPath, None, Map("inferSchema" -> "true")) + } + } + + test("spark.sql.streaming.schemaInference set for json file") { + withTempDirs { case (src, tmp) => + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { + // Add a file so that we can infer its schema + stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}") + createFileStream("json", src.getCanonicalPath, None) + } + } + } + test("read from json files") { withTempDirs { case (src, tmp) => val fileStream = createFileStream("json", src.getCanonicalPath, Some(valueSchema))