Skip to content

Commit

Permalink
#21: Refactored integration tests using test-db-builder-java (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
morazow committed Jan 26, 2021
1 parent 344dec4 commit 28a7a43
Show file tree
Hide file tree
Showing 12 changed files with 141 additions and 101 deletions.
5 changes: 3 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ env:

before_install:
- git fetch --tags
- docker pull exasol/docker-db:7.0.4
- docker pull exasol/docker-db:7.0.6
- docker pull localstack/localstack:0.12.5

matrix:
include:
Expand Down Expand Up @@ -73,4 +74,4 @@ before_cache:
- du -h -d 4 $HOME/.coursier/
- find $HOME/.sbt -name "*.lock" -type f -delete
- find $HOME/.ivy2/cache -name "ivydata-*.properties" -type f -delete
- find $HOME/.coursier/cache -name "*.lock" -type f -delete
- find $HOME/.coursier/cache -name "*.lock" -type f -delete
3 changes: 2 additions & 1 deletion doc/changes/changelog.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Releases

* [1.0.0](changes_1.0.0.md)
* [0.1.1](changes_0.1.1.md)
* [0.1.0](changes_0.1.0.md)
* [0.1.0](changes_0.1.0.md)
2 changes: 1 addition & 1 deletion doc/changes/changes_0.1.1.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ Code name: Bug fix for Exasol 7 and jackson-core/jackson-dataformat-cbor depende
* Updated `com.exasol:exasol-testcontainers:3.0.0` to `3.4.0`
* Updated `org.testcontainers:localstack:1.14.3` to `1.15.0`
* Updated `com.amazonaws:aws-java-sdk-core:1.11.860` to `1.11.916`
* Updated `com.exasol:import-export-udf-common-scala:0.1.0` to `0.2.0`
* Updated `com.exasol:import-export-udf-common-scala:0.1.0` to `0.2.0`
21 changes: 21 additions & 0 deletions doc/changes/changes_1.0.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Kinesis Connector Extension 1.0.0, released 2021-MM-DD

Code name:

## Refactoring

* #21: Refactored integration tests using test-db-builder-java

## Dependency updates

* Updated `com.amazonaws:aws-java-sdk-core:1.11.916` to `1.11.942`
* Updated `com.amazonaws:aws-java-sdk-kinesis:1.11.916` to `1.11.942`
* Updated `com.exasol:exasol-testcontainers:3.4.0` to `3.4.1`
* Updated `org.testcontainers:localstack:1.15.0` to `1.15.1`
* Updated `org.mockito:mockito-core:3.5.10` to `3.7.7`
* Updated `org.scalatest:scalatest:3.2.2` to `3.2.2`

### Plugin Updates

