Skip to content

Commit

Permalink
[SPARK-45139][SQL] Add DatabricksDialect to handle SQL type conversion
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR adds `DatabricksDialect` to Spark to allow users to query Databricks clusters and Databricks SQL warehouses with more precise SQL type conversion and quote identifiers instead of doing it manually in the code.

### Why are the changes needed?

The PR fixes type conversion and makes it easier to query Databricks clusters.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added unit tests in JDBCSuite to check conversion.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#42896 from sadikovi/add_databricks_dialect.

Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
sadikovi authored and dongjoon-hyun committed Sep 13, 2023
1 parent 4f74fc5 commit 2710dbe
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.jdbc

import java.sql.Connection

import scala.collection.mutable.ArrayBuilder

import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
import org.apache.spark.sql.types._

private case object DatabricksDialect extends JdbcDialect {

override def canHandle(url: String): Boolean = {
url.startsWith("jdbc:databricks")
}

override def getCatalystType(
sqlType: Int,
typeName: String,
size: Int,
md: MetadataBuilder): Option[DataType] = {
sqlType match {
case java.sql.Types.TINYINT => Some(ByteType)
case java.sql.Types.SMALLINT => Some(ShortType)
case java.sql.Types.REAL => Some(FloatType)
case _ => None
}
}

override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
case DoubleType => Some(JdbcType("DOUBLE", java.sql.Types.DOUBLE))
case StringType => Some(JdbcType("STRING", java.sql.Types.VARCHAR))
case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY))
case _ => None
}

override def quoteIdentifier(colName: String): String = {
s"`$colName`"
}

override def supportsLimit: Boolean = true

override def supportsOffset: Boolean = true

override def supportsTableSample: Boolean = true

override def getTableSample(sample: TableSampleInfo): String = {
s"TABLESAMPLE (${(sample.upperBound - sample.lowerBound) * 100}) REPEATABLE (${sample.seed})"
}

// Override listSchemas to run "show schemas" as a PreparedStatement instead of
// invoking getMetaData.getSchemas as it may not work correctly in older versions of the driver.
override def schemasExists(conn: Connection, options: JDBCOptions, schema: String): Boolean = {
val stmt = conn.prepareStatement("SHOW SCHEMAS")
val rs = stmt.executeQuery()
while (rs.next()) {
if (rs.getString(1) == schema) {
return true
}
}
false
}

// Override listSchemas to run "show schemas" as a PreparedStatement instead of
// invoking getMetaData.getSchemas as it may not work correctly in older versions of the driver.
override def listSchemas(conn: Connection, options: JDBCOptions): Array[Array[String]] = {
val schemaBuilder = ArrayBuilder.make[Array[String]]
val stmt = conn.prepareStatement("SHOW SCHEMAS")
val rs = stmt.executeQuery()
while (rs.next()) {
schemaBuilder += Array(rs.getString(1))
}
schemaBuilder.result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ object JdbcDialects {
registerDialect(TeradataDialect)
registerDialect(H2Dialect)
registerDialect(SnowflakeDialect)
registerDialect(DatabricksDialect)

/**
* Fetch the JdbcDialect class corresponding to a given database url.
Expand Down
26 changes: 26 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2062,4 +2062,30 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
val snowflakeDialect = JdbcDialects.get("jdbc:snowflake://account.snowflakecomputing.com")
assert(snowflakeDialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == "BOOLEAN")
}

test("SPARK-45139: DatabricksDialect url handling") {
assert(JdbcDialects.get("jdbc:databricks://account.cloud.databricks.com") == DatabricksDialect)
}

test("SPARK-45139: DatabricksDialect catalyst type mapping") {
val databricksDialect = JdbcDialects.get("jdbc:databricks://account.cloud.databricks.com")
assert(databricksDialect
.getCatalystType(java.sql.Types.TINYINT, "", 1, null) == Some(ByteType))
assert(databricksDialect
.getCatalystType(java.sql.Types.SMALLINT, "", 1, null) == Some(ShortType))
assert(databricksDialect
.getCatalystType(java.sql.Types.REAL, "", 1, null) == Some(FloatType))
}

test("SPARK-45139: DatabricksDialect JDBC type mapping") {
val databricksDialect = JdbcDialects.get("jdbc:databricks://account.cloud.databricks.com")
assert(databricksDialect
.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == "BOOLEAN")
assert(databricksDialect
.getJDBCType(DoubleType).map(_.databaseTypeDefinition).get == "DOUBLE")
assert(databricksDialect
.getJDBCType(StringType).map(_.databaseTypeDefinition).get == "STRING")
assert(databricksDialect
.getJDBCType(BinaryType).map(_.databaseTypeDefinition).get == "BINARY")
}
}

0 comments on commit 2710dbe

Please sign in to comment.