Skip to content

Commit

Permalink
refactor list hive partitions.
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangxb1987 committed Oct 6, 2017
1 parent 83488cc commit 8f50c7c
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 18 deletions.
Expand Up @@ -405,6 +405,11 @@ object CatalogTypes {
* Specifications of a table partition. Mapping column name to column value.
*/
type TablePartitionSpec = Map[String, String]

/**
* Initialize an empty spec.
*/
lazy val emptyTablePartitionSpec: TablePartitionSpec = Map.empty[String, String]
}

/**
Expand Down
Expand Up @@ -638,12 +638,14 @@ private[hive] class HiveClientImpl(
table: CatalogTable,
spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState {
val hiveTable = toHiveTable(table, Some(userName))
val parts = spec match {
case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
val partialPartSpec = spec match {
case None => CatalogTypes.emptyTablePartitionSpec
case Some(s) =>
assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition)
s
}
val parts = client.getPartitions(hiveTable, partialPartSpec.asJava).asScala
.map(fromHivePartition)
HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
parts
}
Expand Down
Expand Up @@ -162,21 +162,19 @@ case class HiveTableScanExec(

// exposed for tests
@transient lazy val rawPartitions = {
val prunedPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning) {
// Retrieve the original attributes based on expression ID so that capitalization matches.
val normalizedFilters = partitionPruningPred.map(_.transform {
case a: AttributeReference => originalAttributes(a)
})
sparkSession.sharedState.externalCatalog.listPartitionsByFilter(
relation.tableMeta.database,
relation.tableMeta.identifier.table,
normalizedFilters,
sparkSession.sessionState.conf.sessionLocalTimeZone)
} else {
sparkSession.sharedState.externalCatalog.listPartitions(
relation.tableMeta.database,
relation.tableMeta.identifier.table)
}
val prunedPartitions =
if (sparkSession.sessionState.conf.metastorePartitionPruning &&
partitionPruningPred.size > 0) {
// Retrieve the original attributes based on expression ID so that capitalization matches.
val normalizedFilters = partitionPruningPred.map(_.transform {
case a: AttributeReference => originalAttributes(a)
})
sparkSession.sessionState.catalog.listPartitionsByFilter(
relation.tableMeta.identifier,
normalizedFilters)
} else {
sparkSession.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
}
prunedPartitions.map(HiveClientImpl.toHivePartition(_, hiveQlTable))
}

Expand Down

0 comments on commit 8f50c7c

Please sign in to comment.