From e130e1c0f12e0678623eba2e30b75868e67affd4 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 9 Jan 2017 22:12:48 +0800 Subject: [PATCH 1/3] Implement InsertIntoHiveTable with FileCommitProtocol and FileFormatWriter --- .../io/HadoopMapReduceCommitProtocol.scala | 2 +- .../spark/sql/execution/QueryExecution.scala | 33 +- .../spark/sql/hive/HiveSessionState.scala | 2 +- .../spark/sql/hive/HiveStrategies.scala | 77 ++-- .../apache/spark/sql/hive/TableReader.scala | 4 +- .../spark/sql/hive/client/package.scala | 2 +- .../sql/hive/execution/HiveFileFormat.scala | 133 +++++++ .../hive/execution/InsertIntoHiveTable.scala | 187 +++++---- ...n.scala => ScriptTransformationExec.scala} | 2 +- .../spark/sql/hive/hiveWriterContainers.scala | 356 ------------------ .../spark/sql/hive/client/VersionsSuite.scala | 4 +- .../hive/execution/HiveComparisonTest.scala | 10 +- .../sql/hive/execution/HiveDDLSuite.scala | 5 +- .../sql/hive/execution/HiveQuerySuite.scala | 8 +- .../execution/ScriptTransformationSuite.scala | 10 +- 15 files changed, 302 insertions(+), 533 deletions(-) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala rename sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/{ScriptTransformation.scala => ScriptTransformationExec.scala} (99%) delete mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index b2d9b8d2a012f..2f33f2e4ff8d1 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -99,7 +99,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) } private def getFilename(taskContext: TaskAttemptContext, ext: String): String = { - // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet + // The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, // the file name is fine and won't overflow. val split = taskContext.getTaskAttemptID.getTaskID.getId diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index b3ef29f6e34c4..dcd9003ec66f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -108,21 +108,18 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { /** - * Returns the result as a hive compatible sequence of strings. For native commands, the - * execution is simply passed back to Hive. + * Returns the result as a hive compatible sequence of strings. This is for testing only. */ def hiveResultString(): Seq[String] = executedPlan match { case ExecutedCommandExec(desc: DescribeTableCommand) => - SQLExecution.withNewExecutionId(sparkSession, this) { - // If it is a describe command for a Hive table, we want to have the output format - // be similar with Hive. - desc.run(sparkSession).map { - case Row(name: String, dataType: String, comment) => - Seq(name, dataType, - Option(comment.asInstanceOf[String]).getOrElse("")) - .map(s => String.format(s"%-20s", s)) - .mkString("\t") - } + // If it is a describe command for a Hive table, we want to have the output format + // be similar with Hive. + desc.run(sparkSession).map { + case Row(name: String, dataType: String, comment) => + Seq(name, dataType, + Option(comment.asInstanceOf[String]).getOrElse("")) + .map(s => String.format(s"%-20s", s)) + .mkString("\t") } // SHOW TABLES in Hive only output table names, while ours outputs database, table name, isTemp. case command: ExecutedCommandExec if command.cmd.isInstanceOf[ShowTablesCommand] => @@ -130,13 +127,11 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { case command: ExecutedCommandExec => command.executeCollect().map(_.getString(0)) case other => - SQLExecution.withNewExecutionId(sparkSession, this) { - val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq - // We need the types so we can output struct field names - val types = analyzed.output.map(_.dataType) - // Reformat to match hive tab delimited output. - result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t")).toSeq - } + val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq + // We need the types so we can output struct field names + val types = analyzed.output.map(_.dataType) + // Reformat to match hive tab delimited output. + result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t")) } /** Formats a datum (based on the given data type) and returns the string representation. */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 9b4b8b6fcd910..4e30d038b1985 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -66,6 +66,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) PreprocessTableInsertion(conf) :: DataSourceAnalysis(conf) :: new DetermineHiveSerde(conf) :: + new HiveAnalysis(sparkSession) :: new ResolveDataSource(sparkSession) :: Nil override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog)) @@ -88,7 +89,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) SpecialLimits, InMemoryScans, HiveTableScans, - DataSinks, Scripts, Aggregation, JoinSelection, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index d1f11e78b4ea2..7987a0a84c728 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -21,14 +21,14 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ -import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command.{DDLUtils, ExecutedCommandExec} +import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} +import org.apache.spark.sql.types.StructType /** @@ -86,6 +86,47 @@ class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] { } } +class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) + if hasBeenPreprocessed(table.output, table.partitionKeys.toStructType, partSpec, query) => + InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists) + + case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) => + // Currently `DataFrameWriter.saveAsTable` doesn't support the Append mode of hive serde + // tables yet. + if (mode == SaveMode.Append) { + throw new AnalysisException( + "CTAS for hive serde tables does not support append semantics.") + } + + val dbName = tableDesc.identifier.database.getOrElse(session.catalog.currentDatabase) + CreateHiveTableAsSelectCommand( + tableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))), + query, + mode == SaveMode.Ignore) + } + + /** + * Returns true if the [[InsertIntoTable]] plan has already been preprocessed by analyzer rule + * [[PreprocessTableInsertion]]. It is important that this rule([[HiveAnalysis]]) has to + * be run after [[PreprocessTableInsertion]], to normalize the column names in partition spec and + * fix the schema mismatch by adding Cast. + */ + private def hasBeenPreprocessed( + tableOutput: Seq[Attribute], + partSchema: StructType, + partSpec: Map[String, Option[String]], + query: LogicalPlan): Boolean = { + val partColNames = partSchema.map(_.name).toSet + query.resolved && partSpec.keys.forall(partColNames.contains) && { + val staticPartCols = partSpec.filter(_._2.isDefined).keySet + val expectedColumns = tableOutput.filterNot(a => staticPartCols.contains(a.name)) + expectedColumns.toStructType.sameType(query.schema) + } + } +} + private[hive] trait HiveStrategies { // Possibly being too clever with types here... or not clever enough. self: SparkPlanner => @@ -94,35 +135,9 @@ private[hive] trait HiveStrategies { object Scripts extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case logical.ScriptTransformation(input, script, output, child, ioschema) => + case ScriptTransformation(input, script, output, child, ioschema) => val hiveIoSchema = HiveScriptIOSchema(ioschema) - ScriptTransformation(input, script, output, planLater(child), hiveIoSchema) :: Nil - case _ => Nil - } - } - - object DataSinks extends Strategy { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case logical.InsertIntoTable( - table: MetastoreRelation, partition, child, overwrite, ifNotExists) => - InsertIntoHiveTable( - table, partition, planLater(child), overwrite, ifNotExists) :: Nil - - case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) => - // Currently `DataFrameWriter.saveAsTable` doesn't support - // the Append mode of hive serde tables yet. - if (mode == SaveMode.Append) { - throw new AnalysisException( - "CTAS for hive serde tables does not support append semantics.") - } - - val dbName = tableDesc.identifier.database.getOrElse(sparkSession.catalog.currentDatabase) - val cmd = CreateHiveTableAsSelectCommand( - tableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))), - query, - mode == SaveMode.Ignore) - ExecutedCommandExec(cmd) :: Nil - + ScriptTransformationExec(input, script, output, planLater(child), hiveIoSchema) :: Nil case _ => Nil } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index aaf30f41f29c2..b4b63032ab261 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -311,10 +311,10 @@ private[hive] object HiveTableUtil { // that calls Hive.get() which tries to access metastore, but it's not valid in runtime // it would be fixed in next version of hive but till then, we should use this instead def configureJobPropertiesForStorageHandler( - tableDesc: TableDesc, jobConf: JobConf, input: Boolean) { + tableDesc: TableDesc, conf: Configuration, input: Boolean) { val property = tableDesc.getProperties.getProperty(META_TABLE_STORAGE) val storageHandler = - org.apache.hadoop.hive.ql.metadata.HiveUtils.getStorageHandler(jobConf, property) + org.apache.hadoop.hive.ql.metadata.HiveUtils.getStorageHandler(conf, property) if (storageHandler != null) { val jobProperties = new java.util.LinkedHashMap[String, String] if (input) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala index b1b8439efa011..4e2193b6abc3f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive /** Support for interacting with different versions of the HiveMetastoreClient */ package object client { - private[client] abstract class HiveVersion( + private[hive] abstract class HiveVersion( val fullVersion: String, val extraDeps: Seq[String] = Nil, val exclusions: Seq[String] = Nil) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala new file mode 100644 index 0000000000000..bd1fa31dd5536 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.hive.ql.exec.Utilities +import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} +import org.apache.hadoop.hive.serde2.Serializer +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector} +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.{JobConf, Reporter} +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, OutputWriterFactory} +import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil} +import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableJobConf + +/** + * `FileFormat` for writing Hive tables. + * + * TODO: implement the read logic. + */ +class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat { + override def inferSchema( + sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = None + + override def prepareWrite( + sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + val conf = job.getConfiguration + val tableDesc = fileSinkConf.getTableInfo + conf.set("mapred.output.format.class", tableDesc.getOutputFileFormatClassName) + + // Add table properties from storage handler to hadoopConf, so any custom storage + // handler settings can be set to hadoopConf + HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, conf, false) + Utilities.copyTableJobPropertiesToConf(tableDesc, conf) + + // Avoid referencing the outer object. + val fileSinkConfSer = fileSinkConf + new OutputWriterFactory { + private val jobConf = new SerializableJobConf(new JobConf(conf)) + @transient private lazy val outputFormat = + jobConf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]] + + override def getFileExtension(context: TaskAttemptContext): String = { + Utilities.getFileExtension(jobConf.value, fileSinkConfSer.getCompressed, outputFormat) + } + + override def newInstance( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + new HiveOutputWriter(path, fileSinkConfSer, jobConf.value, dataSchema) + } + } + } +} + +class HiveOutputWriter( + path: String, + fileSinkConf: FileSinkDesc, + jobConf: JobConf, + dataSchema: StructType) extends OutputWriter with HiveInspectors { + + private def tableDesc = fileSinkConf.getTableInfo + + private val serializer = { + val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer] + serializer.initialize(null, tableDesc.getProperties) + serializer + } + + private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter( + jobConf, + tableDesc, + serializer.getSerializedClass, + fileSinkConf, + new Path(path), + Reporter.NULL) + + private val standardOI = ObjectInspectorUtils + .getStandardObjectInspector( + tableDesc.getDeserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + private val fieldOIs = + standardOI.getAllStructFieldRefs.asScala.map(_.getFieldObjectInspector).toArray + private val dataTypes = dataSchema.map(_.dataType).toArray + private val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt) } + private val outputData = new Array[Any](fieldOIs.length) + + override def write(row: InternalRow): Unit = { + var i = 0 + while (i < fieldOIs.length) { + outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i))) + i += 1 + } + hiveWriter.write(serializer.serialize(outputData, standardOI)) + } + + override def close(): Unit = { + // Seems the boolean value passed into close does not matter. + hiveWriter.close(false) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index aa858e808edf7..ce418ae135dd9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -24,22 +24,22 @@ import java.util.{Date, Locale, Random} import scala.util.control.NonFatal +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.FileUtils import org.apache.hadoop.hive.ql.exec.TaskRunner import org.apache.hadoop.hive.ql.ErrorMsg -import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.execution.datasources.FileFormatWriter import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} +import org.apache.spark.sql.hive.client.HiveVersion import org.apache.spark.SparkException -import org.apache.spark.util.SerializableJobConf /** @@ -69,26 +69,20 @@ import org.apache.spark.util.SerializableJobConf * {{{ * Map('a' -> Some('1'), 'b' -> None) * }}}. - * @param child the logical plan representing data to write to. + * @param query the logical plan representing data to write to. * @param overwrite overwrite existing table or partitions. * @param ifNotExists If true, only write if the table or partition does not exist. */ case class InsertIntoHiveTable( table: MetastoreRelation, partition: Map[String, Option[String]], - child: SparkPlan, + query: LogicalPlan, overwrite: Boolean, - ifNotExists: Boolean) extends UnaryExecNode { + ifNotExists: Boolean) extends RunnableCommand { - @transient private val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] - @transient private val externalCatalog = sqlContext.sharedState.externalCatalog + override protected def innerChildren: Seq[LogicalPlan] = query :: Nil - def output: Seq[Attribute] = Seq.empty - - val hadoopConf = sessionState.newHadoopConf() var createdTempDir: Option[Path] = None - val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") - val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") private def executionId: String = { val rand: Random = new Random @@ -96,7 +90,10 @@ case class InsertIntoHiveTable( "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) } - private def getStagingDir(inputPath: Path): Path = { + private def getStagingDir( + inputPath: Path, + hadoopConf: Configuration, + stagingDir: String): Path = { val inputPathUri: URI = inputPath.toUri val inputPathName: String = inputPathUri.getPath val fs: FileSystem = inputPath.getFileSystem(hadoopConf) @@ -121,17 +118,27 @@ case class InsertIntoHiveTable( throw new RuntimeException( "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e) } - return dir + dir } - private def getExternalScratchDir(extURI: URI): Path = { - getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath)) + private def getExternalScratchDir( + extURI: URI, + hadoopConf: Configuration, + stagingDir: String): Path = { + getStagingDir( + new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), + hadoopConf, + stagingDir) } - def getExternalTmpPath(path: Path): Path = { + def getExternalTmpPath( + path: Path, + hiveVersion: HiveVersion, + hadoopConf: Configuration, + stagingDir: String, + scratchDir: String): Path = { import org.apache.spark.sql.hive.client.hive._ - val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under // a common scratch directory. After the writing is finished, Hive will simply empty the table // directory and move the staging directory to it. @@ -142,16 +149,19 @@ case class InsertIntoHiveTable( // staging directory under the table director for Hive prior to 1.1, the staging directory will // be removed by Hive when Hive is trying to empty the table directory. if (hiveVersion == v12 || hiveVersion == v13 || hiveVersion == v14 || hiveVersion == v1_0) { - oldVersionExternalTempPath(path) + oldVersionExternalTempPath(path, hadoopConf, scratchDir) } else if (hiveVersion == v1_1 || hiveVersion == v1_2) { - newVersionExternalTempPath(path) + newVersionExternalTempPath(path, hadoopConf, stagingDir) } else { throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion) } } // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13 - def oldVersionExternalTempPath(path: Path): Path = { + def oldVersionExternalTempPath( + path: Path, + hadoopConf: Configuration, + scratchDir: String): Path = { val extURI: URI = path.toUri val scratchPath = new Path(scratchDir, executionId) var dirPath = new Path( @@ -176,54 +186,44 @@ case class InsertIntoHiveTable( } // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2 - def newVersionExternalTempPath(path: Path): Path = { + def newVersionExternalTempPath( + path: Path, + hadoopConf: Configuration, + stagingDir: String): Path = { val extURI: URI = path.toUri if (extURI.getScheme == "viewfs") { - getExtTmpPathRelTo(path.getParent) + getExtTmpPathRelTo(path.getParent, hadoopConf, stagingDir) } else { - new Path(getExternalScratchDir(extURI), "-ext-10000") + new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000") } } - def getExtTmpPathRelTo(path: Path): Path = { - new Path(getStagingDir(path), "-ext-10000") // Hive uses 10000 - } - - private def saveAsHiveFile( - rdd: RDD[InternalRow], - valueClass: Class[_], - fileSinkConf: FileSinkDesc, - conf: SerializableJobConf, - writerContainer: SparkHiveWriterContainer): Unit = { - assert(valueClass != null, "Output value class not set") - conf.value.setOutputValueClass(valueClass) - - val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName - assert(outputFileFormatClassName != null, "Output format class not set") - conf.value.set("mapred.output.format.class", outputFileFormatClassName) - - FileOutputFormat.setOutputPath( - conf.value, - SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName(), conf.value)) - log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) - writerContainer.driverSideSetup() - sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _) - writerContainer.commitJob() + def getExtTmpPathRelTo( + path: Path, + hadoopConf: Configuration, + stagingDir: String): Path = { + new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000 } /** * Inserts all the rows in the table into Hive. Row objects are properly serialized with the * `org.apache.hadoop.hive.serde2.SerDe` and the * `org.apache.hadoop.mapred.OutputFormat` provided by the table definition. - * - * Note: this is run once and then kept to avoid double insertions. */ - protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { + override def run(sparkSession: SparkSession): Seq[Row] = { + val sessionState = sparkSession.sessionState + val externalCatalog = sparkSession.sharedState.externalCatalog + val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version + val hadoopConf = sessionState.newHadoopConf() + val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") + val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") + // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = table.tableDesc val tableLocation = table.hiveQlTable.getDataLocation - val tmpLocation = getExternalTmpPath(tableLocation) + val tmpLocation = + getExternalTmpPath(tableLocation, hiveVersion, hadoopConf, stagingDir, scratchDir) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean @@ -276,40 +276,31 @@ case class InsertIntoHiveTable( } } - val jobConf = new JobConf(hadoopConf) - val jobConfSer = new SerializableJobConf(jobConf) - - // When speculation is on and output committer class name contains "Direct", we should warn - // users that they may loss data if they are using a direct output committer. - val speculationEnabled = sqlContext.sparkContext.conf.getBoolean("spark.speculation", false) - val outputCommitterClass = jobConf.get("mapred.output.committer.class", "") - if (speculationEnabled && outputCommitterClass.contains("Direct")) { - val warningMessage = - s"$outputCommitterClass may be an output committer that writes data directly to " + - "the final location. Because speculation is enabled, this output committer may " + - "cause data loss (see the case in SPARK-10063). If possible, please use an output " + - "committer that does not have this behavior (e.g. FileOutputCommitter)." - logWarning(warningMessage) + val committer = FileCommitProtocol.instantiate( + sparkSession.sessionState.conf.fileCommitProtocolClass, + jobId = java.util.UUID.randomUUID().toString, + outputPath = tmpLocation.toString, + isAppend = false) + + val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name => + query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse { + throw new AnalysisException( + s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]") + }.asInstanceOf[Attribute] } - val writerContainer = if (numDynamicPartitions > 0) { - val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions) - new SparkHiveDynamicPartitionWriterContainer( - jobConf, - fileSinkConf, - dynamicPartColNames, - child.output) - } else { - new SparkHiveWriterContainer( - jobConf, - fileSinkConf, - child.output) - } - - @transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass - saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer) + FileFormatWriter.write( + sparkSession = sparkSession, + queryExecution = Dataset.ofRows(sparkSession, query).queryExecution, + fileFormat = new HiveFileFormat(fileSinkConf), + committer = committer, + outputSpec = FileFormatWriter.OutputSpec(tmpLocation.toString, Map.empty), + hadoopConf = hadoopConf, + partitionColumns = partitionAttributes, + bucketSpec = None, + refreshFunction = _ => (), + options = Map.empty) - val outputPath = FileOutputFormat.getOutputPath(jobConf) // TODO: Correctly set holdDDLTime. // In most of the time, we should have holdDDLTime = false. // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint. @@ -319,7 +310,7 @@ case class InsertIntoHiveTable( externalCatalog.loadDynamicPartitions( db = table.catalogTable.database, table = table.catalogTable.identifier.table, - outputPath.toString, + tmpLocation.toString, partitionSpec, overwrite, numDynamicPartitions, @@ -363,7 +354,7 @@ case class InsertIntoHiveTable( externalCatalog.loadPartition( table.catalogTable.database, table.catalogTable.identifier.table, - outputPath.toString, + tmpLocation.toString, partitionSpec, isOverwrite = doHiveOverwrite, holdDDLTime = holdDDLTime, @@ -375,7 +366,7 @@ case class InsertIntoHiveTable( externalCatalog.loadTable( table.catalogTable.database, table.catalogTable.identifier.table, - outputPath.toString, // TODO: URI + tmpLocation.toString, // TODO: URI overwrite, holdDDLTime, isSrcLocal = false) @@ -391,21 +382,13 @@ case class InsertIntoHiveTable( } // Invalidate the cache. - sqlContext.sharedState.cacheManager.invalidateCache(table) - sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier) + sparkSession.sharedState.cacheManager.invalidateCache(table) + sparkSession.sessionState.catalog.refreshTable(table.catalogTable.identifier) // It would be nice to just return the childRdd unchanged so insert operations could be chained, // however for now we return an empty list to simplify compatibility checks with hive, which // does not return anything for insert operations. // TODO: implement hive compatibility as rules. - Seq.empty[InternalRow] - } - - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray - - protected override def doExecute(): RDD[InternalRow] = { - sqlContext.sparkContext.parallelize(sideEffectResult.asInstanceOf[Seq[InternalRow]], 1) + Seq.empty[Row] } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala similarity index 99% rename from sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala rename to sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala index 50855e48bc8fe..e7c165c5f86c5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala @@ -52,7 +52,7 @@ import org.apache.spark.util.{CircularBuffer, RedirectThread, SerializableConfig * @param script the command that should be executed. * @param output the attributes that are produced by the script. */ -case class ScriptTransformation( +case class ScriptTransformationExec( input: Seq[Expression], script: String, output: Seq[Attribute], diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala deleted file mode 100644 index 0c9321068c4c1..0000000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ /dev/null @@ -1,356 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import java.text.NumberFormat -import java.util.{Date, Locale} - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.fs.Path -import org.apache.hadoop.hive.common.FileUtils -import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities} -import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} -import org.apache.hadoop.hive.ql.plan.TableDesc -import org.apache.hadoop.hive.serde2.Serializer -import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector} -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption -import org.apache.hadoop.io.Writable -import org.apache.hadoop.mapred._ -import org.apache.hadoop.mapreduce.TaskType - -import org.apache.spark._ -import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.SparkHadoopWriterUtils -import org.apache.spark.mapred.SparkHadoopMapRedUtil -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.execution.UnsafeKVExternalSorter -import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} -import org.apache.spark.sql.types._ -import org.apache.spark.util.SerializableJobConf -import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter - -/** - * Internal helper class that saves an RDD using a Hive OutputFormat. - * It is based on `SparkHadoopWriter`. - */ -private[hive] class SparkHiveWriterContainer( - @transient private val jobConf: JobConf, - fileSinkConf: FileSinkDesc, - inputSchema: Seq[Attribute]) - extends Logging - with HiveInspectors - with Serializable { - - private val now = new Date() - private val tableDesc: TableDesc = fileSinkConf.getTableInfo - // Add table properties from storage handler to jobConf, so any custom storage - // handler settings can be set to jobConf - if (tableDesc != null) { - HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, jobConf, false) - Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf) - } - protected val conf = new SerializableJobConf(jobConf) - - private var jobID = 0 - private var splitID = 0 - private var attemptID = 0 - private var jID: SerializableWritable[JobID] = null - private var taID: SerializableWritable[TaskAttemptID] = null - - @transient private var writer: FileSinkOperator.RecordWriter = null - @transient protected lazy val committer = conf.value.getOutputCommitter - @transient protected lazy val jobContext = new JobContextImpl(conf.value, jID.value) - @transient private lazy val taskContext = new TaskAttemptContextImpl(conf.value, taID.value) - @transient private lazy val outputFormat = - conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]] - - def driverSideSetup() { - setIDs(0, 0, 0) - setConfParams() - committer.setupJob(jobContext) - } - - def executorSideSetup(jobId: Int, splitId: Int, attemptId: Int) { - setIDs(jobId, splitId, attemptId) - setConfParams() - committer.setupTask(taskContext) - initWriters() - } - - protected def getOutputName: String = { - val numberFormat = NumberFormat.getInstance(Locale.US) - numberFormat.setMinimumIntegerDigits(5) - numberFormat.setGroupingUsed(false) - val extension = Utilities.getFileExtension(conf.value, fileSinkConf.getCompressed, outputFormat) - "part-" + numberFormat.format(splitID) + extension - } - - def close() { - // Seems the boolean value passed into close does not matter. - if (writer != null) { - writer.close(false) - commit() - } - } - - def commitJob() { - committer.commitJob(jobContext) - } - - protected def initWriters() { - // NOTE this method is executed at the executor side. - // For Hive tables without partitions or with only static partitions, only 1 writer is needed. - writer = HiveFileFormatUtils.getHiveRecordWriter( - conf.value, - fileSinkConf.getTableInfo, - conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], - fileSinkConf, - FileOutputFormat.getTaskOutputPath(conf.value, getOutputName), - Reporter.NULL) - } - - protected def commit() { - SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID) - } - - def abortTask(): Unit = { - if (committer != null) { - committer.abortTask(taskContext) - } - logError(s"Task attempt $taskContext aborted.") - } - - private def setIDs(jobId: Int, splitId: Int, attemptId: Int) { - jobID = jobId - splitID = splitId - attemptID = attemptId - - jID = new SerializableWritable[JobID](SparkHadoopWriterUtils.createJobID(now, jobId)) - taID = new SerializableWritable[TaskAttemptID]( - new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID)) - } - - private def setConfParams() { - conf.value.set("mapred.job.id", jID.value.toString) - conf.value.set("mapred.tip.id", taID.value.getTaskID.toString) - conf.value.set("mapred.task.id", taID.value.toString) - conf.value.setBoolean("mapred.task.is.map", true) - conf.value.setInt("mapred.task.partition", splitID) - } - - def newSerializer(tableDesc: TableDesc): Serializer = { - val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer] - serializer.initialize(null, tableDesc.getProperties) - serializer - } - - protected def prepareForWrite() = { - val serializer = newSerializer(fileSinkConf.getTableInfo) - val standardOI = ObjectInspectorUtils - .getStandardObjectInspector( - fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, - ObjectInspectorCopyOption.JAVA) - .asInstanceOf[StructObjectInspector] - - val fieldOIs = standardOI.getAllStructFieldRefs.asScala.map(_.getFieldObjectInspector).toArray - val dataTypes = inputSchema.map(_.dataType) - val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt) } - val outputData = new Array[Any](fieldOIs.length) - (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData) - } - - // this function is executed on executor side - def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = { - val (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData) = prepareForWrite() - executorSideSetup(context.stageId, context.partitionId, context.attemptNumber) - - iterator.foreach { row => - var i = 0 - while (i < fieldOIs.length) { - outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i))) - i += 1 - } - writer.write(serializer.serialize(outputData, standardOI)) - } - - close() - } -} - -private[hive] object SparkHiveWriterContainer { - def createPathFromString(path: String, conf: JobConf): Path = { - if (path == null) { - throw new IllegalArgumentException("Output path is null") - } - val outputPath = new Path(path) - val fs = outputPath.getFileSystem(conf) - if (outputPath == null || fs == null) { - throw new IllegalArgumentException("Incorrectly formatted output path") - } - outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - } -} - -private[spark] object SparkHiveDynamicPartitionWriterContainer { - val SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = "mapreduce.fileoutputcommitter.marksuccessfuljobs" -} - -private[spark] class SparkHiveDynamicPartitionWriterContainer( - jobConf: JobConf, - fileSinkConf: FileSinkDesc, - dynamicPartColNames: Array[String], - inputSchema: Seq[Attribute]) - extends SparkHiveWriterContainer(jobConf, fileSinkConf, inputSchema) { - - import SparkHiveDynamicPartitionWriterContainer._ - - private val defaultPartName = jobConf.get( - ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultStrVal) - - override protected def initWriters(): Unit = { - // do nothing - } - - override def close(): Unit = { - // do nothing - } - - override def commitJob(): Unit = { - // This is a hack to avoid writing _SUCCESS mark file. In lower versions of Hadoop (e.g. 1.0.4), - // semantics of FileSystem.globStatus() is different from higher versions (e.g. 2.4.1) and will - // include _SUCCESS file when glob'ing for dynamic partition data files. - // - // Better solution is to add a step similar to what Hive FileSinkOperator.jobCloseOp does: - // calling something like Utilities.mvFileToFinalPath to cleanup the output directory and then - // load it with loadDynamicPartitions/loadPartition/loadTable. - val oldMarker = conf.value.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true) - conf.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false) - super.commitJob() - conf.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker) - } - - // this function is executed on executor side - override def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = { - val (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData) = prepareForWrite() - executorSideSetup(context.stageId, context.partitionId, context.attemptNumber) - - val partitionOutput = inputSchema.takeRight(dynamicPartColNames.length) - val dataOutput = inputSchema.take(fieldOIs.length) - // Returns the partition key given an input row - val getPartitionKey = UnsafeProjection.create(partitionOutput, inputSchema) - // Returns the data columns to be written given an input row - val getOutputRow = UnsafeProjection.create(dataOutput, inputSchema) - - val fun: AnyRef = (pathString: String) => FileUtils.escapePathName(pathString, defaultPartName) - // Expressions that given a partition key build a string like: col1=val/col2=val/... - val partitionStringExpression = partitionOutput.zipWithIndex.flatMap { case (c, i) => - val escaped = - ScalaUDF(fun, StringType, Seq(Cast(c, StringType)), Seq(StringType)) - val str = If(IsNull(c), Literal(defaultPartName), escaped) - val partitionName = Literal(dynamicPartColNames(i) + "=") :: str :: Nil - if (i == 0) partitionName else Literal(Path.SEPARATOR_CHAR.toString) :: partitionName - } - - // Returns the partition path given a partition key. - val getPartitionString = - UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionOutput) - - // If anything below fails, we should abort the task. - try { - val sorter: UnsafeKVExternalSorter = new UnsafeKVExternalSorter( - StructType.fromAttributes(partitionOutput), - StructType.fromAttributes(dataOutput), - SparkEnv.get.blockManager, - SparkEnv.get.serializerManager, - TaskContext.get().taskMemoryManager().pageSizeBytes, - SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", - UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD)) - - while (iterator.hasNext) { - val inputRow = iterator.next() - val currentKey = getPartitionKey(inputRow) - sorter.insertKV(currentKey, getOutputRow(inputRow)) - } - - logInfo(s"Sorting complete. Writing out partition files one at a time.") - val sortedIterator = sorter.sortedIterator() - var currentKey: InternalRow = null - var currentWriter: FileSinkOperator.RecordWriter = null - try { - while (sortedIterator.next()) { - if (currentKey != sortedIterator.getKey) { - if (currentWriter != null) { - currentWriter.close(false) - } - currentKey = sortedIterator.getKey.copy() - logDebug(s"Writing partition: $currentKey") - currentWriter = newOutputWriter(currentKey) - } - - var i = 0 - while (i < fieldOIs.length) { - outputData(i) = if (sortedIterator.getValue.isNullAt(i)) { - null - } else { - wrappers(i)(sortedIterator.getValue.get(i, dataTypes(i))) - } - i += 1 - } - currentWriter.write(serializer.serialize(outputData, standardOI)) - } - } finally { - if (currentWriter != null) { - currentWriter.close(false) - } - } - commit() - } catch { - case cause: Throwable => - logError("Aborting task.", cause) - abortTask() - throw new SparkException("Task failed while writing rows.", cause) - } - /** Open and returns a new OutputWriter given a partition key. */ - def newOutputWriter(key: InternalRow): FileSinkOperator.RecordWriter = { - val partitionPath = getPartitionString(key).getString(0) - val newFileSinkDesc = new FileSinkDesc( - fileSinkConf.getDirName + partitionPath, - fileSinkConf.getTableInfo, - fileSinkConf.getCompressed) - newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec) - newFileSinkDesc.setCompressType(fileSinkConf.getCompressType) - - // use the path like ${hive_tmp}/_temporary/${attemptId}/ - // to avoid write to the same file when `spark.speculation=true` - val path = FileOutputFormat.getTaskOutputPath( - conf.value, - partitionPath.stripPrefix("/") + "/" + getOutputName) - - HiveFileFormatUtils.getHiveRecordWriter( - conf.value, - fileSinkConf.getTableInfo, - conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], - newFileSinkDesc, - path, - Reporter.NULL) - } - } -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 5cb8519d2a9af..28b5bfd5819c6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -565,8 +565,8 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSinglet val filePaths = dir.map(_.getName).toList folders.flatMap(listFiles) ++: filePaths } - val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil - assert(listFiles(tmpDir).sorted == expectedFiles) + // expect 2 files left: `.part-00000-random-uuid.crc` and `part-00000-random-uuid` + assert(listFiles(tmpDir).length == 2) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 05a15166f815b..4772a264d6fd9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -26,6 +26,7 @@ import scala.util.control.NonFatal import org.scalatest.{BeforeAndAfterAll, GivenWhenThen} import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.Dataset import org.apache.spark.sql.catalyst.SQLBuilder import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ @@ -441,23 +442,20 @@ abstract class HiveComparisonTest val executions = queryList.map(new TestHiveQueryExecution(_)) executions.foreach(_.toRdd) val tablesGenerated = queryList.zip(executions).flatMap { - // We should take executedPlan instead of sparkPlan, because in following codes we - // will run the collected plans. As we will do extra processing for sparkPlan such - // as adding exchange, collapsing codegen stages, etc., collecting sparkPlan here - // will cause some errors when running these plans later. - case (q, e) => e.executedPlan.collect { + case (q, e) => e.analyzed.collect { case i: InsertIntoHiveTable if tablesRead contains i.table.tableName => (q, e, i) } } tablesGenerated.map { case (hiveql, execution, insert) => + val rdd = Dataset.ofRows(TestHive.sparkSession, insert.query).queryExecution.toRdd s""" |=== Generated Table === |$hiveql |$execution |== Results == - |${insert.child.execute().collect().mkString("\n")} + |${rdd.collect().mkString("\n")} """.stripMargin }.mkString("\n") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index e3f1667249684..e4c9607f9800e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -22,6 +22,7 @@ import java.io.File import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterEach +import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType} @@ -789,7 +790,7 @@ class HiveDDLSuite test("Create Cataloged Table As Select - Drop Table After Runtime Exception") { withTable("tab") { - intercept[RuntimeException] { + intercept[SparkException] { sql( """ |CREATE TABLE tab @@ -1263,7 +1264,7 @@ class HiveDDLSuite sql("INSERT INTO t SELECT 1") checkAnswer(spark.table("t"), Row(1)) // Check if this is compressed as ZLIB. - val maybeOrcFile = path.listFiles().find(_.getName.endsWith("part-00000")) + val maybeOrcFile = path.listFiles().find(!_.getName.endsWith(".crc")) assert(maybeOrcFile.isDefined) val orcFilePath = maybeOrcFile.get.toPath.toString val expectedCompressionKind = diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 2ae66d1b2f8a6..75ba92cadacd8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1043,8 +1043,8 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd assertResult(1, "Duplicated project detected\n" + analyzedPlan) { analyzedPlan.collect { - case _: Project => () - }.size + case i: InsertIntoHiveTable => i.query.collect { case p: Project => () }.size + }.sum } } @@ -1062,8 +1062,8 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd assertResult(2, "Duplicated project detected\n" + analyzedPlan) { analyzedPlan.collect { - case _: Project => () - }.size + case i: InsertIntoHiveTable => i.query.collect { case p: Project => () }.size + }.sum } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index d3475a79a7fae..5318b4650b01f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -55,7 +55,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") checkAnswer( rowsDf, - (child: SparkPlan) => new ScriptTransformation( + (child: SparkPlan) => new ScriptTransformationExec( input = Seq(rowsDf.col("a").expr), script = "cat", output = Seq(AttributeReference("a", StringType)()), @@ -71,7 +71,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") checkAnswer( rowsDf, - (child: SparkPlan) => new ScriptTransformation( + (child: SparkPlan) => new ScriptTransformationExec( input = Seq(rowsDf.col("a").expr), script = "cat", output = Seq(AttributeReference("a", StringType)()), @@ -88,7 +88,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { val e = intercept[TestFailedException] { checkAnswer( rowsDf, - (child: SparkPlan) => new ScriptTransformation( + (child: SparkPlan) => new ScriptTransformationExec( input = Seq(rowsDf.col("a").expr), script = "cat", output = Seq(AttributeReference("a", StringType)()), @@ -107,7 +107,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { val e = intercept[TestFailedException] { checkAnswer( rowsDf, - (child: SparkPlan) => new ScriptTransformation( + (child: SparkPlan) => new ScriptTransformationExec( input = Seq(rowsDf.col("a").expr), script = "cat", output = Seq(AttributeReference("a", StringType)()), @@ -126,7 +126,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { val e = intercept[SparkException] { val plan = - new ScriptTransformation( + new ScriptTransformationExec( input = Seq(rowsDf.col("a").expr), script = "some_non_existent_command", output = Seq(AttributeReference("a", StringType)()), From 2f24c10fdda91d8963ccac3e997e840890f85483 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 18 Jan 2017 09:18:20 +0800 Subject: [PATCH 2/3] address comments --- .../org/apache/spark/sql/hive/execution/HiveFileFormat.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala index bd1fa31dd5536..e6282ba378a7b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -46,7 +46,9 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat { override def inferSchema( sparkSession: SparkSession, options: Map[String, String], - files: Seq[FileStatus]): Option[StructType] = None + files: Seq[FileStatus]): Option[StructType] = { + throw new UnsupportedOperationException(s"inferSchema is not supported for hive data source.") + } override def prepareWrite( sparkSession: SparkSession, From 150efa2266f298205b272e9347032b2a85ab665c Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 18 Jan 2017 12:45:14 +0800 Subject: [PATCH 3/3] address comments --- .../sql/hive/execution/HiveFileFormat.scala | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala index e6282ba378a7b..cc2b60bc41963 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{JobConf, Reporter} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} +import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, OutputWriterFactory} @@ -42,7 +43,7 @@ import org.apache.spark.util.SerializableJobConf * * TODO: implement the read logic. */ -class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat { +class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat with Logging { override def inferSchema( sparkSession: SparkSession, options: Map[String, String], @@ -59,6 +60,19 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat { val tableDesc = fileSinkConf.getTableInfo conf.set("mapred.output.format.class", tableDesc.getOutputFileFormatClassName) + // When speculation is on and output committer class name contains "Direct", we should warn + // users that they may loss data if they are using a direct output committer. + val speculationEnabled = sparkSession.sparkContext.conf.getBoolean("spark.speculation", false) + val outputCommitterClass = conf.get("mapred.output.committer.class", "") + if (speculationEnabled && outputCommitterClass.contains("Direct")) { + val warningMessage = + s"$outputCommitterClass may be an output committer that writes data directly to " + + "the final location. Because speculation is enabled, this output committer may " + + "cause data loss (see the case in SPARK-10063). If possible, please use an output " + + "committer that does not have this behavior (e.g. FileOutputCommitter)." + logWarning(warningMessage) + } + // Add table properties from storage handler to hadoopConf, so any custom storage // handler settings can be set to hadoopConf HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, conf, false)