diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala index 0d795bb..938e0a5 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala @@ -3,7 +3,11 @@ package io.epiphanous.flinkrunner.model.sink import com.typesafe.scalalogging.LazyLogging import io.epiphanous.flinkrunner.model.SupportedDatabase.{Postgresql, Snowflake} import io.epiphanous.flinkrunner.model._ -import io.epiphanous.flinkrunner.model.sink.JdbcSinkConfig.DEFAULT_CONNECTION_TIMEOUT +import io.epiphanous.flinkrunner.model.sink.JdbcSinkConfig.{ + DEFAULT_CONNECTION_TIMEOUT, + DEFAULT_TIMESCALE_CHUNK_TIME_INTERVAL, + DEFAULT_TIMESCALE_NUMBER_PARTITIONS +} import io.epiphanous.flinkrunner.operator.CreateTableJdbcSinkFunction import io.epiphanous.flinkrunner.util.SqlBuilder import org.apache.flink.api.common.functions.RuntimeContext @@ -12,11 +16,7 @@ import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.StatementExecutorFactory import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor -import org.apache.flink.connector.jdbc.{ - JdbcConnectionOptions, - JdbcExecutionOptions, - JdbcStatementBuilder -} +import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcStatementBuilder} import org.apache.flink.streaming.api.datastream.DataStreamSink import org.apache.flink.streaming.api.scala.DataStream @@ -176,7 +176,7 @@ case class JdbcSinkConfig[ADT <: FlinkEvent]( } val isTimescale: Boolean = config - .getBooleanOpt(pfx("table.isTimescale")) + .getBooleanOpt(pfx("table.is.timescale")) .getOrElse(false) val sqlBuilder: SqlBuilder = SqlBuilder(product) @@ -467,20 +467,42 @@ case class JdbcSinkConfig[ADT <: FlinkEvent]( /** * If the sink is a PostgresDB with Timescale extension - * creates hypertable, with a time column. + * creates hypertable, with a partitioning column. */ if (product == Postgresql && isTimescale) { - val timeColumn: String = config.getString(pfx("table.timeColumn")) - val createHypertable: String = { + val timeColumn: String = config.getString(pfx("table.time.column")) + val chunkTimeInterval: String = config + .getStringOpt(pfx("table.chunk.time.interval")) + .getOrElse(DEFAULT_TIMESCALE_CHUNK_TIME_INTERVAL) + + val partitioningColumn: String = config + .getStringOpt(pfx("table.partitioning.column")) + .getOrElse("_no_partitioning_column_") + + val numberPartitions: Int = config + .getIntOpt(pfx("table.number.partitions")) + .getOrElse(DEFAULT_TIMESCALE_NUMBER_PARTITIONS) + + val createHypertableDml: String = { sqlBuilder - .append(s"SELECT create_hypertable('$table', '$timeColumn');") + .append(s"SELECT create_hypertable('$table', '$timeColumn'") + .append(s", chunk_time_interval => INTERVAL '$chunkTimeInterval'") + + if (partitioningColumn != "_no_partitioning_column_") { + sqlBuilder + .append(s", partitioning_column => '$partitioningColumn'") + .append(s", number_partitions => $numberPartitions") + } + + sqlBuilder.append(");") .getSqlAndClear } logger.info( - s"creating hypertable for [$table] with time column: $timeColumn" + s"creating hypertable for [$table]: \n====\n$createHypertableDml\n====\n" ) - stmt.executeQuery(createHypertable) + + stmt.executeQuery(createHypertableDml) } stmt.close() @@ -578,4 +600,6 @@ case class JdbcSinkConfig[ADT <: FlinkEvent]( object JdbcSinkConfig { final val DEFAULT_CONNECTION_TIMEOUT = 5 + final val DEFAULT_TIMESCALE_CHUNK_TIME_INTERVAL = "1 day" + final val DEFAULT_TIMESCALE_NUMBER_PARTITIONS = 4 } diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkCreateTableTest.scala b/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkCreateTableTest.scala index cf3d0e7..a987a5e 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkCreateTableTest.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkCreateTableTest.scala @@ -37,8 +37,11 @@ class JdbcSinkCreateTableTest extends UnitSpec { | } | table = { | name = test-table - | isTimescale = true - | timeColumn = "time" + | is.timescale = true + | time.column = time + | chunk.time.interval = "1 hour" + | partitioning.column = device_id + | number.partitions = 4 | columns = [ | { | name = id @@ -48,6 +51,13 @@ class JdbcSinkCreateTableTest extends UnitSpec { | { | name = time | type = TIMESTAMP + | primary.key = 1 + | }, + | { + | name = device_id + | type = VARCHAR + | precision = 100 + | primary.key = 1 | } | ] | }