From 8e4b94ffc4222464d3aa0a33e794edf946f926c0 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 18 Jul 2017 13:37:56 -0700 Subject: [PATCH 1/6] Allow userSpecifiedSchema to override partition inference performed by MetadataLogFileIndex --- .../execution/datasources/DataSource.scala | 40 ++++++++++++++----- .../streaming/FileStreamSource.scala | 2 +- .../streaming/MetadataLogFileIndex.scala | 33 ++++++++++++--- .../ParquetPartitionDiscoverySuite.scala | 33 +++++++++++++++ 4 files changed, 91 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index d36a04f1fff8e..5b5f85674e4b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -96,6 +96,24 @@ case class DataSource( bucket.sortColumnNames, "in the sort definition", equality) } + /** + * In the read path, only managed tables by Hive provide the partition columns properly when + * initializing this class. All other file based data sources will try to infer the partitioning, + * and then cast the inferred types to user specified dataTypes if the partition columns exist + * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510, or + * inconsistent data types as reported in SPARK-21463. + * @param fileIndex A FileIndex that will perform partition inference + * @return The PartitionSchema resolved from inference and cast according to `userSpecifiedSchema` + */ + private def combineInferredAndUserSpecifiedPartitionSchema(fileIndex: FileIndex): StructType = { + val resolved = fileIndex.partitionSchema.map { partitionField => + // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred + userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse( + partitionField) + } + StructType(resolved) + } + /** * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer * it. In the read path, only managed tables by Hive provide the partition columns properly when @@ -139,12 +157,7 @@ case class DataSource( val partitionSchema = if (partitionColumns.isEmpty) { // Try to infer partitioning, because no DataSource in the read path provides the partitioning // columns properly unless it is a Hive DataSource - val resolved = tempFileIndex.partitionSchema.map { partitionField => - // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred - userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse( - partitionField) - } - StructType(resolved) + combineInferredAndUserSpecifiedPartitionSchema(tempFileIndex) } else { // maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred // partitioning @@ -336,7 +349,14 @@ case class DataSource( caseInsensitiveOptions.get("path").toSeq ++ paths, sparkSession.sessionState.newHadoopConf()) => val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) - val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath) + val tempFileCatalog = MetadataLogFileIndex(sparkSession, basePath) + val fileCatalog = if (userSpecifiedSchema.nonEmpty) { + val partitionSchema = combineInferredAndUserSpecifiedPartitionSchema(tempFileCatalog) + tempFileCatalog.withPartitionSchema(partitionSchema) + } else { + tempFileCatalog + } + val resolvedPartitionSchema = fileCatalog.partitionSchema val dataSchema = userSpecifiedSchema.orElse { format.inferSchema( sparkSession, @@ -346,12 +366,12 @@ case class DataSource( throw new AnalysisException( s"Unable to infer schema for $format at ${fileCatalog.allFiles().mkString(",")}. " + "It must be specified manually") - } + }.filterNot(field => resolvedPartitionSchema.exists(pf => equality(field.name, pf.name))) HadoopFsRelation( fileCatalog, - partitionSchema = fileCatalog.partitionSchema, - dataSchema = dataSchema, + partitionSchema = resolvedPartitionSchema, + dataSchema = StructType(dataSchema), bucketSpec = None, format, caseInsensitiveOptions)(sparkSession) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index a9e64c640042a..475800a840830 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -195,7 +195,7 @@ class FileStreamSource( private def allFilesUsingMetadataLogFileIndex() = { // Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a // non-glob path - new MetadataLogFileIndex(sparkSession, qualifiedBasePath).allFiles() + MetadataLogFileIndex(sparkSession, qualifiedBasePath).allFiles() } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala index aeaa134736937..2bb1ea718fa55 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala @@ -21,21 +21,26 @@ import scala.collection.mutable import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.types.StructType /** * A [[FileIndex]] that generates the list of files to processing by reading them from the * metadata log files generated by the [[FileStreamSink]]. + * + * @param userPartitionSchema an optional partition schema that will be use to provide types for + * the discovered partitions */ -class MetadataLogFileIndex(sparkSession: SparkSession, path: Path) - extends PartitioningAwareFileIndex(sparkSession, Map.empty, None) { +class MetadataLogFileIndex( + sparkSession: SparkSession, + path: Path, + private val metadataLog: FileStreamSinkLog, + userPartitionSchema: Option[StructType]) + extends PartitioningAwareFileIndex(sparkSession, Map.empty, userPartitionSchema) { - private val metadataDirectory = new Path(path, FileStreamSink.metadataDir) - logInfo(s"Reading streaming file log from $metadataDirectory") - private val metadataLog = - new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, metadataDirectory.toUri.toString) private val allFilesFromLog = metadataLog.allFiles().map(_.toFileStatus).filterNot(_.isDirectory) private var cachedPartitionSpec: PartitionSpec = _ @@ -57,4 +62,20 @@ class MetadataLogFileIndex(sparkSession: SparkSession, path: Path) } cachedPartitionSpec } + + private[spark] def withPartitionSchema(schema: StructType): MetadataLogFileIndex = { + new MetadataLogFileIndex(sparkSession, path, metadataLog, Option(schema)) + } +} + +object MetadataLogFileIndex extends Logging { + def apply( + sparkSession: SparkSession, + path: Path): MetadataLogFileIndex = { + val metadataDirectory = new Path(path, FileStreamSink.metadataDir) + logInfo(s"Reading streaming file log from $metadataDirectory") + val metadataLog = new FileStreamSinkLog( + FileStreamSinkLog.VERSION, sparkSession, metadataDirectory.toUri.toString) + new MetadataLogFileIndex(sparkSession, path, metadataLog, None) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 84b34d5ad26d1..630e1633db1c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition} +import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -1022,4 +1023,36 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } } } + + test("SPARK-21463: MetadataLogFileIndex should respect userSpecifiedSchema for partition cols") { + withTempDir { tempDir => + val output = new File(tempDir, "output").toString + val checkpoint = new File(tempDir, "chkpoint").toString + try { + val stream = MemoryStream[(String, Int)] + val df = stream.toDS().toDF("time", "value") + val sq = df.writeStream + .option("checkpointLocation", checkpoint) + .format("parquet") + .partitionBy("time") + .start(output) + + stream.addData(("2017-01-01-00", 1), ("2017-01-01-01", 2)) + sq.processAllAvailable() + + val schema = new StructType() + .add("time", StringType) + .add("value", IntegerType) + val readBack = spark.read.schema(schema).parquet(output) + assert(readBack.schema.toSet === schema.toSet) + + checkAnswer( + readBack, + Seq(Row(1, "2017-01-01-00"), Row(2, "2017-01-01-01")) + ) + } finally { + spark.streams.active.foreach(_.stop()) + } + } + } } From 37a967a64750d0aa3d4e269f85fffb2d2b7a8d38 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 18 Jul 2017 13:43:08 -0700 Subject: [PATCH 2/6] simplify code --- .../execution/datasources/DataSource.scala | 4 ++-- .../streaming/FileStreamSource.scala | 2 +- .../streaming/MetadataLogFileIndex.scala | 22 ++++--------------- 3 files changed, 7 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 5b5f85674e4b2..176e1c9d06f71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -349,10 +349,10 @@ case class DataSource( caseInsensitiveOptions.get("path").toSeq ++ paths, sparkSession.sessionState.newHadoopConf()) => val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) - val tempFileCatalog = MetadataLogFileIndex(sparkSession, basePath) + val tempFileCatalog = new MetadataLogFileIndex(sparkSession, basePath, None) val fileCatalog = if (userSpecifiedSchema.nonEmpty) { val partitionSchema = combineInferredAndUserSpecifiedPartitionSchema(tempFileCatalog) - tempFileCatalog.withPartitionSchema(partitionSchema) + new MetadataLogFileIndex(sparkSession, basePath, Option(partitionSchema)) } else { tempFileCatalog } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 475800a840830..4b1b2520390ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -195,7 +195,7 @@ class FileStreamSource( private def allFilesUsingMetadataLogFileIndex() = { // Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a // non-glob path - MetadataLogFileIndex(sparkSession, qualifiedBasePath).allFiles() + new MetadataLogFileIndex(sparkSession, qualifiedBasePath, None).allFiles() } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala index 2bb1ea718fa55..1da703cefd8ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala @@ -21,7 +21,6 @@ import scala.collection.mutable import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.types.StructType @@ -37,10 +36,13 @@ import org.apache.spark.sql.types.StructType class MetadataLogFileIndex( sparkSession: SparkSession, path: Path, - private val metadataLog: FileStreamSinkLog, userPartitionSchema: Option[StructType]) extends PartitioningAwareFileIndex(sparkSession, Map.empty, userPartitionSchema) { + private val metadataDirectory = new Path(path, FileStreamSink.metadataDir) + logInfo(s"Reading streaming file log from $metadataDirectory") + private val metadataLog = + new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, metadataDirectory.toUri.toString) private val allFilesFromLog = metadataLog.allFiles().map(_.toFileStatus).filterNot(_.isDirectory) private var cachedPartitionSpec: PartitionSpec = _ @@ -62,20 +64,4 @@ class MetadataLogFileIndex( } cachedPartitionSpec } - - private[spark] def withPartitionSchema(schema: StructType): MetadataLogFileIndex = { - new MetadataLogFileIndex(sparkSession, path, metadataLog, Option(schema)) - } -} - -object MetadataLogFileIndex extends Logging { - def apply( - sparkSession: SparkSession, - path: Path): MetadataLogFileIndex = { - val metadataDirectory = new Path(path, FileStreamSink.metadataDir) - logInfo(s"Reading streaming file log from $metadataDirectory") - val metadataLog = new FileStreamSinkLog( - FileStreamSinkLog.VERSION, sparkSession, metadataDirectory.toUri.toString) - new MetadataLogFileIndex(sparkSession, path, metadataLog, None) - } } From 7589b6634cd0bb36ab915494cc9bfdff04d1c008 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 18 Jul 2017 13:46:04 -0700 Subject: [PATCH 3/6] remove filter --- .../org/apache/spark/sql/execution/datasources/DataSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 176e1c9d06f71..c08ae9cb7c1a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -366,7 +366,7 @@ case class DataSource( throw new AnalysisException( s"Unable to infer schema for $format at ${fileCatalog.allFiles().mkString(",")}. " + "It must be specified manually") - }.filterNot(field => resolvedPartitionSchema.exists(pf => equality(field.name, pf.name))) + } HadoopFsRelation( fileCatalog, From 470a6bd2eba43cc0120d2e78f9a4fd15a2cdc4a5 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 18 Jul 2017 13:47:32 -0700 Subject: [PATCH 4/6] remove filter --- .../org/apache/spark/sql/execution/datasources/DataSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index c08ae9cb7c1a4..8a852283b6ff6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -371,7 +371,7 @@ case class DataSource( HadoopFsRelation( fileCatalog, partitionSchema = resolvedPartitionSchema, - dataSchema = StructType(dataSchema), + dataSchema = dataSchema, bucketSpec = None, format, caseInsensitiveOptions)(sparkSession) From dce4a753af26cec93a071785fdc9315ace7d9cb9 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 18 Jul 2017 13:48:14 -0700 Subject: [PATCH 5/6] remove filter --- .../apache/spark/sql/execution/datasources/DataSource.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 8a852283b6ff6..cbe8ce421f92b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -356,7 +356,6 @@ case class DataSource( } else { tempFileCatalog } - val resolvedPartitionSchema = fileCatalog.partitionSchema val dataSchema = userSpecifiedSchema.orElse { format.inferSchema( sparkSession, @@ -370,7 +369,7 @@ case class DataSource( HadoopFsRelation( fileCatalog, - partitionSchema = resolvedPartitionSchema, + partitionSchema = fileCatalog.partitionSchema, dataSchema = dataSchema, bucketSpec = None, format, From 7cdc864f9b0c50ac8f9d877eb67820569c54776e Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 18 Jul 2017 15:56:31 -0700 Subject: [PATCH 6/6] Update ParquetPartitionDiscoverySuite.scala --- .../datasources/parquet/ParquetPartitionDiscoverySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 630e1633db1c5..2f5fd8438f682 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -1048,7 +1048,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha checkAnswer( readBack, - Seq(Row(1, "2017-01-01-00"), Row(2, "2017-01-01-01")) + Seq(Row("2017-01-01-00", 1), Row("2017-01-01-01", 2)) ) } finally { spark.streams.active.foreach(_.stop())