From 901a552c5e973262fddbf70ee2d4078c948bc668 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 6 Jun 2015 22:59:31 -0700 Subject: [PATCH 1/4] [SPARK-8004][SQL] Enclose column names by JDBC Dialect JIRA: https://issues.apache.org/jira/browse/SPARK-8004 Author: Liang-Chi Hsieh Closes #6577 from viirya/enclose_jdbc_columns and squashes the following commits: 614606a [Liang-Chi Hsieh] For comment. bc50182 [Liang-Chi Hsieh] Enclose column names by JDBC Dialect. --- .../scala/org/apache/spark/sql/jdbc/JDBCRDD.scala | 4 +++- .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 13 +++++++++++++ .../scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 11 +++++++++++ 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala index 40b604d710dce..2930f7bb4cae1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala @@ -211,12 +211,14 @@ private[sql] object JDBCRDD extends Logging { requiredColumns: Array[String], filters: Array[Filter], parts: Array[Partition]): RDD[Row] = { + val dialect = JdbcDialects.get(url) + val enclosedColumns = requiredColumns.map(dialect.columnEnclosing(_)) new JDBCRDD( sc, getConnector(driver, url, properties), pruneSchema(schema, requiredColumns), fqTable, - requiredColumns, + enclosedColumns, filters, parts, properties) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 6a169e106b968..04052f80f5e78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -80,6 +80,15 @@ abstract class JdbcDialect { * @return The new JdbcType if there is an override for this DataType */ def getJDBCType(dt: DataType): Option[JdbcType] = None + + /** + * Enclose column name + * @param colName The coulmn name + * @return Enclosed column name + */ + def columnEnclosing(colName: String): String = { + s""""$colName"""" + } } /** @@ -208,4 +217,8 @@ case object MySQLDialect extends JdbcDialect { Some(BooleanType) } else None } + + override def columnEnclosing(colName: String): String = { + s"`$colName`" + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 7931854db27c1..a228543953536 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -410,6 +410,17 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter { assert(JdbcDialects.get("test.invalid") == NoopDialect) } + test("Enclosing column names by jdbc dialect") { + val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db") + val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") + + val columns = Seq("abc", "key") + val MySQLColumns = columns.map(MySQL.columnEnclosing(_)) + val PostgresColumns = columns.map(Postgres.columnEnclosing(_)) + assert(MySQLColumns === Seq("`abc`", "`key`")) + assert(PostgresColumns === Seq(""""abc"""", """"key"""")) + } + test("Dialect unregister") { JdbcDialects.registerDialect(testH2Dialect) JdbcDialects.unregisterDialect(testH2Dialect) From db9a8e028930e1b24123ce2680383df78a06a72c Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sat, 6 Jun 2015 23:04:58 -0700 Subject: [PATCH 2/4] [SPARK-8004][SQL] Quote identifier in JDBC data source. This is a follow-up patch to #6577 to replace columnEnclosing to quoteIdentifier. --- .../apache/spark/sql/jdbc/JdbcDialects.scala | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 04052f80f5e78..8849fc2f1f0ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -17,11 +17,11 @@ package org.apache.spark.sql.jdbc +import java.sql.Types + import org.apache.spark.sql.types._ import org.apache.spark.annotation.DeveloperApi -import java.sql.Types - /** * :: DeveloperApi :: * A database type definition coupled with the jdbc type needed to send null @@ -82,11 +82,10 @@ abstract class JdbcDialect { def getJDBCType(dt: DataType): Option[JdbcType] = None /** - * Enclose column name - * @param colName The coulmn name - * @return Enclosed column name + * Quotes the identifier. This is used to put quotes around the identifier in case the column + * name is a reserved keyword, or in case it contains characters that require quotes (e.g. space). */ - def columnEnclosing(colName: String): String = { + def quoteIdentifier(colName: String): String = { s""""$colName"""" } } @@ -150,18 +149,19 @@ object JdbcDialects { @DeveloperApi class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect { - require(!dialects.isEmpty) + require(dialects.nonEmpty) - def canHandle(url : String): Boolean = + override def canHandle(url : String): Boolean = dialects.map(_.canHandle(url)).reduce(_ && _) override def getCatalystType( - sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = - dialects.map(_.getCatalystType(sqlType, typeName, size, md)).flatten.headOption - - override def getJDBCType(dt: DataType): Option[JdbcType] = - dialects.map(_.getJDBCType(dt)).flatten.headOption + sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { + dialects.flatMap(_.getCatalystType(sqlType, typeName, size, md)).headOption + } + override def getJDBCType(dt: DataType): Option[JdbcType] = { + dialects.flatMap(_.getJDBCType(dt)).headOption + } } /** @@ -170,7 +170,7 @@ class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect { */ @DeveloperApi case object NoopDialect extends JdbcDialect { - def canHandle(url : String): Boolean = true + override def canHandle(url : String): Boolean = true } /** @@ -179,7 +179,7 @@ case object NoopDialect extends JdbcDialect { */ @DeveloperApi case object PostgresDialect extends JdbcDialect { - def canHandle(url: String): Boolean = url.startsWith("jdbc:postgresql") + override def canHandle(url: String): Boolean = url.startsWith("jdbc:postgresql") override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) { @@ -205,7 +205,7 @@ case object PostgresDialect extends JdbcDialect { */ @DeveloperApi case object MySQLDialect extends JdbcDialect { - def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql") + override def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql") override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { if (sqlType == Types.VARBINARY && typeName.equals("BIT") && size != 1) { @@ -218,7 +218,7 @@ case object MySQLDialect extends JdbcDialect { } else None } - override def columnEnclosing(colName: String): String = { + override def quoteIdentifier(colName: String): String = { s"`$colName`" } } From e39e14ed03eb993f4914ae24d88fc069b0684328 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sat, 6 Jun 2015 23:16:19 -0700 Subject: [PATCH 3/4] Fixed compilation. --- .../src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala index 2930f7bb4cae1..db68b9c86db1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala @@ -212,13 +212,13 @@ private[sql] object JDBCRDD extends Logging { filters: Array[Filter], parts: Array[Partition]): RDD[Row] = { val dialect = JdbcDialects.get(url) - val enclosedColumns = requiredColumns.map(dialect.columnEnclosing(_)) + val quotedColumns = requiredColumns.map(colName => dialect.quoteIdentifier(colName)) new JDBCRDD( sc, getConnector(driver, url, properties), pruneSchema(schema, requiredColumns), fqTable, - enclosedColumns, + quotedColumns, filters, parts, properties) From bad365f13b8b628e34c1bb8d95b0a8a46d975460 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sat, 6 Jun 2015 23:37:57 -0700 Subject: [PATCH 4/4] Fixed test compilation... --- .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index a228543953536..49d348c3ed21b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -410,13 +410,13 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter { assert(JdbcDialects.get("test.invalid") == NoopDialect) } - test("Enclosing column names by jdbc dialect") { + test("quote column names by jdbc dialect") { val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db") val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") val columns = Seq("abc", "key") - val MySQLColumns = columns.map(MySQL.columnEnclosing(_)) - val PostgresColumns = columns.map(Postgres.columnEnclosing(_)) + val MySQLColumns = columns.map(MySQL.quoteIdentifier(_)) + val PostgresColumns = columns.map(Postgres.quoteIdentifier(_)) assert(MySQLColumns === Seq("`abc`", "`key`")) assert(PostgresColumns === Seq(""""abc"""", """"key"""")) }