From 93e4d0c5a9885f3e41adf27c4e6c18a9cb6a39d8 Mon Sep 17 00:00:00 2001 From: Robert Balaban Date: Tue, 4 Oct 2022 18:28:02 +0300 Subject: [PATCH] feat: timescale hypertable support and testcontainer --- .../model/sink/JdbcSinkConfig.scala | 24 ++++++- .../model/sink/JdbcSinkCreateTableTest.scala | 64 +++++++++++++++++-- .../model/sink/TimescaleDbContainer.scala | 13 ++++ 3 files changed, 96 insertions(+), 5 deletions(-) create mode 100644 src/test/scala/io/epiphanous/flinkrunner/model/sink/TimescaleDbContainer.scala diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala index a4704b26..0d795bbe 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala @@ -1,7 +1,7 @@ package io.epiphanous.flinkrunner.model.sink import com.typesafe.scalalogging.LazyLogging -import io.epiphanous.flinkrunner.model.SupportedDatabase.Snowflake +import io.epiphanous.flinkrunner.model.SupportedDatabase.{Postgresql, Snowflake} import io.epiphanous.flinkrunner.model._ import io.epiphanous.flinkrunner.model.sink.JdbcSinkConfig.DEFAULT_CONNECTION_TIMEOUT import io.epiphanous.flinkrunner.operator.CreateTableJdbcSinkFunction @@ -175,6 +175,10 @@ case class JdbcSinkConfig[ADT <: FlinkEvent]( case _ => Seq.empty } + val isTimescale: Boolean = config + .getBooleanOpt(pfx("table.isTimescale")) + .getOrElse(false) + val sqlBuilder: SqlBuilder = SqlBuilder(product) val dropTableSql: String = sqlBuilder @@ -461,6 +465,24 @@ case class JdbcSinkConfig[ADT <: FlinkEvent]( } } + /** + * If the sink is a PostgresDB with Timescale extension + * creates hypertable, with a time column. + */ + if (product == Postgresql && isTimescale) { + val timeColumn: String = config.getString(pfx("table.timeColumn")) + val createHypertable: String = { + sqlBuilder + .append(s"SELECT create_hypertable('$table', '$timeColumn');") + .getSqlAndClear + } + + logger.info( + s"creating hypertable for [$table] with time column: $timeColumn" + ) + stmt.executeQuery(createHypertable) + } + stmt.close() conn.commit() conn.close() diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkCreateTableTest.scala b/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkCreateTableTest.scala index d7a75f02..05b782cb 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkCreateTableTest.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkCreateTableTest.scala @@ -10,11 +10,55 @@ import io.epiphanous.flinkrunner.model.{FlinkConfig, MyAvroADT} class JdbcSinkCreateTableTest extends UnitSpec { - val mysqlContainer: MySQLContainer = MySQLContainer() - val pgContainer: PostgreSQLContainer = PostgreSQLContainer() - val mssqlContainer: MSSQLServerContainer = MSSQLServerContainer() + val mysqlContainer: MySQLContainer = MySQLContainer() + val pgContainer: PostgreSQLContainer = PostgreSQLContainer() + val timescaleDbContainer: PostgreSQLContainer = TimescaleDbContainer() + val mssqlContainer: MSSQLServerContainer = MSSQLServerContainer() mssqlContainer.container.acceptLicense() + def maybeCreateTableTestTimescale( + database: String, + schema: String, + jdbcUrl: String, + username: String, + password: String): Unit = { + val config = new FlinkConfig( + Array.empty[String], + Some(s""" + |sinks { + | jdbc-test { + | connector = "jdbc" + | connection = { + | database = "$database" + | schema = "$schema" + | url = "$jdbcUrl" + | username = "$username" + | password = "$password" + | } + | table = { + | name = test-table + | isTimescale = true + | timeColumn = "time" + | columns = [ + | { + | name = id + | type = CHAR + | precision = 36 + | }, + | { + | name = time + | type = TIMESTAMP + | } + | ] + | } + | } + |} + |""".stripMargin) + ) + val sinkConfig = new JdbcSinkConfig[MyAvroADT]("jdbc-test", config) + sinkConfig.maybeCreateTable() + } + def maybeCreateTableTest( database: String, schema: String, @@ -91,7 +135,19 @@ class JdbcSinkCreateTableTest extends UnitSpec { pgContainer.stop() } - // ignoring this test now since it relies on manually setting up a local postgres container + it should "maybeCreateTable in timescale" in { + timescaleDbContainer.start() + maybeCreateTableTestTimescale( + timescaleDbContainer.databaseName, + "public", + timescaleDbContainer.jdbcUrl, + timescaleDbContainer.username, + timescaleDbContainer.password + ) + timescaleDbContainer.stop() + } + + //ignoring this test now since it relies on manually setting up a local postgres container ignore should "maybeCreateTable in postgres local" in { maybeCreateTableTest( "test", diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/sink/TimescaleDbContainer.scala b/src/test/scala/io/epiphanous/flinkrunner/model/sink/TimescaleDbContainer.scala new file mode 100644 index 00000000..bb39c0cd --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/sink/TimescaleDbContainer.scala @@ -0,0 +1,13 @@ +package io.epiphanous.flinkrunner.model.sink + +import com.dimafeng.testcontainers.PostgreSQLContainer +import org.testcontainers.utility.DockerImageName + +object TimescaleDbContainer { + def apply(): PostgreSQLContainer = PostgreSQLContainer( + dockerImageNameOverride = DockerImageName + .parse("timescale/timescaledb") + .withTag("latest-pg14") + .asCompatibleSubstituteFor("postgres") + ) +} \ No newline at end of file