Skip to content

Commit

Permalink
Apply suggestions from review.
Browse files Browse the repository at this point in the history
  • Loading branch information
morazow committed Mar 3, 2021
1 parent c8bca37 commit 582ebc0
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 76 deletions.
4 changes: 2 additions & 2 deletions src/it/scala/com/exasol/spark/BaseIntegrationTest.scala
Expand Up @@ -24,13 +24,13 @@ trait BaseIntegrationTest extends AnyFunSuite with BeforeAndAfterAll {

var jdbcHost: String = _
var jdbcPort: String = _
var connectionManager: ExasolConnectionManager = _
var exasolConnectionManager: ExasolConnectionManager = _

def prepareExasolDatabase(): Unit = {
container.start()
jdbcHost = container.getDockerNetworkInternalIpAddress()
jdbcPort = s"${container.getDefaultInternalDatabasePort()}"
connectionManager = ExasolConnectionManager(ExasolConfiguration(getConfiguration()))
exasolConnectionManager = ExasolConnectionManager(ExasolConfiguration(getConfiguration()))
}

def getConfiguration(): Map[String, String] = Map(
Expand Down
2 changes: 1 addition & 1 deletion src/it/scala/com/exasol/spark/BaseTableQueryIT.scala
Expand Up @@ -5,7 +5,7 @@ class BaseTableQueryIT extends AbstractTableQueryIT {
val schema = "TEST_SCHEMA"
override val tableName: String = s"$schema.TEST_TABLE"
override def createTable: Unit =
connectionManager.withExecute(
exasolConnectionManager.withExecute(
// scalastyle:off nonascii
Seq(
s"DROP SCHEMA IF EXISTS $schema CASCADE",
Expand Down
22 changes: 11 additions & 11 deletions src/it/scala/com/exasol/spark/PredicatePushdownIT.scala
Expand Up @@ -12,47 +12,47 @@ class PredicatePushdownIT extends BaseTableQueryIT {

import spark.implicits._

test("returns dataframe with equal-to-filter") {
test("returns dataframe with equal-to filter") {
val df = getDataFrame().filter($"id" === 1).collect()
assert(df.map(r => (r.getLong(0), r.getString(1))) === Seq((1, "Germany")))
}

test("returns dataframe with not equal to filter") {
test("returns dataframe with not-equal-to filter") {
val df = getDataFrame().filter($"name" =!= "Germany").collect()
assert(df.map(r => r.getString(1)).contains("Germany") === false)
}

test("returns dataframe with greater-than-filter") {
test("returns dataframe with greater-than filter") {
val df = getDataFrame().filter($"id" > 2).collect()
assert(df.map(r => r.getString(1)) === Seq("Portugal"))
}

test("returns dataframe with greater than or equal filter") {
test("returns dataframe with greater-than or equal-to filter") {
val df = getDataFrame().filter($"id" >= 2).collect()
assert(df.map(r => r.getString(2)) === Seq("Paris", "Lisbon"))
}

test("returns dataframe with less than filter") {
test("returns dataframe with less-than filter") {
val df = getDataFrame().filter($"id" < 2).collect()
assert(df.map(r => r.getString(2)) === Seq("Berlin"))
}

test("returns dataframe with less than or equal filter") {
test("returns dataframe with less-than or equal-to filter") {
val df = getDataFrame().filter($"id" <= 2).collect()
assert(df.map(r => r.getString(2)) === Seq("Berlin", "Paris"))
}

test("returns dataframe with string ends with filter") {
test("returns dataframe with string-ends-with filter") {
val df = getDataFrame().filter($"city".endsWith("bon")).collect()
assert(df.map(r => (r.getString(1), r.getString(2))) === Seq(("Portugal", "Lisbon")))
}

test("returns dataframe with string contains filter") {
test("returns dataframe with string-contains filter") {
val df = getDataFrame().filter($"name".contains("rma")).collect()
assert(df.map(r => r.getString(1)) === Seq("Germany"))
}

test("returns dataframe with string starts with filter") {
test("returns dataframe with string-starts-with filter") {
val df = getDataFrame().filter($"name".startsWith("Franc")).collect()
assert(df.map(r => (r.getString(1), r.getString(2))) === Seq(("France", "Paris")))
}
Expand Down Expand Up @@ -90,7 +90,7 @@ class PredicatePushdownIT extends BaseTableQueryIT {
assert(sqlDF.map(r => (r.getLong(0), r.getString(1))) === Seq((1, "Berlin"), (2, "Paris")))
}

test("returns dataframe with date filter") {
test("returns dataframe with date literal filter") {
val filterDate = Date.valueOf("2017-12-31")
val df = getDataFrame()
.filter(col("date_info") === filterDate)
Expand All @@ -99,7 +99,7 @@ class PredicatePushdownIT extends BaseTableQueryIT {
assert(df.queryExecution.executedPlan.toString().contains("EqualTo(DATE_INFO,"))
}

test("returns dataframe with timestamp filter") {
test("returns dataframe with timestamp literal filter") {
val minTimestamp = Timestamp.valueOf("2017-12-30 00:00:00.0000")
val df = getDataFrame()
.filter(col("updated_at") < minTimestamp)
Expand Down
2 changes: 1 addition & 1 deletion src/it/scala/com/exasol/spark/QuotedColumnsIT.scala
Expand Up @@ -10,7 +10,7 @@ class QuotedColumnsIT extends AbstractTableQueryIT {

override val tableName = s"$schema.TEST_TABLE"
override def createTable(): Unit =
connectionManager.withExecute(
exasolConnectionManager.withExecute(
Seq(
s"DROP SCHEMA IF EXISTS $schema CASCADE",
s"CREATE SCHEMA $schema",
Expand Down
2 changes: 1 addition & 1 deletion src/it/scala/com/exasol/spark/ReservedKeywordsIT.scala
Expand Up @@ -10,7 +10,7 @@ class ReservedKeywordsIT extends AbstractTableQueryIT {

override val tableName = s"$schema.TEST_TABLE"
override def createTable(): Unit =
connectionManager.withExecute(
exasolConnectionManager.withExecute(
Seq(
s"DROP SCHEMA IF EXISTS $schema CASCADE",
s"CREATE SCHEMA $schema",
Expand Down
40 changes: 20 additions & 20 deletions src/it/scala/com/exasol/spark/SaveOptionsIT.scala
Expand Up @@ -31,45 +31,45 @@ class SaveOptionsIT extends BaseTableQueryIT {
// scalastyle:on nonascii

test("`tableExists` should return correct boolean result") {
assert(connectionManager.tableExists(tableName) === true)
assert(connectionManager.tableExists("DUMMY_SCHEMA.DUMMYTABLE") === false)
assert(exasolConnectionManager.tableExists(tableName) === true)
assert(exasolConnectionManager.tableExists("DUMMY_SCHEMA.DUMMYTABLE") === false)
}

test("`truncateTable` should perform table truncation") {
assert(connectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName") > 0)
connectionManager.truncateTable(tableName)
assert(connectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName") === 0)
assert(exasolConnectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName") > 0)
exasolConnectionManager.truncateTable(tableName)
assert(exasolConnectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName") === 0)
// Ensure it is idempotent
connectionManager.truncateTable(tableName)
assert(connectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName") === 0)
exasolConnectionManager.truncateTable(tableName)
assert(exasolConnectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName") === 0)
}

test("`dropTable` should drop table") {
assert(connectionManager.tableExists(tableName) === true)
connectionManager.dropTable(tableName)
assert(connectionManager.tableExists(tableName) === false)
assert(exasolConnectionManager.tableExists(tableName) === true)
exasolConnectionManager.dropTable(tableName)
assert(exasolConnectionManager.tableExists(tableName) === false)
// Ensure it is idempotent
connectionManager.dropTable(tableName)
assert(connectionManager.tableExists(tableName) === false)
exasolConnectionManager.dropTable(tableName)
assert(exasolConnectionManager.tableExists(tableName) === false)
}

test("`createTable` should create a table") {
val newTableName = s"$schema.new_table"
assert(connectionManager.tableExists(newTableName) === false)
assert(exasolConnectionManager.tableExists(newTableName) === false)

import sqlContext.implicits._
val df = sc
.parallelize(Seq(("a", 103, Date.valueOf("2019-01-14"))))
.toDF("str_col", "int_col", "date_col")

val newTableSchema = Types.createTableSchema(df.schema)
connectionManager.createTable(newTableName, newTableSchema)
assert(connectionManager.tableExists(newTableName) === true)
exasolConnectionManager.createTable(newTableName, newTableSchema)
assert(exasolConnectionManager.tableExists(newTableName) === true)
}

test("save mode 'ignore' does not insert data if table exists") {
val initialRecordsCount =
connectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName")
exasolConnectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName")
assert(runDataFrameSave("ignore", 1) === initialRecordsCount)
}

Expand All @@ -79,7 +79,7 @@ class SaveOptionsIT extends BaseTableQueryIT {

test("save mode 'append' appends data if table exists") {
val initialRecordsCount =
connectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName")
exasolConnectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName")
val totalRecords = initialRecordsCount + dataframeTestData.size
assert(runDataFrameSave("append", 3) === totalRecords)
}
Expand All @@ -92,7 +92,7 @@ class SaveOptionsIT extends BaseTableQueryIT {
}

test("save throws without 'create_table' or 'drop_table' option when table does not exist") {
connectionManager.dropTable(tableName)
exasolConnectionManager.dropTable(tableName)
saveModes.foreach {
case mode =>
val thrown = intercept[UnsupportedOperationException] {
Expand All @@ -108,7 +108,7 @@ class SaveOptionsIT extends BaseTableQueryIT {
val newOptions = defaultOptions ++ Map("create_table" -> "true")
saveModes.foreach {
case mode =>
connectionManager.dropTable(tableName)
exasolConnectionManager.dropTable(tableName)
assert(runDataFrameSave(mode, 2, newOptions) === dataframeTestData.size.toLong)
}
}
Expand Down Expand Up @@ -138,7 +138,7 @@ class SaveOptionsIT extends BaseTableQueryIT {
.format("exasol")
.save()

connectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName")
exasolConnectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName")
}

}
2 changes: 1 addition & 1 deletion src/it/scala/com/exasol/spark/SparkDataImportIT.scala
Expand Up @@ -172,7 +172,7 @@ class SparkDataImportIT extends BaseTableQueryIT {
.format("exasol")
.save()

connectionManager
exasolConnectionManager
.withExecuteQuery(s"SELECT * FROM $tableName")(assertThat(_, matcher))
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/it/scala/com/exasol/spark/TypesIT.scala
Expand Up @@ -11,7 +11,7 @@ class TypesIT extends AbstractTableQueryIT {
override val tableName = s"$schema.TEST_TABLE"
override def createTable(): Unit = {
val maxDecimal = " DECIMAL(" + getMaxPrecisionExasol() + "," + getMaxScaleExasol() + ")"
connectionManager.withExecute(
exasolConnectionManager.withExecute(
Seq(
s"DROP SCHEMA IF EXISTS $schema CASCADE",
s"CREATE SCHEMA $schema",
Expand Down
54 changes: 30 additions & 24 deletions src/main/scala/com/exasol/spark/util/Filters.scala
Expand Up @@ -48,36 +48,44 @@ object Filters {
BooleanTerm.lt(column(attribute), getLiteral(value))
case LessThanOrEqual(attribute, value) =>
BooleanTerm.le(column(attribute), getLiteral(value))
// case IsNull(attribute) => stringLiteral(s"""("$attribute" IS NULL)""")
// case IsNotNull(attribute) => stringLiteral(s"""("$attribute" IS NOT NULL)""")
case StringEndsWith(attribute, value) =>
BooleanTerm.like(column(attribute), stringLiteral(s"%$value"))
case StringContains(attribute, value) =>
BooleanTerm.like(column(attribute), stringLiteral(s"%$value%"))
case StringStartsWith(attribute, value) =>
BooleanTerm.like(column(attribute), stringLiteral(s"$value%"))
// case In(a, vs) => inExpr(a, vs, "IN", dataTypes)
// case Not(In(a, vs)) => inExpr(a, vs, "NOT IN", dataTypes)
case Not(notFilter) =>
filterToBooleanExpression(notFilter).map(BooleanTerm.not(_)).getOrElse(null)
case And(leftFilter, rightFilter) =>
val leftExpr = filterToBooleanExpression(leftFilter)
val rightExpr = filterToBooleanExpression(rightFilter)
if (leftExpr.isDefined && rightExpr.isDefined) {
BooleanTerm.and(leftExpr.getOrElse(null), rightExpr.getOrElse(null))
} else {
null
}
case Or(leftFilter, rightFilter) =>
val leftExpr = filterToBooleanExpression(leftFilter)
val rightExpr = filterToBooleanExpression(rightFilter)
if (leftExpr.isDefined && rightExpr.isDefined) {
BooleanTerm.or(leftExpr.getOrElse(null), rightExpr.getOrElse(null))
} else {
null
}
case _ => null
case And(leftFilter, rightFilter) => andFilterToExpression(leftFilter, rightFilter)
case Or(leftFilter, rightFilter) => orFilterToExpression(leftFilter, rightFilter)
case _ => null
})

private[this] def andFilterToExpression(
leftFilter: Filter,
rightFilter: Filter
): BooleanExpression = {
val leftExpr = filterToBooleanExpression(leftFilter)
val rightExpr = filterToBooleanExpression(rightFilter)
if (leftExpr.isDefined && rightExpr.isDefined) {
BooleanTerm.and(leftExpr.getOrElse(null), rightExpr.getOrElse(null))
} else {
null
}
}

private[this] def orFilterToExpression(
leftFilter: Filter,
rightFilter: Filter
): BooleanExpression = {
val leftExpr = filterToBooleanExpression(leftFilter)
val rightExpr = filterToBooleanExpression(rightFilter)
if (leftExpr.isDefined && rightExpr.isDefined) {
BooleanTerm.or(leftExpr.getOrElse(null), rightExpr.getOrElse(null))
} else {
null
}
}
// scalastyle:on null

private[this] def getLiteral(value: Any): ValueExpression =
Expand All @@ -90,9 +98,7 @@ object Filters {
case longValue: Long => longLiteral(longValue)
case floatValue: Float => floatLiteral(floatValue)
case doubleValue: Double => doubleLiteral(doubleValue)
// case bigDecimalValue: BigDecimal => BigDecimalLiteral(bigDecimalValue.underlying())
// case bigDecimalValue: java.math.BigDecimal => BigDecimalLiteral(bigDecimalValue)
case _ => stringLiteral(s"$value")
case _ => stringLiteral(s"$value")
}

}

0 comments on commit 582ebc0

Please sign in to comment.