From cb2d1b920885e8eeba6551d55c68f0b972b9942c Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 3 Feb 2017 21:12:49 +0800 Subject: [PATCH 01/40] [SPARK-19448][SQL]optimize some duplication functions in MetaStoreRelation and process another TODO --- .../spark/sql/hive/MetastoreRelation.scala | 63 +++---------------- .../sql/hive/client/HiveClientImpl.scala | 4 +- .../hive/execution/HiveTableScanExec.scala | 2 +- 3 files changed, 11 insertions(+), 58 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 346757c2047a7..43295e7cf8bca 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -60,57 +60,11 @@ private[hive] case class MetastoreRelation( override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil - private def toHiveColumn(c: StructField): FieldSchema = { - val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) { - c.metadata.getString(HiveUtils.hiveTypeString) - } else { - c.dataType.catalogString - } - new FieldSchema(c.name, typeString, c.getComment.orNull) - } - - // TODO: merge this with HiveClientImpl#toHiveTable - @transient val hiveQlTable: HiveTable = { - // We start by constructing an API table as Hive performs several important transformations - // internally when converting an API table to a QL table. - val tTable = new org.apache.hadoop.hive.metastore.api.Table() - tTable.setTableName(catalogTable.identifier.table) - tTable.setDbName(catalogTable.database) - - val tableParameters = new java.util.HashMap[String, String]() - tTable.setParameters(tableParameters) - catalogTable.properties.foreach { case (k, v) => tableParameters.put(k, v) } - - tTable.setTableType(catalogTable.tableType match { - case CatalogTableType.EXTERNAL => HiveTableType.EXTERNAL_TABLE.toString - case CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE.toString - case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW.toString - }) - - val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() - tTable.setSd(sd) - - // Note: In Hive the schema and partition columns must be disjoint sets - val (partCols, schema) = catalogTable.schema.map(toHiveColumn).partition { c => - catalogTable.partitionColumnNames.contains(c.getName) - } - sd.setCols(schema.asJava) - tTable.setPartitionKeys(partCols.asJava) + // here used to call some helper functions, not used for storing persistent metadata + @transient val hiveClientImpl = HiveUtils.newClientForExecution(sparkSession.sparkContext.conf, + sparkSession.sessionState.newHadoopConf()) - catalogTable.storage.locationUri.foreach(sd.setLocation) - catalogTable.storage.inputFormat.foreach(sd.setInputFormat) - catalogTable.storage.outputFormat.foreach(sd.setOutputFormat) - - val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo - catalogTable.storage.serde.foreach(serdeInfo.setSerializationLib) - sd.setSerdeInfo(serdeInfo) - - val serdeParameters = new java.util.HashMap[String, String]() - catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) } - serdeInfo.setParameters(serdeParameters) - - new HiveTable(tTable) - } + @transient val hiveQlTable: HiveTable = hiveClientImpl.toHiveTable(catalogTable) @transient override def computeStats(conf: CatalystConf): Statistics = { catalogTable.stats.map(_.toPlanStats(output)).getOrElse(Statistics( @@ -176,7 +130,7 @@ private[hive] case class MetastoreRelation( tPartition.setSd(sd) // Note: In Hive the schema and partition columns must be disjoint sets - val schema = catalogTable.schema.map(toHiveColumn).filter { c => + val schema = catalogTable.schema.map(hiveClientImpl.toHiveColumn).filter { c => !catalogTable.partitionColumnNames.contains(c.getName) } sd.setCols(schema.asJava) @@ -231,18 +185,17 @@ private[hive] case class MetastoreRelation( val partitionKeys = catalogTable.partitionSchema.map(_.toAttribute) /** Non-partitionKey attributes */ - // TODO: just make this hold the schema itself, not just non-partition columns - val attributes = catalogTable.schema + val dataColKeys = catalogTable.schema .filter { c => !catalogTable.partitionColumnNames.contains(c.name) } .map(_.toAttribute) - val output = attributes ++ partitionKeys + val output = dataColKeys ++ partitionKeys /** An attribute map that can be used to lookup original attributes based on expression id. */ val attributeMap = AttributeMap(output.map(o => (o, o))) /** An attribute map for determining the ordinal for non-partition columns. */ - val columnOrdinals = AttributeMap(attributes.zipWithIndex) + val columnOrdinals = AttributeMap(dataColKeys.zipWithIndex) override def inputFiles: Array[String] = { val partLocations = allPartitions diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index bf703a5ab6e60..779abf1919d2f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -789,7 +789,7 @@ private[hive] class HiveClientImpl( Utils.classForName(name) .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] - private def toHiveColumn(c: StructField): FieldSchema = { + private[hive] def toHiveColumn(c: StructField): FieldSchema = { val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) { c.metadata.getString(HiveUtils.hiveTypeString) } else { @@ -815,7 +815,7 @@ private[hive] class HiveClientImpl( Option(hc.getComment).map(field.withComment).getOrElse(field) } - private def toHiveTable(table: CatalogTable): HiveTable = { + private[hive] def toHiveTable(table: CatalogTable): HiveTable = { val hiveTable = new HiveTable(table.database, table.identifier.table) // For EXTERNAL_TABLE, we also need to set EXTERNAL field in the table properties. // Otherwise, Hive metastore will change the table to a MANAGED_TABLE. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index def6ef3691333..140c352fa6f8d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -113,7 +113,7 @@ case class HiveTableScanExec( .mkString(",") hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) - hiveConf.set(serdeConstants.LIST_COLUMNS, relation.attributes.map(_.name).mkString(",")) + hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataColKeys.map(_.name).mkString(",")) } /** From 75e67d5f6267ccbaa3184fc1c03951cba5a7ef67 Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 3 Feb 2017 21:20:07 +0800 Subject: [PATCH 02/40] set a func to private --- .../scala/org/apache/spark/sql/hive/MetastoreRelation.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 43295e7cf8bca..5ab89476eca74 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -61,8 +61,8 @@ private[hive] case class MetastoreRelation( override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil // here used to call some helper functions, not used for storing persistent metadata - @transient val hiveClientImpl = HiveUtils.newClientForExecution(sparkSession.sparkContext.conf, - sparkSession.sessionState.newHadoopConf()) + @transient private val hiveClientImpl = HiveUtils.newClientForExecution( + sparkSession.sparkContext.conf, sparkSession.sessionState.newHadoopConf()) @transient val hiveQlTable: HiveTable = hiveClientImpl.toHiveTable(catalogTable) From 2fbd7174aa827374c8e9e5b0257b40ecf05c16c3 Mon Sep 17 00:00:00 2001 From: windpiger Date: Sat, 4 Feb 2017 11:02:29 +0800 Subject: [PATCH 03/40] create a object of HiveClientImpl --- .../spark/sql/hive/MetastoreRelation.scala | 12 ++-- .../sql/hive/client/HiveClientImpl.scala | 60 ++++++++++++------- 2 files changed, 44 insertions(+), 28 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 5ab89476eca74..e9817d0ae9a4d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -22,10 +22,9 @@ import java.io.IOException import scala.collection.JavaConverters._ import com.google.common.base.Objects + import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.hive.common.StatsSetupConst -import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} -import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable} import org.apache.hadoop.hive.ql.plan.TableDesc @@ -36,6 +35,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.execution.FileRelation +import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.types.StructField @@ -60,11 +60,7 @@ private[hive] case class MetastoreRelation( override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil - // here used to call some helper functions, not used for storing persistent metadata - @transient private val hiveClientImpl = HiveUtils.newClientForExecution( - sparkSession.sparkContext.conf, sparkSession.sessionState.newHadoopConf()) - - @transient val hiveQlTable: HiveTable = hiveClientImpl.toHiveTable(catalogTable) + @transient val hiveQlTable: HiveTable = HiveClientImpl.toHiveTable(catalogTable) @transient override def computeStats(conf: CatalystConf): Statistics = { catalogTable.stats.map(_.toPlanStats(output)).getOrElse(Statistics( @@ -130,7 +126,7 @@ private[hive] case class MetastoreRelation( tPartition.setSd(sd) // Note: In Hive the schema and partition columns must be disjoint sets - val schema = catalogTable.schema.map(hiveClientImpl.toHiveColumn).filter { c => + val schema = catalogTable.schema.map(HiveClientImpl.toHiveColumn).filter { c => !catalogTable.partitionColumnNames.contains(c.getName) } sd.setCols(schema.asJava) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 779abf1919d2f..a848167886d27 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -365,8 +365,8 @@ private[hive] class HiveClientImpl( Option(client.getTable(dbName, tableName, false)).map { h => // Note: Hive separates partition columns and the schema, but for us the // partition columns are part of the schema - val partCols = h.getPartCols.asScala.map(fromHiveColumn) - val schema = StructType(h.getCols.asScala.map(fromHiveColumn) ++ partCols) + val partCols = h.getPartCols.asScala.map(HiveClientImpl.fromHiveColumn) + val schema = StructType(h.getCols.asScala.map(HiveClientImpl.fromHiveColumn) ++ partCols) // Skew spec, storage handler, and bucketing info can't be mapped to CatalogTable (yet) val unsupportedFeatures = ArrayBuffer.empty[String] @@ -435,7 +435,7 @@ private[hive] class HiveClientImpl( } override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState { - client.createTable(toHiveTable(table), ignoreIfExists) + client.createTable(HiveClientImpl.toHiveTable(table, conf, shim), ignoreIfExists) } override def dropTable( @@ -447,7 +447,7 @@ private[hive] class HiveClientImpl( } override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState { - val hiveTable = toHiveTable(table) + val hiveTable = HiveClientImpl.toHiveTable(table, conf, shim) // Do not use `table.qualifiedName` here because this may be a rename val qualifiedTableName = s"${table.database}.$tableName" client.alterTable(qualifiedTableName, hiveTable) @@ -516,10 +516,10 @@ private[hive] class HiveClientImpl( newSpecs: Seq[TablePartitionSpec]): Unit = withHiveState { require(specs.size == newSpecs.size, "number of old and new partition specs differ") val catalogTable = getTable(db, table) - val hiveTable = toHiveTable(catalogTable) + val hiveTable = HiveClientImpl.toHiveTable(catalogTable, conf, shim) specs.zip(newSpecs).foreach { case (oldSpec, newSpec) => val hivePart = getPartitionOption(catalogTable, oldSpec) - .map { p => toHivePartition(p.copy(spec = newSpec), hiveTable) } + .map { p => HiveClientImpl.toHivePartition(p.copy(spec = newSpec), hiveTable) } .getOrElse { throw new NoSuchPartitionException(db, table, oldSpec) } client.renamePartition(hiveTable, oldSpec.asJava, hivePart) } @@ -529,8 +529,9 @@ private[hive] class HiveClientImpl( db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit = withHiveState { - val hiveTable = toHiveTable(getTable(db, table)) - client.alterPartitions(table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava) + val hiveTable = HiveClientImpl.toHiveTable(getTable(db, table), conf, shim) + client.alterPartitions(table, newParts.map { + p => HiveClientImpl.toHivePartition(p, hiveTable) }.asJava) } /** @@ -557,9 +558,9 @@ private[hive] class HiveClientImpl( override def getPartitionOption( table: CatalogTable, spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState { - val hiveTable = toHiveTable(table) + val hiveTable = HiveClientImpl.toHiveTable(table, conf, shim) val hivePartition = client.getPartition(hiveTable, spec.asJava, false) - Option(hivePartition).map(fromHivePartition) + Option(hivePartition).map(HiveClientImpl.fromHivePartition) } /** @@ -569,12 +570,12 @@ private[hive] class HiveClientImpl( override def getPartitions( table: CatalogTable, spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState { - val hiveTable = toHiveTable(table) + val hiveTable = HiveClientImpl.toHiveTable(table, conf, shim) val parts = spec match { - case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition) + case None => shim.getAllPartitions(client, hiveTable).map(HiveClientImpl.fromHivePartition) case Some(s) => assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid") - client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition) + client.getPartitions(hiveTable, s.asJava).asScala.map(HiveClientImpl.fromHivePartition) } HiveCatalogMetrics.incrementFetchedPartitions(parts.length) parts @@ -583,8 +584,9 @@ private[hive] class HiveClientImpl( override def getPartitionsByFilter( table: CatalogTable, predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState { - val hiveTable = toHiveTable(table) - val parts = shim.getPartitionsByFilter(client, hiveTable, predicates).map(fromHivePartition) + val hiveTable = HiveClientImpl.toHiveTable(table, conf, shim) + val parts = shim.getPartitionsByFilter(client, hiveTable, predicates) + .map(HiveClientImpl.fromHivePartition) HiveCatalogMetrics.incrementFetchedPartitions(parts.length) parts } @@ -778,6 +780,22 @@ private[hive] class HiveClientImpl( } + +} + +private[hive] object HiveClientImpl { + private lazy val shimDefault = IsolatedClientLoader.hiveVersion( + HiveUtils.hiveExecutionVersion) match { + case hive.v12 => new Shim_v0_12() + case hive.v13 => new Shim_v0_13() + case hive.v14 => new Shim_v0_14() + case hive.v1_0 => new Shim_v1_0() + case hive.v1_1 => new Shim_v1_1() + case hive.v1_2 => new Shim_v1_2() + } + + private lazy val hiveConf = new HiveConf(classOf[SessionState]) + /* -------------------------------------------------------- * | Helper methods for converting to and from Hive classes | * -------------------------------------------------------- */ @@ -789,7 +807,8 @@ private[hive] class HiveClientImpl( Utils.classForName(name) .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] - private[hive] def toHiveColumn(c: StructField): FieldSchema = { + + def toHiveColumn(c: StructField): FieldSchema = { val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) { c.metadata.getString(HiveUtils.hiveTypeString) } else { @@ -798,7 +817,7 @@ private[hive] class HiveClientImpl( new FieldSchema(c.name, typeString, c.getComment().orNull) } - private def fromHiveColumn(hc: FieldSchema): StructField = { + def fromHiveColumn(hc: FieldSchema): StructField = { val columnType = try { CatalystSqlParser.parseDataType(hc.getType) } catch { @@ -815,7 +834,8 @@ private[hive] class HiveClientImpl( Option(hc.getComment).map(field.withComment).getOrElse(field) } - private[hive] def toHiveTable(table: CatalogTable): HiveTable = { + def toHiveTable(table: CatalogTable, conf: HiveConf = hiveConf, shim: Shim = shimDefault) + : HiveTable = { val hiveTable = new HiveTable(table.database, table.identifier.table) // For EXTERNAL_TABLE, we also need to set EXTERNAL field in the table properties. // Otherwise, Hive metastore will change the table to a MANAGED_TABLE. @@ -866,7 +886,7 @@ private[hive] class HiveClientImpl( hiveTable } - private def toHivePartition( + def toHivePartition( p: CatalogTablePartition, ht: HiveTable): HivePartition = { val tpart = new org.apache.hadoop.hive.metastore.api.Partition @@ -891,7 +911,7 @@ private[hive] class HiveClientImpl( new HivePartition(ht, tpart) } - private def fromHivePartition(hp: HivePartition): CatalogTablePartition = { + def fromHivePartition(hp: HivePartition): CatalogTablePartition = { val apiPartition = hp.getTPartition CatalogTablePartition( spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), From baf776a774df6ce03a9ccb39c899615f22ccb680 Mon Sep 17 00:00:00 2001 From: windpiger Date: Sat, 4 Feb 2017 11:07:11 +0800 Subject: [PATCH 04/40] remove an empty line --- .../main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index e9817d0ae9a4d..28827569882d7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -22,7 +22,6 @@ import java.io.IOException import scala.collection.JavaConverters._ import com.google.common.base.Objects - import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable} From 43138409a6a79abe6c227987b1ebf27cbdfc8786 Mon Sep 17 00:00:00 2001 From: windpiger Date: Sat, 4 Feb 2017 15:26:48 +0800 Subject: [PATCH 05/40] remove HiveConf in Object HiveClientImpl --- .../sql/hive/client/HiveClientImpl.scala | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index a848167886d27..bf57cff40e587 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -435,7 +435,7 @@ private[hive] class HiveClientImpl( } override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState { - client.createTable(HiveClientImpl.toHiveTable(table, conf, shim), ignoreIfExists) + client.createTable(HiveClientImpl.toHiveTable(table, Some(conf), shim), ignoreIfExists) } override def dropTable( @@ -447,7 +447,7 @@ private[hive] class HiveClientImpl( } override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState { - val hiveTable = HiveClientImpl.toHiveTable(table, conf, shim) + val hiveTable = HiveClientImpl.toHiveTable(table, Some(conf), shim) // Do not use `table.qualifiedName` here because this may be a rename val qualifiedTableName = s"${table.database}.$tableName" client.alterTable(qualifiedTableName, hiveTable) @@ -516,7 +516,7 @@ private[hive] class HiveClientImpl( newSpecs: Seq[TablePartitionSpec]): Unit = withHiveState { require(specs.size == newSpecs.size, "number of old and new partition specs differ") val catalogTable = getTable(db, table) - val hiveTable = HiveClientImpl.toHiveTable(catalogTable, conf, shim) + val hiveTable = HiveClientImpl.toHiveTable(catalogTable, Some(conf), shim) specs.zip(newSpecs).foreach { case (oldSpec, newSpec) => val hivePart = getPartitionOption(catalogTable, oldSpec) .map { p => HiveClientImpl.toHivePartition(p.copy(spec = newSpec), hiveTable) } @@ -529,7 +529,7 @@ private[hive] class HiveClientImpl( db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit = withHiveState { - val hiveTable = HiveClientImpl.toHiveTable(getTable(db, table), conf, shim) + val hiveTable = HiveClientImpl.toHiveTable(getTable(db, table), Some(conf), shim) client.alterPartitions(table, newParts.map { p => HiveClientImpl.toHivePartition(p, hiveTable) }.asJava) } @@ -558,7 +558,7 @@ private[hive] class HiveClientImpl( override def getPartitionOption( table: CatalogTable, spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState { - val hiveTable = HiveClientImpl.toHiveTable(table, conf, shim) + val hiveTable = HiveClientImpl.toHiveTable(table, Some(conf), shim) val hivePartition = client.getPartition(hiveTable, spec.asJava, false) Option(hivePartition).map(HiveClientImpl.fromHivePartition) } @@ -570,7 +570,7 @@ private[hive] class HiveClientImpl( override def getPartitions( table: CatalogTable, spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState { - val hiveTable = HiveClientImpl.toHiveTable(table, conf, shim) + val hiveTable = HiveClientImpl.toHiveTable(table, Some(conf), shim) val parts = spec match { case None => shim.getAllPartitions(client, hiveTable).map(HiveClientImpl.fromHivePartition) case Some(s) => @@ -584,7 +584,7 @@ private[hive] class HiveClientImpl( override def getPartitionsByFilter( table: CatalogTable, predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState { - val hiveTable = HiveClientImpl.toHiveTable(table, conf, shim) + val hiveTable = HiveClientImpl.toHiveTable(table, Some(conf), shim) val parts = shim.getPartitionsByFilter(client, hiveTable, predicates) .map(HiveClientImpl.fromHivePartition) HiveCatalogMetrics.incrementFetchedPartitions(parts.length) @@ -778,9 +778,6 @@ private[hive] class HiveClientImpl( client.dropDatabase(db, true, false, true) } } - - - } private[hive] object HiveClientImpl { @@ -794,8 +791,6 @@ private[hive] object HiveClientImpl { case hive.v1_2 => new Shim_v1_2() } - private lazy val hiveConf = new HiveConf(classOf[SessionState]) - /* -------------------------------------------------------- * | Helper methods for converting to and from Hive classes | * -------------------------------------------------------- */ @@ -834,7 +829,7 @@ private[hive] object HiveClientImpl { Option(hc.getComment).map(field.withComment).getOrElse(field) } - def toHiveTable(table: CatalogTable, conf: HiveConf = hiveConf, shim: Shim = shimDefault) + def toHiveTable(table: CatalogTable, conf: Option[HiveConf] = None, shim: Shim = shimDefault) : HiveTable = { val hiveTable = new HiveTable(table.database, table.identifier.table) // For EXTERNAL_TABLE, we also need to set EXTERNAL field in the table properties. @@ -865,7 +860,7 @@ private[hive] object HiveClientImpl { hiveTable.setFields(schema.asJava) } hiveTable.setPartCols(partCols.asJava) - hiveTable.setOwner(conf.getUser) + conf.foreach(c => hiveTable.setOwner(c.getUser)) hiveTable.setCreateTime((table.createTime / 1000).toInt) hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt) table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) } From a2ccba2987b90a40a6bf55477e5ecfba022c09f6 Mon Sep 17 00:00:00 2001 From: windpiger Date: Sat, 4 Feb 2017 16:04:07 +0800 Subject: [PATCH 06/40] add some comments --- .../apache/spark/sql/hive/client/HiveClientImpl.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index bf57cff40e587..384361ed51eff 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -781,7 +781,7 @@ private[hive] class HiveClientImpl( } private[hive] object HiveClientImpl { - private lazy val shimDefault = IsolatedClientLoader.hiveVersion( + private lazy val shimForHiveExecution = IsolatedClientLoader.hiveVersion( HiveUtils.hiveExecutionVersion) match { case hive.v12 => new Shim_v0_12() case hive.v13 => new Shim_v0_13() @@ -829,8 +829,13 @@ private[hive] object HiveClientImpl { Option(hc.getComment).map(field.withComment).getOrElse(field) } - def toHiveTable(table: CatalogTable, conf: Option[HiveConf] = None, shim: Shim = shimDefault) - : HiveTable = { + // the default value shimForHiveExecution is only used for hive execution, + // a Shim instance with a specific metastore version should be passed to this function + // to interact with metastore + def toHiveTable( + table: CatalogTable, + conf: Option[HiveConf] = None, + shim: Shim = shimForHiveExecution): HiveTable = { val hiveTable = new HiveTable(table.database, table.identifier.table) // For EXTERNAL_TABLE, we also need to set EXTERNAL field in the table properties. // Otherwise, Hive metastore will change the table to a MANAGED_TABLE. From c6faaeb459eef065785f58f3b655df845d8d0da1 Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 5 Feb 2017 11:58:55 +0800 Subject: [PATCH 07/40] import HiveClientImpl._ --- .../sql/hive/client/HiveClientImpl.scala | 33 +++++++++---------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 384361ed51eff..a96f3ff5aeefc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -47,6 +47,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.hive.HiveUtils +import org.apache.spark.sql.hive.client.HiveClientImpl._ import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType} import org.apache.spark.util.{CircularBuffer, Utils} @@ -365,8 +366,8 @@ private[hive] class HiveClientImpl( Option(client.getTable(dbName, tableName, false)).map { h => // Note: Hive separates partition columns and the schema, but for us the // partition columns are part of the schema - val partCols = h.getPartCols.asScala.map(HiveClientImpl.fromHiveColumn) - val schema = StructType(h.getCols.asScala.map(HiveClientImpl.fromHiveColumn) ++ partCols) + val partCols = h.getPartCols.asScala.map(fromHiveColumn) + val schema = StructType(h.getCols.asScala.map(fromHiveColumn) ++ partCols) // Skew spec, storage handler, and bucketing info can't be mapped to CatalogTable (yet) val unsupportedFeatures = ArrayBuffer.empty[String] @@ -435,7 +436,7 @@ private[hive] class HiveClientImpl( } override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState { - client.createTable(HiveClientImpl.toHiveTable(table, Some(conf), shim), ignoreIfExists) + client.createTable(toHiveTable(table, Some(conf), shim), ignoreIfExists) } override def dropTable( @@ -447,7 +448,7 @@ private[hive] class HiveClientImpl( } override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState { - val hiveTable = HiveClientImpl.toHiveTable(table, Some(conf), shim) + val hiveTable = toHiveTable(table, Some(conf), shim) // Do not use `table.qualifiedName` here because this may be a rename val qualifiedTableName = s"${table.database}.$tableName" client.alterTable(qualifiedTableName, hiveTable) @@ -516,10 +517,10 @@ private[hive] class HiveClientImpl( newSpecs: Seq[TablePartitionSpec]): Unit = withHiveState { require(specs.size == newSpecs.size, "number of old and new partition specs differ") val catalogTable = getTable(db, table) - val hiveTable = HiveClientImpl.toHiveTable(catalogTable, Some(conf), shim) + val hiveTable = toHiveTable(catalogTable, Some(conf), shim) specs.zip(newSpecs).foreach { case (oldSpec, newSpec) => val hivePart = getPartitionOption(catalogTable, oldSpec) - .map { p => HiveClientImpl.toHivePartition(p.copy(spec = newSpec), hiveTable) } + .map { p => toHivePartition(p.copy(spec = newSpec), hiveTable) } .getOrElse { throw new NoSuchPartitionException(db, table, oldSpec) } client.renamePartition(hiveTable, oldSpec.asJava, hivePart) } @@ -529,9 +530,8 @@ private[hive] class HiveClientImpl( db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit = withHiveState { - val hiveTable = HiveClientImpl.toHiveTable(getTable(db, table), Some(conf), shim) - client.alterPartitions(table, newParts.map { - p => HiveClientImpl.toHivePartition(p, hiveTable) }.asJava) + val hiveTable = toHiveTable(getTable(db, table), Some(conf), shim) + client.alterPartitions(table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava) } /** @@ -558,9 +558,9 @@ private[hive] class HiveClientImpl( override def getPartitionOption( table: CatalogTable, spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState { - val hiveTable = HiveClientImpl.toHiveTable(table, Some(conf), shim) + val hiveTable = toHiveTable(table, Some(conf), shim) val hivePartition = client.getPartition(hiveTable, spec.asJava, false) - Option(hivePartition).map(HiveClientImpl.fromHivePartition) + Option(hivePartition).map(fromHivePartition) } /** @@ -570,12 +570,12 @@ private[hive] class HiveClientImpl( override def getPartitions( table: CatalogTable, spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState { - val hiveTable = HiveClientImpl.toHiveTable(table, Some(conf), shim) + val hiveTable = toHiveTable(table, Some(conf), shim) val parts = spec match { - case None => shim.getAllPartitions(client, hiveTable).map(HiveClientImpl.fromHivePartition) + case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition) case Some(s) => assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid") - client.getPartitions(hiveTable, s.asJava).asScala.map(HiveClientImpl.fromHivePartition) + client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition) } HiveCatalogMetrics.incrementFetchedPartitions(parts.length) parts @@ -584,9 +584,8 @@ private[hive] class HiveClientImpl( override def getPartitionsByFilter( table: CatalogTable, predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState { - val hiveTable = HiveClientImpl.toHiveTable(table, Some(conf), shim) - val parts = shim.getPartitionsByFilter(client, hiveTable, predicates) - .map(HiveClientImpl.fromHivePartition) + val hiveTable = toHiveTable(table, Some(conf), shim) + val parts = shim.getPartitionsByFilter(client, hiveTable, predicates).map(fromHivePartition) HiveCatalogMetrics.incrementFetchedPartitions(parts.length) parts } From abe8dc9e4bcd88856b40daf091a48f6207efd197 Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 5 Feb 2017 12:00:44 +0800 Subject: [PATCH 08/40] remov an empty line --- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index a96f3ff5aeefc..2c579418ca003 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -800,8 +800,7 @@ private[hive] object HiveClientImpl { private def toOutputFormat(name: String) = Utils.classForName(name) .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] - - + def toHiveColumn(c: StructField): FieldSchema = { val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) { c.metadata.getString(HiveUtils.hiveTypeString) From 001352d86d1098e29a4c8856c101855155b358b7 Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 5 Feb 2017 12:11:54 +0800 Subject: [PATCH 09/40] remove a whitespace --- .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 2c579418ca003..a05a6b255d381 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -800,7 +800,7 @@ private[hive] object HiveClientImpl { private def toOutputFormat(name: String) = Utils.classForName(name) .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] - + def toHiveColumn(c: StructField): FieldSchema = { val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) { c.metadata.getString(HiveUtils.hiveTypeString) From 8681902275ac67bc439d7c6718bdfc5e43dde520 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Feb 2017 15:24:23 +0800 Subject: [PATCH 10/40] fix ambigurous hive --- .../org/apache/spark/sql/hive/HiveUtils.scala | 11 ++------- .../sql/hive/client/HiveClientImpl.scala | 23 ++++++++++++------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 771ae23278905..2508b292c31e8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -62,15 +62,8 @@ private[spark] object HiveUtils extends Logging { /** The version of hive used internally by Spark SQL. */ val hiveExecutionVersion: String = "1.2.1" - private lazy val shimForHiveExecution = IsolatedClientLoader.hiveVersion( - HiveUtils.hiveExecutionVersion) match { - case hive.v12 => new Shim_v0_12() - case hive.v13 => new Shim_v0_13() - case hive.v14 => new Shim_v0_14() - case hive.v1_0 => new Shim_v1_0() - case hive.v1_1 => new Shim_v1_1() - case hive.v1_2 => new Shim_v1_2() - } + private lazy val shimForHiveExecution = HiveClientImpl.shim(IsolatedClientLoader.hiveVersion( + HiveUtils.hiveExecutionVersion)) /** * The property key that is used to store the raw hive type string in the metadata of StructField. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index e48ad09e39f64..3dca6147fdf3a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -87,14 +87,7 @@ private[hive] class HiveClientImpl( // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur. private val outputBuffer = new CircularBuffer() - private val shim = version match { - case hive.v12 => new Shim_v0_12() - case hive.v13 => new Shim_v0_13() - case hive.v14 => new Shim_v0_14() - case hive.v1_0 => new Shim_v1_0() - case hive.v1_1 => new Shim_v1_1() - case hive.v1_2 => new Shim_v1_2() - } + private val shim = HiveClientImpl.shim(version) // Create an internal session state for this HiveClientImpl. val state: SessionState = { @@ -776,3 +769,17 @@ private[hive] class HiveClientImpl( } } } + +private[hive] object HiveClientImpl { + + def shim(version: HiveVersion): Shim = { + version match { + case hive.v12 => new Shim_v0_12() + case hive.v13 => new Shim_v0_13() + case hive.v14 => new Shim_v0_14() + case hive.v1_0 => new Shim_v1_0() + case hive.v1_1 => new Shim_v1_1() + case hive.v1_2 => new Shim_v1_2() + } + } +} From 352856bca3dff862f09253ec5a06cb55fff3e76a Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Feb 2017 15:26:10 +0800 Subject: [PATCH 11/40] remove an empty line --- .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 1 - .../main/scala/org/apache/spark/sql/hive/client/HiveShim.scala | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 3dca6147fdf3a..485ee55b2e122 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -771,7 +771,6 @@ private[hive] class HiveClientImpl( } private[hive] object HiveClientImpl { - def shim(version: HiveVersion): Shim = { version match { case hive.v12 => new Shim_v0_12() diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index c1bac5462ea42..b052f1e7e43f5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -57,7 +57,7 @@ import org.apache.spark.util.Utils * - initialize methods in lazy vals, both for quicker access for multiple invocations, and to * avoid runtime errors due to the above guideline. */ -private[hive] sealed abstract class Shim { +private[client] sealed abstract class Shim { /** * Set the current SessionState to the given SessionState. Also, set the context classloader of From 5ef2139a7628ea5d6568f56b3a87ad9b3cf1caed Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Feb 2017 15:42:15 +0800 Subject: [PATCH 12/40] fix compile failed --- .../main/scala/org/apache/spark/sql/hive/client/HiveShim.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index b052f1e7e43f5..c1bac5462ea42 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -57,7 +57,7 @@ import org.apache.spark.util.Utils * - initialize methods in lazy vals, both for quicker access for multiple invocations, and to * avoid runtime errors due to the above guideline. */ -private[client] sealed abstract class Shim { +private[hive] sealed abstract class Shim { /** * Set the current SessionState to the given SessionState. Also, set the context classloader of From 57521c6edfef58c48c12904ce3b7fb4949a76f82 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Feb 2017 17:50:06 +0800 Subject: [PATCH 13/40] fix test failed --- .../org/apache/spark/sql/hive/HiveUtils.scala | 2 +- .../spark/sql/hive/client/HiveClientImpl.scala | 15 +-------------- .../apache/spark/sql/hive/client/HiveShim.scala | 13 +++++++++++++ 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 2508b292c31e8..577b889b546f9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -62,7 +62,7 @@ private[spark] object HiveUtils extends Logging { /** The version of hive used internally by Spark SQL. */ val hiveExecutionVersion: String = "1.2.1" - private lazy val shimForHiveExecution = HiveClientImpl.shim(IsolatedClientLoader.hiveVersion( + private val shimForHiveExecution = HiveShimUtils.shim(IsolatedClientLoader.hiveVersion( HiveUtils.hiveExecutionVersion)) /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 485ee55b2e122..3a11ea0625886 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -87,7 +87,7 @@ private[hive] class HiveClientImpl( // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur. private val outputBuffer = new CircularBuffer() - private val shim = HiveClientImpl.shim(version) + private val shim = HiveShimUtils.shim(version) // Create an internal session state for this HiveClientImpl. val state: SessionState = { @@ -769,16 +769,3 @@ private[hive] class HiveClientImpl( } } } - -private[hive] object HiveClientImpl { - def shim(version: HiveVersion): Shim = { - version match { - case hive.v12 => new Shim_v0_12() - case hive.v13 => new Shim_v0_13() - case hive.v14 => new Shim_v0_14() - case hive.v1_0 => new Shim_v1_0() - case hive.v1_1 => new Shim_v1_1() - case hive.v1_2 => new Shim_v1_2() - } - } -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index c1bac5462ea42..ea3af1b345a6e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -843,3 +843,16 @@ private[client] class Shim_v1_2 extends Shim_v1_1 { } } + +private[hive] object HiveShimUtils { + def shim(version: HiveVersion): Shim = { + version match { + case hive.v12 => new Shim_v0_12() + case hive.v13 => new Shim_v0_13() + case hive.v14 => new Shim_v0_14() + case hive.v1_0 => new Shim_v1_0() + case hive.v1_1 => new Shim_v1_1() + case hive.v1_2 => new Shim_v1_2() + } + } +} From d82220973c945e08cd34855972461e96b56ea936 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Feb 2017 19:35:28 +0800 Subject: [PATCH 14/40] move import --- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 3a11ea0625886..b9154b4f8336f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.QueryExecutionException -import org.apache.spark.sql.hive.HiveUtils +import org.apache.spark.sql.hive.HiveUtils._ import org.apache.spark.sql.types.StructType import org.apache.spark.util.CircularBuffer @@ -83,7 +83,6 @@ private[hive] class HiveClientImpl( extends HiveClient with Logging { - import HiveUtils._ // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur. private val outputBuffer = new CircularBuffer() From ebf875f6650bc182fbac3986745561ebe90f48d0 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Feb 2017 20:11:02 +0800 Subject: [PATCH 15/40] fix test failed --- .../org/apache/spark/sql/hive/HiveUtils.scala | 2 +- .../spark/sql/hive/client/HiveClientImpl.scala | 15 ++++++++++++++- .../apache/spark/sql/hive/client/HiveShim.scala | 15 +-------------- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 577b889b546f9..2508b292c31e8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -62,7 +62,7 @@ private[spark] object HiveUtils extends Logging { /** The version of hive used internally by Spark SQL. */ val hiveExecutionVersion: String = "1.2.1" - private val shimForHiveExecution = HiveShimUtils.shim(IsolatedClientLoader.hiveVersion( + private lazy val shimForHiveExecution = HiveClientImpl.shim(IsolatedClientLoader.hiveVersion( HiveUtils.hiveExecutionVersion)) /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index b9154b4f8336f..326daf9d6f781 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -86,7 +86,7 @@ private[hive] class HiveClientImpl( // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur. private val outputBuffer = new CircularBuffer() - private val shim = HiveShimUtils.shim(version) + private val shim = HiveClientImpl.shim(version) // Create an internal session state for this HiveClientImpl. val state: SessionState = { @@ -768,3 +768,16 @@ private[hive] class HiveClientImpl( } } } + +object HiveClientImpl { + def shim(version: HiveVersion): Shim = { + version match { + case hive.v12 => new Shim_v0_12() + case hive.v13 => new Shim_v0_13() + case hive.v14 => new Shim_v0_14() + case hive.v1_0 => new Shim_v1_0() + case hive.v1_1 => new Shim_v1_1() + case hive.v1_2 => new Shim_v1_2() + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index ea3af1b345a6e..02462f6b6887c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -842,17 +842,4 @@ private[client] class Shim_v1_2 extends Shim_v1_1 { } } -} - -private[hive] object HiveShimUtils { - def shim(version: HiveVersion): Shim = { - version match { - case hive.v12 => new Shim_v0_12() - case hive.v13 => new Shim_v0_13() - case hive.v14 => new Shim_v0_14() - case hive.v1_0 => new Shim_v1_0() - case hive.v1_1 => new Shim_v1_1() - case hive.v1_2 => new Shim_v1_2() - } - } -} +} \ No newline at end of file From e971da01e62c2e455504604120544f9a5e78588d Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Feb 2017 20:20:45 +0800 Subject: [PATCH 16/40] fix a code sytle --- .../main/scala/org/apache/spark/sql/hive/client/HiveShim.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 02462f6b6887c..fbae4a6805c2d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -841,5 +841,4 @@ private[client] class Shim_v1_2 extends Shim_v1_1 { case e: InvocationTargetException => throw e.getCause() } } - } \ No newline at end of file From 9ccf9e364f0ac57f5b7c91a9b1bed6fb4c24098c Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Feb 2017 20:30:51 +0800 Subject: [PATCH 17/40] add a empty line --- sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala index 9e9894803ce25..23dc21afa1dd4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala @@ -277,3 +277,4 @@ private[hive] object HiveShim { } } } + From 922eb9d182e547f0e5706f6ad8c924f4c9ef4496 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Feb 2017 21:36:52 +0800 Subject: [PATCH 18/40] fix code style --- .../src/main/scala/org/apache/spark/sql/hive/HiveShim.scala | 3 +-- .../scala/org/apache/spark/sql/hive/client/HiveShim.scala | 4 +++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala index 23dc21afa1dd4..0db12d4f53517 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala @@ -276,5 +276,4 @@ private[hive] object HiveShim { compressType = intermediateCompressType } } -} - +} \ No newline at end of file diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index fbae4a6805c2d..39ae2c30542ac 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -841,4 +841,6 @@ private[client] class Shim_v1_2 extends Shim_v1_1 { case e: InvocationTargetException => throw e.getCause() } } -} \ No newline at end of file + +} + From 6566a59e915e1a6e9e0a4bef8d4591a7ef6e18c2 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Feb 2017 21:42:11 +0800 Subject: [PATCH 19/40] fix code style --- .../src/main/scala/org/apache/spark/sql/hive/HiveShim.scala | 2 +- .../main/scala/org/apache/spark/sql/hive/client/HiveShim.scala | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala index 0db12d4f53517..9e9894803ce25 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala @@ -276,4 +276,4 @@ private[hive] object HiveShim { compressType = intermediateCompressType } } -} \ No newline at end of file +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 39ae2c30542ac..c1bac5462ea42 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -841,6 +841,5 @@ private[client] class Shim_v1_2 extends Shim_v1_1 { case e: InvocationTargetException => throw e.getCause() } } - -} +} From a674920678dfbf359fd279302076cb31e54307c2 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Feb 2017 22:27:42 +0800 Subject: [PATCH 20/40] fix test failed --- .../org/apache/spark/sql/hive/HiveUtils.scala | 11 ++++++++-- .../sql/hive/client/HiveClientImpl.scala | 21 +++++++------------ 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 2508b292c31e8..ad19f88ab5f47 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -62,8 +62,15 @@ private[spark] object HiveUtils extends Logging { /** The version of hive used internally by Spark SQL. */ val hiveExecutionVersion: String = "1.2.1" - private lazy val shimForHiveExecution = HiveClientImpl.shim(IsolatedClientLoader.hiveVersion( - HiveUtils.hiveExecutionVersion)) + private lazy val shimForHiveExecution = IsolatedClientLoader.hiveVersion( + HiveUtils.hiveExecutionVersion) match { + case org.apache.spark.sql.hive.client.hive.v12 => new Shim_v0_12() + case org.apache.spark.sql.hive.client.hive.v13 => new Shim_v0_13() + case org.apache.spark.sql.hive.client.hive.v14 => new Shim_v0_14() + case org.apache.spark.sql.hive.client.hive.v1_0 => new Shim_v1_0() + case org.apache.spark.sql.hive.client.hive.v1_1 => new Shim_v1_1() + case org.apache.spark.sql.hive.client.hive.v1_2 => new Shim_v1_2() + } /** * The property key that is used to store the raw hive type string in the metadata of StructField. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 326daf9d6f781..bb9d393cf78a3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -86,7 +86,14 @@ private[hive] class HiveClientImpl( // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur. private val outputBuffer = new CircularBuffer() - private val shim = HiveClientImpl.shim(version) + private val shim = version match { + case hive.v12 => new Shim_v0_12() + case hive.v13 => new Shim_v0_13() + case hive.v14 => new Shim_v0_14() + case hive.v1_0 => new Shim_v1_0() + case hive.v1_1 => new Shim_v1_1() + case hive.v1_2 => new Shim_v1_2() + } // Create an internal session state for this HiveClientImpl. val state: SessionState = { @@ -769,15 +776,3 @@ private[hive] class HiveClientImpl( } } -object HiveClientImpl { - def shim(version: HiveVersion): Shim = { - version match { - case hive.v12 => new Shim_v0_12() - case hive.v13 => new Shim_v0_13() - case hive.v14 => new Shim_v0_14() - case hive.v1_0 => new Shim_v1_0() - case hive.v1_1 => new Shim_v1_1() - case hive.v1_2 => new Shim_v1_2() - } - } -} From b9418d0063be854f4e2cfaae943e4f08d95f7f57 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Feb 2017 23:04:10 +0800 Subject: [PATCH 21/40] fix test failed --- .../org/apache/spark/sql/hive/HiveUtils.scala | 163 +---------------- .../spark/sql/hive/MetastoreRelation.scala | 4 +- .../sql/hive/client/HiveClientImpl.scala | 165 +++++++++++++++++- 3 files changed, 168 insertions(+), 164 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index ad19f88ab5f47..9c344ed09312e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -31,18 +31,13 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.common.`type`.HiveDecimal import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} -import org.apache.hadoop.hive.metastore.api.{FieldSchema, SerDeInfo, StorageDescriptor} -import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable} import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable} import org.apache.hadoop.util.VersionInfo -import org.apache.spark.{SparkConf, SparkContext, SparkException} +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} -import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition} -import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.internal.SQLConf @@ -62,16 +57,6 @@ private[spark] object HiveUtils extends Logging { /** The version of hive used internally by Spark SQL. */ val hiveExecutionVersion: String = "1.2.1" - private lazy val shimForHiveExecution = IsolatedClientLoader.hiveVersion( - HiveUtils.hiveExecutionVersion) match { - case org.apache.spark.sql.hive.client.hive.v12 => new Shim_v0_12() - case org.apache.spark.sql.hive.client.hive.v13 => new Shim_v0_13() - case org.apache.spark.sql.hive.client.hive.v14 => new Shim_v0_14() - case org.apache.spark.sql.hive.client.hive.v1_0 => new Shim_v1_0() - case org.apache.spark.sql.hive.client.hive.v1_1 => new Shim_v1_1() - case org.apache.spark.sql.hive.client.hive.v1_2 => new Shim_v1_2() - } - /** * The property key that is used to store the raw hive type string in the metadata of StructField. * For example, in the case where the Hive type is varchar, the type gets mapped to a string type @@ -474,143 +459,6 @@ private[spark] object HiveUtils extends Logging { case (other, tpe) if primitiveTypes contains tpe => other.toString } - /** Converts the native StructField to Hive's FieldSchema. */ - def toHiveColumn(c: StructField): FieldSchema = { - val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) { - c.metadata.getString(HiveUtils.hiveTypeString) - } else { - c.dataType.catalogString - } - new FieldSchema(c.name, typeString, c.getComment.orNull) - } - - /** Builds the native StructField from Hive's FieldSchema. */ - def fromHiveColumn(hc: FieldSchema): StructField = { - val columnType = try { - CatalystSqlParser.parseDataType(hc.getType) - } catch { - case e: ParseException => - throw new SparkException("Cannot recognize hive type string: " + hc.getType, e) - } - - val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build() - val field = StructField( - name = hc.getName, - dataType = columnType, - nullable = true, - metadata = metadata) - Option(hc.getComment).map(field.withComment).getOrElse(field) - } - - private def toInputFormat(name: String) = - Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]] - - private def toOutputFormat(name: String) = - Utils.classForName(name) - .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] - - /** Converts the native table metadata representation format CatalogTable to Hive's Table. - * the default value shimForHiveExecution is only used for hive execution, a Shim instance - * with a specific metastore version should be passed to this function to interact with metastore - */ - def toHiveTable( - table: CatalogTable, - conf: Option[HiveConf] = None, - shim: Shim = shimForHiveExecution): HiveTable = { - val hiveTable = new HiveTable(table.database, table.identifier.table) - // For EXTERNAL_TABLE, we also need to set EXTERNAL field in the table properties. - // Otherwise, Hive metastore will change the table to a MANAGED_TABLE. - // (metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1095-L1105) - hiveTable.setTableType(table.tableType match { - case CatalogTableType.EXTERNAL => - hiveTable.setProperty("EXTERNAL", "TRUE") - HiveTableType.EXTERNAL_TABLE - case CatalogTableType.MANAGED => - HiveTableType.MANAGED_TABLE - case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW - }) - // Note: In Hive the schema and partition columns must be disjoint sets - val (partCols, schema) = table.schema.map(toHiveColumn).partition { c => - table.partitionColumnNames.contains(c.getName) - } - if (schema.isEmpty) { - // This is a hack to preserve existing behavior. Before Spark 2.0, we do not - // set a default serde here (this was done in Hive), and so if the user provides - // an empty schema Hive would automatically populate the schema with a single - // field "col". However, after SPARK-14388, we set the default serde to - // LazySimpleSerde so this implicit behavior no longer happens. Therefore, - // we need to do it in Spark ourselves. - hiveTable.setFields( - Seq(new FieldSchema("col", "array", "from deserializer")).asJava) - } else { - hiveTable.setFields(schema.asJava) - } - hiveTable.setPartCols(partCols.asJava) - conf.foreach(c => hiveTable.setOwner(c.getUser)) - hiveTable.setCreateTime((table.createTime / 1000).toInt) - hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt) - table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) } - table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass) - table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass) - hiveTable.setSerializationLib( - table.storage.serde.getOrElse("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) - table.storage.properties.foreach { case (k, v) => hiveTable.setSerdeParam(k, v) } - table.properties.foreach { case (k, v) => hiveTable.setProperty(k, v) } - table.comment.foreach { c => hiveTable.setProperty("comment", c) } - // Hive will expand the view text, so it needs 2 fields: viewOriginalText and viewExpandedText. - // Since we don't expand the view text, but only add table properties, we map the `viewText` to - // the both fields in hive table. - table.viewText.foreach { t => - hiveTable.setViewOriginalText(t) - hiveTable.setViewExpandedText(t) - } - hiveTable - } - - /** - * Converts the native partition metadata representation format CatalogTablePartition to - * Hive's Partition. - */ - def toHivePartition( - p: CatalogTablePartition, - ht: HiveTable): HivePartition = { - val tpart = new org.apache.hadoop.hive.metastore.api.Partition - val partValues = ht.getPartCols.asScala.map { hc => - p.spec.get(hc.getName).getOrElse { - throw new IllegalArgumentException( - s"Partition spec is missing a value for column '${hc.getName}': ${p.spec}") - } - } - val storageDesc = new StorageDescriptor - val serdeInfo = new SerDeInfo - p.storage.locationUri.foreach(storageDesc.setLocation) - p.storage.inputFormat.foreach(storageDesc.setInputFormat) - p.storage.outputFormat.foreach(storageDesc.setOutputFormat) - p.storage.serde.foreach(serdeInfo.setSerializationLib) - serdeInfo.setParameters(p.storage.properties.asJava) - storageDesc.setSerdeInfo(serdeInfo) - tpart.setDbName(ht.getDbName) - tpart.setTableName(ht.getTableName) - tpart.setValues(partValues.asJava) - tpart.setSd(storageDesc) - new HivePartition(ht, tpart) - } - - def fromHivePartition(hp: HivePartition): CatalogTablePartition = { - val apiPartition = hp.getTPartition - CatalogTablePartition( - spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), - storage = CatalogStorageFormat( - locationUri = Option(apiPartition.getSd.getLocation), - inputFormat = Option(apiPartition.getSd.getInputFormat), - outputFormat = Option(apiPartition.getSd.getOutputFormat), - serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), - compressed = apiPartition.getSd.isCompressed, - properties = Option(apiPartition.getSd.getSerdeInfo.getParameters) - .map(_.asScala.toMap).orNull), - parameters = - if (hp.getParameters() != null) hp.getParameters().asScala.toMap else Map.empty) - } /** * Infers the schema for Hive serde tables and returns the CatalogTable with the inferred schema. * When the tables are data source tables or the schema already exists, returns the original @@ -620,11 +468,12 @@ private[spark] object HiveUtils extends Logging { if (DDLUtils.isDatasourceTable(table) || table.schema.nonEmpty) { table } else { - val hiveTable = toHiveTable(table) + val hiveTable = HiveClientImpl.toHiveTable(table) // Note: Hive separates partition columns and the schema, but for us the // partition columns are part of the schema - val partCols = hiveTable.getPartCols.asScala.map(fromHiveColumn) - val schema = StructType(hiveTable.getCols.asScala.map(fromHiveColumn) ++ partCols) + val partCols = hiveTable.getPartCols.asScala.map(HiveClientImpl.fromHiveColumn) + val schema = StructType(hiveTable.getCols.asScala.map(HiveClientImpl.fromHiveColumn) + ++ partCols) table.copy(schema = schema) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index b2c133fefb372..97b120758ba45 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -57,7 +57,7 @@ private[hive] case class MetastoreRelation( override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil - @transient val hiveQlTable: HiveTable = HiveUtils.toHiveTable(catalogTable) + @transient val hiveQlTable: HiveTable = HiveClientImpl.toHiveTable(catalogTable) @transient override def computeStats(conf: CatalystConf): Statistics = { catalogTable.stats.map(_.toPlanStats(output)).getOrElse(Statistics( @@ -113,7 +113,7 @@ private[hive] case class MetastoreRelation( allPartitions } - rawPartitions.map(HiveUtils.toHivePartition(_, hiveQlTable)) + rawPartitions.map(HiveClientImpl.toHivePartition(_, hiveQlTable)) } /** Only compare database and tablename, not alias. */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index bb9d393cf78a3..d769cb07bae59 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -27,9 +27,10 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} -import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase} +import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema} +import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor} import org.apache.hadoop.hive.ql.Driver -import org.apache.hadoop.hive.ql.metadata.Hive +import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable} import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.security.UserGroupInformation @@ -43,10 +44,12 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPa import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException -import org.apache.spark.sql.hive.HiveUtils._ -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.CircularBuffer +import org.apache.spark.sql.hive.client.HiveClientImpl._ +import org.apache.spark.sql.hive.HiveUtils +import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType} +import org.apache.spark.util.{CircularBuffer, Utils} /** * A class that wraps the HiveClient and converts its responses to externally visible classes. @@ -776,3 +779,155 @@ private[hive] class HiveClientImpl( } } +private[hive] object HiveClientImpl { + private lazy val shimForHiveExecution = IsolatedClientLoader.hiveVersion( + HiveUtils.hiveExecutionVersion) match { + case hive.v12 => new Shim_v0_12() + case hive.v13 => new Shim_v0_13() + case hive.v14 => new Shim_v0_14() + case hive.v1_0 => new Shim_v1_0() + case hive.v1_1 => new Shim_v1_1() + case hive.v1_2 => new Shim_v1_2() + } + + /** Converts the native StructField to Hive's FieldSchema. */ + def toHiveColumn(c: StructField): FieldSchema = { + val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) { + c.metadata.getString(HiveUtils.hiveTypeString) + } else { + c.dataType.catalogString + } + new FieldSchema(c.name, typeString, c.getComment.orNull) + } + + /** Builds the native StructField from Hive's FieldSchema. */ + def fromHiveColumn(hc: FieldSchema): StructField = { + val columnType = try { + CatalystSqlParser.parseDataType(hc.getType) + } catch { + case e: ParseException => + throw new SparkException("Cannot recognize hive type string: " + hc.getType, e) + } + + val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build() + val field = StructField( + name = hc.getName, + dataType = columnType, + nullable = true, + metadata = metadata) + Option(hc.getComment).map(field.withComment).getOrElse(field) + } + + private def toInputFormat(name: String) = + Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]] + + private def toOutputFormat(name: String) = + Utils.classForName(name) + .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] + + /** Converts the native table metadata representation format CatalogTable to Hive's Table. + * the default value shimForHiveExecution is only used for hive execution, a Shim instance + * with a specific metastore version should be passed to this function to interact with metastore + */ + def toHiveTable( + table: CatalogTable, + conf: Option[HiveConf] = None, + shim: Shim = shimForHiveExecution): HiveTable = { + val hiveTable = new HiveTable(table.database, table.identifier.table) + // For EXTERNAL_TABLE, we also need to set EXTERNAL field in the table properties. + // Otherwise, Hive metastore will change the table to a MANAGED_TABLE. + // (metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1095-L1105) + hiveTable.setTableType(table.tableType match { + case CatalogTableType.EXTERNAL => + hiveTable.setProperty("EXTERNAL", "TRUE") + HiveTableType.EXTERNAL_TABLE + case CatalogTableType.MANAGED => + HiveTableType.MANAGED_TABLE + case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW + }) + // Note: In Hive the schema and partition columns must be disjoint sets + val (partCols, schema) = table.schema.map(toHiveColumn).partition { c => + table.partitionColumnNames.contains(c.getName) + } + if (schema.isEmpty) { + // This is a hack to preserve existing behavior. Before Spark 2.0, we do not + // set a default serde here (this was done in Hive), and so if the user provides + // an empty schema Hive would automatically populate the schema with a single + // field "col". However, after SPARK-14388, we set the default serde to + // LazySimpleSerde so this implicit behavior no longer happens. Therefore, + // we need to do it in Spark ourselves. + hiveTable.setFields( + Seq(new FieldSchema("col", "array", "from deserializer")).asJava) + } else { + hiveTable.setFields(schema.asJava) + } + hiveTable.setPartCols(partCols.asJava) + conf.foreach(c => hiveTable.setOwner(c.getUser)) + hiveTable.setCreateTime((table.createTime / 1000).toInt) + hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt) + table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) } + table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass) + table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass) + hiveTable.setSerializationLib( + table.storage.serde.getOrElse("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + table.storage.properties.foreach { case (k, v) => hiveTable.setSerdeParam(k, v) } + table.properties.foreach { case (k, v) => hiveTable.setProperty(k, v) } + table.comment.foreach { c => hiveTable.setProperty("comment", c) } + // Hive will expand the view text, so it needs 2 fields: viewOriginalText and viewExpandedText. + // Since we don't expand the view text, but only add table properties, we map the `viewText` to + // the both fields in hive table. + table.viewText.foreach { t => + hiveTable.setViewOriginalText(t) + hiveTable.setViewExpandedText(t) + } + hiveTable + } + + /** + * Converts the native partition metadata representation format CatalogTablePartition to + * Hive's Partition. + */ + def toHivePartition( + p: CatalogTablePartition, + ht: HiveTable): HivePartition = { + val tpart = new org.apache.hadoop.hive.metastore.api.Partition + val partValues = ht.getPartCols.asScala.map { hc => + p.spec.get(hc.getName).getOrElse { + throw new IllegalArgumentException( + s"Partition spec is missing a value for column '${hc.getName}': ${p.spec}") + } + } + val storageDesc = new StorageDescriptor + val serdeInfo = new SerDeInfo + p.storage.locationUri.foreach(storageDesc.setLocation) + p.storage.inputFormat.foreach(storageDesc.setInputFormat) + p.storage.outputFormat.foreach(storageDesc.setOutputFormat) + p.storage.serde.foreach(serdeInfo.setSerializationLib) + serdeInfo.setParameters(p.storage.properties.asJava) + storageDesc.setSerdeInfo(serdeInfo) + tpart.setDbName(ht.getDbName) + tpart.setTableName(ht.getTableName) + tpart.setValues(partValues.asJava) + tpart.setSd(storageDesc) + new HivePartition(ht, tpart) + } + + def fromHivePartition(hp: HivePartition): CatalogTablePartition = { + val apiPartition = hp.getTPartition + CatalogTablePartition( + spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), + storage = CatalogStorageFormat( + locationUri = Option(apiPartition.getSd.getLocation), + inputFormat = Option(apiPartition.getSd.getInputFormat), + outputFormat = Option(apiPartition.getSd.getOutputFormat), + serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), + compressed = apiPartition.getSd.isCompressed, + properties = Option(apiPartition.getSd.getSerdeInfo.getParameters) + .map(_.asScala.toMap).orNull), + parameters = + if (hp.getParameters() != null) hp.getParameters().asScala.toMap else Map.empty) + } + +} + + From 631cbd5d768d517f7ebb70174940413d012e489a Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Feb 2017 23:07:27 +0800 Subject: [PATCH 22/40] remove empty line --- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index d769cb07bae59..6a1a42bd165d7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -927,7 +927,4 @@ private[hive] object HiveClientImpl { parameters = if (hp.getParameters() != null) hp.getParameters().asScala.toMap else Map.empty) } - } - - From 2413ad85474754b51bfabb88945d5303b2d6a636 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Feb 2017 23:09:53 +0800 Subject: [PATCH 23/40] make HiveShim client private --- .../main/scala/org/apache/spark/sql/hive/client/HiveShim.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index c1bac5462ea42..b052f1e7e43f5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -57,7 +57,7 @@ import org.apache.spark.util.Utils * - initialize methods in lazy vals, both for quicker access for multiple invocations, and to * avoid runtime errors due to the above guideline. */ -private[hive] sealed abstract class Shim { +private[client] sealed abstract class Shim { /** * Set the current SessionState to the given SessionState. Also, set the context classloader of From 614922817ad771eb24527b2ed27bdec7632017fb Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Feb 2017 23:11:12 +0800 Subject: [PATCH 24/40] code style --- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 6a1a42bd165d7..3e6b9ea15abf1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -924,7 +924,7 @@ private[hive] object HiveClientImpl { compressed = apiPartition.getSd.isCompressed, properties = Option(apiPartition.getSd.getSerdeInfo.getParameters) .map(_.asScala.toMap).orNull), - parameters = - if (hp.getParameters() != null) hp.getParameters().asScala.toMap else Map.empty) + parameters = + if (hp.getParameters() != null) hp.getParameters().asScala.toMap else Map.empty) } } From 0f43bb51a66dc83b47c70fe570d54967d6bdfdd5 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Feb 2017 23:13:42 +0800 Subject: [PATCH 25/40] code style --- .../spark/sql/hive/client/HiveClientImpl.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 3e6b9ea15abf1..7ad6f7f3318a2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -826,9 +826,9 @@ private[hive] object HiveClientImpl { .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] /** Converts the native table metadata representation format CatalogTable to Hive's Table. - * the default value shimForHiveExecution is only used for hive execution, a Shim instance - * with a specific metastore version should be passed to this function to interact with metastore - */ + * the default value shimForHiveExecution is only used for hive execution, a Shim instance + * with a specific metastore version should be passed to this function to interact with metastore + */ def toHiveTable( table: CatalogTable, conf: Option[HiveConf] = None, @@ -884,9 +884,9 @@ private[hive] object HiveClientImpl { } /** - * Converts the native partition metadata representation format CatalogTablePartition to - * Hive's Partition. - */ + * Converts the native partition metadata representation format CatalogTablePartition to + * Hive's Partition. + */ def toHivePartition( p: CatalogTablePartition, ht: HiveTable): HivePartition = { From d4155c29b2a077e225a89b56504a0d20ec34b684 Mon Sep 17 00:00:00 2001 From: windpiger Date: Tue, 7 Feb 2017 09:03:40 +0800 Subject: [PATCH 26/40] not allowed to create a empty schema hive table --- .../spark/sql/hive/client/HiveClientImpl.scala | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 7ad6f7f3318a2..d48a76dfce9b2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -849,18 +849,8 @@ private[hive] object HiveClientImpl { val (partCols, schema) = table.schema.map(toHiveColumn).partition { c => table.partitionColumnNames.contains(c.getName) } - if (schema.isEmpty) { - // This is a hack to preserve existing behavior. Before Spark 2.0, we do not - // set a default serde here (this was done in Hive), and so if the user provides - // an empty schema Hive would automatically populate the schema with a single - // field "col". However, after SPARK-14388, we set the default serde to - // LazySimpleSerde so this implicit behavior no longer happens. Therefore, - // we need to do it in Spark ourselves. - hiveTable.setFields( - Seq(new FieldSchema("col", "array", "from deserializer")).asJava) - } else { - hiveTable.setFields(schema.asJava) - } + + hiveTable.setFields(schema.asJava) hiveTable.setPartCols(partCols.asJava) conf.foreach(c => hiveTable.setOwner(c.getUser)) hiveTable.setCreateTime((table.createTime / 1000).toInt) From 7f15d4e32688102be6d22b7badb7e5e5bf75e8e2 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 8 Feb 2017 14:19:59 +0800 Subject: [PATCH 27/40] fix test failed --- .../spark/sql/hive/client/HiveClientImpl.scala | 15 +++++++++++++-- ...xternalCatalogBackwardCompatibilitySuite.scala | 5 +++++ .../sql/hive/MetastoreDataSourcesSuite.scala | 3 +++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index d48a76dfce9b2..e6a7281b94e21 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -46,6 +46,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException +import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.hive.client.HiveClientImpl._ import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType} @@ -849,8 +850,18 @@ private[hive] object HiveClientImpl { val (partCols, schema) = table.schema.map(toHiveColumn).partition { c => table.partitionColumnNames.contains(c.getName) } - - hiveTable.setFields(schema.asJava) + if (schema.isEmpty && DDLUtils.isDatasourceTable(table)) { + // This is a hack to preserve existing behavior. Before Spark 2.0, we do not + // set a default serde here (this was done in Hive), and so if the user provides + // an empty schema Hive would automatically populate the schema with a single + // field "col". However, after SPARK-14388, we set the default serde to + // LazySimpleSerde so this implicit behavior no longer happens. Therefore, + // we need to do it in Spark ourselves. + hiveTable.setFields( + Seq(new FieldSchema("col", "array", "from deserializer")).asJava) + } else { + hiveTable.setFields(schema.asJava) + } hiveTable.setPartCols(partCols.asJava) conf.foreach(c => hiveTable.setOwner(c.getUser)) hiveTable.setCreateTime((table.createTime / 1000).toInt) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala index 00fdfbcebbe85..ee632d24b717e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala @@ -134,6 +134,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest storage = CatalogStorageFormat.empty.copy( properties = Map("path" -> defaultTableURI("tbl4").toString)), schema = new StructType(), + provider = Some("json"), properties = Map( "spark.sql.sources.provider" -> "json", "spark.sql.sources.schema.numParts" -> "1", @@ -145,6 +146,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest storage = CatalogStorageFormat.empty.copy( properties = Map("path" -> defaultTableURI("tbl5").toString)), schema = simpleSchema, + provider = Some("parquet"), properties = Map( "spark.sql.sources.provider" -> "parquet", "spark.sql.sources.schema.numParts" -> "1", @@ -156,6 +158,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest storage = CatalogStorageFormat.empty.copy( properties = Map("path" -> defaultTableURI("tbl6").toString)), schema = new StructType(), + provider = Some("json"), properties = Map( "spark.sql.sources.provider" -> "json", "spark.sql.sources.schema.numParts" -> "1", @@ -170,6 +173,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest locationUri = Some(defaultTableURI("tbl7").toString + "-__PLACEHOLDER__"), properties = Map("path" -> tempDirUri)), schema = new StructType(), + provider = Some("json"), properties = Map( "spark.sql.sources.provider" -> "json", "spark.sql.sources.schema.numParts" -> "1", @@ -194,6 +198,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest locationUri = Some(defaultTableURI("tbl9").toString + "-__PLACEHOLDER__"), properties = Map("path" -> tempDirUri)), schema = new StructType(), + provider = Some("json"), properties = Map("spark.sql.sources.provider" -> "json")) // A list of all raw tables we want to test, with their expected schema. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index c262095df65b4..cd29b1961152a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -747,6 +747,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv identifier = TableIdentifier(tableName, Some("default")), tableType = CatalogTableType.MANAGED, schema = new StructType, + provider = Some("json"), storage = CatalogStorageFormat( locationUri = None, inputFormat = None, @@ -1275,6 +1276,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv identifier = TableIdentifier("t", Some("default")), tableType = CatalogTableType.MANAGED, schema = new StructType, + provider = Some("json"), storage = CatalogStorageFormat.empty, properties = Map( DATASOURCE_PROVIDER -> "json", @@ -1329,6 +1331,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv properties = Map("path" -> path.getAbsolutePath) ), schema = new StructType(), + provider = Some("parquet"), properties = Map( HiveExternalCatalog.DATASOURCE_PROVIDER -> "parquet")) hiveClient.createTable(tableDesc, ignoreIfExists = false) From 22140f005703541596548d3b0e545cda5e56ee74 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 8 Feb 2017 14:21:30 +0800 Subject: [PATCH 28/40] remove a whitespace --- .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index e6a7281b94e21..b9a8c0852c46b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -860,7 +860,7 @@ private[hive] object HiveClientImpl { hiveTable.setFields( Seq(new FieldSchema("col", "array", "from deserializer")).asJava) } else { - hiveTable.setFields(schema.asJava) + hiveTable.setFields(schema.asJava) } hiveTable.setPartCols(partCols.asJava) conf.foreach(c => hiveTable.setOwner(c.getUser)) From 931b81aba351d13bf0e0f474c4fc5c2de8bfc706 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 8 Feb 2017 14:24:03 +0800 Subject: [PATCH 29/40] add a bracket --- .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index b9a8c0852c46b..45a7bb7e72952 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -798,7 +798,7 @@ private[hive] object HiveClientImpl { } else { c.dataType.catalogString } - new FieldSchema(c.name, typeString, c.getComment.orNull) + new FieldSchema(c.name, typeString, c.getComment().orNull) } /** Builds the native StructField from Hive's FieldSchema. */ From a3c9f5e4a754ceee2ffb71c3da49221001b1bf2c Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 8 Feb 2017 14:26:18 +0800 Subject: [PATCH 30/40] add comments --- .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 45a7bb7e72952..b173639868f4f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -850,6 +850,8 @@ private[hive] object HiveClientImpl { val (partCols, schema) = table.schema.map(toHiveColumn).partition { c => table.partitionColumnNames.contains(c.getName) } + // after SPARK-19279, it is not allowed to create a hive table with an empty schema, + // so here we should not add a default col schema if (schema.isEmpty && DDLUtils.isDatasourceTable(table)) { // This is a hack to preserve existing behavior. Before Spark 2.0, we do not // set a default serde here (this was done in Hive), and so if the user provides From bf09f15ca7c90138312eb73b819131adf16ac040 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 8 Feb 2017 22:03:37 +0800 Subject: [PATCH 31/40] optimize a code --- .../src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 9c344ed09312e..c5becb87f722c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -472,9 +472,8 @@ private[spark] object HiveUtils extends Logging { // Note: Hive separates partition columns and the schema, but for us the // partition columns are part of the schema val partCols = hiveTable.getPartCols.asScala.map(HiveClientImpl.fromHiveColumn) - val schema = StructType(hiveTable.getCols.asScala.map(HiveClientImpl.fromHiveColumn) - ++ partCols) - table.copy(schema = schema) + val dataCols = hiveTable.getCols.asScala.map(HiveClientImpl.fromHiveColumn) + table.copy(schema = StructType(dataCols ++ partCols)) } } } From df3597ec28e71dc82c56f87464e6c12f3862ca95 Mon Sep 17 00:00:00 2001 From: windpiger Date: Thu, 9 Feb 2017 11:10:50 +0800 Subject: [PATCH 32/40] setDataLocation by storagedesc --- .../sql/hive/client/HiveClientImpl.scala | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index b173639868f4f..49786eb34a04a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -437,7 +437,7 @@ private[hive] class HiveClientImpl( } override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState { - client.createTable(toHiveTable(table, Some(conf), shim), ignoreIfExists) + client.createTable(toHiveTable(table, Some(conf)), ignoreIfExists) } override def dropTable( @@ -449,7 +449,7 @@ private[hive] class HiveClientImpl( } override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState { - val hiveTable = toHiveTable(table, Some(conf), shim) + val hiveTable = toHiveTable(table, Some(conf)) // Do not use `table.qualifiedName` here because this may be a rename val qualifiedTableName = s"${table.database}.$tableName" client.alterTable(qualifiedTableName, hiveTable) @@ -518,7 +518,7 @@ private[hive] class HiveClientImpl( newSpecs: Seq[TablePartitionSpec]): Unit = withHiveState { require(specs.size == newSpecs.size, "number of old and new partition specs differ") val catalogTable = getTable(db, table) - val hiveTable = toHiveTable(catalogTable, Some(conf), shim) + val hiveTable = toHiveTable(catalogTable, Some(conf)) specs.zip(newSpecs).foreach { case (oldSpec, newSpec) => val hivePart = getPartitionOption(catalogTable, oldSpec) .map { p => toHivePartition(p.copy(spec = newSpec), hiveTable) } @@ -531,7 +531,7 @@ private[hive] class HiveClientImpl( db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit = withHiveState { - val hiveTable = toHiveTable(getTable(db, table), Some(conf), shim) + val hiveTable = toHiveTable(getTable(db, table), Some(conf)) client.alterPartitions(table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava) } @@ -559,7 +559,7 @@ private[hive] class HiveClientImpl( override def getPartitionOption( table: CatalogTable, spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState { - val hiveTable = toHiveTable(table, Some(conf), shim) + val hiveTable = toHiveTable(table, Some(conf)) val hivePartition = client.getPartition(hiveTable, spec.asJava, false) Option(hivePartition).map(fromHivePartition) } @@ -571,7 +571,7 @@ private[hive] class HiveClientImpl( override def getPartitions( table: CatalogTable, spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState { - val hiveTable = toHiveTable(table, Some(conf), shim) + val hiveTable = toHiveTable(table, Some(conf)) val parts = spec match { case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition) case Some(s) => @@ -585,7 +585,7 @@ private[hive] class HiveClientImpl( override def getPartitionsByFilter( table: CatalogTable, predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState { - val hiveTable = toHiveTable(table, Some(conf), shim) + val hiveTable = toHiveTable(table, Some(conf)) val parts = shim.getPartitionsByFilter(client, hiveTable, predicates).map(fromHivePartition) HiveCatalogMetrics.incrementFetchedPartitions(parts.length) parts @@ -832,8 +832,7 @@ private[hive] object HiveClientImpl { */ def toHiveTable( table: CatalogTable, - conf: Option[HiveConf] = None, - shim: Shim = shimForHiveExecution): HiveTable = { + conf: Option[HiveConf] = None): HiveTable = { val hiveTable = new HiveTable(table.database, table.identifier.table) // For EXTERNAL_TABLE, we also need to set EXTERNAL field in the table properties. // Otherwise, Hive metastore will change the table to a MANAGED_TABLE. @@ -868,7 +867,7 @@ private[hive] object HiveClientImpl { conf.foreach(c => hiveTable.setOwner(c.getUser)) hiveTable.setCreateTime((table.createTime / 1000).toInt) hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt) - table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) } + table.storage.locationUri.foreach { loc => hiveTable.getSd.setLocation(loc)} table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass) table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass) hiveTable.setSerializationLib( From b20d14fb6e70aaf6c4e09c644dd8ec6b8b5569dd Mon Sep 17 00:00:00 2001 From: windpiger Date: Thu, 9 Feb 2017 13:00:43 +0800 Subject: [PATCH 33/40] fix a nosuch method --- .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 49786eb34a04a..818b2538239a4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -867,7 +867,7 @@ private[hive] object HiveClientImpl { conf.foreach(c => hiveTable.setOwner(c.getUser)) hiveTable.setCreateTime((table.createTime / 1000).toInt) hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt) - table.storage.locationUri.foreach { loc => hiveTable.getSd.setLocation(loc)} + table.storage.locationUri.foreach { loc => hiveTable.getTTable.getSd.setLocation(loc)} table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass) table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass) hiveTable.setSerializationLib( From 99d5bb20a3f98220e8370c94b3620e9b2c6c61f2 Mon Sep 17 00:00:00 2001 From: windpiger Date: Thu, 9 Feb 2017 14:54:13 +0800 Subject: [PATCH 34/40] remove some redundant code --- .../spark/sql/hive/client/HiveClientImpl.scala | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 818b2538239a4..4be1608ecb684 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -781,16 +781,6 @@ private[hive] class HiveClientImpl( } private[hive] object HiveClientImpl { - private lazy val shimForHiveExecution = IsolatedClientLoader.hiveVersion( - HiveUtils.hiveExecutionVersion) match { - case hive.v12 => new Shim_v0_12() - case hive.v13 => new Shim_v0_13() - case hive.v14 => new Shim_v0_14() - case hive.v1_0 => new Shim_v1_0() - case hive.v1_1 => new Shim_v1_1() - case hive.v1_2 => new Shim_v1_2() - } - /** Converts the native StructField to Hive's FieldSchema. */ def toHiveColumn(c: StructField): FieldSchema = { val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) { @@ -826,7 +816,8 @@ private[hive] object HiveClientImpl { Utils.classForName(name) .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] - /** Converts the native table metadata representation format CatalogTable to Hive's Table. + /** + * Converts the native table metadata representation format CatalogTable to Hive's Table. * the default value shimForHiveExecution is only used for hive execution, a Shim instance * with a specific metastore version should be passed to this function to interact with metastore */ From 2a55c450b6d7a7c107e8c7095fb9a41348ead13b Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 10 Feb 2017 00:15:43 +0800 Subject: [PATCH 35/40] fix some commets --- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 4be1608ecb684..8e7561e73ca37 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -818,8 +818,6 @@ private[hive] object HiveClientImpl { /** * Converts the native table metadata representation format CatalogTable to Hive's Table. - * the default value shimForHiveExecution is only used for hive execution, a Shim instance - * with a specific metastore version should be passed to this function to interact with metastore */ def toHiveTable( table: CatalogTable, @@ -905,6 +903,9 @@ private[hive] object HiveClientImpl { new HivePartition(ht, tpart) } + /** + * Build the native partition metadata from Hive's Partition. + */ def fromHivePartition(hp: HivePartition): CatalogTablePartition = { val apiPartition = hp.getTPartition CatalogTablePartition( From eb77b2eca1b151bc596b118ca03dcbabe8af4b51 Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 10 Feb 2017 09:45:16 +0800 Subject: [PATCH 36/40] add a version suit for toHiveTable --- .../apache/spark/sql/hive/client/VersionsSuite.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index ca39c7e8459fc..c15e59e4fde59 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -570,7 +570,6 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w } } - test(s"$version: SPARK-13709: reading partitioned Avro table with nested schema") { withTempDir { dir => val path = dir.toURI.toString @@ -649,6 +648,17 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w } } + test(s"$version: create table should success to test HiveClientImpl.toHiveTable compatible") { + withTable("t", "t1") { + import spark.implicits._ + Seq("1").toDF("a").write.saveAsTable("t") + checkAnswer(spark.table("t"), Row("1") :: Nil) + + spark.sql("create table t1 as select 2 as a") + checkAnswer(spark.table("t1"), Row(2) :: Nil) + + } + } // TODO: add more tests. } } From 31bb4d0a647abec073673e9135016fd6d328ee2a Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 10 Feb 2017 09:46:43 +0800 Subject: [PATCH 37/40] remove an empty line --- .../scala/org/apache/spark/sql/hive/client/VersionsSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index c15e59e4fde59..f7fb7d70f65f7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -656,7 +656,6 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w spark.sql("create table t1 as select 2 as a") checkAnswer(spark.table("t1"), Row(2) :: Nil) - } } // TODO: add more tests. From 09184616951e90f25d7f653c009b41b3f79eeea1 Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 12 Feb 2017 12:11:33 +0800 Subject: [PATCH 38/40] add location check --- .../spark/sql/hive/client/VersionsSuite.scala | 28 ++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index f7fb7d70f65f7..0e96703d16311 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -17,19 +17,20 @@ package org.apache.spark.sql.hive.client -import java.io.{ByteArrayOutputStream, File, PrintStream} +import java.io.{File, ByteArrayOutputStream, PrintStream} import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.hadoop.mapred.TextInputFormat import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, QueryTest, Row} -import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.{QueryTest, Row, AnalysisException} +import org.apache.spark.sql.catalyst.{TableIdentifier, FunctionIdentifier} import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException} import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} +import org.apache.spark.sql.catalyst.expressions.{Literal, AttributeReference, EqualTo} import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.TestHiveSingleton @@ -37,7 +38,7 @@ import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StructType import org.apache.spark.tags.ExtendedHiveTest -import org.apache.spark.util.{MutableURLClassLoader, Utils} +import org.apache.spark.util.{Utils, MutableURLClassLoader} /** * A simple set of tests that call the methods of a [[HiveClient]], loading different version @@ -648,13 +649,26 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w } } - test(s"$version: create table should success to test HiveClientImpl.toHiveTable compatible") { + test(s"$version: CTAS for managed data source tables") { withTable("t", "t1") { import spark.implicits._ + + val tPath = new Path(spark.sessionState.conf.warehousePath, "t") Seq("1").toDF("a").write.saveAsTable("t") + val expectedPath = s"file:${tPath.toUri.getPath.stripSuffix("/")}" + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + + assert(table.location.stripSuffix("/") == expectedPath) + assert(tPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(tPath)) checkAnswer(spark.table("t"), Row("1") :: Nil) - spark.sql("create table t1 as select 2 as a") + val t1Path = new Path(spark.sessionState.conf.warehousePath, "t1") + spark.sql("create table t1 using parquet as select 2 as a") + val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) + val expectedPath1 = s"file:${t1Path.toUri.getPath.stripSuffix("/")}" + + assert(table1.location.stripSuffix("/") == expectedPath1) + assert(t1Path.getFileSystem(spark.sessionState.newHadoopConf()).exists(t1Path)) checkAnswer(spark.table("t1"), Row(2) :: Nil) } } From 547d1da142a4814e16b36d1cc868e07bab01f38a Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 12 Feb 2017 12:19:29 +0800 Subject: [PATCH 39/40] remove an redundant import --- .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index c562309c75238..dc9c3ff33542d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -48,7 +48,6 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.hive.client.HiveClientImpl._ -import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.types._ import org.apache.spark.util.{CircularBuffer, Utils} From d44ff38cc71296e48e3b6fc4a2e4f3908c7508a6 Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 12 Feb 2017 12:22:10 +0800 Subject: [PATCH 40/40] fix code style --- .../apache/spark/sql/hive/client/VersionsSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 0e96703d16311..fe14824cf0967 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.client -import java.io.{File, ByteArrayOutputStream, PrintStream} +import java.io.{ByteArrayOutputStream, File, PrintStream} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -26,11 +26,11 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.hadoop.mapred.TextInputFormat import org.apache.spark.internal.Logging -import org.apache.spark.sql.{QueryTest, Row, AnalysisException} -import org.apache.spark.sql.catalyst.{TableIdentifier, FunctionIdentifier} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException} import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.{Literal, AttributeReference, EqualTo} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.TestHiveSingleton @@ -38,7 +38,7 @@ import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StructType import org.apache.spark.tags.ExtendedHiveTest -import org.apache.spark.util.{Utils, MutableURLClassLoader} +import org.apache.spark.util.{MutableURLClassLoader, Utils} /** * A simple set of tests that call the methods of a [[HiveClient]], loading different version