From 61846debe512d3e43491b5063f13a02af0f68a46 Mon Sep 17 00:00:00 2001 From: Venkata Ramana Gollamudi Date: Fri, 15 May 2015 02:03:39 +0530 Subject: [PATCH 1/2] CreatableRelationProvider support for JDBCDataSource --- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 521f3dc821795..8863705ec283f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -1099,7 +1099,7 @@ class SQLContext(@transient val sparkContext: SparkContext) table: String, parts: Array[Partition], properties: Properties): DataFrame = { - val relation = JDBCRelation(url, table, parts, properties)(this) + val relation = JDBCRelation(url, table, parts, properties, None)(this) baseRelationToDataFrame(relation) } From 6696c0b42a7013331e8afabc6ab2ce2c8d5fd045 Mon Sep 17 00:00:00 2001 From: Venkata Ramana Gollamudi Date: Fri, 15 May 2015 02:05:06 +0530 Subject: [PATCH 2/2] CreatableRelationProvider support for JDBCDataSource Complete --- .../apache/spark/sql/jdbc/JDBCRelation.scala | 95 ++++++++++++++++++- .../sql/hive/MetastoreDataSourcesSuite.scala | 60 +++++++++++- 2 files changed, 149 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala index 93e82549f213b..f7e197870f800 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala @@ -17,7 +17,9 @@ package org.apache.spark.sql.jdbc +import java.sql.DatabaseMetaData import java.sql.DriverManager +import java.sql.ResultSet import java.util.Properties import scala.collection.mutable.ArrayBuffer @@ -25,8 +27,10 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.Partition import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.jdbc import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.SaveMode import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils @@ -88,11 +92,22 @@ private[sql] object JDBCRelation { } } -private[sql] class DefaultSource extends RelationProvider { +private[sql] class DefaultSource + extends RelationProvider + with SchemaRelationProvider + with CreatableRelationProvider { /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { + createRelation(sqlContext, parameters, null) + } + + /** Returns a new base relation with the given parameters and schema. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + userDefinedSchema: StructType): BaseRelation = { val url = parameters.getOrElse("url", sys.error("Option 'url' not specified")) val driver = parameters.getOrElse("driver", null) val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified")) @@ -120,7 +135,68 @@ private[sql] class DefaultSource extends RelationProvider { val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) - JDBCRelation(url, table, parts, properties)(sqlContext) + val userDefinedSchemaOpt = if(userDefinedSchema == null) None else Some(userDefinedSchema) + JDBCRelation(url, table, parts, properties, userDefinedSchemaOpt)(sqlContext) + } + + /** + * Creates a relation with the given parameters based on the contents of the given + * DataFrame. The mode specifies the expected behavior of createRelation when + * data already exists. + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { + val url = parameters.getOrElse("url", sys.error("Option 'url' not specified")) + val driver = parameters.getOrElse("driver", null) + val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified")) + + if (driver != null) DriverRegistry.register(driver) + + val properties = new Properties() // Additional properties that we will pass to getConnection + parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) + + val conn = DriverManager.getConnection(url, properties) + var doInsertion = false + try { + val dbm: DatabaseMetaData = conn.getMetaData(); + // check if table already exists + val tables: ResultSet = dbm.getTables(null, null, table, null) + var tableExists = tables.next() + + doInsertion = (mode, tableExists) match { + case (SaveMode.ErrorIfExists, true) => + sys.error(s"Table $table already exists.") + false + case (SaveMode.Overwrite, true) => { + val sql = s"DROP TABLE $table" + conn.prepareStatement(sql).executeUpdate() + tableExists = false + true } + case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) => + true + case (SaveMode.Ignore, exists) => + !exists + } + if(!tableExists) { + val schema = JDBCWriteDetails.schemaString(data, url) + val createSql = s"CREATE TABLE $table ($schema)" + conn.prepareStatement(createSql).executeUpdate() + } + } finally { + conn.close() + } + + val relation = createRelation(sqlContext, parameters) + if(doInsertion) relation match { + case ir: InsertableRelation => { + ir.insert(data, false) + } + case _ => sys.error(s"JDBC datasource does not support insert") + } + relation } } @@ -128,14 +204,16 @@ private[sql] case class JDBCRelation( url: String, table: String, parts: Array[Partition], - properties: Properties = new Properties())(@transient val sqlContext: SQLContext) + properties: Properties = new Properties(), + userDefinedSchema: Option[StructType])(@transient val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan with InsertableRelation { override val needConversion: Boolean = false - override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) + override val schema: StructType = + userDefinedSchema.getOrElse { JDBCRDD.resolveTable(url, table, properties) } override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { val driver: String = DriverRegistry.getDriverClassName(url) @@ -154,4 +232,13 @@ private[sql] case class JDBCRelation( override def insert(data: DataFrame, overwrite: Boolean): Unit = { data.insertIntoJDBC(url, table, overwrite, properties) } + + override def hashCode(): Int = + 37 * (37 * (37 + url.hashCode) + table.hashCode) + schema.hashCode + + override def equals(other: Any): Boolean = other match { + case that: JDBCRelation => + (this.url == that.url) && (this.table == that.table) && this.schema.sameType(that.schema) + case _ => false + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index da5d203d9d343..573d25ab4e36d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -18,11 +18,14 @@ package org.apache.spark.sql.hive import java.io.File +import java.sql.DriverManager +import java.util.Properties import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.InvalidInputException +import org.scalatest.BeforeAndAfter import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql._ @@ -37,8 +40,24 @@ import org.apache.spark.util.Utils /** * Tests for persisting tables created though the data sources API into the metastore. */ -class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { - +class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach with BeforeAndAfter { + + val url = "jdbc:h2:mem:testdbsource1" + var conn: java.sql.Connection = null + val properties = new Properties() + properties.setProperty("user", "testUser") + properties.setProperty("password", "testPass") + properties.setProperty("rowId", "false") + + before { + Class.forName("org.h2.Driver") + conn = DriverManager.getConnection(url, properties) + } + + after { + conn.close(); + } + override def afterEach(): Unit = { reset() Utils.deleteRecursively(tempPath) @@ -758,4 +777,41 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql("SELECT p.c1, c2 FROM insertParquet p"), (70 to 79).map(i => Row(i, s"str$i"))) } + + test("create table and insert into a JDBCDataSource table") { + def createDF(from: Int, to: Int): DataFrame = + createDataFrame((from to to).map(i => Tuple2(i, s"str$i"))).toDF("C1", "C2") + + val propertiesMap = Map( + "url"->url, + "dbtable"->"INSERTJDBC", + "user"->"testUser", + "password"->"testPass", + "driver"->"org.h2.Driver") + val src = "org.apache.spark.sql.jdbc" + + createDF(0, 9).saveAsTable("INSERTJDBC", src, SaveMode.ErrorIfExists, propertiesMap) + checkAnswer( + sql("SELECT p.c1, p.c2 FROM INSERTJDBC p WHERE p.c1 > 5"), + (6 to 9).map(i => Row(i, s"str$i"))) + + intercept[AnalysisException] { + createDF(10, 19).saveAsTable("INSERTJDBC", src, SaveMode.ErrorIfExists, propertiesMap) + } + + createDF(10, 19).saveAsTable("INSERTJDBC", src, SaveMode.Append, propertiesMap) + checkAnswer( + sql("SELECT p.c1, p.c2 FROM INSERTJDBC p WHERE p.c1 > 5"), + (6 to 19).map(i => Row(i, s"str$i"))) + + createDF(50, 59).saveAsTable("INSERTJDBC", src, SaveMode.Overwrite, propertiesMap) + checkAnswer( + sql("SELECT p.c1, c2 FROM INSERTJDBC p WHERE p.c1 > 51 AND p.c1 < 55"), + (52 to 54).map(i => Row(i, s"str$i"))) + + createDF(60, 69).saveAsTable("INSERTJDBC", src, SaveMode.Ignore, propertiesMap) + checkAnswer( + sql("SELECT p.c1, c2 FROM INSERTJDBC p"), + (50 to 59).map(i => Row(i, s"str$i"))) + } }