Skip to content

Commit

Permalink
[SPARK-12975][SQL] Throwing Exception when Bucketing Columns are part…
Browse files Browse the repository at this point in the history
… of Partitioning Columns

When users are using `partitionBy` and `bucketBy` at the same time, some bucketing columns might be part of partitioning columns. For example,
```
        df.write
          .format(source)
          .partitionBy("i")
          .bucketBy(8, "i", "k")
          .saveAsTable("bucketed_table")
```
However, in the above case, adding column `i` into `bucketBy` is useless. It is just wasting extra CPU when reading or writing bucket tables. Thus, like Hive, we can issue an exception and let users do the change.

Also added a test case for checking if the information of `sortBy` and `bucketBy` columns are correctly saved in the metastore table.

Could you check if my understanding is correct? cloud-fan rxin marmbrus Thanks!

Author: gatorsmile <gatorsmile@gmail.com>

Closes #10891 from gatorsmile/commonKeysInPartitionByBucketBy.
  • Loading branch information
gatorsmile authored and marmbrus committed Jan 25, 2016
1 parent 00026fa commit 9348431
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,15 @@ final class DataFrameWriter private[sql](df: DataFrame) {
n <- numBuckets
} yield {
require(n > 0 && n < 100000, "Bucket number must be greater than 0 and less than 100000.")

// partitionBy columns cannot be used in bucketBy
if (normalizedParCols.nonEmpty &&
normalizedBucketColNames.get.toSet.intersect(normalizedParCols.get.toSet).nonEmpty) {
throw new AnalysisException(
s"bucketBy columns '${bucketColumnNames.get.mkString(", ")}' should not be part of " +
s"partitionBy columns '${partitioningColumns.get.mkString(", ")}'")
}

BucketSpec(n, normalizedBucketColNames.get, normalizedSortColNames.getOrElse(Nil))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
}

test("Saving partition columns information") {
test("Saving partitionBy columns information") {
val df = (1 to 10).map(i => (i, i + 1, s"str$i", s"str${i + 1}")).toDF("a", "b", "c", "d")
val tableName = s"partitionInfo_${System.currentTimeMillis()}"

Expand Down Expand Up @@ -776,6 +776,59 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
}

test("Saving information for sortBy and bucketBy columns") {
val df = (1 to 10).map(i => (i, i + 1, s"str$i", s"str${i + 1}")).toDF("a", "b", "c", "d")
val tableName = s"bucketingInfo_${System.currentTimeMillis()}"

withTable(tableName) {
df.write
.format("parquet")
.bucketBy(8, "d", "b")
.sortBy("c")
.saveAsTable(tableName)
invalidateTable(tableName)
val metastoreTable = catalog.client.getTable("default", tableName)
val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
val expectedSortByColumns = StructType(df.schema("c") :: Nil)

val numBuckets = metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt
assert(numBuckets == 8)

val numBucketCols = metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt
assert(numBucketCols == 2)

val numSortCols = metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt
assert(numSortCols == 1)

val actualBucketByColumns =
StructType(
(0 until numBucketCols).map { index =>
df.schema(metastoreTable.properties(s"spark.sql.sources.schema.bucketCol.$index"))
})
// Make sure bucketBy columns are correctly stored in metastore.
assert(
expectedBucketByColumns.sameType(actualBucketByColumns),
s"Partitions columns stored in metastore $actualBucketByColumns is not the " +
s"partition columns defined by the saveAsTable operation $expectedBucketByColumns.")

val actualSortByColumns =
StructType(
(0 until numSortCols).map { index =>
df.schema(metastoreTable.properties(s"spark.sql.sources.schema.sortCol.$index"))
})
// Make sure sortBy columns are correctly stored in metastore.
assert(
expectedSortByColumns.sameType(actualSortByColumns),
s"Partitions columns stored in metastore $actualSortByColumns is not the " +
s"partition columns defined by the saveAsTable operation $expectedSortByColumns.")

// Check the content of the saved table.
checkAnswer(
table(tableName).select("c", "b", "d", "a"),
df.select("c", "b", "d", "a"))
}
}

test("insert into a table") {
def createDF(from: Int, to: Int): DataFrame = {
(from to to).map(i => i -> s"str$i").toDF("c1", "c2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,13 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
fail(s"Unable to find the related bucket files.")
}

// Remove the duplicate columns in bucketCols and sortCols;
// Otherwise, we got analysis errors due to duplicate names
val selectedColumns = (bucketCols ++ sortCols).distinct
// We may lose the type information after write(e.g. json format doesn't keep schema
// information), here we get the types from the original dataframe.
val types = df.select((bucketCols ++ sortCols).map(col): _*).schema.map(_.dataType)
val columns = (bucketCols ++ sortCols).zip(types).map {
val types = df.select(selectedColumns.map(col): _*).schema.map(_.dataType)
val columns = selectedColumns.zip(types).map {
case (colName, dt) => col(colName).cast(dt)
}

Expand Down Expand Up @@ -158,6 +161,21 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
}
}

test("write bucketed data with the overlapping bucketBy and partitionBy columns") {
intercept[AnalysisException](df.write
.partitionBy("i", "j")
.bucketBy(8, "j", "k")
.sortBy("k")
.saveAsTable("bucketed_table"))
}

test("write bucketed data with the identical bucketBy and partitionBy columns") {
intercept[AnalysisException](df.write
.partitionBy("i")
.bucketBy(8, "i")
.saveAsTable("bucketed_table"))
}

test("write bucketed data without partitionBy") {
for (source <- Seq("parquet", "json", "orc")) {
withTable("bucketed_table") {
Expand Down

0 comments on commit 9348431

Please sign in to comment.