Skip to content

Commit

Permalink
fix.
Browse files Browse the repository at this point in the history
  • Loading branch information
gatorsmile committed Sep 20, 2016
1 parent 8f0c35a commit 061e60b
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,39 +109,59 @@ private[hive] case class MetastoreRelation(
}

@transient override lazy val statistics: Statistics = {
catalogTable.stats.getOrElse(Statistics(
sizeInBytes = {
val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
// TODO: check if this estimate is valid for tables after partition pruning.
// NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
// relatively cheap if parameters for the table are populated into the metastore.
// Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
// (see StatsSetupConst in Hive) that we can look at in the future.
BigInt(
// When table is external,`totalSize` is always zero, which will influence join strategy
// so when `totalSize` is zero, use `rawDataSize` instead
// when `rawDataSize` is also zero, use `HiveExternalCatalog.STATISTICS_TOTAL_SIZE`,
// which is generated by analyze command.
if (totalSize != null && totalSize.toLong > 0L) {
totalSize.toLong
} else if (rawDataSize != null && rawDataSize.toLong > 0) {
rawDataSize.toLong
} else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
try {
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf)
fs.getContentSummary(hiveQlTable.getPath).getLength
} catch {
case e: IOException =>
logWarning("Failed to get table size from hdfs.", e)
sparkSession.sessionState.conf.defaultSizeInBytes
}
} else {
sparkSession.sessionState.conf.defaultSizeInBytes
})
catalogTable.stats.getOrElse {
// For non-partitioned tables, Hive-generated statistics are stored in table properties
// For partitioned tables, Hive-generated statistics are stored in partition properties
val (totalSize, rawDataSize) = if (catalogTable.partitionColumnNames.isEmpty) {
val properties = Option(hiveQlTable.getParameters).map(_.asScala.toMap).orNull
(properties.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong),
properties.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong))
} else {
(getTotalTableSize(StatsSetupConst.TOTAL_SIZE),
getTotalTableSize(StatsSetupConst.RAW_DATA_SIZE))
}
))
Statistics(
sizeInBytes = {
BigInt(
// When table is external,`totalSize` is always zero, which will influence join strategy
// so when `totalSize` is zero, use `rawDataSize` instead. When `rawDataSize` is also
// zero, fall back to filesystem to get file sizes, if enabled.
if (totalSize.isDefined && totalSize.get > 0L) {
totalSize.get
} else if (rawDataSize.isDefined && rawDataSize.get > 0L) {
rawDataSize.get
} else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
try {
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf)
fs.getContentSummary(hiveQlTable.getPath).getLength
} catch {
case e: IOException =>
logWarning("Failed to get table size from hdfs.", e)
sparkSession.sessionState.conf.defaultSizeInBytes
}
} else {
sparkSession.sessionState.conf.defaultSizeInBytes
})
}
)
}
}

// For partitioned tables, get the size of all the partitions.
// Note: the statistics might not be gathered for all the partitions.
// For partial collection, we will not utilize the Hive-generated statistics.
private def getTotalTableSize(statType: String): Option[Long] = {
allPartitions.foldLeft[Option[Long]](Some(0L)) {
(totalSize: Option[Long], cur: CatalogTablePartition) =>
val curSize = cur.parameters.get(statType).map(_.toLong)
if (totalSize.isEmpty || curSize.isEmpty || curSize.get < 0L) {
// return None when hitting the first negative or empty stats.
return None
} else {
Some(totalSize.get + curSize.get)
}
}
}

// When metastore partition pruning is turned off, we cache the list of all partitions to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,6 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
}

test("analyze MetastoreRelations") {
def queryTotalSize(tableName: String): BigInt =
spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes

// Non-partitioned table
sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect()
Expand Down Expand Up @@ -154,11 +151,15 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
|SELECT * FROM src
""".stripMargin).collect()

assert(queryTotalSize("analyzeTable_part") === spark.sessionState.conf.defaultSizeInBytes)
// This is from Hive-generated statistics
val totalSizeFromHive = queryTotalSize("analyzeTable_part")

sql("ANALYZE TABLE analyzeTable_part COMPUTE STATISTICS noscan")

assert(queryTotalSize("analyzeTable_part") === BigInt(17436))
// This is from Spark-generated statistics
val totalSizeFromSpark = queryTotalSize("analyzeTable_part")

assert(totalSizeFromHive == totalSizeFromSpark)

sql("DROP TABLE analyzeTable_part").collect()

Expand All @@ -171,6 +172,9 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
TableIdentifier("tempTable"), ignoreIfNotExists = true, purge = false)
}

private def queryTotalSize(tableName: String): BigInt =
spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes

private def checkStats(
stats: Option[Statistics],
hasSizeInBytes: Boolean,
Expand Down Expand Up @@ -235,6 +239,79 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
}
}

test("gather table-level statistics of managed partitioned tables from Hive") {
val managedTable = "partitionedTable"
withTable(managedTable) {
sql(
s"""
|CREATE TABLE $managedTable (key INT, value STRING)
|PARTITIONED BY (ds STRING, hr STRING)
""".stripMargin)

for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) {
sql(
s"""
|INSERT OVERWRITE TABLE $managedTable
|partition (ds='$ds',hr='$hr')
|SELECT 1, 'a'
""".stripMargin)
}

checkStats(
managedTable, isDataSourceTable = false, hasSizeInBytes = false, expectedRowCounts = None)

// This is from Hive-generated statistics
val totalSizeFromHive = queryTotalSize(managedTable)

sql(s"ANALYZE TABLE $managedTable COMPUTE STATISTICS noscan")

// This is from Spark-generated statistics
val totalSizeFromSpark = queryTotalSize(managedTable)

assert(totalSizeFromHive == totalSizeFromSpark)
}
}

test("gather statistics for external partitioned table from Hive") {
val catalog = spark.sessionState.catalog
val externalPartitionedTable = "partitionedTable"
withTempDir { tempDir =>
val basePath = tempDir.getCanonicalPath
withTable(externalPartitionedTable) {
sql(
s"""
|CREATE EXTERNAL TABLE $externalPartitionedTable (key INT, value STRING)
|PARTITIONED BY (ds STRING, hr STRING)
|LOCATION '$basePath'
""".stripMargin)
for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) {
sql(
s"""
|INSERT OVERWRITE TABLE $externalPartitionedTable
|partition (ds='$ds',hr='$hr')
|SELECT 1, 'a'
""".stripMargin)
}
val totalSizeFromHive1 = queryTotalSize(externalPartitionedTable)

sql(
s"""
|ALTER TABLE $externalPartitionedTable DROP PARTITION (ds='2008-04-08'),
|PARTITION (hr='12')
""".stripMargin)
assert(
catalog.listPartitions(TableIdentifier(externalPartitionedTable)).map(_.spec).toSet ==
Set(Map("ds" -> "2008-04-09", "hr" -> "11")))
val totalSizeFromHive2 = queryTotalSize(externalPartitionedTable)

sql(s"ALTER TABLE $externalPartitionedTable ADD PARTITION (ds='2008-04-08', hr='12')")
val totalSizeFromHive3 = queryTotalSize(externalPartitionedTable)

assert(totalSizeFromHive1 > totalSizeFromHive3 && totalSizeFromHive3 > totalSizeFromHive2)
}
}
}

test("test elimination of the influences of the old stats") {
val textTable = "textTable"
withTable(textTable) {
Expand Down

0 comments on commit 061e60b

Please sign in to comment.