From 582ebc01a3f876a56a336ae5de7b8b3f86eaf67b Mon Sep 17 00:00:00 2001 From: morazow Date: Wed, 3 Mar 2021 11:34:26 +0100 Subject: [PATCH] Apply suggestions from review. --- .../exasol/spark/BaseIntegrationTest.scala | 4 +- .../com/exasol/spark/BaseTableQueryIT.scala | 2 +- .../exasol/spark/PredicatePushdownIT.scala | 22 ++++---- .../com/exasol/spark/QuotedColumnsIT.scala | 2 +- .../com/exasol/spark/ReservedKeywordsIT.scala | 2 +- .../com/exasol/spark/SaveOptionsIT.scala | 40 +++++++------- .../com/exasol/spark/SparkDataImportIT.scala | 2 +- src/it/scala/com/exasol/spark/TypesIT.scala | 2 +- .../scala/com/exasol/spark/util/Filters.scala | 54 ++++++++++--------- .../com/exasol/spark/util/FiltersSuite.scala | 42 ++++++++++----- 10 files changed, 96 insertions(+), 76 deletions(-) diff --git a/src/it/scala/com/exasol/spark/BaseIntegrationTest.scala b/src/it/scala/com/exasol/spark/BaseIntegrationTest.scala index 6d99d723..09d2bf26 100644 --- a/src/it/scala/com/exasol/spark/BaseIntegrationTest.scala +++ b/src/it/scala/com/exasol/spark/BaseIntegrationTest.scala @@ -24,13 +24,13 @@ trait BaseIntegrationTest extends AnyFunSuite with BeforeAndAfterAll { var jdbcHost: String = _ var jdbcPort: String = _ - var connectionManager: ExasolConnectionManager = _ + var exasolConnectionManager: ExasolConnectionManager = _ def prepareExasolDatabase(): Unit = { container.start() jdbcHost = container.getDockerNetworkInternalIpAddress() jdbcPort = s"${container.getDefaultInternalDatabasePort()}" - connectionManager = ExasolConnectionManager(ExasolConfiguration(getConfiguration())) + exasolConnectionManager = ExasolConnectionManager(ExasolConfiguration(getConfiguration())) } def getConfiguration(): Map[String, String] = Map( diff --git a/src/it/scala/com/exasol/spark/BaseTableQueryIT.scala b/src/it/scala/com/exasol/spark/BaseTableQueryIT.scala index 0d467f0b..c1996eda 100644 --- a/src/it/scala/com/exasol/spark/BaseTableQueryIT.scala +++ b/src/it/scala/com/exasol/spark/BaseTableQueryIT.scala @@ -5,7 +5,7 @@ class BaseTableQueryIT extends AbstractTableQueryIT { val schema = "TEST_SCHEMA" override val tableName: String = s"$schema.TEST_TABLE" override def createTable: Unit = - connectionManager.withExecute( + exasolConnectionManager.withExecute( // scalastyle:off nonascii Seq( s"DROP SCHEMA IF EXISTS $schema CASCADE", diff --git a/src/it/scala/com/exasol/spark/PredicatePushdownIT.scala b/src/it/scala/com/exasol/spark/PredicatePushdownIT.scala index 6a837a47..401441a2 100644 --- a/src/it/scala/com/exasol/spark/PredicatePushdownIT.scala +++ b/src/it/scala/com/exasol/spark/PredicatePushdownIT.scala @@ -12,47 +12,47 @@ class PredicatePushdownIT extends BaseTableQueryIT { import spark.implicits._ - test("returns dataframe with equal-to-filter") { + test("returns dataframe with equal-to filter") { val df = getDataFrame().filter($"id" === 1).collect() assert(df.map(r => (r.getLong(0), r.getString(1))) === Seq((1, "Germany"))) } - test("returns dataframe with not equal to filter") { + test("returns dataframe with not-equal-to filter") { val df = getDataFrame().filter($"name" =!= "Germany").collect() assert(df.map(r => r.getString(1)).contains("Germany") === false) } - test("returns dataframe with greater-than-filter") { + test("returns dataframe with greater-than filter") { val df = getDataFrame().filter($"id" > 2).collect() assert(df.map(r => r.getString(1)) === Seq("Portugal")) } - test("returns dataframe with greater than or equal filter") { + test("returns dataframe with greater-than or equal-to filter") { val df = getDataFrame().filter($"id" >= 2).collect() assert(df.map(r => r.getString(2)) === Seq("Paris", "Lisbon")) } - test("returns dataframe with less than filter") { + test("returns dataframe with less-than filter") { val df = getDataFrame().filter($"id" < 2).collect() assert(df.map(r => r.getString(2)) === Seq("Berlin")) } - test("returns dataframe with less than or equal filter") { + test("returns dataframe with less-than or equal-to filter") { val df = getDataFrame().filter($"id" <= 2).collect() assert(df.map(r => r.getString(2)) === Seq("Berlin", "Paris")) } - test("returns dataframe with string ends with filter") { + test("returns dataframe with string-ends-with filter") { val df = getDataFrame().filter($"city".endsWith("bon")).collect() assert(df.map(r => (r.getString(1), r.getString(2))) === Seq(("Portugal", "Lisbon"))) } - test("returns dataframe with string contains filter") { + test("returns dataframe with string-contains filter") { val df = getDataFrame().filter($"name".contains("rma")).collect() assert(df.map(r => r.getString(1)) === Seq("Germany")) } - test("returns dataframe with string starts with filter") { + test("returns dataframe with string-starts-with filter") { val df = getDataFrame().filter($"name".startsWith("Franc")).collect() assert(df.map(r => (r.getString(1), r.getString(2))) === Seq(("France", "Paris"))) } @@ -90,7 +90,7 @@ class PredicatePushdownIT extends BaseTableQueryIT { assert(sqlDF.map(r => (r.getLong(0), r.getString(1))) === Seq((1, "Berlin"), (2, "Paris"))) } - test("returns dataframe with date filter") { + test("returns dataframe with date literal filter") { val filterDate = Date.valueOf("2017-12-31") val df = getDataFrame() .filter(col("date_info") === filterDate) @@ -99,7 +99,7 @@ class PredicatePushdownIT extends BaseTableQueryIT { assert(df.queryExecution.executedPlan.toString().contains("EqualTo(DATE_INFO,")) } - test("returns dataframe with timestamp filter") { + test("returns dataframe with timestamp literal filter") { val minTimestamp = Timestamp.valueOf("2017-12-30 00:00:00.0000") val df = getDataFrame() .filter(col("updated_at") < minTimestamp) diff --git a/src/it/scala/com/exasol/spark/QuotedColumnsIT.scala b/src/it/scala/com/exasol/spark/QuotedColumnsIT.scala index 1eceda94..af7c1947 100644 --- a/src/it/scala/com/exasol/spark/QuotedColumnsIT.scala +++ b/src/it/scala/com/exasol/spark/QuotedColumnsIT.scala @@ -10,7 +10,7 @@ class QuotedColumnsIT extends AbstractTableQueryIT { override val tableName = s"$schema.TEST_TABLE" override def createTable(): Unit = - connectionManager.withExecute( + exasolConnectionManager.withExecute( Seq( s"DROP SCHEMA IF EXISTS $schema CASCADE", s"CREATE SCHEMA $schema", diff --git a/src/it/scala/com/exasol/spark/ReservedKeywordsIT.scala b/src/it/scala/com/exasol/spark/ReservedKeywordsIT.scala index bc868647..b6d30b28 100644 --- a/src/it/scala/com/exasol/spark/ReservedKeywordsIT.scala +++ b/src/it/scala/com/exasol/spark/ReservedKeywordsIT.scala @@ -10,7 +10,7 @@ class ReservedKeywordsIT extends AbstractTableQueryIT { override val tableName = s"$schema.TEST_TABLE" override def createTable(): Unit = - connectionManager.withExecute( + exasolConnectionManager.withExecute( Seq( s"DROP SCHEMA IF EXISTS $schema CASCADE", s"CREATE SCHEMA $schema", diff --git a/src/it/scala/com/exasol/spark/SaveOptionsIT.scala b/src/it/scala/com/exasol/spark/SaveOptionsIT.scala index 083a8d25..b3c1cb51 100644 --- a/src/it/scala/com/exasol/spark/SaveOptionsIT.scala +++ b/src/it/scala/com/exasol/spark/SaveOptionsIT.scala @@ -31,31 +31,31 @@ class SaveOptionsIT extends BaseTableQueryIT { // scalastyle:on nonascii test("`tableExists` should return correct boolean result") { - assert(connectionManager.tableExists(tableName) === true) - assert(connectionManager.tableExists("DUMMY_SCHEMA.DUMMYTABLE") === false) + assert(exasolConnectionManager.tableExists(tableName) === true) + assert(exasolConnectionManager.tableExists("DUMMY_SCHEMA.DUMMYTABLE") === false) } test("`truncateTable` should perform table truncation") { - assert(connectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName") > 0) - connectionManager.truncateTable(tableName) - assert(connectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName") === 0) + assert(exasolConnectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName") > 0) + exasolConnectionManager.truncateTable(tableName) + assert(exasolConnectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName") === 0) // Ensure it is idempotent - connectionManager.truncateTable(tableName) - assert(connectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName") === 0) + exasolConnectionManager.truncateTable(tableName) + assert(exasolConnectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName") === 0) } test("`dropTable` should drop table") { - assert(connectionManager.tableExists(tableName) === true) - connectionManager.dropTable(tableName) - assert(connectionManager.tableExists(tableName) === false) + assert(exasolConnectionManager.tableExists(tableName) === true) + exasolConnectionManager.dropTable(tableName) + assert(exasolConnectionManager.tableExists(tableName) === false) // Ensure it is idempotent - connectionManager.dropTable(tableName) - assert(connectionManager.tableExists(tableName) === false) + exasolConnectionManager.dropTable(tableName) + assert(exasolConnectionManager.tableExists(tableName) === false) } test("`createTable` should create a table") { val newTableName = s"$schema.new_table" - assert(connectionManager.tableExists(newTableName) === false) + assert(exasolConnectionManager.tableExists(newTableName) === false) import sqlContext.implicits._ val df = sc @@ -63,13 +63,13 @@ class SaveOptionsIT extends BaseTableQueryIT { .toDF("str_col", "int_col", "date_col") val newTableSchema = Types.createTableSchema(df.schema) - connectionManager.createTable(newTableName, newTableSchema) - assert(connectionManager.tableExists(newTableName) === true) + exasolConnectionManager.createTable(newTableName, newTableSchema) + assert(exasolConnectionManager.tableExists(newTableName) === true) } test("save mode 'ignore' does not insert data if table exists") { val initialRecordsCount = - connectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName") + exasolConnectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName") assert(runDataFrameSave("ignore", 1) === initialRecordsCount) } @@ -79,7 +79,7 @@ class SaveOptionsIT extends BaseTableQueryIT { test("save mode 'append' appends data if table exists") { val initialRecordsCount = - connectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName") + exasolConnectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName") val totalRecords = initialRecordsCount + dataframeTestData.size assert(runDataFrameSave("append", 3) === totalRecords) } @@ -92,7 +92,7 @@ class SaveOptionsIT extends BaseTableQueryIT { } test("save throws without 'create_table' or 'drop_table' option when table does not exist") { - connectionManager.dropTable(tableName) + exasolConnectionManager.dropTable(tableName) saveModes.foreach { case mode => val thrown = intercept[UnsupportedOperationException] { @@ -108,7 +108,7 @@ class SaveOptionsIT extends BaseTableQueryIT { val newOptions = defaultOptions ++ Map("create_table" -> "true") saveModes.foreach { case mode => - connectionManager.dropTable(tableName) + exasolConnectionManager.dropTable(tableName) assert(runDataFrameSave(mode, 2, newOptions) === dataframeTestData.size.toLong) } } @@ -138,7 +138,7 @@ class SaveOptionsIT extends BaseTableQueryIT { .format("exasol") .save() - connectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName") + exasolConnectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName") } } diff --git a/src/it/scala/com/exasol/spark/SparkDataImportIT.scala b/src/it/scala/com/exasol/spark/SparkDataImportIT.scala index 9f561feb..b7e361a7 100644 --- a/src/it/scala/com/exasol/spark/SparkDataImportIT.scala +++ b/src/it/scala/com/exasol/spark/SparkDataImportIT.scala @@ -172,7 +172,7 @@ class SparkDataImportIT extends BaseTableQueryIT { .format("exasol") .save() - connectionManager + exasolConnectionManager .withExecuteQuery(s"SELECT * FROM $tableName")(assertThat(_, matcher)) } } diff --git a/src/it/scala/com/exasol/spark/TypesIT.scala b/src/it/scala/com/exasol/spark/TypesIT.scala index 7dc4da5a..b99c6dbc 100644 --- a/src/it/scala/com/exasol/spark/TypesIT.scala +++ b/src/it/scala/com/exasol/spark/TypesIT.scala @@ -11,7 +11,7 @@ class TypesIT extends AbstractTableQueryIT { override val tableName = s"$schema.TEST_TABLE" override def createTable(): Unit = { val maxDecimal = " DECIMAL(" + getMaxPrecisionExasol() + "," + getMaxScaleExasol() + ")" - connectionManager.withExecute( + exasolConnectionManager.withExecute( Seq( s"DROP SCHEMA IF EXISTS $schema CASCADE", s"CREATE SCHEMA $schema", diff --git a/src/main/scala/com/exasol/spark/util/Filters.scala b/src/main/scala/com/exasol/spark/util/Filters.scala index 0c4877ca..3544ecc8 100644 --- a/src/main/scala/com/exasol/spark/util/Filters.scala +++ b/src/main/scala/com/exasol/spark/util/Filters.scala @@ -48,36 +48,44 @@ object Filters { BooleanTerm.lt(column(attribute), getLiteral(value)) case LessThanOrEqual(attribute, value) => BooleanTerm.le(column(attribute), getLiteral(value)) - // case IsNull(attribute) => stringLiteral(s"""("$attribute" IS NULL)""") - // case IsNotNull(attribute) => stringLiteral(s"""("$attribute" IS NOT NULL)""") case StringEndsWith(attribute, value) => BooleanTerm.like(column(attribute), stringLiteral(s"%$value")) case StringContains(attribute, value) => BooleanTerm.like(column(attribute), stringLiteral(s"%$value%")) case StringStartsWith(attribute, value) => BooleanTerm.like(column(attribute), stringLiteral(s"$value%")) - // case In(a, vs) => inExpr(a, vs, "IN", dataTypes) - // case Not(In(a, vs)) => inExpr(a, vs, "NOT IN", dataTypes) case Not(notFilter) => filterToBooleanExpression(notFilter).map(BooleanTerm.not(_)).getOrElse(null) - case And(leftFilter, rightFilter) => - val leftExpr = filterToBooleanExpression(leftFilter) - val rightExpr = filterToBooleanExpression(rightFilter) - if (leftExpr.isDefined && rightExpr.isDefined) { - BooleanTerm.and(leftExpr.getOrElse(null), rightExpr.getOrElse(null)) - } else { - null - } - case Or(leftFilter, rightFilter) => - val leftExpr = filterToBooleanExpression(leftFilter) - val rightExpr = filterToBooleanExpression(rightFilter) - if (leftExpr.isDefined && rightExpr.isDefined) { - BooleanTerm.or(leftExpr.getOrElse(null), rightExpr.getOrElse(null)) - } else { - null - } - case _ => null + case And(leftFilter, rightFilter) => andFilterToExpression(leftFilter, rightFilter) + case Or(leftFilter, rightFilter) => orFilterToExpression(leftFilter, rightFilter) + case _ => null }) + + private[this] def andFilterToExpression( + leftFilter: Filter, + rightFilter: Filter + ): BooleanExpression = { + val leftExpr = filterToBooleanExpression(leftFilter) + val rightExpr = filterToBooleanExpression(rightFilter) + if (leftExpr.isDefined && rightExpr.isDefined) { + BooleanTerm.and(leftExpr.getOrElse(null), rightExpr.getOrElse(null)) + } else { + null + } + } + + private[this] def orFilterToExpression( + leftFilter: Filter, + rightFilter: Filter + ): BooleanExpression = { + val leftExpr = filterToBooleanExpression(leftFilter) + val rightExpr = filterToBooleanExpression(rightFilter) + if (leftExpr.isDefined && rightExpr.isDefined) { + BooleanTerm.or(leftExpr.getOrElse(null), rightExpr.getOrElse(null)) + } else { + null + } + } // scalastyle:on null private[this] def getLiteral(value: Any): ValueExpression = @@ -90,9 +98,7 @@ object Filters { case longValue: Long => longLiteral(longValue) case floatValue: Float => floatLiteral(floatValue) case doubleValue: Double => doubleLiteral(doubleValue) - // case bigDecimalValue: BigDecimal => BigDecimalLiteral(bigDecimalValue.underlying()) - // case bigDecimalValue: java.math.BigDecimal => BigDecimalLiteral(bigDecimalValue) - case _ => stringLiteral(s"$value") + case _ => stringLiteral(s"$value") } } diff --git a/src/test/scala/com/exasol/spark/util/FiltersSuite.scala b/src/test/scala/com/exasol/spark/util/FiltersSuite.scala index 28a17b86..5b7a323c 100644 --- a/src/test/scala/com/exasol/spark/util/FiltersSuite.scala +++ b/src/test/scala/com/exasol/spark/util/FiltersSuite.scala @@ -24,12 +24,12 @@ class FiltersSuite extends AnyFunSuite with Matchers { assert(getWhereClause(Seq.empty[Filter]) === "") } - test("renders equal to") { + test("renders equal-to") { assert(getWhereClause(Seq(EqualTo("field", "a"))) === """("field" = 'a')""") } // scalastyle:off nonascii - test("renders equal to with different data types") { + test("renders equal-to with different data types") { val filters = Seq( EqualTo("bool_col", false), EqualTo("str_col", "XYZ"), @@ -57,43 +57,43 @@ class FiltersSuite extends AnyFunSuite with Matchers { } // scalastyle:on nonascii - test("renders not equal to") { + test("renders not-equal-to") { assert(getWhereClause(Seq(Not(EqualTo("field", 1.0)))) === """("field" <> 1.0)""") } - test("renders greater than") { + test("renders greater-than") { assert(getWhereClause(Seq(GreaterThan("field", 1))) === """("field" > 1)""") } - test("renders greater than or equal") { + test("renders greater-than-or-equal") { assert(getWhereClause(Seq(GreaterThanOrEqual("field", 3L))) === """("field" >= 3)""") } - test("renders less than") { + test("renders less-than") { assert(getWhereClause(Seq(LessThan("field", 2.1f))) === """("field" < 2.1)""") } - test("renders less than or equal") { + test("renders less-than-or-equal") { assert(getWhereClause(Seq(LessThanOrEqual("field", "e"))) === """("field" <= 'e')""") } - ignore("renders is null") { + ignore("renders is-null") { assert(getWhereClause(Seq(IsNull("field"))) === """("field" IS NULL)""") } - ignore("renders is not null") { + ignore("renders is-not-null") { assert(getWhereClause(Seq(IsNotNull("field"))) === """("field" IS NOT NULL)""") } - test("renders string ends with") { + test("renders string-ends-with") { assert(getWhereClause(Seq(StringEndsWith("field", "xyz"))) === """("field" LIKE '%xyz')""") } - test("renders string contains") { + test("renders string-contains") { assert(getWhereClause(Seq(StringContains("field", "in"))) === """("field" LIKE '%in%')""") } - test("renders string starts with") { + test("renders string-starts-with") { assert(getWhereClause(Seq(StringStartsWith("field", "abc"))) === """("field" LIKE 'abc%')""") } @@ -117,8 +117,8 @@ class FiltersSuite extends AnyFunSuite with Matchers { test("renders nested list of filters") { val filters = Seq( - Or(EqualTo("str_col", "abc"), EqualTo("int_col", 123)), - Or(Not(LessThan("int_col", 1)), GreaterThan("str_col", "a")), + Or(EqualTo("str_col", "abc"), EqualTo("int_col", 123.toShort)), + Or(Not(LessThan("int_col", 1.toByte)), GreaterThan("str_col", "a")), Or(EqualTo("str_col", "xyz"), And(EqualTo("float_col", 3.14), Not(EqualTo("int_col", 3)))) ) val expected = @@ -130,4 +130,18 @@ class FiltersSuite extends AnyFunSuite with Matchers { assert(getWhereClause(filters) === expected) } + test("returns empty when one of and expressions is null") { + val expr = Filters.filterToBooleanExpression(And(EqualTo("a", 1), EqualNullSafe("b", "abc"))) + assert(expr === None) + } + + test("returns empty when one of or expressions is null") { + val expr = Filters.filterToBooleanExpression(Or(EqualNullSafe("b", "x"), LessThan("c", 1))) + assert(expr === None) + } + + test("returns empty when filter is null") { + assert(Filters.filterToBooleanExpression(EqualNullSafe("b", "x")) === None) + } + }