Skip to content

Commit

Permalink
fix: addressed PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rbalaban-mdsol committed Oct 20, 2022
1 parent 62997a1 commit bc00a50
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 25 deletions.
Expand Up @@ -178,8 +178,23 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](
}

val isTimescale: Boolean = config
.getBooleanOpt(pfx("table.is.timescale"))
.getOrElse(false)
.getObjectOption(pfx("table.timescale"))
.nonEmpty

val timescaleTimeColumn: Option[String] = config
.getStringOpt(pfx("table.timescale.time.column"))

val timescaleChunkTimeInterval: String = config
.getStringOpt(pfx("table.timescale.chunk.time.interval"))
.getOrElse(DEFAULT_TIMESCALE_CHUNK_TIME_INTERVAL)

val timescalePartitioningColumn: String = config
.getStringOpt(pfx("table.timescale.partitioning.column"))
.getOrElse("_no_partitioning_column_")

val timescaleNumberPartitions: Int = config
.getIntOpt(pfx("table.timescale.number.partitions"))
.getOrElse(DEFAULT_TIMESCALE_NUMBER_PARTITIONS)

val sqlBuilder: SqlBuilder = SqlBuilder(product)

Expand Down Expand Up @@ -472,28 +487,19 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](
* creates hypertable, with a partitioning column.
*/
if (product == Postgresql && isTimescale) {
val timeColumn: String = config.getString(pfx("table.time.column"))
val chunkTimeInterval: String = config
.getStringOpt(pfx("table.chunk.time.interval"))
.getOrElse(DEFAULT_TIMESCALE_CHUNK_TIME_INTERVAL)

val partitioningColumn: String = config
.getStringOpt(pfx("table.partitioning.column"))
.getOrElse("_no_partitioning_column_")

val numberPartitions: Int = config
.getIntOpt(pfx("table.number.partitions"))
.getOrElse(DEFAULT_TIMESCALE_NUMBER_PARTITIONS)

val createHypertableDml: String = {
timescaleTimeColumn match {
case Some(timeColumn) => sqlBuilder.append(s"SELECT create_hypertable('$table', '$timeColumn'")
case None => throw new RuntimeException(s"timescale.time.column must be present in config")
}

sqlBuilder
.append(s"SELECT create_hypertable('$table', '$timeColumn'")
.append(s", chunk_time_interval => INTERVAL '$chunkTimeInterval'")
.append(s", chunk_time_interval => INTERVAL '$timescaleChunkTimeInterval'")

if (partitioningColumn != "_no_partitioning_column_") {
if (timescalePartitioningColumn != "_no_partitioning_column_") {
sqlBuilder
.append(s", partitioning_column => '$partitioningColumn'")
.append(s", number_partitions => $numberPartitions")
.append(s", partitioning_column => '$timescalePartitioningColumn'")
.append(s", number_partitions => $timescaleNumberPartitions")
}

sqlBuilder.append(");")
Expand Down
Expand Up @@ -38,11 +38,12 @@ class JdbcSinkCreateTableTest extends UnitSpec {
| }
| table = {
| name = test-table
| is.timescale = true
| time.column = time
| chunk.time.interval = "1 hour"
| partitioning.column = device_id
| number.partitions = 4
| timescale = {
| time.column = time
| chunk.time.interval = "1 hour"
| partitioning.column = device_id
| number.partitions = 4
| }
| columns = [
| {
| name = id
Expand Down

0 comments on commit bc00a50

Please sign in to comment.