Skip to content

Commit

Permalink
feat: timescale hypertable support and testcontainer
Browse files Browse the repository at this point in the history
  • Loading branch information
rbalaban-mdsol committed Oct 4, 2022
1 parent 5561c8a commit 93e4d0c
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.epiphanous.flinkrunner.model.sink

import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.model.SupportedDatabase.Snowflake
import io.epiphanous.flinkrunner.model.SupportedDatabase.{Postgresql, Snowflake}
import io.epiphanous.flinkrunner.model._
import io.epiphanous.flinkrunner.model.sink.JdbcSinkConfig.DEFAULT_CONNECTION_TIMEOUT
import io.epiphanous.flinkrunner.operator.CreateTableJdbcSinkFunction
Expand Down Expand Up @@ -175,6 +175,10 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](
case _ => Seq.empty
}

val isTimescale: Boolean = config
.getBooleanOpt(pfx("table.isTimescale"))
.getOrElse(false)

val sqlBuilder: SqlBuilder = SqlBuilder(product)

val dropTableSql: String = sqlBuilder
Expand Down Expand Up @@ -461,6 +465,24 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](
}
}

/**
* If the sink is a PostgresDB with Timescale extension
* creates hypertable, with a time column.
*/
if (product == Postgresql && isTimescale) {
val timeColumn: String = config.getString(pfx("table.timeColumn"))
val createHypertable: String = {
sqlBuilder
.append(s"SELECT create_hypertable('$table', '$timeColumn');")
.getSqlAndClear
}

logger.info(
s"creating hypertable for [$table] with time column: $timeColumn"
)
stmt.executeQuery(createHypertable)
}

stmt.close()
conn.commit()
conn.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,55 @@ import io.epiphanous.flinkrunner.model.{FlinkConfig, MyAvroADT}

class JdbcSinkCreateTableTest extends UnitSpec {

val mysqlContainer: MySQLContainer = MySQLContainer()
val pgContainer: PostgreSQLContainer = PostgreSQLContainer()
val mssqlContainer: MSSQLServerContainer = MSSQLServerContainer()
val mysqlContainer: MySQLContainer = MySQLContainer()
val pgContainer: PostgreSQLContainer = PostgreSQLContainer()
val timescaleDbContainer: PostgreSQLContainer = TimescaleDbContainer()
val mssqlContainer: MSSQLServerContainer = MSSQLServerContainer()
mssqlContainer.container.acceptLicense()

def maybeCreateTableTestTimescale(
database: String,
schema: String,
jdbcUrl: String,
username: String,
password: String): Unit = {
val config = new FlinkConfig(
Array.empty[String],
Some(s"""
|sinks {
| jdbc-test {
| connector = "jdbc"
| connection = {
| database = "$database"
| schema = "$schema"
| url = "$jdbcUrl"
| username = "$username"
| password = "$password"
| }
| table = {
| name = test-table
| isTimescale = true
| timeColumn = "time"
| columns = [
| {
| name = id
| type = CHAR
| precision = 36
| },
| {
| name = time
| type = TIMESTAMP
| }
| ]
| }
| }
|}
|""".stripMargin)
)
val sinkConfig = new JdbcSinkConfig[MyAvroADT]("jdbc-test", config)
sinkConfig.maybeCreateTable()
}

def maybeCreateTableTest(
database: String,
schema: String,
Expand Down Expand Up @@ -91,7 +135,19 @@ class JdbcSinkCreateTableTest extends UnitSpec {
pgContainer.stop()
}

// ignoring this test now since it relies on manually setting up a local postgres container
it should "maybeCreateTable in timescale" in {
timescaleDbContainer.start()
maybeCreateTableTestTimescale(
timescaleDbContainer.databaseName,
"public",
timescaleDbContainer.jdbcUrl,
timescaleDbContainer.username,
timescaleDbContainer.password
)
timescaleDbContainer.stop()
}

//ignoring this test now since it relies on manually setting up a local postgres container
ignore should "maybeCreateTable in postgres local" in {
maybeCreateTableTest(
"test",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.epiphanous.flinkrunner.model.sink

import com.dimafeng.testcontainers.PostgreSQLContainer
import org.testcontainers.utility.DockerImageName

object TimescaleDbContainer {
def apply(): PostgreSQLContainer = PostgreSQLContainer(
dockerImageNameOverride = DockerImageName
.parse("timescale/timescaledb")
.withTag("latest-pg14")
.asCompatibleSubstituteFor("postgres")
)
}

0 comments on commit 93e4d0c

Please sign in to comment.