Skip to content

Commit

Permalink
[SPARK-30215][SQL] Remove PrunedInMemoryFileIndex and merge its funct…
Browse files Browse the repository at this point in the history
…ionality into InMemoryFileIndex

### What changes were proposed in this pull request?
Remove PrunedInMemoryFileIndex and merge its functionality into InMemoryFileIndex.

### Why are the changes needed?
PrunedInMemoryFileIndex is only used in CatalogFileIndex.filterPartitions, and its name is kind of confusing, we can completely merge its functionality into InMemoryFileIndex and remove the class.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Existing unit tests.

Closes #26850 from fuwhu/SPARK-30215.

Authored-by: fuwhu <bestwwg@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
fuwhu authored and cloud-fan committed Jan 8, 2020
1 parent b2ed6d0 commit 047bff0
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,15 @@ class CatalogFileIndex(
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
val timeNs = System.nanoTime() - startTime
new PrunedInMemoryFileIndex(
sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec, Option(timeNs))
new InMemoryFileIndex(sparkSession,
rootPathsSpecified = partitionSpec.partitions.map(_.path),
parameters = Map.empty,
userSpecifiedSchema = Some(partitionSpec.partitionColumns),
fileStatusCache = fileStatusCache,
userSpecifiedPartitionSpec = Some(partitionSpec),
metadataOpsTimeNs = Some(timeNs))
} else {
new InMemoryFileIndex(sparkSession, rootPaths, table.storage.properties,
new InMemoryFileIndex(sparkSession, rootPaths, parameters = table.storage.properties,
userSpecifiedSchema = None, fileStatusCache = fileStatusCache)
}
}
Expand All @@ -101,23 +106,3 @@ class CatalogFileIndex(

override def hashCode(): Int = table.identifier.hashCode()
}

/**
* An override of the standard HDFS listing based catalog, that overrides the partition spec with
* the information from the metastore.
*
* @param tableBasePath The default base path of the Hive metastore table
* @param partitionSpec The partition specifications from Hive metastore
*/
private class PrunedInMemoryFileIndex(
sparkSession: SparkSession,
tableBasePath: Path,
fileStatusCache: FileStatusCache,
override val partitionSpec: PartitionSpec,
override val metadataOpsTimeNs: Option[Long])
extends InMemoryFileIndex(
sparkSession,
partitionSpec.partitions.map(_.path),
Map.empty,
Some(partitionSpec.partitionColumns),
fileStatusCache)
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ class InMemoryFileIndex(
rootPathsSpecified: Seq[Path],
parameters: Map[String, String],
userSpecifiedSchema: Option[StructType],
fileStatusCache: FileStatusCache = NoopCache)
fileStatusCache: FileStatusCache = NoopCache,
userSpecifiedPartitionSpec: Option[PartitionSpec] = None,
override val metadataOpsTimeNs: Option[Long] = None)
extends PartitioningAwareFileIndex(
sparkSession, parameters, userSpecifiedSchema, fileStatusCache) {

Expand All @@ -69,7 +71,11 @@ class InMemoryFileIndex(

override def partitionSpec(): PartitionSpec = {
if (cachedPartitionSpec == null) {
cachedPartitionSpec = inferPartitioning()
if (userSpecifiedPartitionSpec.isDefined) {
cachedPartitionSpec = userSpecifiedPartitionSpec.get
} else {
cachedPartitionSpec = inferPartitioning()
}
}
logTrace(s"Partition spec: $cachedPartitionSpec")
cachedPartitionSpec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,10 @@ class ExplainSuite extends QueryTest with SharedSparkSession {
"PartitionFilters: \\[isnotnull\\(k#xL\\), dynamicpruningexpression\\(k#xL " +
"IN subquery#x\\)\\]"
val expected_pattern3 =
"Location: PrunedInMemoryFileIndex \\[.*org.apache.spark.sql.ExplainSuite" +
"Location: InMemoryFileIndex \\[.*org.apache.spark.sql.ExplainSuite" +
"/df2/.*, ... 99 entries\\]"
val expected_pattern4 =
"Location: PrunedInMemoryFileIndex \\[.*org.apache.spark.sql.ExplainSuite" +
"Location: InMemoryFileIndex \\[.*org.apache.spark.sql.ExplainSuite" +
"/df1/.*, ... 999 entries\\]"
withNormalizedExplain(sqlText) { normalizedOutput =>
assert(expected_pattern1.r.findAllMatchIn(normalizedOutput).length == 1)
Expand Down

0 comments on commit 047bff0

Please sign in to comment.