From 2198f5975a452daa8946ddb0bb084d826a448d54 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 19 Jun 2016 10:00:45 -0700 Subject: [PATCH 1/4] fix --- .../datasources/jdbc/JDBCRelation.scala | 39 +++++++---- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 65 +++++++++++++++++++ 2 files changed, 91 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index 233b7891d664c..7953babc91470 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -52,29 +52,42 @@ private[sql] object JDBCRelation { * @return an array of partitions with where clause for each partition */ def columnPartition(partitioning: JDBCPartitioningInfo): Array[Partition] = { - if (partitioning == null) return Array[Partition](JDBCPartition(null, 0)) + if (partitioning == null || partitioning.numPartitions <= 1 || + partitioning.lowerBound == partitioning.upperBound) { + return Array[Partition](JDBCPartition(null, 0)) + } - val numPartitions = partitioning.numPartitions + val lowerBound = partitioning.lowerBound + val upperBound = partitioning.upperBound + if (lowerBound > upperBound) { + throw new IllegalArgumentException("Operation not allowed: the lower bound of partitioning " + + s"column is larger than the upper bound. lowerBound: $lowerBound; higherBound: $upperBound") + } + val numPartitions = + if ((upperBound - lowerBound) >= partitioning.numPartitions) { + partitioning.numPartitions + } else { + upperBound - lowerBound + } val column = partitioning.column - if (numPartitions == 1) return Array[Partition](JDBCPartition(null, 0)) // Overflow and silliness can happen if you subtract then divide. // Here we get a little roundoff, but that's (hopefully) OK. - val stride: Long = (partitioning.upperBound / numPartitions - - partitioning.lowerBound / numPartitions) + val stride: Long = upperBound / numPartitions - lowerBound / numPartitions + assert(stride >= 1) var i: Int = 0 - var currentValue: Long = partitioning.lowerBound + var currentValue: Long = lowerBound var ans = new ArrayBuffer[Partition]() while (i < numPartitions) { - val lowerBound = if (i != 0) s"$column >= $currentValue" else null + val lBound = if (i != 0) s"$column >= $currentValue" else null currentValue += stride - val upperBound = if (i != numPartitions - 1) s"$column < $currentValue" else null + val uBound = if (i != numPartitions - 1) s"$column < $currentValue" else null val whereClause = - if (upperBound == null) { - lowerBound - } else if (lowerBound == null) { - s"$upperBound or $column is null" + if (uBound == null) { + lBound + } else if (lBound == null) { + s"$uBound or $column is null" } else { - s"$lowerBound AND $upperBound" + s"$lBound AND $uBound" } ans += JDBCPartition(whereClause, i) i = i + 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index d6ec40c18be2b..00ffd150fbee7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -184,6 +184,16 @@ class JDBCSuite extends SparkFunSuite "insert into test.emp values ('kathy', null, null)").executeUpdate() conn.commit() + conn.prepareStatement( + "create table test.seq(id INTEGER)").executeUpdate() + (0 to 6).foreach { value => + conn.prepareStatement( + s"insert into test.seq values ($value)").executeUpdate() + } + conn.prepareStatement( + "insert into test.seq values (null)").executeUpdate() + conn.commit() + sql( s""" |CREATE TEMPORARY TABLE nullparts @@ -373,6 +383,61 @@ class JDBCSuite extends SparkFunSuite .collect().length === 4) } + test("Partitioning on column where numPartitions is zero") { + val res = spark.read.jdbc( + url = urlWithUserAndPass, + table = "TEST.seq", + columnName = "id", + lowerBound = 0, + upperBound = 4, + numPartitions = 0, + connectionProperties = new Properties + ) + assert(res.count() === 8) + } + + test("Partitioning on column where numPartitions are more than the number of total rows") { + val res = spark.read.jdbc( + url = urlWithUserAndPass, + table = "TEST.seq", + columnName = "id", + lowerBound = 1, + upperBound = 5, + numPartitions = 10, + connectionProperties = new Properties + ) + assert(res.count() === 8) + } + + test("Partitioning on column where lowerBound is equal to upperBound") { + val res = spark.read.jdbc( + url = urlWithUserAndPass, + table = "TEST.seq", + columnName = "id", + lowerBound = 5, + upperBound = 5, + numPartitions = 4, + connectionProperties = new Properties + ) + assert(res.count() === 8) + } + + test("Partitioning on column where lowerBound is larger than upperBound") { + val e = intercept[IllegalArgumentException] { + spark.read.jdbc( + url = urlWithUserAndPass, + table = "TEST.seq", + columnName = "id", + lowerBound = 5, + upperBound = 1, + numPartitions = 3, + connectionProperties = new Properties + ) + }.getMessage + assert(e.contains("Operation not allowed: the lower bound of partitioning column " + + "is larger than the upper bound. lowerBound: 5; higherBound: 1")) + } + test("SELECT * on partitioned table with a nullable partition column") { assert(sql("SELECT * FROM nullparts").collect().size == 4) } From 94bc09c0b9e0022a4db7fa4f14285991077d7e9b Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 19 Jun 2016 11:45:58 -0700 Subject: [PATCH 2/4] address comments --- .../sql/execution/datasources/jdbc/JDBCRelation.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index 7953babc91470..f85b316d334a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -59,10 +59,10 @@ private[sql] object JDBCRelation { val lowerBound = partitioning.lowerBound val upperBound = partitioning.upperBound - if (lowerBound > upperBound) { - throw new IllegalArgumentException("Operation not allowed: the lower bound of partitioning " + - s"column is larger than the upper bound. lowerBound: $lowerBound; higherBound: $upperBound") - } + require (lowerBound <= upperBound, + "Operation not allowed: the lower bound of partitioning column is larger than the upper " + + s"bound. lowerBound: $lowerBound; higherBound: $upperBound") + val numPartitions = if ((upperBound - lowerBound) >= partitioning.numPartitions) { partitioning.numPartitions From da3720b7a70949e09c3562e6f3a168a690243b6c Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 20 Jun 2016 13:32:05 -0700 Subject: [PATCH 3/4] address comments --- .../sql/execution/datasources/jdbc/JDBCRelation.scala | 8 +++++--- .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index f85b316d334a2..b04c2279b6259 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -61,7 +61,7 @@ private[sql] object JDBCRelation { val upperBound = partitioning.upperBound require (lowerBound <= upperBound, "Operation not allowed: the lower bound of partitioning column is larger than the upper " + - s"bound. lowerBound: $lowerBound; higherBound: $upperBound") + s"bound. Lower bound: $lowerBound; Upper bound: $upperBound") val numPartitions = if ((upperBound - lowerBound) >= partitioning.numPartitions) { @@ -69,11 +69,13 @@ private[sql] object JDBCRelation { } else { upperBound - lowerBound } - val column = partitioning.column // Overflow and silliness can happen if you subtract then divide. // Here we get a little roundoff, but that's (hopefully) OK. val stride: Long = upperBound / numPartitions - lowerBound / numPartitions - assert(stride >= 1) + // The automatic adjustment of numPartitions can ensure the following checking condition. + assert(stride >= 1, "The specified number of partitions should be greater than " + + "the difference between upper bound and lower bound") + val column = partitioning.column var i: Int = 0 var currentValue: Long = lowerBound var ans = new ArrayBuffer[Partition]() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 00ffd150fbee7..fd6671a39b6ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -435,7 +435,7 @@ class JDBCSuite extends SparkFunSuite ) }.getMessage assert(e.contains("Operation not allowed: the lower bound of partitioning column " + - "is larger than the upper bound. lowerBound: 5; higherBound: 1")) + "is larger than the upper bound. Lower bound: 5; Upper bound: 1")) } test("SELECT * on partitioned table with a nullable partition column") { From 7d2ae9a0ea061577402aece4c2b49458009d7029 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 20 Jun 2016 18:38:10 -0700 Subject: [PATCH 4/4] address comments --- .../sql/execution/datasources/jdbc/JDBCRelation.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index b04c2279b6259..11613dd912eca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -21,6 +21,7 @@ import java.util.Properties import scala.collection.mutable.ArrayBuffer +import org.apache.spark.internal.Logging import org.apache.spark.Partition import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession, SQLContext} @@ -36,7 +37,7 @@ private[sql] case class JDBCPartitioningInfo( upperBound: Long, numPartitions: Int) -private[sql] object JDBCRelation { +private[sql] object JDBCRelation extends Logging { /** * Given a partitioning schematic (a column of integral type, a number of * partitions, and upper and lower bounds on the column's value), generate @@ -67,14 +68,16 @@ private[sql] object JDBCRelation { if ((upperBound - lowerBound) >= partitioning.numPartitions) { partitioning.numPartitions } else { + logWarning("The number of partitions is reduced because the specified number of " + + "partitions is less than the difference between upper bound and lower bound. " + + s"Updated number of partitions: ${upperBound - lowerBound}; Input number of " + + s"partitions: ${partitioning.numPartitions}; Lower bound: $lowerBound; " + + s"Upper bound: $upperBound.") upperBound - lowerBound } // Overflow and silliness can happen if you subtract then divide. // Here we get a little roundoff, but that's (hopefully) OK. val stride: Long = upperBound / numPartitions - lowerBound / numPartitions - // The automatic adjustment of numPartitions can ensure the following checking condition. - assert(stride >= 1, "The specified number of partitions should be greater than " + - "the difference between upper bound and lower bound") val column = partitioning.column var i: Int = 0 var currentValue: Long = lowerBound