Skip to content

Commit

Permalink
Merge pull request #3 from ericl/add-pruning-feature-flag
Browse files Browse the repository at this point in the history
Add a conf flag for the lazy pruning (enabled by default)
  • Loading branch information
Michael Allman committed Oct 13, 2016
2 parents 14ca612 + d216d5d commit 59fecdf
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))

if (partitionKeyFilters.nonEmpty) {
val prunedFileCatalog = tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq)
val prunedFsRelation =
fsRelation.copy(location = prunedFileCatalog)(sparkSession)
val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation)
val prunedFileCatalog = tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq)
val prunedFsRelation =
fsRelation.copy(location = prunedFileCatalog)(sparkSession)
val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation)

// Keep partition-pruning predicates so that they are visible in physical planning
val filterExpression = filters.reduceLeft(And)
val filter = Filter(filterExpression, prunedLogicalRelation)
Project(projects, filter)
// Keep partition-pruning predicates so that they are visible in physical planning
val filterExpression = filters.reduceLeft(And)
val filter = Filter(filterExpression, prunedLogicalRelation)
Project(projects, filter)
} else {
op
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ class TableFileCatalog(
new ListingFileCatalog(sparkSession, rootPaths, parameters, partitionSchema)
}

override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles
// Not used in the hot path of queries when metastore partition pruning is enabled
lazy val cachedAllPartitions: ListingFileCatalog = filterPartitions(Nil)

override def inputFiles: Array[String] = cachedAllPartitions.inputFiles

private def listDataLeafFiles(paths: Seq[Path]) =
listLeafFiles(paths).filter(f => isDataPath(f.getPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,9 @@ case class Partition(values: InternalRow, files: Seq[FileStatus])
trait BasicFileCatalog {

/**
* Returns the list of root input paths from which the catalog will get files. These paths
* should *not* include any table partition directories. Partition directories are discovered or
* provided by a metastore catalog.
* Returns the list of root input paths from which the catalog will get files. There may be a
* single root path from which partitions are discovered, or individual partitions may be
* specified by each path.
*/
def rootPaths: Seq[Path]

Expand Down Expand Up @@ -366,6 +366,7 @@ trait FileCatalog extends BasicFileCatalog {
/** Returns all the valid files. */
def allFiles(): Seq[FileStatus]

/** Returns the list of files that will be read when scanning this relation. */
override def inputFiles: Array[String] =
allFiles().map(_.getPath.toUri.toString).toArray

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,13 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val HIVE_FILESOURCE_PARTITION_PRUNING =
SQLConfigBuilder("spark.sql.hive.filesourcePartitionPruning")
.doc("When true, enable metastore partition pruning for file source tables as well. " +
"This is currently implemented for converted Hive tables only.")
.booleanConf
.createWithDefault(true)

val OPTIMIZER_METADATA_ONLY = SQLConfigBuilder("spark.sql.optimizer.metadataOnly")
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
"to produce the partition columns instead of table scans. It applies when all the columns " +
Expand Down Expand Up @@ -661,6 +668,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)

def filesourcePartitionPruning: Boolean = getConf(HIVE_FILESOURCE_PARTITION_PRUNING)

def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)

def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log

private def getCached(
tableIdentifier: QualifiedTableName,
pathsInMetastore: Seq[Path],
metastoreRelation: MetastoreRelation,
schemaInMetastore: StructType,
expectedFileFormat: Class[_ <: FileFormat],
Expand All @@ -145,15 +146,14 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
val metastoreRelationRootPath = metastoreRelation.hiveQlTable.getDataLocation
val cachedRelationFileFormatClass = relation.fileFormat.getClass

expectedFileFormat match {
case `cachedRelationFileFormatClass` =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached relation.
val useCached =
relation.location.rootPaths.toSet == Set(metastoreRelationRootPath) &&
relation.location.rootPaths.toSet == pathsInMetastore.toSet &&
logical.schema.sameType(schemaInMetastore) &&
relation.bucketSpec == expectedBucketSpec &&
relation.partitionSchema == partitionSchema.getOrElse(StructType(Nil))
Expand Down Expand Up @@ -195,11 +195,31 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
val bucketSpec = None // We don't support hive bucketed tables, only ones we write out.

val lazyPruningEnabled = sparkSession.sqlContext.conf.filesourcePartitionPruning
val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)

val rootPaths: Seq[Path] = if (lazyPruningEnabled) {
Seq(metastoreRelation.hiveQlTable.getDataLocation)
} else {
// By convention (for example, see TableFileCatalog), the definition of a
// partitioned table's paths depends on whether that table has any actual partitions.
// Partitioned tables without partitions use the location of the table's base path.
// Partitioned tables with partitions use the locations of those partitions' data
// locations,_omitting_ the table's base path.
val paths = metastoreRelation.getHiveQlPartitions().map { p =>
new Path(p.getLocation)
}
if (paths.isEmpty) {
Seq(metastoreRelation.hiveQlTable.getDataLocation)
} else {
paths
}
}

val cached = getCached(
tableIdentifier,
rootPaths,
metastoreRelation,
metastoreSchema,
fileFormatClass,
Expand All @@ -210,8 +230,15 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
val db = metastoreRelation.databaseName
val table = metastoreRelation.tableName
val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong
val fileCatalog =
new TableFileCatalog(sparkSession, db, table, Some(partitionSchema), sizeInBytes)
val fileCatalog = {
val catalog = new TableFileCatalog(
sparkSession, db, table, Some(partitionSchema), sizeInBytes)
if (lazyPruningEnabled) {
catalog
} else {
catalog.cachedAllPartitions
}
}
val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet
val dataSchema =
StructType(metastoreSchema
Expand All @@ -235,6 +262,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
val rootPath = metastoreRelation.hiveQlTable.getDataLocation

val cached = getCached(tableIdentifier,
Seq(rootPath),
metastoreRelation,
metastoreSchema,
fileFormatClass,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,39 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
}
}
}

test("partitioned table is cached when partition pruning is off") {
withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> "false") {
withTable("test") {
withTempDir { dir =>
spark.range(5).selectExpr("id", "id as f1", "id as f2").write
.partitionBy("f1", "f2")
.mode("overwrite")
.parquet(dir.getAbsolutePath)

spark.sql(s"""
|create external table test (id long)
|partitioned by (f1 int, f2 int)
|stored as parquet
|location "${dir.getAbsolutePath}"""".stripMargin)
spark.sql("msck repair table test")

val df = spark.sql("select * from test")
assert(sql("select * from test").count() == 5)

// Delete a file, then assert that we tried to read it. This means the table was cached.
val p = new Path(spark.table("test").inputFiles.head)
assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, false))
val e = intercept[SparkException] {
sql("select * from test").count()
}
assert(e.getMessage.contains("FileNotFoundException"))

// Test refreshing the cache.
spark.catalog.refreshTable("test")
assert(sql("select * from test").count() == 4)
}
}
}
}
}

0 comments on commit 59fecdf

Please sign in to comment.