* Updated `org.wartremover:sbt-wartremover:2.4.10` to `2.4.13`
* Updated `org.wartremover:sbt-wartremover-contrib:1.3.8` to `1.3.11`
12 changes: 7 additions & 5 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ object Dependencies {
// Runtime dependencies versions
private val ExasolVersion = "6.1.7"
private val ImportExportUDFVersion = "0.2.0"
private val AwsJavaSdkVersion = "1.11.916"
private val AwsJavaSdkVersion = "1.11.942"
private val FasterxmlJacksonVersion = "2.12.0"

// Test dependencies versions
private val ScalaTestVersion = "3.2.2"
private val ScalaTestVersion = "3.2.3"
private val ScalaTestPlusVersion = "1.0.0-M2"
private val MockitoCoreVersion = "3.5.10"
private val ExasolTestContainersVersion = "3.4.0"
private val TestContainersLocalstackVersion = "1.15.0"
private val MockitoCoreVersion = "3.7.7"
private val ExasolTestContainersVersion = "3.4.1"
private val ExasolTestDBBuilderVersion = "3.0.0"
private val TestContainersLocalstackVersion = "1.15.1"

val Resolvers: Seq[Resolver] = Seq(
"Confluent Maven Repo" at "https://packages.confluent.io/maven/",
Expand All @@ -39,6 +40,7 @@ object Dependencies {
"org.scalatestplus" %% "scalatestplus-mockito" % ScalaTestPlusVersion,
"org.mockito" % "mockito-core" % MockitoCoreVersion,
"com.exasol" % "exasol-testcontainers" % ExasolTestContainersVersion,
"com.exasol" % "test-db-builder-java" % ExasolTestDBBuilderVersion,
"org.testcontainers" % "localstack" % TestContainersLocalstackVersion
).map(_ % Test)

Expand Down
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// Adds a `wartremover` a flexible Scala code linting tool
// http://github.com/puffnfresh/wartremover
addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.4.10")
addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.4.13")

// Adds Contrib Warts
// http://github.com/wartremover/wartremover-contrib/
addSbtPlugin("org.wartremover" % "sbt-wartremover-contrib" % "1.3.8")
addSbtPlugin("org.wartremover" % "sbt-wartremover-contrib" % "1.3.11")

// Adds a `assembly` task to create a fat JAR with all of its
// dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package com.exasol.cloudetl.kinesis
import com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY
import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClientBuilder}
import com.exasol.containers.ExasolContainer
import com.exasol.dbbuilder.dialects.Column
import com.exasol.dbbuilder.dialects.exasol._
import com.exasol.dbbuilder.dialects.exasol.udf.UdfScript
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
import org.testcontainers.containers.localstack.LocalStackContainer
Expand All @@ -17,13 +20,16 @@ trait KinesisAbstractIntegrationTest extends AnyFunSuite with BeforeAndAfterAll
val JAR_NAME_PATTERN = "exasol-kinesis-connector-extension-"
val DOCKER_IP_ADDRESS = "172.17.0.1"
val TEST_SCHEMA_NAME = "kinesis_schema"
var assembledJarName: String = _

val exasolContainer = new ExasolContainer("7.0.4")
val exasolContainer = new ExasolContainer("7.0.6")
val kinesisLocalStack: LocalStackContainer =
new LocalStackContainer(DockerImageName.parse("localstack/localstack:0.12.3"))
new LocalStackContainer(DockerImageName.parse("localstack/localstack:0.12.5"))
.withServices(LocalStackContainer.Service.KINESIS)

var assembledJarName: String = _
var schema: ExasolSchema = _
var factory: ExasolObjectFactory = _

private[this] var connection: java.sql.Connection = _
var statement: java.sql.Statement = _
private[kinesis] var kinesisClient: AmazonKinesis = _
Expand All @@ -49,8 +55,22 @@ trait KinesisAbstractIntegrationTest extends AnyFunSuite with BeforeAndAfterAll
private[kinesis] def setupExasol(): Unit = {
assembledJarName = findAssembledJarName()
uploadJarToBucket(assembledJarName)
statement.execute(s"CREATE SCHEMA $TEST_SCHEMA_NAME")
statement.execute(s"OPEN SCHEMA $TEST_SCHEMA_NAME")
val exasolConfiguration = ExasolObjectConfiguration
.builder()
.withJvmOptions("-Dcom.amazonaws.sdk.disableCbor=true")
.build()
factory = new ExasolObjectFactory(connection, exasolConfiguration)
schema = factory.createSchema(TEST_SCHEMA_NAME)
createConnectionObject()
()
}

private[this] def createConnectionObject(): Unit = {
val credentials = kinesisLocalStack.getDefaultCredentialsProvider.getCredentials
val awsAccessKey = credentials.getAWSAccessKeyId()
val awsSecretKey = credentials.getAWSSecretKey()
val secret = s"AWS_ACCESS_KEY=$awsAccessKey;AWS_SECRET_KEY=$awsSecretKey"
factory.createConnectionDefinition("KINESIS_CONNECTION", "", "user", secret)
()
}

Expand Down Expand Up @@ -95,27 +115,37 @@ trait KinesisAbstractIntegrationTest extends AnyFunSuite with BeforeAndAfterAll
}

