Skip to content

Commit

Permalink
address comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
gatorsmile committed Jan 25, 2016
1 parent 8c718b3 commit d207813
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 46 deletions.
28 changes: 7 additions & 21 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -242,29 +242,15 @@ final class DataFrameWriter private[sql](df: DataFrame) extends Logging {
} yield {
require(n > 0 && n < 100000, "Bucket number must be greater than 0 and less than 100000.")

if (normalizedParCols.isEmpty) {
BucketSpec(n, normalizedBucketColNames.get, normalizedSortColNames.getOrElse(Nil))
} else {
// When partitionBy and blockBy are used at the same time, the overlapping columns are
// useless. Thus, we removed these overlapping columns from blockBy.
val bucketColumns: Seq[String] =
normalizedBucketColNames.get.filterNot(normalizedParCols.get.contains)

if (bucketColumns.nonEmpty) {
if (bucketColumns.length != normalizedBucketColNames.get.length) {
val removedColumns: Seq[String] =
normalizedBucketColNames.get.filter(normalizedParCols.get.contains)
logInfo(s"bucketBy columns is changed to '${bucketColumnNames.get.mkString(", ")}' " +
s"after removing the columns '${removedColumns.mkString(", ")}' that are part of " +
s"partitionBy columns '${partitioningColumns.get.mkString(", ")}'")
}
BucketSpec(n, bucketColumns, normalizedSortColNames.getOrElse(Nil))
} else {
// partitionBy columns cannot be used in blockedBy
if (normalizedParCols.nonEmpty &&
normalizedBucketColNames.get.toSet.intersect(normalizedParCols.get.toSet).nonEmpty) {
throw new AnalysisException(
s"bucketBy columns '${bucketColumnNames.get.mkString(", ")}' should not be the " +
s"subset of partitionBy columns '${partitioningColumns.get.mkString(", ")}'")
}
s"bucketBy columns '${bucketColumnNames.get.mkString(", ")}' should not be part of " +
s"partitionBy columns '${partitioningColumns.get.mkString(", ")}'")
}

BucketSpec(n, normalizedBucketColNames.get, normalizedSortColNames.getOrElse(Nil))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,34 +162,18 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
}

test("write bucketed data with the overlapping blockBy and partitionBy columns") {
for (source <- Seq("parquet", "json", "orc")) {
withTable("bucketed_table") {
df.write
.format(source)
.partitionBy("i")
.bucketBy(8, "i", "k")
.sortBy("k")
.saveAsTable("bucketed_table")

for (i <- 0 until 5) {
// After column pruning, the actual bucketBy columns only contain `k`, which
// is identical to the sortBy column.
testBucketing(new File(tableDir, s"i=$i"), source, 8, Seq("k"), Seq("k"))
}
}
}
intercept[AnalysisException](df.write
.partitionBy("i")
.bucketBy(8, "i", "k")
.sortBy("k")
.saveAsTable("bucketed_table"))
}

test("write bucketed data with the identical blockBy and partitionBy columns") {
for (source <- Seq("parquet", "json", "orc")) {
withTable("bucketed_table") {
intercept[AnalysisException](df.write
.format(source)
.partitionBy("i")
.bucketBy(8, "i")
.saveAsTable("bucketed_table"))
}
}
intercept[AnalysisException](df.write
.partitionBy("i")
.bucketBy(8, "i")
.saveAsTable("bucketed_table"))
}

test("write bucketed data without partitionBy") {
Expand Down

0 comments on commit d207813

Please sign in to comment.