From bc00a50e5163a2f7cd3200b3144e327c75762612 Mon Sep 17 00:00:00 2001 From: Robert Balaban Date: Thu, 20 Oct 2022 12:09:08 +0300 Subject: [PATCH] fix: addressed PR comments --- .../model/sink/JdbcSinkConfig.scala | 46 +++++++++++-------- .../model/sink/JdbcSinkCreateTableTest.scala | 11 +++-- 2 files changed, 32 insertions(+), 25 deletions(-) diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala index c01056ac..748a68aa 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala @@ -178,8 +178,23 @@ case class JdbcSinkConfig[ADT <: FlinkEvent]( } val isTimescale: Boolean = config - .getBooleanOpt(pfx("table.is.timescale")) - .getOrElse(false) + .getObjectOption(pfx("table.timescale")) + .nonEmpty + + val timescaleTimeColumn: Option[String] = config + .getStringOpt(pfx("table.timescale.time.column")) + + val timescaleChunkTimeInterval: String = config + .getStringOpt(pfx("table.timescale.chunk.time.interval")) + .getOrElse(DEFAULT_TIMESCALE_CHUNK_TIME_INTERVAL) + + val timescalePartitioningColumn: String = config + .getStringOpt(pfx("table.timescale.partitioning.column")) + .getOrElse("_no_partitioning_column_") + + val timescaleNumberPartitions: Int = config + .getIntOpt(pfx("table.timescale.number.partitions")) + .getOrElse(DEFAULT_TIMESCALE_NUMBER_PARTITIONS) val sqlBuilder: SqlBuilder = SqlBuilder(product) @@ -472,28 +487,19 @@ case class JdbcSinkConfig[ADT <: FlinkEvent]( * creates hypertable, with a partitioning column. */ if (product == Postgresql && isTimescale) { - val timeColumn: String = config.getString(pfx("table.time.column")) - val chunkTimeInterval: String = config - .getStringOpt(pfx("table.chunk.time.interval")) - .getOrElse(DEFAULT_TIMESCALE_CHUNK_TIME_INTERVAL) - - val partitioningColumn: String = config - .getStringOpt(pfx("table.partitioning.column")) - .getOrElse("_no_partitioning_column_") - - val numberPartitions: Int = config - .getIntOpt(pfx("table.number.partitions")) - .getOrElse(DEFAULT_TIMESCALE_NUMBER_PARTITIONS) - val createHypertableDml: String = { + timescaleTimeColumn match { + case Some(timeColumn) => sqlBuilder.append(s"SELECT create_hypertable('$table', '$timeColumn'") + case None => throw new RuntimeException(s"timescale.time.column must be present in config") + } + sqlBuilder - .append(s"SELECT create_hypertable('$table', '$timeColumn'") - .append(s", chunk_time_interval => INTERVAL '$chunkTimeInterval'") + .append(s", chunk_time_interval => INTERVAL '$timescaleChunkTimeInterval'") - if (partitioningColumn != "_no_partitioning_column_") { + if (timescalePartitioningColumn != "_no_partitioning_column_") { sqlBuilder - .append(s", partitioning_column => '$partitioningColumn'") - .append(s", number_partitions => $numberPartitions") + .append(s", partitioning_column => '$timescalePartitioningColumn'") + .append(s", number_partitions => $timescaleNumberPartitions") } sqlBuilder.append(");") diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkCreateTableTest.scala b/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkCreateTableTest.scala index 38c0f97a..c26350a7 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkCreateTableTest.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkCreateTableTest.scala @@ -38,11 +38,12 @@ class JdbcSinkCreateTableTest extends UnitSpec { | } | table = { | name = test-table - | is.timescale = true - | time.column = time - | chunk.time.interval = "1 hour" - | partitioning.column = device_id - | number.partitions = 4 + | timescale = { + | time.column = time + | chunk.time.interval = "1 hour" + | partitioning.column = device_id + | number.partitions = 4 + | } | columns = [ | { | name = id