From e4aac502d58972063a1ab25f17a1c217abe97b97 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 4 Aug 2017 22:38:15 -0700 Subject: [PATCH] improve message. --- .../datasources/jdbc/JDBCOptions.scala | 11 ++++++---- .../jdbc/JdbcRelationProvider.scala | 9 ++++++-- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 22 +++++++++++++++++++ .../spark/sql/jdbc/JDBCWriteSuite.scala | 5 +++-- 4 files changed, 39 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 591096d5efd22..96a8a51da18e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -97,10 +97,13 @@ class JDBCOptions( val lowerBound = parameters.get(JDBC_LOWER_BOUND).map(_.toLong) // the upper bound of the partition column val upperBound = parameters.get(JDBC_UPPER_BOUND).map(_.toLong) - require(partitionColumn.isEmpty || - (lowerBound.isDefined && upperBound.isDefined && numPartitions.isDefined), - s"If '$JDBC_PARTITION_COLUMN' is specified then '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND'," + - s" and '$JDBC_NUM_PARTITIONS' are required.") + // numPartitions is also used for data source writing + require((partitionColumn.isEmpty && lowerBound.isEmpty && upperBound.isEmpty) || + (partitionColumn.isDefined && lowerBound.isDefined && upperBound.isDefined && + numPartitions.isDefined), + s"When reading JDBC data sources, users need to specify all or none for the following " + + s"options: '$JDBC_PARTITION_COLUMN', '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', " + + s"and '$JDBC_NUM_PARTITIONS'") val fetchSize = { val size = parameters.getOrElse(JDBC_BATCH_FETCH_SIZE, "0").toInt require(size >= 0, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index 74dcfb06f5c2b..37e7bb0a59bb6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -29,6 +29,8 @@ class JdbcRelationProvider extends CreatableRelationProvider override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { + import JDBCOptions._ + val jdbcOptions = new JDBCOptions(parameters) val partitionColumn = jdbcOptions.partitionColumn val lowerBound = jdbcOptions.lowerBound @@ -36,10 +38,13 @@ class JdbcRelationProvider extends CreatableRelationProvider val numPartitions = jdbcOptions.numPartitions val partitionInfo = if (partitionColumn.isEmpty) { - assert(lowerBound.isEmpty && upperBound.isEmpty) + assert(lowerBound.isEmpty && upperBound.isEmpty, "When 'partitionColumn' is not specified, " + + s"'$JDBC_LOWER_BOUND' and '$JDBC_UPPER_BOUND' are expected to be empty") null } else { - assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty) + assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty, + s"When 'partitionColumn' is specified, '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', and " + + s"'$JDBC_NUM_PARTITIONS' are also required") JDBCPartitioningInfo( partitionColumn.get, lowerBound.get, upperBound.get, numPartitions.get) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 24f46a6a057d9..4c43646889418 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -421,6 +421,28 @@ class JDBCSuite extends SparkFunSuite assert(e.contains("Invalid value `-1` for parameter `fetchsize`")) } + test("Missing partition columns") { + withView("tempPeople") { + val e = intercept[IllegalArgumentException] { + sql( + s""" + |CREATE OR REPLACE TEMPORARY VIEW tempPeople + |USING org.apache.spark.sql.jdbc + |OPTIONS ( + | url 'jdbc:h2:mem:testdb0;user=testUser;password=testPass', + | dbtable 'TEST.PEOPLE', + | lowerBound '0', + | upperBound '52', + | numPartitions '53', + | fetchSize '10000' ) + """.stripMargin.replaceAll("\n", " ")) + }.getMessage + assert(e.contains("When reading JDBC data sources, users need to specify all or none " + + "for the following options: 'partitionColumn', 'lowerBound', 'upperBound', and " + + "'numPartitions'")) + } + } + test("Basic API with FetchSize") { (0 to 4).foreach { size => val properties = new Properties() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 2334d5ae32dc3..b7f97f204b24c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -324,8 +324,9 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { .option("partitionColumn", "foo") .save() }.getMessage - assert(e.contains("If 'partitionColumn' is specified then 'lowerBound', 'upperBound'," + - " and 'numPartitions' are required.")) + assert(e.contains("When reading JDBC data sources, users need to specify all or none " + + "for the following options: 'partitionColumn', 'lowerBound', 'upperBound', and " + + "'numPartitions'")) } test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {