Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#84: Added Exasol SQL statement builder #88

Merged
merged 4 commits into from
Mar 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 10 additions & 3 deletions doc/changes/changes_1.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@

## Features / Improvements

## Bug Fixes

* #14: Fixed issue with using Exasol reserved keywords in Spark queries (PR #88).
* #39: Fixed issue related to quoted columns in Spark queries (PR #88).

## Refactoring

* #40: Added Exasol testcontainers, refactored test environment (PR #87).
* #84: Added Exasol SQL Statement Builder for building SQL queries (PR #88).

## Documentation

Expand All @@ -14,19 +20,20 @@

### Runtime Dependency Updates

* Added `com.exasol:sql-statement-builder:4.3.0`
* Updated `com.exasol:exasol-jdbc:7.0.0` to `7.0.7`
* Updated `org.apache.spark:spark-core:2.4.5` to `3.0.1`
* Updated `org.apache.spark:spark-sql:2.4.5` to `3.0.1`

### Test Dependency Updates

* Added `com.exasol:exasol-testcontainers:3.5.0`
* Added `com.exasol:exasol-testcontainers:3.5.1`
* Added `com.exasol:test-db-builder-java:3.0.0`
* Added `com.exasol:hamcrest-resultset-matcher:1.4.0`
* Removed `org.testcontainers:jdbc`
* Removed `com.dimafeng:testcontainers-scala`
* Updated `org.scalatest:scalatest:3.2.2` to `3.2.4`
* Updated `org.mockito:mockito-core:3.5.13` to `3.7.7`
* Updated `org.scalatest:scalatest:3.2.2` to `3.2.5`
* Updated `org.mockito:mockito-core:3.5.13` to `3.8.0`
* Updated `com.holdenkarau:spark-testing-base:2.4.5_0.14.0` to `3.0.1_1.0.0`

