From bc7873a71eb0f9eddf4a1622e55cb6812bde9055 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 15 Apr 2016 14:43:30 -0700 Subject: [PATCH 1/3] Move analyze table logic to AnalyzeTable --- .../apache/spark/sql/hive/HiveContext.scala | 87 ------------------- .../spark/sql/hive/execution/commands.scala | 81 ++++++++++++++++- .../spark/sql/hive/StatisticsSuite.scala | 2 +- 3 files changed, 80 insertions(+), 90 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index ff93bfc4a3d16..280ba84a496fe 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -29,12 +29,9 @@ import scala.collection.mutable.HashMap import scala.language.implicitConversions import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.common.`type`.HiveDecimal import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hadoop.hive.ql.metadata.Table import org.apache.hadoop.hive.ql.parse.VariableSubstitution import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable} import org.apache.hadoop.util.VersionInfo @@ -45,7 +42,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression} import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.plans.logical._ @@ -223,89 +219,6 @@ class HiveContext private[hive]( sessionState.catalog.invalidateTable(tableIdent) } - /** - * Analyzes the given table in the current database to generate statistics, which will be - * used in query optimizations. - * - * Right now, it only supports Hive tables and it only updates the size of a Hive table - * in the Hive metastore. - * - * @since 1.2.0 - */ - def analyze(tableName: String) { - val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName) - val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) - - relation match { - case relation: MetastoreRelation => - // This method is mainly based on - // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table) - // in Hive 0.13 (except that we do not use fs.getContentSummary). - // TODO: Generalize statistics collection. - // TODO: Why fs.getContentSummary returns wrong size on Jenkins? - // Can we use fs.getContentSummary in future? - // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use - // countFileSize to count the table size. - val stagingDir = metadataHive.getConf(HiveConf.ConfVars.STAGINGDIR.varname, - HiveConf.ConfVars.STAGINGDIR.defaultStrVal) - - def calculateTableSize(fs: FileSystem, path: Path): Long = { - val fileStatus = fs.getFileStatus(path) - val size = if (fileStatus.isDirectory) { - fs.listStatus(path) - .map { status => - if (!status.getPath().getName().startsWith(stagingDir)) { - calculateTableSize(fs, status.getPath) - } else { - 0L - } - } - .sum - } else { - fileStatus.getLen - } - - size - } - - def getFileSizeForTable(conf: HiveConf, table: Table): Long = { - val path = table.getPath - var size: Long = 0L - try { - val fs = path.getFileSystem(conf) - size = calculateTableSize(fs, path) - } catch { - case e: Exception => - logWarning( - s"Failed to get the size of table ${table.getTableName} in the " + - s"database ${table.getDbName} because of ${e.toString}", e) - size = 0L - } - - size - } - - val tableParameters = relation.hiveQlTable.getParameters - val oldTotalSize = - Option(tableParameters.get(StatsSetupConst.TOTAL_SIZE)) - .map(_.toLong) - .getOrElse(0L) - val newTotalSize = getFileSizeForTable(hiveconf, relation.hiveQlTable) - // Update the Hive metastore if the total size of the table is different than the size - // recorded in the Hive metastore. - // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats(). - if (newTotalSize > 0 && newTotalSize != oldTotalSize) { - sessionState.catalog.alterTable( - relation.table.copy( - properties = relation.table.properties + - (StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString))) - } - case otherRelation => - throw new UnsupportedOperationException( - s"Analyze only works for Hive tables, but $tableName is a ${otherRelation.nodeName}") - } - } - override def setConf(key: String, value: String): Unit = { super.setConf(key, value) executionHive.runSqlHive(s"SET $key=$value") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 06badff474f49..0c06608ff9903 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -17,7 +17,11 @@ package org.apache.spark.sql.hive.execution +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.common.StatsSetupConst +import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.MetaStoreUtils +import org.apache.hadoop.hive.ql.metadata.Table import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier @@ -26,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, LogicalRelation} -import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -41,7 +45,80 @@ private[hive] case class AnalyzeTable(tableName: String) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { - sqlContext.asInstanceOf[HiveContext].analyze(tableName) + val sessionState = sqlContext.sessionState + val hiveContext = sqlContext.asInstanceOf[HiveContext] + val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName) + val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) + + relation match { + case relation: MetastoreRelation => + // This method is mainly based on + // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table) + // in Hive 0.13 (except that we do not use fs.getContentSummary). + // TODO: Generalize statistics collection. + // TODO: Why fs.getContentSummary returns wrong size on Jenkins? + // Can we use fs.getContentSummary in future? + // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use + // countFileSize to count the table size. + val stagingDir = hiveContext.metadataHive.getConf( + HiveConf.ConfVars.STAGINGDIR.varname, + HiveConf.ConfVars.STAGINGDIR.defaultStrVal) + + def calculateTableSize(fs: FileSystem, path: Path): Long = { + val fileStatus = fs.getFileStatus(path) + val size = if (fileStatus.isDirectory) { + fs.listStatus(path) + .map { status => + if (!status.getPath().getName().startsWith(stagingDir)) { + calculateTableSize(fs, status.getPath) + } else { + 0L + } + } + .sum + } else { + fileStatus.getLen + } + + size + } + + def getFileSizeForTable(conf: HiveConf, table: Table): Long = { + val path = table.getPath + var size: Long = 0L + try { + val fs = path.getFileSystem(conf) + size = calculateTableSize(fs, path) + } catch { + case e: Exception => + logWarning( + s"Failed to get the size of table ${table.getTableName} in the " + + s"database ${table.getDbName} because of ${e.toString}", e) + size = 0L + } + + size + } + + val tableParameters = relation.hiveQlTable.getParameters + val oldTotalSize = + Option(tableParameters.get(StatsSetupConst.TOTAL_SIZE)) + .map(_.toLong) + .getOrElse(0L) + val newTotalSize = getFileSizeForTable(hiveContext.hiveconf, relation.hiveQlTable) + // Update the Hive metastore if the total size of the table is different than the size + // recorded in the Hive metastore. + // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats(). + if (newTotalSize > 0 && newTotalSize != oldTotalSize) { + sessionState.catalog.alterTable( + relation.table.copy( + properties = relation.table.properties + + (StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString))) + } + case otherRelation => + throw new UnsupportedOperationException( + s"Analyze only works for Hive tables, but $tableName is a ${otherRelation.nodeName}") + } Seq.empty[Row] } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 05318f51af01e..65c70290ddc0d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -116,7 +116,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton { // Try to analyze a temp table sql("""SELECT * FROM src""").registerTempTable("tempTable") intercept[UnsupportedOperationException] { - hiveContext.analyze("tempTable") + AnalyzeTable("tempTable").run(hiveContext) } hiveContext.sessionState.catalog.dropTable( TableIdentifier("tempTable"), ignoreIfNotExists = true) From fa80269c1f35c942f0a4604854c01696f1788d2d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 15 Apr 2016 17:20:04 -0700 Subject: [PATCH 2/3] Add back the thing --- .../org/apache/spark/sql/hive/HiveContext.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 280ba84a496fe..73b5a63c8856a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -49,6 +49,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand} import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.hive.client._ +import org.apache.spark.sql.hive.execution.AnalyzeTable import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf._ @@ -219,6 +220,19 @@ class HiveContext private[hive]( sessionState.catalog.invalidateTable(tableIdent) } + /** + * Analyzes the given table in the current database to generate statistics, which will be + * used in query optimizations. + * + * Right now, it only supports Hive tables and it only updates the size of a Hive table + * in the Hive metastore. + * + * @since 1.2.0 + */ + def analyze(tableName: String) { + AnalyzeTable(tableName).run(self) + } + override def setConf(key: String, value: String): Unit = { super.setConf(key, value) executionHive.runSqlHive(s"SET $key=$value") From 9b4fdd7af23f1b32e2327de7037ee5fe21575fe0 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Sat, 16 Apr 2016 14:20:38 -0700 Subject: [PATCH 3/3] style --- .../src/main/scala/org/apache/spark/sql/hive/HiveContext.scala | 3 +-- .../test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 73b5a63c8856a..b16aa1cf3364e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -49,8 +49,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand} import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.hive.client._ -import org.apache.spark.sql.hive.execution.AnalyzeTable -import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand} +import org.apache.spark.sql.hive.execution.{AnalyzeTable, DescribeHiveTableCommand, HiveNativeCommand} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf._ import org.apache.spark.sql.types._ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 65c70290ddc0d..05318f51af01e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -116,7 +116,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton { // Try to analyze a temp table sql("""SELECT * FROM src""").registerTempTable("tempTable") intercept[UnsupportedOperationException] { - AnalyzeTable("tempTable").run(hiveContext) + hiveContext.analyze("tempTable") } hiveContext.sessionState.catalog.dropTable( TableIdentifier("tempTable"), ignoreIfNotExists = true)