Skip to content

Commit

Permalink
Merge bc00a50 into e71e44c
Browse files Browse the repository at this point in the history
  • Loading branch information
rbalaban-mdsol committed Oct 20, 2022
2 parents e71e44c + bc00a50 commit b48d135
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package io.epiphanous.flinkrunner.model.sink

import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.model.SupportedDatabase.Snowflake
import io.epiphanous.flinkrunner.model.SupportedDatabase.{Postgresql, Snowflake}
import io.epiphanous.flinkrunner.model._
import io.epiphanous.flinkrunner.model.sink.JdbcSinkConfig.DEFAULT_CONNECTION_TIMEOUT
import io.epiphanous.flinkrunner.model.sink.JdbcSinkConfig.{
DEFAULT_CONNECTION_TIMEOUT,
DEFAULT_TIMESCALE_CHUNK_TIME_INTERVAL,
DEFAULT_TIMESCALE_NUMBER_PARTITIONS
}
import io.epiphanous.flinkrunner.operator.CreateTableJdbcSinkFunction
import io.epiphanous.flinkrunner.util.SqlBuilder
import org.apache.flink.api.common.functions.RuntimeContext
Expand All @@ -12,11 +16,7 @@ import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat
import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.StatementExecutorFactory
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
import org.apache.flink.connector.jdbc.{
JdbcConnectionOptions,
JdbcExecutionOptions,
JdbcStatementBuilder
}
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcStatementBuilder}
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.DataStream

Expand Down Expand Up @@ -177,6 +177,25 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](
case _ => Seq.empty
}

val isTimescale: Boolean = config
.getObjectOption(pfx("table.timescale"))
.nonEmpty

val timescaleTimeColumn: Option[String] = config
.getStringOpt(pfx("table.timescale.time.column"))

val timescaleChunkTimeInterval: String = config
.getStringOpt(pfx("table.timescale.chunk.time.interval"))
.getOrElse(DEFAULT_TIMESCALE_CHUNK_TIME_INTERVAL)

val timescalePartitioningColumn: String = config
.getStringOpt(pfx("table.timescale.partitioning.column"))
.getOrElse("_no_partitioning_column_")

val timescaleNumberPartitions: Int = config
.getIntOpt(pfx("table.timescale.number.partitions"))
.getOrElse(DEFAULT_TIMESCALE_NUMBER_PARTITIONS)

val sqlBuilder: SqlBuilder = SqlBuilder(product)

val dropTableSql: String = sqlBuilder
Expand Down Expand Up @@ -463,6 +482,37 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](
}
}

/**
* If the sink is a PostgresDB with Timescale extension
* creates hypertable, with a partitioning column.
*/
if (product == Postgresql && isTimescale) {
val createHypertableDml: String = {
timescaleTimeColumn match {
case Some(timeColumn) => sqlBuilder.append(s"SELECT create_hypertable('$table', '$timeColumn'")
case None => throw new RuntimeException(s"timescale.time.column must be present in config")
}

sqlBuilder
.append(s", chunk_time_interval => INTERVAL '$timescaleChunkTimeInterval'")

if (timescalePartitioningColumn != "_no_partitioning_column_") {
sqlBuilder
.append(s", partitioning_column => '$timescalePartitioningColumn'")
.append(s", number_partitions => $timescaleNumberPartitions")
}

sqlBuilder.append(");")
.getSqlAndClear
}

logger.info(
s"creating hypertable for [$table]: \n====\n$createHypertableDml\n====\n"
)

stmt.executeQuery(createHypertableDml)
}

stmt.close()
conn.commit()
conn.close()
Expand Down Expand Up @@ -558,4 +608,6 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](

object JdbcSinkConfig {
final val DEFAULT_CONNECTION_TIMEOUT = 5
final val DEFAULT_TIMESCALE_CHUNK_TIME_INTERVAL = "1 day"
final val DEFAULT_TIMESCALE_NUMBER_PARTITIONS = 4
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,67 @@ import org.apache.flink.api.scala.createTypeInformation

class JdbcSinkCreateTableTest extends UnitSpec {

val mysqlContainer: MySQLContainer = MySQLContainer()
val pgContainer: PostgreSQLContainer = PostgreSQLContainer()
val mssqlContainer: MSSQLServerContainer = MSSQLServerContainer()
val mysqlContainer: MySQLContainer = MySQLContainer()
val pgContainer: PostgreSQLContainer = PostgreSQLContainer()
val timescaleDbContainer: PostgreSQLContainer = TimescaleDbContainer()
val mssqlContainer: MSSQLServerContainer = MSSQLServerContainer()
mssqlContainer.container.acceptLicense()

def maybeCreateTableTestTimescale(
database: String,
schema: String,
jdbcUrl: String,
username: String,
password: String) = {
val runner = getRunner[MyAvroADT](
Array.empty[String],
Some(s"""
|sinks {
| jdbc-test {
| connector = "jdbc"
| connection = {
| database = "$database"
| schema = "$schema"
| url = "$jdbcUrl"
| username = "$username"
| password = "$password"
| }
| table = {
| name = test-table
| timescale = {
| time.column = time
| chunk.time.interval = "1 hour"
| partitioning.column = device_id
| number.partitions = 4
| }
| columns = [
| {
| name = id
| type = CHAR
| precision = 36
| },
| {
| name = time
| type = TIMESTAMP
| primary.key = 1
| },
| {
| name = device_id
| type = VARCHAR
| precision = 100
| primary.key = 1
| }
| ]
| }
| }
|}
|""".stripMargin)
)
val sinkConfig =
new JdbcSinkConfig[MyAvroADT]("jdbc-test", runner.config)
sinkConfig.maybeCreateTable()
}

def maybeCreateTableTest(
database: String,
schema: String,
Expand Down Expand Up @@ -93,7 +149,19 @@ class JdbcSinkCreateTableTest extends UnitSpec {
pgContainer.stop()
}

// ignoring this test now since it relies on manually setting up a local postgres container
it should "maybeCreateTable in timescale" in {
timescaleDbContainer.start()
maybeCreateTableTestTimescale(
timescaleDbContainer.databaseName,
"public",
timescaleDbContainer.jdbcUrl,
timescaleDbContainer.username,
timescaleDbContainer.password
)
timescaleDbContainer.stop()
}

//ignoring this test now since it relies on manually setting up a local postgres container
ignore should "maybeCreateTable in postgres local" in {
maybeCreateTableTest(
"test",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.epiphanous.flinkrunner.model.sink

import com.dimafeng.testcontainers.PostgreSQLContainer
import org.testcontainers.utility.DockerImageName

object TimescaleDbContainer {
def apply(): PostgreSQLContainer = PostgreSQLContainer(
dockerImageNameOverride = DockerImageName
.parse("timescale/timescaledb")
.withTag("latest-pg14")
.asCompatibleSubstituteFor("postgres")
)
}

0 comments on commit b48d135

Please sign in to comment.