diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 91ff736cca1c0..2b3fde36225cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -242,29 +242,15 @@ final class DataFrameWriter private[sql](df: DataFrame) extends Logging { } yield { require(n > 0 && n < 100000, "Bucket number must be greater than 0 and less than 100000.") - if (normalizedParCols.isEmpty) { - BucketSpec(n, normalizedBucketColNames.get, normalizedSortColNames.getOrElse(Nil)) - } else { - // When partitionBy and blockBy are used at the same time, the overlapping columns are - // useless. Thus, we removed these overlapping columns from blockBy. - val bucketColumns: Seq[String] = - normalizedBucketColNames.get.filterNot(normalizedParCols.get.contains) - - if (bucketColumns.nonEmpty) { - if (bucketColumns.length != normalizedBucketColNames.get.length) { - val removedColumns: Seq[String] = - normalizedBucketColNames.get.filter(normalizedParCols.get.contains) - logInfo(s"bucketBy columns is changed to '${bucketColumnNames.get.mkString(", ")}' " + - s"after removing the columns '${removedColumns.mkString(", ")}' that are part of " + - s"partitionBy columns '${partitioningColumns.get.mkString(", ")}'") - } - BucketSpec(n, bucketColumns, normalizedSortColNames.getOrElse(Nil)) - } else { + // partitionBy columns cannot be used in blockedBy + if (normalizedParCols.nonEmpty && + normalizedBucketColNames.get.toSet.intersect(normalizedParCols.get.toSet).nonEmpty) { throw new AnalysisException( - s"bucketBy columns '${bucketColumnNames.get.mkString(", ")}' should not be the " + - s"subset of partitionBy columns '${partitioningColumns.get.mkString(", ")}'") - } + s"bucketBy columns '${bucketColumnNames.get.mkString(", ")}' should not be part of " + + s"partitionBy columns '${partitioningColumns.get.mkString(", ")}'") } + + BucketSpec(n, normalizedBucketColNames.get, normalizedSortColNames.getOrElse(Nil)) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 4061456195f2e..40a74e67a67b0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -162,34 +162,18 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle } test("write bucketed data with the overlapping blockBy and partitionBy columns") { - for (source <- Seq("parquet", "json", "orc")) { - withTable("bucketed_table") { - df.write - .format(source) - .partitionBy("i") - .bucketBy(8, "i", "k") - .sortBy("k") - .saveAsTable("bucketed_table") - - for (i <- 0 until 5) { - // After column pruning, the actual bucketBy columns only contain `k`, which - // is identical to the sortBy column. - testBucketing(new File(tableDir, s"i=$i"), source, 8, Seq("k"), Seq("k")) - } - } - } + intercept[AnalysisException](df.write + .partitionBy("i") + .bucketBy(8, "i", "k") + .sortBy("k") + .saveAsTable("bucketed_table")) } test("write bucketed data with the identical blockBy and partitionBy columns") { - for (source <- Seq("parquet", "json", "orc")) { - withTable("bucketed_table") { - intercept[AnalysisException](df.write - .format(source) - .partitionBy("i") - .bucketBy(8, "i") - .saveAsTable("bucketed_table")) - } - } + intercept[AnalysisException](df.write + .partitionBy("i") + .bucketBy(8, "i") + .saveAsTable("bucketed_table")) } test("write bucketed data without partitionBy") {