Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12975] [SQL] Throwing Exception when Bucketing Columns are part of Partitioning Columns #10891

Closed
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.Properties

import scala.collection.JavaConverters._

import org.apache.spark.Logging
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.{CatalystQl, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
Expand All @@ -37,7 +38,7 @@ import org.apache.spark.sql.sources.HadoopFsRelation
* @since 1.4.0
*/
@Experimental
final class DataFrameWriter private[sql](df: DataFrame) {
final class DataFrameWriter private[sql](df: DataFrame) extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to do this?


/**
* Specifies the behavior when data or table already exists. Options include:
Expand Down Expand Up @@ -240,6 +241,15 @@ final class DataFrameWriter private[sql](df: DataFrame) {
n <- numBuckets
} yield {
require(n > 0 && n < 100000, "Bucket number must be greater than 0 and less than 100000.")

// partitionBy columns cannot be used in bucketBy
if (normalizedParCols.nonEmpty &&
normalizedBucketColNames.get.toSet.intersect(normalizedParCols.get.toSet).nonEmpty) {
throw new AnalysisException(
s"bucketBy columns '${bucketColumnNames.get.mkString(", ")}' should not be part of " +
s"partitionBy columns '${partitioningColumns.get.mkString(", ")}'")
}

BucketSpec(n, normalizedBucketColNames.get, normalizedSortColNames.getOrElse(Nil))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
}

test("Saving partition columns information") {
test("Saving partitionBy columns information") {
val df = (1 to 10).map(i => (i, i + 1, s"str$i", s"str${i + 1}")).toDF("a", "b", "c", "d")
val tableName = s"partitionInfo_${System.currentTimeMillis()}"

Expand Down Expand Up @@ -776,6 +776,59 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
}

test("Saving information for sortBy and bucketBy columns") {
val df = (1 to 10).map(i => (i, i + 1, s"str$i", s"str${i + 1}")).toDF("a", "b", "c", "d")
val tableName = s"partitionInfo_${System.currentTimeMillis()}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use bucketingInfo to be more accuracy.


withTable(tableName) {
df.write
.format("parquet")
.bucketBy(8, "d", "b")
.sortBy("c")
.saveAsTable(tableName)
invalidateTable(tableName)
val metastoreTable = catalog.client.getTable("default", tableName)
val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
val expectedSortByColumns = StructType(df.schema("c") :: Nil)

val numBuckets = metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt
assert(numBuckets == 8)

val numBucketCols = metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt
assert(numBucketCols == 2)

val numSortCols = metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt
assert(numSortCols == 1)

val actualBucketByColumns =
StructType(
(0 until numBucketCols).map { index =>
df.schema(metastoreTable.properties(s"spark.sql.sources.schema.bucketCol.$index"))
})
// Make sure bucketBy columns are correctly stored in metastore.
assert(
expectedBucketByColumns.sameType(actualBucketByColumns),
s"Partitions columns stored in metastore $actualBucketByColumns is not the " +
s"partition columns defined by the saveAsTable operation $expectedBucketByColumns.")

val actualSortByColumns =
StructType(
(0 until numSortCols).map { index =>
df.schema(metastoreTable.properties(s"spark.sql.sources.schema.sortCol.$index"))
})
// Make sure sortBy columns are correctly stored in metastore.
assert(
expectedSortByColumns.sameType(actualSortByColumns),
s"Partitions columns stored in metastore $actualSortByColumns is not the " +
s"partition columns defined by the saveAsTable operation $expectedSortByColumns.")

// Check the content of the saved table.
checkAnswer(
table(tableName).select("c", "b", "d", "a"),
df.select("c", "b", "d", "a"))
}
}

test("insert into a table") {
def createDF(from: Int, to: Int): DataFrame = {
(from to to).map(i => i -> s"str$i").toDF("c1", "c2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,13 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
fail(s"Unable to find the related bucket files.")
}

// Remove the duplicate columns in bucketCols and sortCols;
// Otherwise, we got analysis errors due to duplicate names
val selectedColumns = (bucketCols ++ sortCols).distinct
// We may lose the type information after write(e.g. json format doesn't keep schema
// information), here we get the types from the original dataframe.
val types = df.select((bucketCols ++ sortCols).map(col): _*).schema.map(_.dataType)
val columns = (bucketCols ++ sortCols).zip(types).map {
val types = df.select(selectedColumns.map(col): _*).schema.map(_.dataType)
val columns = selectedColumns.zip(types).map {
case (colName, dt) => col(colName).cast(dt)
}

Expand Down Expand Up @@ -158,6 +161,21 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
}
}

test("write bucketed data with the overlapping bucketBy and partitionBy columns") {
intercept[AnalysisException](df.write
.partitionBy("i")
.bucketBy(8, "i", "k")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it better to use i, j and j ,k for "overlapping"?

.sortBy("k")
.saveAsTable("bucketed_table"))
}

test("write bucketed data with the identical bucketBy and partitionBy columns") {
intercept[AnalysisException](df.write
.partitionBy("i")
.bucketBy(8, "i")
.saveAsTable("bucketed_table"))
}

test("write bucketed data without partitionBy") {
for (source <- Seq("parquet", "json", "orc")) {
withTable("bucketed_table") {
Expand Down