Skip to content

Commit

Permalink
Implementation for Oracle
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Jun 15, 2023
1 parent eb7a601 commit 7a0e4d2
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 4 deletions.
Expand Up @@ -60,7 +60,8 @@ import org.apache.spark.tags.DockerTest
* This procedure has been validated with Oracle 18.4.0 and 21.3.0 Express Edition.
*/
@DockerTest
class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSparkSession {
class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSparkSession
with UpsertTests {
import testImplicits._

override val db = new DatabaseOnDocker {
Expand Down Expand Up @@ -157,6 +158,13 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSpark
conn.commit()
}

// Oracle syntax for timestamps is special, need to patch UpsertTests test data
override def getUpsertTestTableInserts(tableName: String): Seq[String] = {
super.getUpsertTestTableInserts(tableName).map(sql =>
sql.replace(", '1996-", ", TIMESTAMP '1996-")
)
}

test("SPARK-16625 : Importing Oracle numeric types") {
val df = sqlContext.read.jdbc(jdbcUrl, "numerics", new Properties)
val rows = df.collect()
Expand Down
Expand Up @@ -17,19 +17,19 @@

package org.apache.spark.sql.jdbc

import java.sql.{Date, Timestamp, Types}
import java.sql.{Date, Statement, Timestamp, Types}
import java.util.{Locale, TimeZone}

import scala.util.control.NonFatal

import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.expressions.Expression
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcOptionsInWrite}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._


private case object OracleDialect extends JdbcDialect {
private case object OracleDialect extends JdbcDialect with MergeByTempTable {
private[jdbc] val BINARY_FLOAT = 100
private[jdbc] val BINARY_DOUBLE = 101
private[jdbc] val TIMESTAMPTZ = -101
Expand Down Expand Up @@ -224,4 +224,25 @@ private case object OracleDialect extends JdbcDialect {
override def supportsLimit: Boolean = true

override def supportsOffset: Boolean = true

override def createTempTable(
statement: Statement,
tableName: String,
strSchema: String,
options: JdbcOptionsInWrite): Unit = {
statement.executeUpdate(s"CREATE GLOBAL TEMPORARY TABLE $tableName ($strSchema) " +
s"ON COMMIT DELETE ROWS")
}

override def getMergeQuery(
sourceTableName: String,
destinationTableName: String,
columns: Array[String],
keyColumns: Array[String]): String = {
// Oracle dialect does not like a few bits of the standard SQL MERGE command
super.getMergeQuery(sourceTableName, destinationTableName, columns, keyColumns)
.replace(" AS dst\n", " dst\n")
.replace(" AS src\n", " src\n")
.replace(");\n", ")\n")
}
}

0 comments on commit 7a0e4d2

Please sign in to comment.