Skip to content

Commit

Permalink
feat: add support for partitioning column + refactored config
Browse files Browse the repository at this point in the history
  • Loading branch information
rbalaban-mdsol committed Oct 19, 2022
1 parent e1ae3aa commit faacd1c
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package io.epiphanous.flinkrunner.model.sink
import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.model.SupportedDatabase.{Postgresql, Snowflake}
import io.epiphanous.flinkrunner.model._
import io.epiphanous.flinkrunner.model.sink.JdbcSinkConfig.DEFAULT_CONNECTION_TIMEOUT
import io.epiphanous.flinkrunner.model.sink.JdbcSinkConfig.{
DEFAULT_CONNECTION_TIMEOUT,
DEFAULT_TIMESCALE_CHUNK_TIME_INTERVAL,
DEFAULT_TIMESCALE_NUMBER_PARTITIONS
}
import io.epiphanous.flinkrunner.operator.CreateTableJdbcSinkFunction
import io.epiphanous.flinkrunner.util.SqlBuilder
import org.apache.flink.api.common.functions.RuntimeContext
Expand All @@ -12,11 +16,7 @@ import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat
import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.StatementExecutorFactory
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
import org.apache.flink.connector.jdbc.{
JdbcConnectionOptions,
JdbcExecutionOptions,
JdbcStatementBuilder
}
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcStatementBuilder}
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.DataStream

Expand Down Expand Up @@ -176,7 +176,7 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](
}

val isTimescale: Boolean = config
.getBooleanOpt(pfx("table.isTimescale"))
.getBooleanOpt(pfx("table.is.timescale"))
.getOrElse(false)

val sqlBuilder: SqlBuilder = SqlBuilder(product)
Expand Down Expand Up @@ -467,20 +467,42 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](

/**
* If the sink is a PostgresDB with Timescale extension
* creates hypertable, with a time column.
* creates hypertable, with a partitioning column.
*/
if (product == Postgresql && isTimescale) {
val timeColumn: String = config.getString(pfx("table.timeColumn"))
val createHypertable: String = {
val timeColumn: String = config.getString(pfx("table.time.column"))
val chunkTimeInterval: String = config
.getStringOpt(pfx("table.chunk.time.interval"))
.getOrElse(DEFAULT_TIMESCALE_CHUNK_TIME_INTERVAL)

val partitioningColumn: String = config
.getStringOpt(pfx("table.partitioning.column"))
.getOrElse("_no_partitioning_column_")

val numberPartitions: Int = config
.getIntOpt(pfx("table.number.partitions"))
.getOrElse(DEFAULT_TIMESCALE_NUMBER_PARTITIONS)

val createHypertableDml: String = {
sqlBuilder
.append(s"SELECT create_hypertable('$table', '$timeColumn');")
.append(s"SELECT create_hypertable('$table', '$timeColumn'")
.append(s", chunk_time_interval => INTERVAL '$chunkTimeInterval'")

if (partitioningColumn != "_no_partitioning_column_") {
sqlBuilder
.append(s", partitioning_column => '$partitioningColumn'")
.append(s", number_partitions => $numberPartitions")
}

sqlBuilder.append(");")
.getSqlAndClear
}

logger.info(
s"creating hypertable for [$table] with time column: $timeColumn"
s"creating hypertable for [$table]: \n====\n$createHypertableDml\n====\n"
)
stmt.executeQuery(createHypertable)

stmt.executeQuery(createHypertableDml)
}

stmt.close()
Expand Down Expand Up @@ -578,4 +600,6 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](

object JdbcSinkConfig {
final val DEFAULT_CONNECTION_TIMEOUT = 5
final val DEFAULT_TIMESCALE_CHUNK_TIME_INTERVAL = "1 day"
final val DEFAULT_TIMESCALE_NUMBER_PARTITIONS = 4
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ class JdbcSinkCreateTableTest extends UnitSpec {
| }
| table = {
| name = test-table
| isTimescale = true
| timeColumn = "time"
| is.timescale = true
| time.column = time
| chunk.time.interval = "1 hour"
| partitioning.column = device_id
| number.partitions = 4
| columns = [
| {
| name = id
Expand All @@ -48,6 +51,13 @@ class JdbcSinkCreateTableTest extends UnitSpec {
| {
| name = time
| type = TIMESTAMP
| primary.key = 1
| },
| {
| name = device_id
| type = VARCHAR
| precision = 100
| primary.key = 1
| }
| ]
| }
Expand Down

0 comments on commit faacd1c

Please sign in to comment.