diff --git a/doc/changes/changes_1.0.0.md b/doc/changes/changes_1.0.0.md index 0cdf720c..18a50c2a 100644 --- a/doc/changes/changes_1.0.0.md +++ b/doc/changes/changes_1.0.0.md @@ -21,9 +21,11 @@ ### Test Dependency Updates * Added `com.exasol:exasol-testcontainers:3.5.0` +* Added `com.exasol:test-db-builder-java:3.0.0` +* Added `com.exasol:hamcrest-resultset-matcher:1.4.0` * Removed `org.testcontainers:jdbc` * Removed `com.dimafeng:testcontainers-scala` -* Updated `org.scalatest:scalatest:3.2.2` to `3.2.3` +* Updated `org.scalatest:scalatest:3.2.2` to `3.2.4` * Updated `org.mockito:mockito-core:3.5.13` to `3.7.7` * Updated `com.holdenkarau:spark-testing-base:2.4.5_0.14.0` to `3.0.1_1.0.0` diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 44f0fc95..c683aa50 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -9,10 +9,12 @@ object Dependencies { private val DefaultSparkVersion = "3.0.1" private val ExasolJdbcVersion = "7.0.7" - private val ScalaTestVersion = "3.2.3" + private val ScalaTestVersion = "3.2.4" private val ScalaTestMockitoVersion = "1.0.0-M2" private val MockitoVersion = "3.7.7" private val ExasolTestContainersVersion = "3.5.0" + private val ExasolTestDBBuilderVersion = "3.0.0" + private val ExasolHamcrestMatcherVersion = "1.4.0" private val sparkCurrentVersion = sys.env.getOrElse("SPARK_VERSION", DefaultSparkVersion) @@ -37,6 +39,8 @@ object Dependencies { "org.mockito" % "mockito-core" % MockitoVersion, "com.holdenkarau" %% "spark-testing-base" % SparkTestingBaseVersion, "com.exasol" % "exasol-testcontainers" % ExasolTestContainersVersion, + "com.exasol" % "test-db-builder-java" % ExasolTestDBBuilderVersion, + "com.exasol" % "hamcrest-resultset-matcher" % ExasolHamcrestMatcherVersion, ).map(_ % Test) /** The list of all dependencies for the connector */ diff --git a/src/it/scala/com/exasol/spark/BaseIntegrationTest.scala b/src/it/scala/com/exasol/spark/BaseIntegrationTest.scala index ce7846d9..9377afd6 100644 --- a/src/it/scala/com/exasol/spark/BaseIntegrationTest.scala +++ b/src/it/scala/com/exasol/spark/BaseIntegrationTest.scala @@ -32,6 +32,7 @@ trait BaseIntegrationTest extends AnyFunSuite with BeforeAndAfterAll { jdbcHost = container.getDockerNetworkInternalIpAddress() jdbcPort = s"${container.getDefaultInternalDatabasePort()}" connectionManager = ExasolConnectionManager(ExasolConfiguration(getConfiguration())) + connectionManager.withExecute(Seq(s"CREATE SCHEMA $EXA_SCHEMA")) } def getConfiguration(): Map[String, String] = Map( diff --git a/src/it/scala/com/exasol/spark/SaveIT.scala b/src/it/scala/com/exasol/spark/SaveOptionsIT.scala similarity index 98% rename from src/it/scala/com/exasol/spark/SaveIT.scala rename to src/it/scala/com/exasol/spark/SaveOptionsIT.scala index b19efeec..1db9d290 100644 --- a/src/it/scala/com/exasol/spark/SaveIT.scala +++ b/src/it/scala/com/exasol/spark/SaveOptionsIT.scala @@ -10,7 +10,7 @@ import org.scalatest.BeforeAndAfterEach /** * Integration tests for saving Spark DataFrames into Exasol tables. */ -class SaveIT extends BaseIntegrationTest with BeforeAndAfterEach with DataFrameSuiteBase { +class SaveOptionsIT extends BaseIntegrationTest with BeforeAndAfterEach with DataFrameSuiteBase { private[this] val tableName = s"$EXA_SCHEMA.$EXA_TABLE" diff --git a/src/it/scala/com/exasol/spark/SparkDataImportIT.scala b/src/it/scala/com/exasol/spark/SparkDataImportIT.scala new file mode 100644 index 00000000..3545a7ec --- /dev/null +++ b/src/it/scala/com/exasol/spark/SparkDataImportIT.scala @@ -0,0 +1,186 @@ +package com.exasol.spark + +import java.math.BigDecimal +import java.nio.charset.StandardCharsets.UTF_8 +import java.sql.Date +import java.sql.ResultSet +import java.sql.Timestamp + +import org.apache.spark.sql.Encoder + +import com.exasol.matcher.ResultSetStructureMatcher.table +import com.exasol.matcher.TypeMatchMode._ + +import com.holdenkarau.spark.testing.DataFrameSuiteBase +import org.hamcrest.Matcher +import org.hamcrest.MatcherAssert.assertThat +import org.scalatest.BeforeAndAfterEach + +class SparkDataImportIT + extends BaseIntegrationTest + with BeforeAndAfterEach + with DataFrameSuiteBase { + + private[this] val INT_MIN = -2147483648 + private[this] val INT_MAX = 2147483647 + private[this] val LONG_MIN = -9223372036854775808L + private[this] val LONG_MAX = 9223372036854775807L + private[this] val tableName = s"$EXA_SCHEMA.$EXA_TABLE" + + import sqlContext.implicits._ + + test("saves boolean") { + SparkImportChecker(Seq(true, false)).assert( + table() + .row(java.lang.Boolean.TRUE) + .row(java.lang.Boolean.FALSE) + .matches() + ) + } + + test("saves integer") { + SparkImportChecker(Seq(1, 13, INT_MIN, INT_MAX)).assert( + table() + .row(java.lang.Integer.valueOf(1)) + .row(java.lang.Integer.valueOf(13)) + .row(java.lang.Integer.valueOf(INT_MIN)) + .row(java.lang.Integer.valueOf(INT_MAX)) + .matches(NO_JAVA_TYPE_CHECK) + ) + } + + test("saves long") { + SparkImportChecker(Seq(1L, LONG_MIN, LONG_MAX)).assert( + table() + .row(java.lang.Long.valueOf(1)) + .row(java.lang.Long.valueOf(LONG_MIN)) + .row(java.lang.Long.valueOf(LONG_MAX)) + .matches(NO_JAVA_TYPE_CHECK) + ) + } + + test("saves double") { + SparkImportChecker(Seq(3.14, 2.71)).assert( + table() + .row(java.lang.Double.valueOf(3.14)) + .row(java.lang.Double.valueOf(2.71)) + .matches() + ) + } + + test("saves float") { + SparkImportChecker(Seq(1.01F, 0.45f)).assert( + table() + .withDefaultNumberTolerance(new BigDecimal(1e-3)) + .row(java.lang.Float.valueOf("1.01")) + .row(java.lang.Float.valueOf("0.45")) + .matches(NO_JAVA_TYPE_CHECK) + ) + } + + test("saves short") { + SparkImportChecker(Seq(2.toShort, 3.toShort)).assert( + table() + .row(java.lang.Short.valueOf("2")) + .row(java.lang.Short.valueOf("3")) + .matches(NO_JAVA_TYPE_CHECK) + ) + } + + test("saves byte") { + SparkImportChecker(Seq(13.toByte, 127.toByte)).assert( + table() + .row(java.lang.Byte.valueOf("13")) + .row(java.lang.Byte.valueOf("127")) + .matches(NO_JAVA_TYPE_CHECK) + ) + } + + test("saves string") { + SparkImportChecker(Seq("str", "abc", null)).assert( + table() + .row("str") + .row("abc") + .row(null) + .matches() + ) + } + + test("saves bytes") { + SparkImportChecker(Seq("hello".getBytes(UTF_8), "world".getBytes(UTF_8), null)).assert( + table() + .row("hello") + .row("world") + .row(null) + .matches() + ) + } + + test("saves date") { + SparkImportChecker(Seq(Date.valueOf("1986-02-25"), Date.valueOf("2021-02-18"), null)).assert( + table() + .row(java.sql.Date.valueOf("1986-02-25")) + .row(java.sql.Date.valueOf("2021-02-18")) + .row(null) + .matches() + ) + } + + test("saves timestamp") { + val timestamp1 = Timestamp.from(java.time.Instant.EPOCH) + val timestamp2 = new Timestamp(System.currentTimeMillis()) + SparkImportChecker(Seq(timestamp1, timestamp2, null)).assert( + table() + .row(timestamp1) + .row(timestamp2) + .row(null) + .matches() + ) + + } + + test("saves decimal") { + SparkImportChecker(Seq(new BigDecimal("123.333"), new BigDecimal("1.666"), null)).assert( + table() + .row(java.lang.Double.valueOf(123.333)) + .row(java.lang.Double.valueOf(1.666)) + .row(null) + .matches(NO_JAVA_TYPE_CHECK) + ) + } + + test("saves unsupported type") { + val thrown = intercept[IllegalArgumentException] { + SparkImportChecker(Seq(Map("a" -> 1L))).assert( + table() + .row("""{"a":1}""") + .matches() + ) + } + assert(thrown.getMessage().contains("Unsupported Spark data type MapType")) + } + + case class SparkImportChecker[T: Encoder](input: Seq[T]) { + def assert(matcher: Matcher[ResultSet]): Unit = { + spark + .createDataset(input) + .toDF("col_field") + .write + .mode("overwrite") + .options( + Map( + "host" -> jdbcHost, + "port" -> jdbcPort, + "table" -> tableName, + "drop_table" -> "true" + ) + ) + .format("exasol") + .save() + + connectionManager + .withExecuteQuery(s"SELECT * FROM $tableName")(assertThat(_, matcher)) + } + } + +} diff --git a/src/main/scala/com/exasol/spark/util/Types.scala b/src/main/scala/com/exasol/spark/util/Types.scala index 720c1920..c02cf733 100644 --- a/src/main/scala/com/exasol/spark/util/Types.scala +++ b/src/main/scala/com/exasol/spark/util/Types.scala @@ -174,6 +174,7 @@ object Types extends Logging { case dt: DecimalType => convertSparkPrecisionScaleToExasol(dt) case BooleanType => "BOOLEAN" case StringType => "CLOB" + case BinaryType => "CLOB" case DateType => "DATE" case TimestampType => "TIMESTAMP" case _ => diff --git a/src/test/scala/com/exasol/spark/util/TypesSuite.scala b/src/test/scala/com/exasol/spark/util/TypesSuite.scala index 0a90d8a0..3d4a2176 100644 --- a/src/test/scala/com/exasol/spark/util/TypesSuite.scala +++ b/src/test/scala/com/exasol/spark/util/TypesSuite.scala @@ -53,6 +53,7 @@ class TypesSuite extends AnyFunSuite with Matchers { FloatType -> "FLOAT", BooleanType -> "BOOLEAN", StringType -> "CLOB", + BinaryType -> "CLOB", DateType -> "DATE", TimestampType -> "TIMESTAMP" )