diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala index b2099c12..748a68aa 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala @@ -1,9 +1,13 @@ package io.epiphanous.flinkrunner.model.sink import com.typesafe.scalalogging.LazyLogging -import io.epiphanous.flinkrunner.model.SupportedDatabase.Snowflake +import io.epiphanous.flinkrunner.model.SupportedDatabase.{Postgresql, Snowflake} import io.epiphanous.flinkrunner.model._ -import io.epiphanous.flinkrunner.model.sink.JdbcSinkConfig.DEFAULT_CONNECTION_TIMEOUT +import io.epiphanous.flinkrunner.model.sink.JdbcSinkConfig.{ + DEFAULT_CONNECTION_TIMEOUT, + DEFAULT_TIMESCALE_CHUNK_TIME_INTERVAL, + DEFAULT_TIMESCALE_NUMBER_PARTITIONS +} import io.epiphanous.flinkrunner.operator.CreateTableJdbcSinkFunction import io.epiphanous.flinkrunner.util.SqlBuilder import org.apache.flink.api.common.functions.RuntimeContext @@ -12,11 +16,7 @@ import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.StatementExecutorFactory import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor -import org.apache.flink.connector.jdbc.{ - JdbcConnectionOptions, - JdbcExecutionOptions, - JdbcStatementBuilder -} +import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcStatementBuilder} import org.apache.flink.streaming.api.datastream.DataStreamSink import org.apache.flink.streaming.api.scala.DataStream @@ -177,6 +177,25 @@ case class JdbcSinkConfig[ADT <: FlinkEvent]( case _ => Seq.empty } + val isTimescale: Boolean = config + .getObjectOption(pfx("table.timescale")) + .nonEmpty + + val timescaleTimeColumn: Option[String] = config + .getStringOpt(pfx("table.timescale.time.column")) + + val timescaleChunkTimeInterval: String = config + .getStringOpt(pfx("table.timescale.chunk.time.interval")) + .getOrElse(DEFAULT_TIMESCALE_CHUNK_TIME_INTERVAL) + + val timescalePartitioningColumn: String = config + .getStringOpt(pfx("table.timescale.partitioning.column")) + .getOrElse("_no_partitioning_column_") + + val timescaleNumberPartitions: Int = config + .getIntOpt(pfx("table.timescale.number.partitions")) + .getOrElse(DEFAULT_TIMESCALE_NUMBER_PARTITIONS) + val sqlBuilder: SqlBuilder = SqlBuilder(product) val dropTableSql: String = sqlBuilder @@ -463,6 +482,37 @@ case class JdbcSinkConfig[ADT <: FlinkEvent]( } } + /** + * If the sink is a PostgresDB with Timescale extension + * creates hypertable, with a partitioning column. + */ + if (product == Postgresql && isTimescale) { + val createHypertableDml: String = { + timescaleTimeColumn match { + case Some(timeColumn) => sqlBuilder.append(s"SELECT create_hypertable('$table', '$timeColumn'") + case None => throw new RuntimeException(s"timescale.time.column must be present in config") + } + + sqlBuilder + .append(s", chunk_time_interval => INTERVAL '$timescaleChunkTimeInterval'") + + if (timescalePartitioningColumn != "_no_partitioning_column_") { + sqlBuilder + .append(s", partitioning_column => '$timescalePartitioningColumn'") + .append(s", number_partitions => $timescaleNumberPartitions") + } + + sqlBuilder.append(");") + .getSqlAndClear + } + + logger.info( + s"creating hypertable for [$table]: \n====\n$createHypertableDml\n====\n" + ) + + stmt.executeQuery(createHypertableDml) + } + stmt.close() conn.commit() conn.close() @@ -558,4 +608,6 @@ case class JdbcSinkConfig[ADT <: FlinkEvent]( object JdbcSinkConfig { final val DEFAULT_CONNECTION_TIMEOUT = 5 + final val DEFAULT_TIMESCALE_CHUNK_TIME_INTERVAL = "1 day" + final val DEFAULT_TIMESCALE_NUMBER_PARTITIONS = 4 } diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkCreateTableTest.scala b/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkCreateTableTest.scala index 5dbfc384..c26350a7 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkCreateTableTest.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkCreateTableTest.scala @@ -11,11 +11,67 @@ import org.apache.flink.api.scala.createTypeInformation class JdbcSinkCreateTableTest extends UnitSpec { - val mysqlContainer: MySQLContainer = MySQLContainer() - val pgContainer: PostgreSQLContainer = PostgreSQLContainer() - val mssqlContainer: MSSQLServerContainer = MSSQLServerContainer() + val mysqlContainer: MySQLContainer = MySQLContainer() + val pgContainer: PostgreSQLContainer = PostgreSQLContainer() + val timescaleDbContainer: PostgreSQLContainer = TimescaleDbContainer() + val mssqlContainer: MSSQLServerContainer = MSSQLServerContainer() mssqlContainer.container.acceptLicense() + def maybeCreateTableTestTimescale( + database: String, + schema: String, + jdbcUrl: String, + username: String, + password: String) = { + val runner = getRunner[MyAvroADT]( + Array.empty[String], + Some(s""" + |sinks { + | jdbc-test { + | connector = "jdbc" + | connection = { + | database = "$database" + | schema = "$schema" + | url = "$jdbcUrl" + | username = "$username" + | password = "$password" + | } + | table = { + | name = test-table + | timescale = { + | time.column = time + | chunk.time.interval = "1 hour" + | partitioning.column = device_id + | number.partitions = 4 + | } + | columns = [ + | { + | name = id + | type = CHAR + | precision = 36 + | }, + | { + | name = time + | type = TIMESTAMP + | primary.key = 1 + | }, + | { + | name = device_id + | type = VARCHAR + | precision = 100 + | primary.key = 1 + | } + | ] + | } + | } + |} + |""".stripMargin) + ) + val sinkConfig = + new JdbcSinkConfig[MyAvroADT]("jdbc-test", runner.config) + sinkConfig.maybeCreateTable() + } + def maybeCreateTableTest( database: String, schema: String, @@ -93,7 +149,19 @@ class JdbcSinkCreateTableTest extends UnitSpec { pgContainer.stop() } - // ignoring this test now since it relies on manually setting up a local postgres container + it should "maybeCreateTable in timescale" in { + timescaleDbContainer.start() + maybeCreateTableTestTimescale( + timescaleDbContainer.databaseName, + "public", + timescaleDbContainer.jdbcUrl, + timescaleDbContainer.username, + timescaleDbContainer.password + ) + timescaleDbContainer.stop() + } + + //ignoring this test now since it relies on manually setting up a local postgres container ignore should "maybeCreateTable in postgres local" in { maybeCreateTableTest( "test", diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/sink/TimescaleDbContainer.scala b/src/test/scala/io/epiphanous/flinkrunner/model/sink/TimescaleDbContainer.scala new file mode 100644 index 00000000..472ae4fc --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/sink/TimescaleDbContainer.scala @@ -0,0 +1,13 @@ +package io.epiphanous.flinkrunner.model.sink + +import com.dimafeng.testcontainers.PostgreSQLContainer +import org.testcontainers.utility.DockerImageName + +object TimescaleDbContainer { + def apply(): PostgreSQLContainer = PostgreSQLContainer( + dockerImageNameOverride = DockerImageName + .parse("timescale/timescaledb") + .withTag("latest-pg14") + .asCompatibleSubstituteFor("postgres") + ) +}