Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ private[sql] case class Partition(values: Row, path: String)

private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition])

private[sql] object PartitionSpec {
val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[Partition])
}

private[sql] object PartitioningUtils {
// This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't
// depend on Hive.
Expand Down Expand Up @@ -68,20 +72,37 @@ private[sql] object PartitioningUtils {
private[sql] def parsePartitions(
paths: Seq[Path],
defaultPartitionName: String): PartitionSpec = {
val partitionValues = resolvePartitions(paths.flatMap(parsePartition(_, defaultPartitionName)))
val fields = {
val (PartitionValues(columnNames, literals)) = partitionValues.head
columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
StructField(name, dataType, nullable = true)
}
// First, we need to parse every partition's path and see if we can find partition values.
val pathsWithPartitionValues = paths.flatMap { path =>
parsePartition(path, defaultPartitionName).map(path -> _)
}

val partitions = partitionValues.zip(paths).map {
case (PartitionValues(_, literals), path) =>
Partition(Row(literals.map(_.value): _*), path.toString)
}
if (pathsWithPartitionValues.isEmpty) {
// This dataset is not partitioned.
PartitionSpec.emptySpec
} else {
// This dataset is partitioned. We need to check whether all partitions have the same
// partition columns and resolve potential type conflicts.
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues.map(_._2))

// Creates the StructType which represents the partition columns.
val fields = {
val PartitionValues(columnNames, literals) = resolvedPartitionValues.head
columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
// We always assume partition columns are nullable since we've no idea whether null values
// will be appended in the future.
StructField(name, dataType, nullable = true)
}
}

// Finally, we create `Partition`s based on paths and resolved partition values.
val partitions = resolvedPartitionValues.zip(pathsWithPartitionValues).map {
case (PartitionValues(_, literals), (path, _)) =>
Partition(Row.fromSeq(literals.map(_.value)), path.toString)
}

PartitionSpec(StructType(fields), partitions)
PartitionSpec(StructType(fields), partitions)
}
}