private[kinesis] def createKinesisMetadataScript(): Unit = {
statement.execute(
s"""CREATE OR REPLACE JAVA SET SCRIPT KINESIS_METADATA (...)
|EMITS (KINESIS_SHARD_ID VARCHAR(130), SHARD_SEQUENCE_NUMBER VARCHAR(2000)) AS
| %scriptclass com.exasol.cloudetl.kinesis.KinesisShardsMetadataReader;
| %jar /buckets/bfsdefault/default/$assembledJarName;
|/
|""".stripMargin
)
schema
.createUdfBuilder("KINESIS_METADATA")
.language(UdfScript.Language.JAVA)
.inputType(UdfScript.InputType.SET)
.emits(
new Column("KINESIS_SHARD_ID", "VARCHAR(130)"),
new Column("SHARD_SEQUENCE_NUMBER", "VARCHAR(2000)")
)
.bucketFsContent(
"com.exasol.cloudetl.kinesis.KinesisShardsMetadataReader",
s"/buckets/bfsdefault/default/$assembledJarName"
)
.build()
()
}

private[kinesis] def createKinesisImportScript(emits: String): Unit = {
statement.execute(
s"""CREATE OR REPLACE JAVA SET SCRIPT KINESIS_IMPORT (...)
|EMITS ($emits) AS
| %jvmoption -Dcom.amazonaws.sdk.disableCbor=true;
| %scriptclass com.exasol.cloudetl.kinesis.KinesisShardDataImporter;
| %jar /buckets/bfsdefault/default/$assembledJarName;
|/
|""".stripMargin
)
private[kinesis] def createKinesisImportScript(emittedColumns: Seq[Column]): Unit = {
statement.execute(s"DROP SCRIPT IF EXISTS KINESIS_IMPORT")
val udfScript = schema
.createUdfBuilder("KINESIS_IMPORT")
.language(UdfScript.Language.JAVA)
.inputType(UdfScript.InputType.SET)
.bucketFsContent(
"com.exasol.cloudetl.kinesis.KinesisShardDataImporter",
s"/buckets/bfsdefault/default/$assembledJarName"
)
if (emittedColumns.isEmpty) {
udfScript.emits().build()
} else {
udfScript.emits(emittedColumns: _*).build()
}
()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import com.exasol.cloudetl.kinesis.KinesisConstants.{
KINESIS_SHARD_ID_COLUMN_NAME,
SHARD_SEQUENCE_NUMBER_COLUMN_NAME
}
import com.exasol.dbbuilder.dialects.Column
import com.exasol.dbbuilder.dialects.exasol.udf.UdfScript
import org.scalatest.BeforeAndAfterEach
import org.testcontainers.containers.localstack.LocalStackContainer

Expand All @@ -19,24 +21,17 @@ class KinesisImportQueryGeneratorIT
prepareContainers()
setupExasol()
createKinesisMetadataScript()
createKinesisImportScript("...")
val credentials = kinesisLocalStack.getDefaultCredentialsProvider.getCredentials
statement.execute(
s"""CREATE OR REPLACE CONNECTION KINESIS_CONNECTION
| TO '' USER '' IDENTIFIED BY
| 'AWS_ACCESS_KEY=${credentials.getAWSAccessKeyId};
| AWS_SECRET_KEY=${credentials.getAWSSecretKey};'""".stripMargin
.replace("'\n", "")
)
statement.execute(
s"""CREATE OR REPLACE JAVA SET SCRIPT KINESIS_CONSUMER (...)
|EMITS (...) AS
| %jvmoption -Dcom.amazonaws.sdk.disableCbor=true;
| %scriptclass com.exasol.cloudetl.kinesis.KinesisImportQueryGenerator;
| %jar /buckets/bfsdefault/default/${findAssembledJarName()};
|/
|""".stripMargin
)
createKinesisImportScript(Seq.empty[Column])
schema
.createUdfBuilder("KINESIS_CONSUMER")
.language(UdfScript.Language.JAVA)
.inputType(UdfScript.InputType.SET)
.emits()
.bucketFsContent(
"com.exasol.cloudetl.kinesis.KinesisImportQueryGenerator",
s"/buckets/bfsdefault/default/$findAssembledJarName"
)
.build()
()
}

Expand Down Expand Up @@ -213,9 +208,9 @@ class KinesisImportQueryGeneratorIT
statement.execute(
s"""IMPORT INTO $TEST_TABLE_NAME
|FROM SCRIPT KINESIS_CONSUMER WITH
| TABLE_NAME = '$TEST_TABLE_NAME'
| CONNECTION_NAME = 'KINESIS_CONNECTION'
| STREAM_NAME = '$streamName'
| TABLE_NAME = '$TEST_TABLE_NAME'
| CONNECTION_NAME = 'KINESIS_CONNECTION'
| STREAM_NAME = '$streamName'
| REGION = '${endpointConfiguration.getSigningRegion}'
| AWS_SERVICE_ENDPOINT = '$endpointInsideDocker'
""".stripMargin
Expand All @@ -233,10 +228,10 @@ class KinesisImportQueryGeneratorIT
s"""IMPORT INTO $TEST_TABLE_NAME
|FROM SCRIPT KINESIS_CONSUMER WITH
| TABLE_NAME = '$TEST_TABLE_NAME'
| AWS_ACCESS_KEY = '${credentials.getAWSAccessKeyId}'
| AWS_SECRET_KEY = '${credentials.getAWSSecretKey}'
| AWS_ACCESS_KEY = '${credentials.getAWSAccessKeyId}'
| AWS_SECRET_KEY = '${credentials.getAWSSecretKey}'
| STREAM_NAME = '$streamName'
| REGION = '${endpointConfiguration.getSigningRegion}'
| REGION = '${endpointConfiguration.getSigningRegion}'
| AWS_SERVICE_ENDPOINT = '$endpointInsideDocker'
""".stripMargin
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.exasol.cloudetl.kinesis.KinesisConstants.{
KINESIS_SHARD_ID_COLUMN_NAME,
SHARD_SEQUENCE_NUMBER_COLUMN_NAME
}
import com.exasol.dbbuilder.dialects.Column
import org.testcontainers.containers.localstack.LocalStackContainer

class KinesisShardDataImporterIT extends KinesisAbstractIntegrationTest {
Expand All @@ -16,14 +17,6 @@ class KinesisShardDataImporterIT extends KinesisAbstractIntegrationTest {
override final def beforeAll(): Unit = {
prepareContainers()
setupExasol()
val credentials = kinesisLocalStack.getDefaultCredentialsProvider.getCredentials
statement.execute(
s"""CREATE OR REPLACE CONNECTION KINESIS_CONNECTION
| TO '' USER '' IDENTIFIED BY
| 'AWS_ACCESS_KEY=${credentials.getAWSAccessKeyId};
| AWS_SECRET_KEY=${credentials.getAWSSecretKey};'""".stripMargin
.replace("'\n", "")
)
()
}

Expand All @@ -32,11 +25,12 @@ class KinesisShardDataImporterIT extends KinesisAbstractIntegrationTest {
createKinesisStream(streamName, 1)
putRecordIntoStream(17, 25.3, streamName)
putRecordIntoStream(20, 21.0, streamName)
val columns =
"""sensorId DECIMAL(18,0),
|currentTemperature DOUBLE PRECISION,
|kinesis_shard_id VARCHAR(2000),
|shard_sequence_number VARCHAR(2000)"""
val columns = Seq(
new Column("sensorId", "DECIMAL(18,0)"),
new Column("currentTemperature", "DOUBLE PRECISION"),
new Column("kinesis_shard_id", "VARCHAR(2000)"),
new Column("shard_sequence_number", "VARCHAR(2000)")
)
createKinesisImportScript(columns)
val expected = List(
(17, 25.3, shardId, true),
Expand Down Expand Up @@ -75,11 +69,12 @@ class KinesisShardDataImporterIT extends KinesisAbstractIntegrationTest {
createKinesisStream(streamName, 1)
putRecordIntoStream("1", "WARN", streamName)
putRecordIntoStream("2", "OK", streamName)
val columns =
"""sensorId CHAR(1),
|status VARCHAR(100),
|kinesis_shard_id VARCHAR(2000),
|shard_sequence_number VARCHAR(2000)"""
val columns = Seq(
new Column("sensorId", "CHAR(1)"),
new Column("status", "VARCHAR(100)"),
new Column("kinesis_shard_id", "VARCHAR(2000)"),
new Column("shard_sequence_number", "VARCHAR(2000)")
)
createKinesisImportScript(columns)
val resultSet = this.executeKinesisImportScript(streamName)
val expected = List(
Expand Down Expand Up @@ -118,11 +113,12 @@ class KinesisShardDataImporterIT extends KinesisAbstractIntegrationTest {
createKinesisStream(streamName, 1)
putRecordIntoStream(true, false, streamName)
putRecordIntoStream(false, true, streamName)
val columns =
"""first_sensor_status BOOLEAN,
|second_sensor_status BOOLEAN,
|kinesis_shard_id VARCHAR(2000),
|shard_sequence_number VARCHAR(2000)"""
val columns = Seq(
new Column("first_sensor_status", "BOOLEAN"),
new Column("second_sensor_status", "BOOLEAN"),
new Column("kinesis_shard_id", "VARCHAR(2000)"),
new Column("shard_sequence_number", "VARCHAR(2000)")
)
createKinesisImportScript(columns)
val resultSet = this.executeKinesisImportScript(streamName)
val expected = List(
Expand Down Expand Up @@ -161,11 +157,12 @@ class KinesisShardDataImporterIT extends KinesisAbstractIntegrationTest {
test("returns nested data from a shard") {
val streamName = "Test_stream_nested"
createKinesisStream(streamName, 1)
val columns =
"""sensorId DECIMAL(18,0),
|statuses VARCHAR(1000),
|kinesis_shard_id VARCHAR(2000),
|shard_sequence_number VARCHAR(2000)"""
val columns = Seq(
new Column("sensorId", "DECIMAL(18,0)"),
new Column("statuses", "VARCHAR(1000)"),
new Column("kinesis_shard_id", "VARCHAR(2000)"),
new Column("shard_sequence_number", "VARCHAR(2000)")
)
createKinesisImportScript(columns)
putRecordWithNestedDataIntoStream(17, 35, 14, 29, partitionKey, streamName)
putRecordWithNestedDataIntoStream(20, 25, 11, 16, partitionKey, streamName)
Expand Down Expand Up @@ -210,11 +207,12 @@ class KinesisShardDataImporterIT extends KinesisAbstractIntegrationTest {
test("returns array data from a shard") {
val streamName = "Test_stream_array"
createKinesisStream(streamName, 1)
val columns =
"""sensorId DECIMAL(18,0),
|statuses VARCHAR(1000),
|kinesis_shard_id VARCHAR(2000),
|shard_sequence_number VARCHAR(2000)"""
val columns = Seq(
new Column("sensorId", "DECIMAL(18,0)"),
new Column("statuses", "VARCHAR(1000)"),
new Column("kinesis_shard_id", "VARCHAR(2000)"),
new Column("shard_sequence_number", "VARCHAR(2000)")
)
createKinesisImportScript(columns)
putRecordWithArrayIntoStream(17, 35, 14, 29, partitionKey, streamName)
putRecordWithArrayIntoStream(20, 25, 11, 16, partitionKey, streamName)
Expand Down
Loading

0 comments on commit 28a7a43

Please sign in to comment.