diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index ab63fe4aa88b7..12eb2393634a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -240,6 +240,15 @@ final class DataFrameWriter private[sql](df: DataFrame) { n <- numBuckets } yield { require(n > 0 && n < 100000, "Bucket number must be greater than 0 and less than 100000.") + + // partitionBy columns cannot be used in bucketBy + if (normalizedParCols.nonEmpty && + normalizedBucketColNames.get.toSet.intersect(normalizedParCols.get.toSet).nonEmpty) { + throw new AnalysisException( + s"bucketBy columns '${bucketColumnNames.get.mkString(", ")}' should not be part of " + + s"partitionBy columns '${partitioningColumns.get.mkString(", ")}'") + } + BucketSpec(n, normalizedBucketColNames.get, normalizedSortColNames.getOrElse(Nil)) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 253f13c598520..211932fea00ed 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -745,7 +745,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } } - test("Saving partition columns information") { + test("Saving partitionBy columns information") { val df = (1 to 10).map(i => (i, i + 1, s"str$i", s"str${i + 1}")).toDF("a", "b", "c", "d") val tableName = s"partitionInfo_${System.currentTimeMillis()}" @@ -776,6 +776,59 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } } + test("Saving information for sortBy and bucketBy columns") { + val df = (1 to 10).map(i => (i, i + 1, s"str$i", s"str${i + 1}")).toDF("a", "b", "c", "d") + val tableName = s"bucketingInfo_${System.currentTimeMillis()}" + + withTable(tableName) { + df.write + .format("parquet") + .bucketBy(8, "d", "b") + .sortBy("c") + .saveAsTable(tableName) + invalidateTable(tableName) + val metastoreTable = catalog.client.getTable("default", tableName) + val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) + val expectedSortByColumns = StructType(df.schema("c") :: Nil) + + val numBuckets = metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt + assert(numBuckets == 8) + + val numBucketCols = metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt + assert(numBucketCols == 2) + + val numSortCols = metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt + assert(numSortCols == 1) + + val actualBucketByColumns = + StructType( + (0 until numBucketCols).map { index => + df.schema(metastoreTable.properties(s"spark.sql.sources.schema.bucketCol.$index")) + }) + // Make sure bucketBy columns are correctly stored in metastore. + assert( + expectedBucketByColumns.sameType(actualBucketByColumns), + s"Partitions columns stored in metastore $actualBucketByColumns is not the " + + s"partition columns defined by the saveAsTable operation $expectedBucketByColumns.") + + val actualSortByColumns = + StructType( + (0 until numSortCols).map { index => + df.schema(metastoreTable.properties(s"spark.sql.sources.schema.sortCol.$index")) + }) + // Make sure sortBy columns are correctly stored in metastore. + assert( + expectedSortByColumns.sameType(actualSortByColumns), + s"Partitions columns stored in metastore $actualSortByColumns is not the " + + s"partition columns defined by the saveAsTable operation $expectedSortByColumns.") + + // Check the content of the saved table. + checkAnswer( + table(tableName).select("c", "b", "d", "a"), + df.select("c", "b", "d", "a")) + } + } + test("insert into a table") { def createDF(from: Int, to: Int): DataFrame = { (from to to).map(i => i -> s"str$i").toDF("c1", "c2") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 59b74d2b4c5ea..a32f8fb4c5a1d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -92,10 +92,13 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle fail(s"Unable to find the related bucket files.") } + // Remove the duplicate columns in bucketCols and sortCols; + // Otherwise, we got analysis errors due to duplicate names + val selectedColumns = (bucketCols ++ sortCols).distinct // We may lose the type information after write(e.g. json format doesn't keep schema // information), here we get the types from the original dataframe. - val types = df.select((bucketCols ++ sortCols).map(col): _*).schema.map(_.dataType) - val columns = (bucketCols ++ sortCols).zip(types).map { + val types = df.select(selectedColumns.map(col): _*).schema.map(_.dataType) + val columns = selectedColumns.zip(types).map { case (colName, dt) => col(colName).cast(dt) } @@ -158,6 +161,21 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle } } + test("write bucketed data with the overlapping bucketBy and partitionBy columns") { + intercept[AnalysisException](df.write + .partitionBy("i", "j") + .bucketBy(8, "j", "k") + .sortBy("k") + .saveAsTable("bucketed_table")) + } + + test("write bucketed data with the identical bucketBy and partitionBy columns") { + intercept[AnalysisException](df.write + .partitionBy("i") + .bucketBy(8, "i") + .saveAsTable("bucketed_table")) + } + test("write bucketed data without partitionBy") { for (source <- Seq("parquet", "json", "orc")) { withTable("bucketed_table") {