Skip to content

Commit

Permalink
Merge 0d1c6d1 into ee350b5
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Nov 1, 2022
2 parents ee350b5 + 0d1c6d1 commit 79ea6ee
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 49 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package io.epiphanous.flinkrunner.model.sink

import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.model.SupportedDatabase.{Postgresql, Snowflake}
import io.epiphanous.flinkrunner.model.SupportedDatabase.{
Postgresql,
Snowflake
}
import io.epiphanous.flinkrunner.model._
import io.epiphanous.flinkrunner.model.sink.JdbcSinkConfig.{
DEFAULT_CONNECTION_TIMEOUT,
Expand All @@ -16,7 +19,11 @@ import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat
import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.StatementExecutorFactory
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcStatementBuilder}
import org.apache.flink.connector.jdbc.{
JdbcConnectionOptions,
JdbcExecutionOptions,
JdbcStatementBuilder
}
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.DataStream

Expand Down Expand Up @@ -57,12 +64,16 @@ import scala.util.{Failure, Success, Try}
* - `table`: required object defining the structure of the database
* table data is inserted into
* - `name`: name of the table (required)
* - `timescale`: optional object defining timescale specific parameters
* - `time.column`: name of the time partitioning column ( required )
* - `timescale`: optional object defining timescale specific
* parameters
* - `time.column`: name of the time partitioning column ( required
* )
* - `chunk.time.interval`: interval in which chunks are aggregated
* ( optional ) default 7 days
* - `partitioning.column`: name of second partitioning column ( optional )
* - `number.partitions`: > 0 ( required if partitioning.column is set )
* - `partitioning.column`: name of second partitioning column (
* optional )
* - `number.partitions`: > 0 ( required if partitioning.column is
* set )
* - `recreate.objects.if.same`: optional boolean (defaults to false)
* that, if true, will drop and recreate objects (tables or indexes)
* that exist in the database even if they are the same as their
Expand Down Expand Up @@ -188,7 +199,7 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](
.getObjectOption(pfx("table.timescale"))
.nonEmpty

val timescaleTimeColumn: Option[String] = config
val timescaleTimeColumn: Option[String] = config
.getStringOpt(pfx("table.timescale.time.column"))

val timescaleChunkTimeInterval: String = config
Expand Down Expand Up @@ -282,11 +293,13 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](
sqlBuilder.append(")")
product match {
case SupportedDatabase.Postgresql =>
sqlBuilder
.append("\nON CONFLICT ON CONSTRAINT ")
.identifier(pkIndex)
.append(" DO UPDATE SET\n")
buildColumnList(nonPkCols, Some("=EXCLUDED."))
if (!isTimescale) {
sqlBuilder
.append("\nON CONFLICT ON CONSTRAINT ")
.identifier(pkIndex)
.append(" DO UPDATE SET\n")
buildColumnList(nonPkCols, Some("=EXCLUDED."))
}

case SupportedDatabase.Mysql =>
sqlBuilder.append("\nON DUPLICATE KEY UPDATE\n")
Expand Down Expand Up @@ -488,28 +501,34 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](
}
}

