From 8eb85e141922971a108fd1ab27358de9a4d1c975 Mon Sep 17 00:00:00 2001 From: scwf Date: Thu, 21 May 2015 16:17:46 +0800 Subject: [PATCH 01/11] support writing data into the filesystem from queries --- .../apache/spark/sql/hive/HiveContext.scala | 1 + .../spark/sql/hive/HiveMetastoreCatalog.scala | 21 ++++ .../org/apache/spark/sql/hive/HiveQl.scala | 97 ++++++++++++++- .../spark/sql/hive/HiveStrategies.scala | 2 + .../spark/sql/hive/SaveAsHiveFile.scala | 97 +++++++++++++++ .../hive/execution/InsertIntoHiveTable.scala | 78 ++----------- .../sql/hive/execution/WriteToDirectory.scala | 110 ++++++++++++++++++ .../sql/hive/execution/SQLQuerySuite.scala | 75 ++++++++++++ .../org/apache/spark/sql/hive/Shim12.scala | 2 +- .../org/apache/spark/sql/hive/Shim13.scala | 5 + 10 files changed, 418 insertions(+), 70 deletions(-) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/SaveAsHiveFile.scala create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 863a5db1bf98c..390a9e1b8e7d3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -368,6 +368,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { catalog.ParquetConversions :: catalog.CreateTables :: catalog.PreInsertionCasts :: + catalog.WriteToDirs :: ExtractPythonUdfs :: ResolveHiveWindowFunction :: sources.PreInsertCastAndRename :: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 2aa80b47a97e2..8519b584ecd15 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -469,6 +469,27 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive } } + object WriteToDirs extends Rule[LogicalPlan] with HiveInspectors { + import org.apache.hadoop.hive.ql.Context + import org.apache.hadoop.hive.ql.parse.{ASTNode, SemanticAnalyzer} + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + // Wait until children are resolved. + case p: LogicalPlan if !p.childrenResolved => p + + case WriteToDirectory(path, child, isLocal, tableDesc) + if !tableDesc.getProperties.containsKey("columns.types") => + // generate column name and related type info as hive style + val Array(cols, types) = child.output.foldLeft(Array("", ""))((r, a) => { + r(0) = r(0) + a.name + "," + r(1) = r(1) + a.dataType.toTypeInfo.getTypeName + ":" + r + }) + tableDesc.getProperties.setProperty("columns", cols.dropRight(1)) + tableDesc.getProperties.setProperty("columns.types", types.dropRight(1)) + WriteToDirectory(path, child, isLocal, tableDesc) + } + } /** * Casts input data to correct data types according to table definition before inserting into * that table. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 2cbb5ca4d2e0c..6e364c4dc4194 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.hive.ql.{ErrorMsg, Context} import org.apache.hadoop.hive.ql.exec.{FunctionRegistry, FunctionInfo} import org.apache.hadoop.hive.ql.lib.Node import org.apache.hadoop.hive.ql.parse._ -import org.apache.hadoop.hive.ql.plan.PlanUtils +import org.apache.hadoop.hive.ql.plan.{TableDesc, PlanUtils} import org.apache.hadoop.hive.ql.session.SessionState import org.apache.spark.sql.AnalysisException @@ -72,6 +72,22 @@ case class CreateTableAsSelect( childrenResolved } +/** + * Logical node for "INSERT OVERWRITE [LOCAL] DIRECTORY directory + * [ROW FORMAT row_format] STORED AS file_format SELECT ... FROM ..." + * @param path the target path to write data. + * @param child the child logical plan. + * @param isLocal whether to write data to local file system. + * @param desc describe the write property such as file format. + */ +case class WriteToDirectory( + path: String, + child: LogicalPlan, + isLocal: Boolean, + desc: TableDesc) extends UnaryNode { + override def output: Seq[Attribute] = Seq.empty[Attribute] +} + /** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */ private[hive] object HiveQl { protected val nativeCommands = Seq( @@ -1217,6 +1233,22 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C Token("TOK_TMP_FILE", Nil) :: Nil) :: Nil) => query + case Token(destinationToken(), + Token("TOK_LOCAL_DIR", path :: formats) :: Nil) => + WriteToDirectory( + BaseSemanticAnalyzer.unescapeSQLString(path.getText), + query, + true, + parseTableDesc(formats)) + + case Token(destinationToken(), + Token("TOK_DIR", path :: formats) :: Nil) => + WriteToDirectory( + BaseSemanticAnalyzer.unescapeSQLString(path.getText), + query, + false, + parseTableDesc(formats)) + case Token(destinationToken(), Token("TOK_TAB", tableArgs) :: Nil) => @@ -1675,6 +1707,69 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C } } + def parseTableDesc(nodeList: Seq[ASTNode]): TableDesc = { + import org.apache.hadoop.hive.ql.plan._ + + val createTableDesc = new CreateTableDesc() + + nodeList.collect { + case Token("TOK_TBLRCFILE", Nil) => + createTableDesc.setOutputFormat("org.apache.hadoop.hive.ql.io.RCFileOutputFormat") + createTableDesc.setSerName(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE)) + + case Token("TOK_TBLORCFILE", Nil) => + createTableDesc.setOutputFormat("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat") + createTableDesc.setSerName("org.apache.hadoop.hive.ql.io.orc.OrcSerde") + + case Token("TOK_TBLPARQUETFILE", Nil) => + createTableDesc + .setOutputFormat("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat") + createTableDesc.setSerName("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") + + case Token("TOK_TBLSEQUENCEFILE", Nil) => + createTableDesc.setOutputFormat("org.apache.hadoop.mapred.SequenceFileOutputFormat") + + case Token("TOK_TABLEROWFORMAT", Token("TOK_SERDEPROPS", child :: Nil) :: Nil)=> + val serdeParams = new java.util.HashMap[String, String]() + child match { + case Token("TOK_TABLEROWFORMATFIELD", rowChild1 :: rowChild2) => + val fieldDelim = BaseSemanticAnalyzer.unescapeSQLString (rowChild1.getText()) + serdeParams.put(serdeConstants.FIELD_DELIM, fieldDelim) + serdeParams.put(serdeConstants.SERIALIZATION_FORMAT, fieldDelim) + if (rowChild2.length > 1) { + val fieldEscape = BaseSemanticAnalyzer.unescapeSQLString (rowChild2(0).getText) + serdeParams.put(serdeConstants.ESCAPE_CHAR, fieldEscape) + } + case Token("TOK_TABLEROWFORMATCOLLITEMS", rowChild :: Nil) => + val collItemDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText) + serdeParams.put(serdeConstants.COLLECTION_DELIM, collItemDelim) + case Token("TOK_TABLEROWFORMATMAPKEYS", rowChild :: Nil) => + val mapKeyDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText) + serdeParams.put(serdeConstants.MAPKEY_DELIM, mapKeyDelim) + case Token("TOK_TABLEROWFORMATLINES", rowChild :: Nil) => + val lineDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText) + if (!(lineDelim == "\n") && !(lineDelim == "10")) { + throw new AnalysisException( + SemanticAnalyzer.generateErrorMessage( + rowChild, + ErrorMsg.LINES_TERMINATED_BY_NON_NEWLINE.getMsg)) + } + serdeParams.put(serdeConstants.LINE_DELIM, lineDelim) + + case Token("TOK_TABLEROWFORMATNULL", rowChild :: Nil) => + val nullFormat = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText) + // TODO support the nullFormat + case _ => assert(false) + } + createTableDesc.setSerdeProps(serdeParams) + + case _ => // Unsupport features + } + // note: we do not know the columns and column types when parsing, so here + // just input `null` for column types. column types will be set in analyzer. + PlanUtils.getDefaultTableDesc(createTableDesc, "", null) + } + def dumpTree(node: Node, builder: StringBuilder = new StringBuilder, indent: Int = 0) : StringBuilder = { node match { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index c6b65106452bf..46ab02a5e6f5e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -189,6 +189,8 @@ private[hive] trait HiveStrategies { table: MetastoreRelation, partition, child, overwrite, ifNotExists) => execution.InsertIntoHiveTable( table, partition, planLater(child), overwrite, ifNotExists) :: Nil + case hive.WriteToDirectory(path, child, isLocal, desc) => + execution.WriteToDirectory(path, planLater(child), isLocal, desc) :: Nil case _ => Nil } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SaveAsHiveFile.scala new file mode 100644 index 0000000000000..af1bf58e677ba --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SaveAsHiveFile.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import scala.collection.JavaConversions._ + +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde2.Serializer +import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} +import org.apache.hadoop.hive.serde2.objectinspector.{StructObjectInspector, ObjectInspectorUtils} +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.hive.{ ShimFileSinkDesc => FileSinkDesc } +import org.apache.spark.sql.hive.HiveShim._ +import org.apache.spark.{Logging, SparkContext, TaskContext, SerializableWritable} + +/** + * A trait for subclasses that write data using arbitrary SerDes to a file system . + */ +private[hive] trait SaveAsHiveFile extends HiveInspectors with Logging { + def newSerializer(tableDesc: TableDesc): Serializer = { + val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer] + serializer.initialize(null, tableDesc.getProperties) + serializer + } + + def saveAsHiveFile( + sparkContext: SparkContext, + rdd: RDD[Row], + valueClass: Class[_], + fileSinkConf: FileSinkDesc, + conf: SerializableWritable[JobConf], + writerContainer: SparkHiveWriterContainer): Unit = { + assert(valueClass != null, "Output value class not set") + conf.value.setOutputValueClass(valueClass) + + val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName + assert(outputFileFormatClassName != null, "Output format class not set") + conf.value.set("mapred.output.format.class", outputFileFormatClassName) + + FileOutputFormat.setOutputPath( + conf.value, + SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value)) + log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) + + writerContainer.driverSideSetup() + sparkContext.runJob(rdd, writeToFile _) + writerContainer.commitJob() + + // Note that this function is executed on executor side + def writeToFile(context: TaskContext, iterator: Iterator[Row]): Unit = { + val serializer = newSerializer(fileSinkConf.getTableInfo) + val standardOI = ObjectInspectorUtils + .getStandardObjectInspector( + fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray + val wrappers = fieldOIs.map(wrapperFor) + val outputData = new Array[Any](fieldOIs.length) + + writerContainer.executorSideSetup(context.stageId, context.partitionId, context.attemptNumber) + + iterator.foreach { row => + var i = 0 + while (i < fieldOIs.length) { + outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row(i)) + i += 1 + } + + writerContainer + .getLocalFileWriter(row) + .write(serializer.serialize(outputData, standardOI)) + } + + writerContainer.close() + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index c0b0b104e9142..9983942b41420 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -24,13 +24,8 @@ import scala.collection.JavaConversions._ import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.metastore.MetaStoreUtils -import org.apache.hadoop.hive.ql.metadata.Hive -import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.ql.{Context, ErrorMsg} -import org.apache.hadoop.hive.serde2.Serializer -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption -import org.apache.hadoop.hive.serde2.objectinspector._ -import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf} +import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{Attribute, Row} @@ -38,7 +33,7 @@ import org.apache.spark.sql.execution.{UnaryNode, SparkPlan} import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.{ ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.HiveShim._ -import org.apache.spark.{SerializableWritable, SparkException, TaskContext} +import org.apache.spark.{SerializableWritable, SparkException} private[hive] case class InsertIntoHiveTable( @@ -46,74 +41,15 @@ case class InsertIntoHiveTable( partition: Map[String, Option[String]], child: SparkPlan, overwrite: Boolean, - ifNotExists: Boolean) extends UnaryNode with HiveInspectors { + ifNotExists: Boolean) extends UnaryNode with SaveAsHiveFile { @transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext] @transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass @transient private lazy val hiveContext = new Context(sc.hiveconf) @transient private lazy val catalog = sc.catalog - private def newSerializer(tableDesc: TableDesc): Serializer = { - val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer] - serializer.initialize(null, tableDesc.getProperties) - serializer - } - def output: Seq[Attribute] = child.output - def saveAsHiveFile( - rdd: RDD[Row], - valueClass: Class[_], - fileSinkConf: FileSinkDesc, - conf: SerializableWritable[JobConf], - writerContainer: SparkHiveWriterContainer): Unit = { - assert(valueClass != null, "Output value class not set") - conf.value.setOutputValueClass(valueClass) - - val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName - assert(outputFileFormatClassName != null, "Output format class not set") - conf.value.set("mapred.output.format.class", outputFileFormatClassName) - - FileOutputFormat.setOutputPath( - conf.value, - SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value)) - log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) - - writerContainer.driverSideSetup() - sc.sparkContext.runJob(rdd, writeToFile _) - writerContainer.commitJob() - - // Note that this function is executed on executor side - def writeToFile(context: TaskContext, iterator: Iterator[Row]): Unit = { - val serializer = newSerializer(fileSinkConf.getTableInfo) - val standardOI = ObjectInspectorUtils - .getStandardObjectInspector( - fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, - ObjectInspectorCopyOption.JAVA) - .asInstanceOf[StructObjectInspector] - - val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray - val wrappers = fieldOIs.map(wrapperFor) - val outputData = new Array[Any](fieldOIs.length) - - writerContainer.executorSideSetup(context.stageId, context.partitionId, context.attemptNumber) - - iterator.foreach { row => - var i = 0 - while (i < fieldOIs.length) { - outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row(i)) - i += 1 - } - - writerContainer - .getLocalFileWriter(row) - .write(serializer.serialize(outputData, standardOI)) - } - - writerContainer.close() - } - } - /** * Inserts all the rows in the table into Hive. Row objects are properly serialized with the * `org.apache.hadoop.hive.serde2.SerDe` and the @@ -182,7 +118,13 @@ case class InsertIntoHiveTable( new SparkHiveWriterContainer(jobConf, fileSinkConf) } - saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer) + saveAsHiveFile( + sc.sparkContext, + child.execute(), + outputClass, + fileSinkConf, + jobConfSer, + writerContainer) val outputPath = FileOutputFormat.getOutputPath(jobConf) // Have to construct the format of dbname.tablename. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala new file mode 100644 index 0000000000000..c0f8cfae61389 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.common.FileUtils +import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hadoop.hive.ql.Context +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.mapred.JobConf + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} +import org.apache.spark.sql.hive.{ShimFileSinkDesc => FileSinkDesc, _} +import org.apache.spark.SerializableWritable + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class WriteToDirectory( + path: String, + child: SparkPlan, + isLocal: Boolean, + desc: TableDesc) extends UnaryNode with SaveAsHiveFile { + + @transient val hiveContext = sqlContext.asInstanceOf[HiveContext] + @transient private lazy val context = new Context(hiveContext.hiveconf) + @transient lazy val outputClass = newSerializer(desc).getSerializedClass + + def output:Seq[Attribute] = child.output + + protected[sql] lazy val sideEffectResult: Seq[Row] = { + val jobConf = new JobConf(hiveContext.hiveconf) + val jobConfSer = new SerializableWritable(jobConf) + val targetPath = new Path(path) + + val (tmpPath, destPath) = if (isLocal) { + val localFileSystem = FileSystem.getLocal(jobConf) + val localPath = localFileSystem.makeQualified(targetPath) + // remove old dir + if (localFileSystem.exists(localPath)) { + localFileSystem.delete(localPath, true) + } + (HiveShim.getExternalTmpPath(context, localPath.toUri), localPath) + } else { + val qualifiedPath = FileUtils.makeQualified(targetPath, hiveContext.hiveconf) + val dfs = qualifiedPath.getFileSystem(jobConf) + if (dfs.exists(qualifiedPath)) { + dfs.delete(qualifiedPath, true) + } else { + dfs.mkdirs(qualifiedPath.getParent) + } + (HiveShim.getExternalTmpPath(context, qualifiedPath.toUri), qualifiedPath) + } + + val fileSinkConf = new FileSinkDesc(tmpPath.toString, desc, false) + val isCompressed = hiveContext.hiveconf.getBoolean( + ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal) + + if (isCompressed) { + // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", + // and "mapred.output.compression.type" have no impact on ORC because it uses table properties + // to store compression information. + hiveContext.hiveconf.set("mapred.output.compress", "true") + fileSinkConf.setCompressed(true) + fileSinkConf.setCompressCodec(hiveContext.hiveconf.get("mapred.output.compression.codec")) + fileSinkConf.setCompressType(hiveContext.hiveconf.get("mapred.output.compression.type")) + } + + val writerContainer = new SparkHiveWriterContainer(jobConf, fileSinkConf) + + saveAsHiveFile(hiveContext.sparkContext, child.execute(), outputClass, + fileSinkConf, jobConfSer, writerContainer) + + val fs = tmpPath.getFileSystem(jobConf) + + // move tmp file to dest dir + if (isLocal) { + fs.moveToLocalFile(tmpPath, destPath) + } else if (!fs.rename(tmpPath, destPath)) { + throw new IOException("Unable to write data to " + destPath) + } + + Seq.empty[Row] + } + + override def executeCollect(): Array[Row] = sideEffectResult.toArray + + override def doExecute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1) +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index fbbf6ba5947dc..91b8ffbd8b34d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -422,6 +422,81 @@ class SQLQuerySuite extends QueryTest { sql("SELECT key, value FROM src ORDER BY key").collect().toSeq) } + test("test insert overwrite to dir from hive metastore table") { + import org.apache.spark.util.Utils + + val path = Utils.createTempDir() + path.delete() + checkAnswer( + sql(s"INSERT OVERWRITE LOCAL DIRECTORY '${path.toString}' SELECT * FROM src where key < 10"), + Seq.empty[Row]) + + checkAnswer( + sql(s"""INSERT OVERWRITE LOCAL DIRECTORY '${path.toString}' + |ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS orc + |SELECT * FROM src where key < 10""".stripMargin), + Seq.empty[Row]) + + // use orc data source to check the data of path is right. + sql( + s"""CREATE TEMPORARY TABLE orc_source + |USING org.apache.spark.sql.hive.orc + |OPTIONS ( + | PATH '${path.getCanonicalPath}' + |) + """.stripMargin) + checkAnswer( + sql("select * from orc_source"), + sql("select * from src where key < 10").collect() + ) + + Utils.deleteRecursively(path) + dropTempTable("orc_source") + } + + test("test insert overwrite to dir from temp table") { + import org.apache.spark.util.Utils + + sparkContext + .parallelize(1 to 10) + .map(i => TestData(i, i.toString)) + .toDF() + .registerTempTable("test_insert_table") + + val path = Utils.createTempDir() + path.delete() + checkAnswer( + sql( + s""" + |INSERT OVERWRITE LOCAL DIRECTORY '${path.toString}' + |SELECT * FROM test_insert_table + """.stripMargin), + Seq.empty[Row]) + + checkAnswer( + sql(s""" + INSERT OVERWRITE LOCAL DIRECTORY '${path.toString}' + |ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS orc + |SELECT * FROM test_insert_table""".stripMargin), + Seq.empty[Row]) + + // use orc data source to check the data of path is right. + sql( + s"""CREATE TEMPORARY TABLE orc_source + |USING org.apache.spark.sql.hive.orc + |OPTIONS ( + | PATH '${path.getCanonicalPath}' + |) + """.stripMargin) + checkAnswer( + sql("select * from orc_source"), + sql("select * from test_insert_table").collect() + ) + Utils.deleteRecursively(path) + dropTempTable("test_insert_table") + dropTempTable("orc_source") + } + test("SPARK-4825 save join to table") { val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF() sql("CREATE TABLE test1 (key INT, value STRING)") diff --git a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala index 33e96eaabfbf6..a053f52100827 100644 --- a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala +++ b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala @@ -205,7 +205,7 @@ private[hive] object HiveShim { } def getExternalTmpPath(context: Context, uri: URI) = { - context.getExternalTmpFileURI(uri) + new Path(context.getExternalTmpFileURI(uri)) } def getDataLocationPath(p: Partition) = p.getPartitionPath diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala index dbc5e029e2047..7e854aa44a495 100644 --- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala +++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive import java.rmi.server.UID import java.util.{Properties, ArrayList => JArrayList} +import java.net.URI import java.io.{OutputStream, InputStream} import scala.collection.JavaConversions._ @@ -340,6 +341,10 @@ private[hive] object HiveShim { } } + def getExternalTmpPath(context: Context, uri: URI) = { + context.getExternalTmpPath(uri) + } + def getExternalTmpPath(context: Context, path: Path) = { context.getExternalTmpPath(path.toUri) } From 9aa4b848893b85482f0938ec511ab0f2fa61b646 Mon Sep 17 00:00:00 2001 From: scwf Date: Fri, 22 May 2015 09:01:03 +0800 Subject: [PATCH 02/11] remove imports --- .../scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 2 -- sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 8519b584ecd15..ab44b0d1246c3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -470,8 +470,6 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive } object WriteToDirs extends Rule[LogicalPlan] with HiveInspectors { - import org.apache.hadoop.hive.ql.Context - import org.apache.hadoop.hive.ql.parse.{ASTNode, SemanticAnalyzer} def apply(plan: LogicalPlan): LogicalPlan = plan transform { // Wait until children are resolved. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 6e364c4dc4194..4f9978b2eccf7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -1765,7 +1765,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case _ => // Unsupport features } - // note: we do not know the columns and column types when parsing, so here + // Note: we do not know the columns and column types when parsing, so here // just input `null` for column types. column types will be set in analyzer. PlanUtils.getDefaultTableDesc(createTableDesc, "", null) } From d485beb6bffafcf3b854b9a726ee4991a46f3110 Mon Sep 17 00:00:00 2001 From: scwf Date: Thu, 25 Jun 2015 10:48:03 +0800 Subject: [PATCH 03/11] make WriteToDirectory extends command --- .../org/apache/spark/sql/catalyst/plans/logical/commands.scala | 2 -- sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 2 +- .../main/scala/org/apache/spark/sql/hive/SaveAsHiveFile.scala | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala index 246f4d7e34d3d..75a5b10d9ed04 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.expressions.Attribute - /** * A logical node that represents a non-query command to be executed by the system. For example, * commands can be used by parsers to represent DDL operations. Commands, unlike queries, are diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 9259344341d33..3b267681635be 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -84,7 +84,7 @@ case class WriteToDirectory( path: String, child: LogicalPlan, isLocal: Boolean, - desc: TableDesc) extends UnaryNode { + desc: TableDesc) extends UnaryNode with Command { override def output: Seq[Attribute] = Seq.empty[Attribute] } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SaveAsHiveFile.scala index 1b1af60ed0745..7b542330ddb64 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SaveAsHiveFile.scala @@ -69,7 +69,7 @@ private[hive] trait SaveAsHiveFile extends HiveInspectors with Logging { writerContainer.commitJob() // Note that this function is executed on executor side - def writeToFile(context: TaskContext, iterator: Iterator[Row]): Unit = { + def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = { val serializer = newSerializer(fileSinkConf.getTableInfo) val standardOI = ObjectInspectorUtils .getStandardObjectInspector( From 48d633ed4a1d3e67e1d4804ddf06e6fc01a8f426 Mon Sep 17 00:00:00 2001 From: scwf Date: Thu, 25 Jun 2015 18:34:38 +0800 Subject: [PATCH 04/11] style --- .../src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 2 +- .../apache/spark/sql/hive/execution/WriteToDirectory.scala | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 3b267681635be..fac367d5e8ecc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -1704,7 +1704,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case Token("TOK_TBLSEQUENCEFILE", Nil) => createTableDesc.setOutputFormat("org.apache.hadoop.mapred.SequenceFileOutputFormat") - case Token("TOK_TABLEROWFORMAT", Token("TOK_SERDEPROPS", child :: Nil) :: Nil)=> + case Token("TOK_TABLEROWFORMAT", Token("TOK_SERDEPROPS", child :: Nil) :: Nil) => val serdeParams = new java.util.HashMap[String, String]() child match { case Token("TOK_TABLEROWFORMATFIELD", rowChild1 :: rowChild2) => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala index bc494071fd1a3..4c49682c83aa6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala @@ -49,7 +49,7 @@ case class WriteToDirectory( @transient private lazy val context = new Context(hiveContext.hiveconf) @transient lazy val outputClass = newSerializer(desc).getSerializedClass - def output:Seq[Attribute] = child.output + def output: Seq[Attribute] = child.output protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { val jobConf = new JobConf(hiveContext.hiveconf) @@ -58,7 +58,7 @@ case class WriteToDirectory( val (tmpPath, destPath) = if (isLocal) { val localFileSystem = FileSystem.getLocal(jobConf) - val localPath = localFileSystem.makeQualified(targetPath) + val localPath = localFileSystem.makeQualified(targetPath) // remove old dir if (localFileSystem.exists(localPath)) { localFileSystem.delete(localPath, true) @@ -112,5 +112,6 @@ case class WriteToDirectory( Seq.empty[InternalRow] } - override def doExecute(): RDD[InternalRow] = sqlContext.sparkContext.parallelize(sideEffectResult, 1) + override def doExecute(): RDD[InternalRow] = + sqlContext.sparkContext.parallelize(sideEffectResult, 1) } From 9f0fdb51a5919826e5b3b04aa42d5a27bbb1e39b Mon Sep 17 00:00:00 2001 From: scwf Date: Mon, 20 Jul 2015 14:46:23 +0800 Subject: [PATCH 05/11] compile fix --- .../scala/org/apache/spark/sql/hive/SaveAsHiveFile.scala | 6 ++---- .../apache/spark/sql/hive/execution/WriteToDirectory.scala | 1 + 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SaveAsHiveFile.scala index 7b542330ddb64..8597f18c29408 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SaveAsHiveFile.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.hive -import org.apache.spark.util.SerializableJobConf - import scala.collection.JavaConversions._ import org.apache.hadoop.hive.ql.plan.TableDesc @@ -28,8 +26,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.{StructObjectInspector, Obj import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.util.SerializableJobConf import org.apache.spark.sql.types.StructType import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.{Logging, SparkContext, TaskContext} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala index 4c49682c83aa6..aaa4601c4eb3e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala @@ -28,6 +28,7 @@ import org.apache.hadoop.mapred.JobConf import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} From e6ade744aacd5386838cc3e641f0808871557228 Mon Sep 17 00:00:00 2001 From: scwf Date: Mon, 20 Jul 2015 19:00:11 +0800 Subject: [PATCH 06/11] fix saveashfile --- .../apache/spark/sql/hive/execution/InsertIntoHiveTable.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index a8f88f8a3c0fb..866729b420c61 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -124,7 +124,7 @@ case class InsertIntoHiveTable( saveAsHiveFile( sc.sparkContext, child.execute(), - StructType.fromAttributes(output), + table.schema, outputClass, fileSinkConf, jobConfSer, From 1faa6c4e14f8ecdb822f8fbe49bedcdcac13eed5 Mon Sep 17 00:00:00 2001 From: scwf Date: Thu, 23 Jul 2015 14:27:48 +0800 Subject: [PATCH 07/11] fix comments --- .../apache/spark/sql/hive/HiveContext.scala | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 7 ++++++ .../org/apache/spark/sql/hive/HiveQl.scala | 2 +- .../spark/sql/hive/HiveStrategies.scala | 2 -- .../hive/execution/InsertIntoHiveTable.scala | 2 -- .../sql/hive/execution/WriteToDirectory.scala | 22 ++++++++----------- 6 files changed, 18 insertions(+), 19 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 96912c38fe12a..8ad5cb457a0b5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -436,7 +436,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging { } @transient - private val hivePlanner = new SparkPlanner with HiveStrategies { + private[hive] val hivePlanner = new SparkPlanner with HiveStrategies { val hiveContext = self override def strategies: Seq[Strategy] = experimental.extraStrategies ++ Seq( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 51e0f62298b8c..2bdf27e5620bf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -502,6 +502,10 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive } } + /** + * Resolve hive.WriteToDirectory node,to set the properties + * of columns and columns.types in tableDesc. + */ object WriteToDirs extends Rule[LogicalPlan] with HiveInspectors { def apply(plan: LogicalPlan): LogicalPlan = plan transform { @@ -519,8 +523,11 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive tableDesc.getProperties.setProperty("columns", cols.dropRight(1)) tableDesc.getProperties.setProperty("columns.types", types.dropRight(1)) WriteToDirectory(path, child, isLocal, tableDesc) + case WriteToDirectory(path, child, isLocal, tableDesc) => + execution.WriteToDirectory(path, hive.executePlan(child).executedPlan, isLocal, tableDesc) } } + /** * Casts input data to correct data types according to table definition before inserting into * that table. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index bee74e7d465e7..b53159fd0fa6d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -81,7 +81,7 @@ private[hive] case class CreateTableAsSelect( * @param isLocal whether to write data to local file system. * @param desc describe the write property such as file format. */ -case class WriteToDirectory( +private[hive] case class WriteToDirectory( path: String, child: LogicalPlan, isLocal: Boolean, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 36ffb85f82d3a..ed359620a5f7f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -190,8 +190,6 @@ private[hive] trait HiveStrategies { table: MetastoreRelation, partition, child, overwrite, ifNotExists) => execution.InsertIntoHiveTable( table, partition, planLater(child), overwrite, ifNotExists) :: Nil - case hive.WriteToDirectory(path, child, isLocal, desc) => - execution.WriteToDirectory(path, planLater(child), isLocal, desc) :: Nil case _ => Nil } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 866729b420c61..91e324a5fbc23 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -26,13 +26,11 @@ import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} import org.apache.spark.SparkException import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.{UnaryNode, SparkPlan} import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive._ -import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableJobConf import scala.collection.JavaConversions._ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala index aaa4601c4eb3e..3612cc04dc332 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala @@ -27,10 +27,9 @@ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.mapred.JobConf import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} +import org.apache.spark.sql.execution.{RunnableCommand, SparkPlan} import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive._ import org.apache.spark.sql.types.StructType @@ -44,15 +43,14 @@ case class WriteToDirectory( path: String, child: SparkPlan, isLocal: Boolean, - desc: TableDesc) extends UnaryNode with SaveAsHiveFile { + desc: TableDesc) extends RunnableCommand with SaveAsHiveFile { - @transient val hiveContext = sqlContext.asInstanceOf[HiveContext] - @transient private lazy val context = new Context(hiveContext.hiveconf) - @transient lazy val outputClass = newSerializer(desc).getSerializedClass + override def output: Seq[Attribute] = child.output - def output: Seq[Attribute] = child.output - - protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { + def run(sqlContext: SQLContext): Seq[Row] = { + @transient val hiveContext = sqlContext.asInstanceOf[HiveContext] + @transient lazy val context = new Context(hiveContext.hiveconf) + @transient lazy val outputClass = newSerializer(desc).getSerializedClass val jobConf = new JobConf(hiveContext.hiveconf) val jobConfSer = new SerializableJobConf(jobConf) val targetPath = new Path(path) @@ -110,9 +108,7 @@ case class WriteToDirectory( throw new IOException("Unable to write data to " + destPath) } - Seq.empty[InternalRow] + Seq.empty[Row] } - override def doExecute(): RDD[InternalRow] = - sqlContext.sparkContext.parallelize(sideEffectResult, 1) } From 33a2b0aa5693c0100e022977c1cf472d08e53010 Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 29 Jul 2015 09:29:39 +0800 Subject: [PATCH 08/11] compile fix accordng to new changes of inernalrow --- .../main/scala/org/apache/spark/sql/hive/SaveAsHiveFile.scala | 4 +++- .../apache/spark/sql/hive/execution/InsertIntoHiveTable.scala | 1 + .../apache/spark/sql/hive/execution/WriteToDirectory.scala | 1 + 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SaveAsHiveFile.scala index 8597f18c29408..c3e804bbb1707 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SaveAsHiveFile.scala @@ -29,6 +29,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.util.SerializableJobConf import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.DataType import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.{Logging, SparkContext, TaskContext} @@ -46,6 +47,7 @@ private[hive] trait SaveAsHiveFile extends HiveInspectors with Logging { sparkContext: SparkContext, rdd: RDD[InternalRow], schema: StructType, + dataTypes: Array[DataType], valueClass: Class[_], fileSinkConf: FileSinkDesc, conf: SerializableJobConf, @@ -84,7 +86,7 @@ private[hive] trait SaveAsHiveFile extends HiveInspectors with Logging { iterator.foreach { row => var i = 0 while (i < fieldOIs.length) { - outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row(i)) + outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i))) i += 1 } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index c61ef5fb41de0..502b8738befc2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -124,6 +124,7 @@ case class InsertIntoHiveTable( sc.sparkContext, child.execute(), table.schema, + child.output.map(_.dataType).toArray, outputClass, fileSinkConf, jobConfSer, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala index 3612cc04dc332..a23ea3e57f8a8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala @@ -94,6 +94,7 @@ case class WriteToDirectory( hiveContext.sparkContext, child.execute(), StructType.fromAttributes(output), + child.output.map(_.dataType).toArray, outputClass, fileSinkConf, jobConfSer, From b049e3a7e3d0863d63a8c4ccad03ec9f1d37e5d4 Mon Sep 17 00:00:00 2001 From: scwf Date: Sat, 8 Aug 2015 16:54:42 +0800 Subject: [PATCH 09/11] compile --- .../apache/spark/sql/hive/execution/InsertIntoHiveTable.scala | 2 +- .../apache/spark/sql/hive/execution/WriteToDirectory.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 502b8738befc2..b92507b0fc391 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -64,7 +64,7 @@ case class InsertIntoHiveTable( // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = table.tableDesc val tableLocation = table.hiveQlTable.getDataLocation - val tmpLocation = hiveContext.getExternalTmpPath(tableLocation.toUri) + val tmpLocation = hiveContext.getExternalTmpPath(tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) val isCompressed = sc.hiveconf.getBoolean( ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala index a23ea3e57f8a8..8a050305d6bb2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala @@ -62,7 +62,7 @@ case class WriteToDirectory( if (localFileSystem.exists(localPath)) { localFileSystem.delete(localPath, true) } - (context.getExternalTmpPath(localPath.toUri), localPath) + (context.getExternalTmpPath(localPath), localPath) } else { val qualifiedPath = FileUtils.makeQualified(targetPath, hiveContext.hiveconf) val dfs = qualifiedPath.getFileSystem(jobConf) @@ -71,7 +71,7 @@ case class WriteToDirectory( } else { dfs.mkdirs(qualifiedPath.getParent) } - (context.getExternalTmpPath(qualifiedPath.toUri), qualifiedPath) + (context.getExternalTmpPath(qualifiedPath), qualifiedPath) } val fileSinkConf = new FileSinkDesc(tmpPath.toString, desc, false) From c6f4b4b74c29db2a9a666a3c59dd98cb26a5a90a Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 12 Aug 2015 10:35:10 +0800 Subject: [PATCH 10/11] merge bug fix --- .../org/apache/spark/sql/hive/HiveQl.scala | 50 +++++++++++-------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 26e5c5ff419c7..ee8a7d93adda5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -1225,20 +1225,17 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C Token("TOK_TMP_FILE", Nil) :: Nil) :: Nil) => query - case Token(destinationToken(), - Token("TOK_LOCAL_DIR", path :: formats) :: Nil) => - WriteToDirectory( - BaseSemanticAnalyzer.unescapeSQLString(path.getText), - query, - true, - parseTableDesc(formats)) - case Token(destinationToken(), Token("TOK_DIR", path :: formats) :: Nil) => + var isLocal = false + formats.collect { + case Token("LOCAL", others) => + isLocal = true + } WriteToDirectory( BaseSemanticAnalyzer.unescapeSQLString(path.getText), query, - false, + isLocal, parseTableDesc(formats)) case Token(destinationToken(), @@ -1691,21 +1688,32 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C val createTableDesc = new CreateTableDesc() nodeList.collect { - case Token("TOK_TBLRCFILE", Nil) => - createTableDesc.setOutputFormat("org.apache.hadoop.hive.ql.io.RCFileOutputFormat") - createTableDesc.setSerName(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE)) + case Token("TOK_FILEFORMAT_GENERIC", child :: Nil) => + child.getText().toLowerCase(Locale.ENGLISH) match { + case "orc" => + createTableDesc.setOutputFormat("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat") + createTableDesc.setSerName("org.apache.hadoop.hive.ql.io.orc.OrcSerde") + + case "parquet" => + createTableDesc + .setOutputFormat("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat") + createTableDesc.setSerName("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") + + case "rcfile" => + createTableDesc.setOutputFormat("org.apache.hadoop.hive.ql.io.RCFileOutputFormat") + createTableDesc.setSerName(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE)) - case Token("TOK_TBLORCFILE", Nil) => - createTableDesc.setOutputFormat("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat") - createTableDesc.setSerName("org.apache.hadoop.hive.ql.io.orc.OrcSerde") + case "textfile" => + createTableDesc + .setOutputFormat("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat") - case Token("TOK_TBLPARQUETFILE", Nil) => - createTableDesc - .setOutputFormat("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat") - createTableDesc.setSerName("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") + case "sequencefile" => + createTableDesc.setOutputFormat("org.apache.hadoop.mapred.SequenceFileOutputFormat") - case Token("TOK_TBLSEQUENCEFILE", Nil) => - createTableDesc.setOutputFormat("org.apache.hadoop.mapred.SequenceFileOutputFormat") + case _ => + throw new SemanticException( + s"Unrecognized file format in STORED AS clause: ${child.getText}") + } case Token("TOK_TABLEROWFORMAT", Token("TOK_SERDEPROPS", child :: Nil) :: Nil) => val serdeParams = new java.util.HashMap[String, String]() From 9cc8474fbe172fc28085e06c2c4e37ee967b038b Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 12 Aug 2015 11:32:40 +0800 Subject: [PATCH 11/11] fix write path --- .../org/apache/spark/sql/hive/HiveQl.scala | 3 ++- .../sql/hive/execution/WriteToDirectory.scala | 17 ++++------------- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 9547c4110a800..5c84a4b9cfd92 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -1722,7 +1722,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case "parquet" => createTableDesc .setOutputFormat("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat") - createTableDesc.setSerName("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") + createTableDesc + .setSerName("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") case "rcfile" => createTableDesc.setOutputFormat("org.apache.hadoop.hive.ql.io.RCFileOutputFormat") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala index 8a050305d6bb2..c250e59481fb9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala @@ -55,14 +55,14 @@ case class WriteToDirectory( val jobConfSer = new SerializableJobConf(jobConf) val targetPath = new Path(path) - val (tmpPath, destPath) = if (isLocal) { + val writeToPath = if (isLocal) { val localFileSystem = FileSystem.getLocal(jobConf) val localPath = localFileSystem.makeQualified(targetPath) // remove old dir if (localFileSystem.exists(localPath)) { localFileSystem.delete(localPath, true) } - (context.getExternalTmpPath(localPath), localPath) + localPath } else { val qualifiedPath = FileUtils.makeQualified(targetPath, hiveContext.hiveconf) val dfs = qualifiedPath.getFileSystem(jobConf) @@ -71,10 +71,10 @@ case class WriteToDirectory( } else { dfs.mkdirs(qualifiedPath.getParent) } - (context.getExternalTmpPath(qualifiedPath), qualifiedPath) + qualifiedPath } - val fileSinkConf = new FileSinkDesc(tmpPath.toString, desc, false) + val fileSinkConf = new FileSinkDesc(writeToPath.toString, desc, false) val isCompressed = hiveContext.hiveconf.getBoolean( ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal) @@ -100,15 +100,6 @@ case class WriteToDirectory( jobConfSer, writerContainer) - val fs = tmpPath.getFileSystem(jobConf) - - // move tmp file to dest dir - if (isLocal) { - fs.moveToLocalFile(tmpPath, destPath) - } else if (!fs.rename(tmpPath, destPath)) { - throw new IOException("Unable to write data to " + destPath) - } - Seq.empty[Row] }