### Plugin Updates
Expand Down
10 changes: 6 additions & 4 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ object Dependencies {
// Versions
private val DefaultSparkVersion = "3.0.1"
private val ExasolJdbcVersion = "7.0.7"
private val ExasolSQLStmtBuilderVersion = "4.3.0"

private val ScalaTestVersion = "3.2.4"
private val ScalaTestVersion = "3.2.5"
private val ScalaTestMockitoVersion = "1.0.0-M2"
private val MockitoVersion = "3.7.7"
private val ExasolTestContainersVersion = "3.5.0"
private val ExasolTestDBBuilderVersion = "3.0.0"
private val MockitoVersion = "3.8.0"
private val ExasolTestContainersVersion = "3.5.1"
private val ExasolTestDBBuilderVersion = "3.1.0"
private val ExasolHamcrestMatcherVersion = "1.4.0"

private val sparkCurrentVersion =
Expand All @@ -28,6 +29,7 @@ object Dependencies {
/** Core dependencies needed for connector */
private val CoreDependencies: Seq[ModuleID] = Seq(
"com.exasol" % "exasol-jdbc" % ExasolJdbcVersion,
"com.exasol" % "sql-statement-builder" % ExasolSQLStmtBuilderVersion,
"org.apache.spark" %% "spark-core" % sparkCurrentVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkCurrentVersion % "provided"
)
Expand Down
29 changes: 29 additions & 0 deletions src/it/scala/com/exasol/spark/AbstractTableQueryIT.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.exasol.spark

import org.apache.spark.sql.DataFrame

import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.scalatest.BeforeAndAfterEach

abstract class AbstractTableQueryIT
extends BaseIntegrationTest
with DataFrameSuiteBase
with BeforeAndAfterEach {

val tableName: String
def createTable(): Unit

override def beforeEach(): Unit = {
createTable()
()
}

private[spark] def getDataFrame(query: Option[String] = None): DataFrame =
spark.read
.format("exasol")
.option("host", jdbcHost)
.option("port", jdbcPort)
.option("query", query.fold(s"SELECT * FROM $tableName")(identity))
.load()

}
70 changes: 2 additions & 68 deletions src/it/scala/com/exasol/spark/BaseIntegrationTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package com.exasol.spark
import com.exasol.containers.ExasolContainer
import com.exasol.spark.util.ExasolConfiguration
import com.exasol.spark.util.ExasolConnectionManager
import com.exasol.spark.util.Types._

import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
Expand All @@ -25,14 +24,13 @@ trait BaseIntegrationTest extends AnyFunSuite with BeforeAndAfterAll {

var jdbcHost: String = _
var jdbcPort: String = _
var connectionManager: ExasolConnectionManager = _
var exasolConnectionManager: ExasolConnectionManager = _

def prepareExasolDatabase(): Unit = {
container.start()
jdbcHost = container.getDockerNetworkInternalIpAddress()
jdbcPort = s"${container.getDefaultInternalDatabasePort()}"
connectionManager = ExasolConnectionManager(ExasolConfiguration(getConfiguration()))
connectionManager.withExecute(Seq(s"CREATE SCHEMA $EXA_SCHEMA"))
exasolConnectionManager = ExasolConnectionManager(ExasolConfiguration(getConfiguration()))
}

def getConfiguration(): Map[String, String] = Map(
Expand All @@ -43,9 +41,6 @@ trait BaseIntegrationTest extends AnyFunSuite with BeforeAndAfterAll {
"max_nodes" -> "200"
)

def getConnection(): java.sql.Connection =
container.createConnection("")

override def beforeAll(): Unit =
prepareExasolDatabase()

Expand All @@ -54,67 +49,6 @@ trait BaseIntegrationTest extends AnyFunSuite with BeforeAndAfterAll {
network.close()
}

val EXA_SCHEMA = "TEST_SCHEMA"
val EXA_TABLE = "TEST_TABLE"
val EXA_ALL_TYPES_TABLE = "TEST_ALL_TYPES_TABLE"
val EXA_TYPES_NOT_COVERED_TABLE = "TEST_TYPES_NOT_COVERED_TABLE"

// scalastyle:off nonascii
def createDummyTable(): Unit = {
val queries = Seq(
s"DROP SCHEMA IF EXISTS $EXA_SCHEMA CASCADE",
s"CREATE SCHEMA $EXA_SCHEMA",
s"""|CREATE OR REPLACE TABLE $EXA_SCHEMA.$EXA_TABLE (
| ID INTEGER IDENTITY NOT NULL,
| NAME VARCHAR(100) UTF8,
| CITY VARCHAR(2000) UTF8,
| DATE_INFO DATE,
| UNICODE_COL VARCHAR(100) UTF8,
| UPDATED_AT TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
|)""".stripMargin,
s"""|INSERT INTO $EXA_SCHEMA.$EXA_TABLE (name, city, date_info, unicode_col)
| VALUES ('Germany', 'Berlin', '2017-12-31', 'öäüß')
|""".stripMargin,
s"""|INSERT INTO $EXA_SCHEMA.$EXA_TABLE (name, city, date_info, unicode_col)
| VALUES ('France', 'Paris', '2018-01-01','\u00d6')
|""".stripMargin,
s"""|INSERT INTO $EXA_SCHEMA.$EXA_TABLE (name, city, date_info, unicode_col)
| VALUES ('Portugal', 'Lisbon', '2018-10-01','\u00d9')
|""".stripMargin,
"commit"
)
connectionManager.withExecute(queries)
}
// scalastyle:on nonascii

def createAllTypesTable(): Unit = {
val maxDecimal = " DECIMAL(" + getMaxPrecisionExasol() + "," + getMaxScaleExasol() + ")"
val queries = Seq(
s"DROP SCHEMA IF EXISTS $EXA_SCHEMA CASCADE",
s"CREATE SCHEMA $EXA_SCHEMA",
s"""|CREATE OR REPLACE TABLE $EXA_SCHEMA.$EXA_ALL_TYPES_TABLE (
| MYID INTEGER,
| MYTINYINT DECIMAL(3,0),
| MYSMALLINT DECIMAL(9,0),
| MYBIGINT DECIMAL(36,0),
| MYDECIMALSystemDefault DECIMAL,
| MYDECIMALMAX $maxDecimal,
| MYNUMERIC DECIMAL( 5,2 ),
| MYDOUBLE DOUBLE PRECISION,
| MYCHAR CHAR,
| MYNCHAR CHAR(2000),
| MYLONGVARCHAR VARCHAR( 2000000),
| MYBOOLEAN BOOLEAN,
| MYDATE DATE,
| MYTIMESTAMP TIMESTAMP,
| MYGEOMETRY Geometry,
| MYINTERVAL INTERVAL YEAR TO MONTH
|)""".stripMargin,
"commit"
)
connectionManager.withExecute(queries)
}

private[this] def getExasolDockerImageVersion(): String =
System.getProperty("EXASOL_DOCKER_VERSION", DEFAULT_EXASOL_DOCKER_IMAGE)

Expand Down
35 changes: 35 additions & 0 deletions src/it/scala/com/exasol/spark/BaseTableQueryIT.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.exasol.spark

class BaseTableQueryIT extends AbstractTableQueryIT {

val schema = "TEST_SCHEMA"
override val tableName: String = s"$schema.TEST_TABLE"
override def createTable: Unit =
exasolConnectionManager.withExecute(
// scalastyle:off nonascii
Seq(
jakobbraun marked this conversation as resolved.
Show resolved Hide resolved
s"DROP SCHEMA IF EXISTS $schema CASCADE",
s"CREATE SCHEMA $schema",
s"""|CREATE OR REPLACE TABLE $tableName (
| ID INTEGER IDENTITY NOT NULL,
| NAME VARCHAR(100) UTF8,
| CITY VARCHAR(2000) UTF8,
| DATE_INFO DATE,
| UNICODE_COL VARCHAR(100) UTF8,
| UPDATED_AT TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
|)""".stripMargin,
s"""|INSERT INTO $tableName (name, city, date_info, unicode_col)
| VALUES ('Germany', 'Berlin', '2017-12-31', 'öäüß')
|""".stripMargin,
s"""|INSERT INTO $tableName (name, city, date_info, unicode_col)
| VALUES ('France', 'Paris', '2018-01-01','\u00d6')
|""".stripMargin,
s"""|INSERT INTO $tableName (name, city, date_info, unicode_col)
| VALUES ('Portugal', 'Lisbon', '2018-10-01','\u00d9')
|""".stripMargin,
"commit"
)
// scalastyle:on nonascii
)

}
22 changes: 5 additions & 17 deletions src/it/scala/com/exasol/spark/ColumnPruningIT.scala
Original file line number Diff line number Diff line change
@@ -1,27 +1,15 @@
package com.exasol.spark

import com.holdenkarau.spark.testing.DataFrameSuiteBase

/**
* Test only required columns selection from queries.
* Tests column pruning for user queries.
*/
class ColumnPruningIT extends BaseIntegrationTest with DataFrameSuiteBase {

test("returns only required columns in query") {
createDummyTable()

val df = spark.read
.format("com.exasol.spark")
.option("host", jdbcHost)
.option("port", jdbcPort)
.option("query", s"SELECT * FROM $EXA_SCHEMA.$EXA_TABLE")
.load()
.select("city")
class ColumnPruningIT extends BaseTableQueryIT {

test("returns datarame with selected columns") {
val df = getDataFrame().select("city")
assert(df.columns.size === 1)
assert(df.columns.head === "city")
val result = df.collect().map(x => x.getString(0)).toSet
assert(result === Set("Berlin", "Paris", "Lisbon"))
assert(df.collect().map(x => x.getString(0)) === Seq("Berlin", "Paris", "Lisbon"))
}

}