/**
Expand Down Expand Up @@ -111,7 +132,7 @@ private[sql] object PartitioningUtils {
while (!finished) {
// Sometimes (e.g., when speculative task is enabled), temporary directories may be left
// uncleaned. Here we simply ignore them.
if (chopped.getName == "_temporary") {
if (chopped.getName.toLowerCase == "_temporary") {
return None
}

Expand All @@ -121,8 +142,12 @@ private[sql] object PartitioningUtils {
finished = maybeColumn.isEmpty || chopped.getParent == null
}

val (columnNames, values) = columns.reverse.unzip
Some(PartitionValues(columnNames, values))
if (columns.isEmpty) {
None
} else {
val (columnNames, values) = columns.reverse.unzip
Some(PartitionValues(columnNames, values))
}
}

private def parsePartitionColumn(
Expand Down Expand Up @@ -156,20 +181,25 @@ private[sql] object PartitioningUtils {
private[sql] def resolvePartitions(values: Seq[PartitionValues]): Seq[PartitionValues] = {
// Column names of all partitions must match
val distinctPartitionsColNames = values.map(_.columnNames).distinct
assert(distinctPartitionsColNames.size == 1, {
val list = distinctPartitionsColNames.mkString("\t", "\n", "")
s"Conflicting partition column names detected:\n$list"
})

// Resolves possible type conflicts for each column
val columnCount = values.head.columnNames.size
val resolvedValues = (0 until columnCount).map { i =>
resolveTypeConflicts(values.map(_.literals(i)))
}

// Fills resolved literals back to each partition
values.zipWithIndex.map { case (d, index) =>
d.copy(literals = resolvedValues.map(_(index)))
if (distinctPartitionsColNames.isEmpty) {
Seq.empty
} else {
assert(distinctPartitionsColNames.size == 1, {
val list = distinctPartitionsColNames.mkString("\t", "\n", "")
s"Conflicting partition column names detected:\n$list"
})

// Resolves possible type conflicts for each column
val columnCount = values.head.columnNames.size
val resolvedValues = (0 until columnCount).map { i =>
resolveTypeConflicts(values.map(_.literals(i)))
}

// Fills resolved literals back to each partition
values.zipWithIndex.map { case (d, index) =>
d.copy(literals = resolvedValues.map(_(index)))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,12 +460,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio

private def discoverPartitions(): PartitionSpec = {
val leafDirs = fileStatusCache.leafDirs.keys.toSeq

if (leafDirs.nonEmpty) {
PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME)
} else {
PartitionSpec(StructType(Array.empty[StructField]), Array.empty[Partition])
}
PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.sources.PartitioningUtils._
import org.apache.spark.sql.sources.{Partition, PartitionSpec}
import org.apache.spark.sql.sources.{LogicalRelation, Partition, PartitionSpec}
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.{QueryTest, Row, SQLContext}
Expand Down Expand Up @@ -66,12 +66,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
assert(message.contains(expected))
}

check("file:///", Some {
PartitionValues(
ArrayBuffer.empty[String],
ArrayBuffer.empty[Literal])
})

check("file://path/a=10", Some {
PartitionValues(
ArrayBuffer("a"),
Expand All @@ -93,6 +87,10 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
ArrayBuffer(Literal.create(1.5, FloatType)))
})

check("file:///", None)
check("file:///path/_temporary", None)
check("file:///path/_temporary/c=1.5", None)
check("file:///path/_temporary/path", None)
check("file://path/a=10/_temporary/c=1.5", None)
check("file://path/a=10/c=1.5/_temporary", None)

Expand Down Expand Up @@ -124,6 +122,25 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
Partition(Row(10, "20"), "hdfs://host:9000/path/a=10/b=20"),
Partition(Row(10.5, "hello"), "hdfs://host:9000/path/a=10.5/b=hello"))))

check(Seq(
"hdfs://host:9000/path/_temporary",
"hdfs://host:9000/path/a=10/b=20",
"hdfs://host:9000/path/a=10.5/b=hello",
"hdfs://host:9000/path/a=10.5/_temporary",
"hdfs://host:9000/path/a=10.5/_TeMpOrArY",
"hdfs://host:9000/path/a=10.5/b=hello/_temporary",
"hdfs://host:9000/path/a=10.5/b=hello/_TEMPORARY",
"hdfs://host:9000/path/_temporary/path",
"hdfs://host:9000/path/a=11/_temporary/path",
"hdfs://host:9000/path/a=10.5/b=world/_temporary/path"),
PartitionSpec(
StructType(Seq(
StructField("a", FloatType),
StructField("b", StringType))),
Seq(
Partition(Row(10, "20"), "hdfs://host:9000/path/a=10/b=20"),
Partition(Row(10.5, "hello"), "hdfs://host:9000/path/a=10.5/b=hello"))))

check(Seq(
s"hdfs://host:9000/path/a=10/b=20",
s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello"),
Expand All @@ -145,6 +162,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
Seq(
Partition(Row(10, null), s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"),
Partition(Row(10.5, null), s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"))))

check(Seq(
s"hdfs://host:9000/path1",
s"hdfs://host:9000/path2"),
PartitionSpec.emptySpec)
}

test("read partitioned table - normal case") {
Expand Down Expand Up @@ -334,4 +356,17 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
}
}
}

test("SPARK-7749 Non-partitioned table should have empty partition spec") {
withTempPath { dir =>
(1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath)
val queryExecution = read.parquet(dir.getCanonicalPath).queryExecution
queryExecution.analyzed.collectFirst {
case LogicalRelation(relation: ParquetRelation2) =>
assert(relation.partitionSpec === PartitionSpec.emptySpec)
}.getOrElse {
fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
import org.apache.spark.sql.{DataFrame, QueryTest, SQLConf, SaveMode}
import org.apache.spark.util.Utils

// The data where the partitioning key exists only in the directory structure.
Expand Down Expand Up @@ -385,6 +386,54 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
sql("DROP TABLE ms_convert")
}

def collectParquetRelation(df: DataFrame): ParquetRelation2 = {
val plan = df.queryExecution.analyzed
plan.collectFirst {
case LogicalRelation(r: ParquetRelation2) => r
}.getOrElse {
fail(s"Expecting a ParquetRelation2, but got:\n$plan")
}
}

test("SPARK-7749: non-partitioned metastore Parquet table lookup should use cached relation") {
sql(
s"""CREATE TABLE nonPartitioned (
| key INT,
| value STRING
|)
|STORED AS PARQUET
""".stripMargin)

// First lookup fills the cache
val r1 = collectParquetRelation(table("nonPartitioned"))
// Second lookup should reuse the cache
val r2 = collectParquetRelation(table("nonPartitioned"))
// They should be the same instance
assert(r1 eq r2)

sql("DROP TABLE nonPartitioned")
}

test("SPARK-7749: partitioned metastore Parquet table lookup should use cached relation") {
sql(
s"""CREATE TABLE partitioned (
| key INT,
| value STRING
|)
|PARTITIONED BY (part INT)
|STORED AS PARQUET
""".stripMargin)

// First lookup fills the cache
val r1 = collectParquetRelation(table("partitioned"))
// Second lookup should reuse the cache
val r2 = collectParquetRelation(table("partitioned"))
// They should be the same instance
assert(r1 eq r2)

sql("DROP TABLE partitioned")
}

test("Caching converted data source Parquet Relations") {
def checkCached(tableIdentifer: catalog.QualifiedTableName): Unit = {
// Converted test_parquet should be cached.
Expand Down