From 6b2841a183825aea1d37287b8530bcb37cdee2c5 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 31 Jan 2017 17:09:14 +0900 Subject: [PATCH 1/2] Deduplicate arguments in JdbcUtils --- .../jdbc/JdbcRelationProvider.scala | 22 +++++++--------- .../datasources/jdbc/JdbcUtils.scala | 26 +++++++++---------- 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index e39d936f3933f..1563d0b4dcce0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -53,33 +53,31 @@ class JdbcRelationProvider extends CreatableRelationProvider parameters: Map[String, String], df: DataFrame): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) - val url = jdbcOptions.url val table = jdbcOptions.table - val createTableOptions = jdbcOptions.createTableOptions val isTruncate = jdbcOptions.isTruncate val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis val conn = JdbcUtils.createConnectionFactory(jdbcOptions)() try { - val tableExists = JdbcUtils.tableExists(conn, url, table) + val tableExists = JdbcUtils.tableExists(conn, jdbcOptions) if (tableExists) { mode match { case SaveMode.Overwrite => - if (isTruncate && isCascadingTruncateTable(url) == Some(false)) { + if (isTruncate && isCascadingTruncateTable(jdbcOptions.url) == Some(false)) { // In this case, we should truncate table and then load. truncateTable(conn, table) - val tableSchema = JdbcUtils.getSchemaOption(conn, url, table) - saveTable(df, url, table, tableSchema, isCaseSensitive, jdbcOptions) + val tableSchema = JdbcUtils.getSchemaOption(conn, jdbcOptions) + saveTable(df, tableSchema, isCaseSensitive, jdbcOptions) } else { // Otherwise, do not truncate the table, instead drop and recreate it dropTable(conn, table) - createTable(df.schema, url, table, createTableOptions, conn) - saveTable(df, url, table, Some(df.schema), isCaseSensitive, jdbcOptions) + createTable(conn, df.schema, jdbcOptions) + saveTable(df, Some(df.schema), isCaseSensitive, jdbcOptions) } case SaveMode.Append => - val tableSchema = JdbcUtils.getSchemaOption(conn, url, table) - saveTable(df, url, table, tableSchema, isCaseSensitive, jdbcOptions) + val tableSchema = JdbcUtils.getSchemaOption(conn, jdbcOptions) + saveTable(df, tableSchema, isCaseSensitive, jdbcOptions) case SaveMode.ErrorIfExists => throw new AnalysisException( @@ -91,8 +89,8 @@ class JdbcRelationProvider extends CreatableRelationProvider // Therefore, it is okay to do nothing here and then just return the relation below. } } else { - createTable(df.schema, url, table, createTableOptions, conn) - saveTable(df, url, table, Some(df.schema), isCaseSensitive, jdbcOptions) + createTable(conn, df.schema, jdbcOptions) + saveTable(df, Some(df.schema), isCaseSensitive, jdbcOptions) } } finally { conn.close() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 0590aec77c3cd..d89f600874177 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -63,14 +63,14 @@ object JdbcUtils extends Logging { /** * Returns true if the table already exists in the JDBC database. */ - def tableExists(conn: Connection, url: String, table: String): Boolean = { - val dialect = JdbcDialects.get(url) + def tableExists(conn: Connection, options: JDBCOptions): Boolean = { + val dialect = JdbcDialects.get(options.url) // Somewhat hacky, but there isn't a good way to identify whether a table exists for all // SQL database systems using JDBC meta data calls, considering "table" could also include // the database name. Query used to find table exists can be overridden by the dialects. Try { - val statement = conn.prepareStatement(dialect.getTableExistsQuery(table)) + val statement = conn.prepareStatement(dialect.getTableExistsQuery(options.table)) try { statement.executeQuery() } finally { @@ -235,11 +235,11 @@ object JdbcUtils extends Logging { /** * Returns the schema if the table already exists in the JDBC database. */ - def getSchemaOption(conn: Connection, url: String, table: String): Option[StructType] = { - val dialect = JdbcDialects.get(url) + def getSchemaOption(conn: Connection, options: JDBCOptions): Option[StructType] = { + val dialect = JdbcDialects.get(options.url) try { - val statement = conn.prepareStatement(dialect.getSchemaQuery(table)) + val statement = conn.prepareStatement(dialect.getSchemaQuery(options.table)) try { Some(getSchema(statement.executeQuery(), dialect)) } catch { @@ -697,11 +697,11 @@ object JdbcUtils extends Logging { */ def saveTable( df: DataFrame, - url: String, - table: String, tableSchema: Option[StructType], isCaseSensitive: Boolean, options: JDBCOptions): Unit = { + val url = options.url + val table = options.table val dialect = JdbcDialects.get(url) val rddSchema = df.schema val getConnection: () => Connection = createConnectionFactory(options) @@ -725,12 +725,12 @@ object JdbcUtils extends Logging { * Creates a table with a given schema. */ def createTable( + conn: Connection, schema: StructType, - url: String, - table: String, - createTableOptions: String, - conn: Connection): Unit = { - val strSchema = schemaString(schema, url) + options: JDBCOptions): Unit = { + val strSchema = schemaString(schema, options.url) + val table = options.table + val createTableOptions = options.createTableOptions // Create the table if the table does not exist. // To allow certain options to append when create a new table, which can be // table_options or partition_options. From 918456ab4b73698eaa229acb1fa723e909db9a13 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 1 Feb 2017 21:09:46 +0900 Subject: [PATCH 2/2] Remove separate variables per comments --- .../jdbc/JdbcRelationProvider.scala | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index 1563d0b4dcce0..88f6cb0021305 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -52,36 +52,34 @@ class JdbcRelationProvider extends CreatableRelationProvider mode: SaveMode, parameters: Map[String, String], df: DataFrame): BaseRelation = { - val jdbcOptions = new JDBCOptions(parameters) - val table = jdbcOptions.table - val isTruncate = jdbcOptions.isTruncate + val options = new JDBCOptions(parameters) val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis - val conn = JdbcUtils.createConnectionFactory(jdbcOptions)() + val conn = JdbcUtils.createConnectionFactory(options)() try { - val tableExists = JdbcUtils.tableExists(conn, jdbcOptions) + val tableExists = JdbcUtils.tableExists(conn, options) if (tableExists) { mode match { case SaveMode.Overwrite => - if (isTruncate && isCascadingTruncateTable(jdbcOptions.url) == Some(false)) { + if (options.isTruncate && isCascadingTruncateTable(options.url) == Some(false)) { // In this case, we should truncate table and then load. - truncateTable(conn, table) - val tableSchema = JdbcUtils.getSchemaOption(conn, jdbcOptions) - saveTable(df, tableSchema, isCaseSensitive, jdbcOptions) + truncateTable(conn, options.table) + val tableSchema = JdbcUtils.getSchemaOption(conn, options) + saveTable(df, tableSchema, isCaseSensitive, options) } else { // Otherwise, do not truncate the table, instead drop and recreate it - dropTable(conn, table) - createTable(conn, df.schema, jdbcOptions) - saveTable(df, Some(df.schema), isCaseSensitive, jdbcOptions) + dropTable(conn, options.table) + createTable(conn, df.schema, options) + saveTable(df, Some(df.schema), isCaseSensitive, options) } case SaveMode.Append => - val tableSchema = JdbcUtils.getSchemaOption(conn, jdbcOptions) - saveTable(df, tableSchema, isCaseSensitive, jdbcOptions) + val tableSchema = JdbcUtils.getSchemaOption(conn, options) + saveTable(df, tableSchema, isCaseSensitive, options) case SaveMode.ErrorIfExists => throw new AnalysisException( - s"Table or view '$table' already exists. SaveMode: ErrorIfExists.") + s"Table or view '${options.table}' already exists. SaveMode: ErrorIfExists.") case SaveMode.Ignore => // With `SaveMode.Ignore` mode, if table already exists, the save operation is expected @@ -89,8 +87,8 @@ class JdbcRelationProvider extends CreatableRelationProvider // Therefore, it is okay to do nothing here and then just return the relation below. } } else { - createTable(conn, df.schema, jdbcOptions) - saveTable(df, Some(df.schema), isCaseSensitive, jdbcOptions) + createTable(conn, df.schema, options) + saveTable(df, Some(df.schema), isCaseSensitive, options) } } finally { conn.close()