From 159d8418752c789a6cd8d5e4dc4245501f582aa0 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 18 Jan 2017 07:36:43 -0800 Subject: [PATCH] fix. --- docs/sql-programming-guide.md | 7 ++++++ .../apache/spark/sql/DataFrameWriter.scala | 24 +++++++++++++------ .../datasources/jdbc/JDBCOptions.scala | 17 ++++++++++++- .../spark/sql/jdbc/JDBCWriteSuite.scala | 10 ++++++++ 4 files changed, 50 insertions(+), 8 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index d38aed1db719f..274310fc4c5d4 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1023,6 +1023,13 @@ the Data Sources API. The following options are supported: The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows). + + + createTableOptions + + This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table. For example: CREATE TABLE t (name string) ENGINE=InnoDB. + +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index a4c4a5defa1b3..3cad7df447627 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project} import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource, HadoopFsRelation} -import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils +import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} /** * Interface used to write a [[Dataset]] to external storage systems (e.g. file systems, @@ -399,6 +399,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { assertNotPartitioned("jdbc") assertNotBucketed("jdbc") + // to add required options like URL and dbtable + val params = extraOptions.toMap ++ Map("url" -> url, "dbtable" -> table) + val jdbcOptions = new JDBCOptions(params) + val jdbcUrl = jdbcOptions.url + val jdbcTable = jdbcOptions.table + val props = new Properties() extraOptions.foreach { case (key, value) => props.put(key, value) @@ -408,25 +414,29 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val conn = JdbcUtils.createConnectionFactory(url, props)() try { - var tableExists = JdbcUtils.tableExists(conn, url, table) + var tableExists = JdbcUtils.tableExists(conn, jdbcUrl, jdbcTable) if (mode == SaveMode.Ignore && tableExists) { return } if (mode == SaveMode.ErrorIfExists && tableExists) { - sys.error(s"Table $table already exists.") + sys.error(s"Table $jdbcTable already exists.") } if (mode == SaveMode.Overwrite && tableExists) { - JdbcUtils.dropTable(conn, table) + JdbcUtils.dropTable(conn, jdbcTable) tableExists = false } // Create the table if the table didn't exist. if (!tableExists) { - val schema = JdbcUtils.schemaString(df, url) - val sql = s"CREATE TABLE $table ($schema)" + val schema = JdbcUtils.schemaString(df, jdbcUrl) + // To allow certain options to append when create a new table, which can be + // table_options or partition_options. + // E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8" + val createtblOptions = jdbcOptions.createTableOptions + val sql = s"CREATE TABLE $jdbcTable ($schema) $createtblOptions" val statement = conn.createStatement try { statement.executeUpdate(sql) @@ -438,7 +448,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { conn.close() } - JdbcUtils.saveTable(df, url, table, props) + JdbcUtils.saveTable(df, jdbcUrl, jdbcTable, props) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 6c6ec89746ee1..a9f156d51f964 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -20,14 +20,21 @@ package org.apache.spark.sql.execution.datasources.jdbc /** * Options for the JDBC data source. */ -private[jdbc] class JDBCOptions( +class JDBCOptions( @transient private val parameters: Map[String, String]) extends Serializable { + // ------------------------------------------------------------ + // Required parameters + // ------------------------------------------------------------ // a JDBC URL val url = parameters.getOrElse("url", sys.error("Option 'url' not specified")) // name of table val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified")) + + // ------------------------------------------------------------ + // Optional parameter list + // ------------------------------------------------------------ // the column used to partition val partitionColumn = parameters.getOrElse("partitionColumn", null) // the lower bound of partition column @@ -36,4 +43,12 @@ private[jdbc] class JDBCOptions( val upperBound = parameters.getOrElse("upperBound", null) // the number of partitions val numPartitions = parameters.getOrElse("numPartitions", null) + + // ------------------------------------------------------------ + // The options for DataFrameWriter + // ------------------------------------------------------------ + // the create table option , which can be table_options or partition_options. + // E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8" + // TODO: to reuse the existing partition parameters for those partition specific options + val createTableOptions = parameters.getOrElse("createTableOptions", "") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 2c6449fa6870b..a6e63e826d92a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -155,6 +155,16 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length) } + test("createTableOptions") { + val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + + val m = intercept[org.h2.jdbc.JdbcSQLException] { + df.write.option("createTableOptions", "ENGINE tableEngineName") + .jdbc(url1, "TEST.CREATETBLOPTS", properties) + }.getMessage + assert(m.contains("Class \"TABLEENGINENAME\" not found")) + } + test("Incompatible INSERT to append") { val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) val df2 = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3)