/**
* If the sink is a PostgresDB with Timescale extension
* creates hypertable, with a partitioning column.
/** If the sink is a PostgresDB with Timescale extension creates
* hypertable, with a partitioning column.
*/
if (product == Postgresql && isTimescale) {
val createHypertableDml: String = {
if (timescaleTimeColumn.isEmpty) {
throw new RuntimeException(s"timescale.time.column must be present in timescale config block")
throw new RuntimeException(
s"timescale.time.column must be present in timescale config block"
)
}

sqlBuilder
.append(s"SELECT create_hypertable('$table', '${timescaleTimeColumn.get}'")
.append(s", chunk_time_interval => INTERVAL '$timescaleChunkTimeInterval'")
.append(
s"SELECT create_hypertable('$table', '${timescaleTimeColumn.get}'"
)
.append(
s", chunk_time_interval => INTERVAL '$timescaleChunkTimeInterval'"
)

if (timescalePartitioningColumn.isDefined) {
sqlBuilder
.append(s", partitioning_column => '${timescalePartitioningColumn.get}'")
.append(
s", partitioning_column => '${timescalePartitioningColumn.get}'"
)
.append(s", number_partitions => $timescaleNumberPartitions")
}

sqlBuilder.append(");")
.getSqlAndClear
sqlBuilder.append(");").getSqlAndClear
}

logger.info(
Expand Down Expand Up @@ -613,7 +632,7 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](
}

object JdbcSinkConfig {
final val DEFAULT_CONNECTION_TIMEOUT = 5
final val DEFAULT_CONNECTION_TIMEOUT = 5
final val DEFAULT_TIMESCALE_CHUNK_TIME_INTERVAL = "7 days"
final val DEFAULT_TIMESCALE_NUMBER_PARTITIONS = 4
final val DEFAULT_TIMESCALE_NUMBER_PARTITIONS = 4
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import io.epiphanous.flinkrunner.serde.{
JsonKafkaRecordSerializationSchema
}
import io.epiphanous.flinkrunner.util.ConfigToProps
import io.epiphanous.flinkrunner.util.ConfigToProps.getFromEither
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.connector.base.DeliveryGuarantee
Expand Down Expand Up @@ -47,9 +48,13 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](
val bootstrapServers: String =
properties.getProperty("bootstrap.servers")

val topic: String = config.getString(pfx("topic"))
val isKeyed: Boolean =
config.getBooleanOpt(pfx("is.keyed")).getOrElse(true)
val topic: String = config.getString(pfx("topic"))

val isKeyed: Boolean = getFromEither(
pfx(),
Seq("keyed", "is.keyed"),
config.getBooleanOpt
).getOrElse(false)

def deliveryGuarantee: DeliveryGuarantee = config
.getStringOpt(pfx("delivery.guarantee"))
Expand All @@ -63,11 +68,40 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](

/** ensure transaction.timeout.ms is set */
val transactionTimeoutMs: Long = {
val t = properties.getProperty("transaction.timeout.ms", "60000")
properties.setProperty("transaction.timeout.ms", t)
t.toLong
val tms = getFromEither(
pfx(),
Seq("transaction.timeout.ms", "tx.timeout.ms"),
config.getLongOpt
)
val td = getFromEither(
pfx(),
Seq("transaction.timeout", "tx.timeout"),
config.getDurationOpt
)
val t = tms.getOrElse(td.getOrElse(Duration.ofHours(2)).toMillis)
properties.setProperty("transaction.timeout.ms", t.toString)
t
}

val transactionalIdPrefix: String =
getFromEither(
pfx(),
Seq(
"transactional.id.prefix",
"transactional.prefix",
"transactional.id",
"transaction.id.prefix",
"transaction.prefix",
"transaction.id",
"tx.id.prefix",
"tx.prefix",
"tx.id"
),
config.getStringOpt
).getOrElse(
s"${config.jobName}.$name.tx.id"
)

val schemaRegistryConfig: SchemaRegistryConfig = SchemaRegistryConfig(
isDeserializing = false,
config
Expand Down Expand Up @@ -117,7 +151,7 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](
.builder()
.setBootstrapServers(bootstrapServers)
.setDeliverGuarantee(deliveryGuarantee)
.setTransactionalIdPrefix(name)
.setTransactionalIdPrefix(transactionalIdPrefix)
.setKafkaProducerConfig(properties)
.setRecordSerializer(serializer)
.build()
Expand Down
37 changes: 32 additions & 5 deletions src/main/scala/io/epiphanous/flinkrunner/util/AvroUtils.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.epiphanous.flinkrunner.util

import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.model.{
EmbeddedAvroRecord,
EmbeddedAvroRecordInfo,
Expand All @@ -11,15 +12,21 @@ import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificRecordBase

import scala.collection.JavaConverters._
import scala.util.Try

object AvroUtils {
object AvroUtils extends LazyLogging {

def isGeneric[A <: GenericRecord](typeClass: Class[A]): Boolean =
!isSpecific(typeClass)

def isSpecific[A <: GenericRecord](typeClass: Class[A]): Boolean =
classOf[SpecificRecordBase].isAssignableFrom(typeClass)

def isGenericInstance[A <: GenericRecord](instance: A): Boolean =
!isSpecificInstance(instance)
def isSpecificInstance[A <: GenericRecord](instance: A): Boolean =
isSpecific(instance.getClass)

def instanceOf[A <: GenericRecord](typeClass: Class[A]): A =
typeClass.getConstructor().newInstance()

Expand Down Expand Up @@ -79,7 +86,7 @@ object AvroUtils {
keyOpt: Option[String] = None,
headers: Map[String, String] = Map.empty)(implicit
fromKV: EmbeddedAvroRecordInfo[A] => E): E =
if (isGeneric(typeClass))
if (isGeneric(typeClass) || isSpecificInstance(genericRecord))
fromKV(
EmbeddedAvroRecordInfo(
genericRecord.asInstanceOf[A],
Expand All @@ -101,9 +108,29 @@ object AvroUtils {
implicit class GenericToSpecific(genericRecord: GenericRecord) {
def toSpecific[A <: GenericRecord](instance: A): A = {
genericRecord.getSchema.getFields.asScala
.map(_.name())
.foldLeft(instance) { (a, f) =>
a.put(f, genericRecord.get(f))
.foldLeft(instance) { (a, field) =>
val f = field.name()
genericRecord.get(f) match {
case rec: GenericRecord =>
val fieldClassName = rec.getSchema.getFullName
Try(Class.forName(fieldClassName)).fold(
error =>
logger.error(
s"can't convert embedded generic record to a $fieldClassName",
error
),
klass => {
if (isGenericInstance(rec)) {
val k = klass
.getDeclaredConstructor()
.newInstance()
.asInstanceOf[GenericRecord]
a.put(f, rec.toSpecific(k))
} else a.put(f, rec)
}
)
case v => a.put(f, v)
}
a
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object WordCountMain extends LazyLogging {
// config
// val host = "localhost"
// val port = 9999
val windowDuration = 5 // seconds
val windowDuration = 5L // seconds

// source: run `nc -l 9999` in a terminal
// val text = env.socketTextStream(host, port, '\n')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ class JdbcSinkCreateTableTest extends UnitSpec {
mssqlContainer.container.acceptLicense()

def maybeCreateTableTestTimescale(
database: String,
schema: String,
jdbcUrl: String,
username: String,
password: String) = {
database: String,
schema: String,
jdbcUrl: String,
username: String,
password: String) = {
val runner = getRunner[MyAvroADT](
Array.empty[String],
Some(s"""
Expand Down Expand Up @@ -161,7 +161,7 @@ class JdbcSinkCreateTableTest extends UnitSpec {
timescaleDbContainer.stop()
}

//ignoring this test now since it relies on manually setting up a local postgres container
// ignoring this test now since it relies on manually setting up a local postgres container
ignore should "maybeCreateTable in postgres local" in {
maybeCreateTableTest(
"test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ trait SerdeTestFixtures extends PropSpec {
E <: MyAvroADT with EmbeddedAvroRecord[A]: TypeInformation,
A <: GenericRecord: TypeInformation] = {
val ss = {
val avroClass = implicitly[TypeInformation[A]].getTypeClass
new ConfluentAvroRegistryKafkaRecordSerializationSchema[
E,
A,
Expand Down

0 comments on commit 79ea6ee

Please sign in to comment.