Skip to content

Commit

Permalink
Merge 0ea08f8 into ee350b5
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Nov 1, 2022
2 parents ee350b5 + 0ea08f8 commit 79400ff
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package io.epiphanous.flinkrunner.model

import com.typesafe.config.{
Config,
ConfigFactory,
ConfigObject,
ConfigOriginFactory
}
import com.typesafe.config.{Config, ConfigFactory, ConfigObject, ConfigOriginFactory}
import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.util.ConfigToProps.RichConfigObject
import io.epiphanous.flinkrunner.util.FileUtils.getResourceOrFile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@ package io.epiphanous.flinkrunner.model.sink
import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.model.SupportedDatabase.{Postgresql, Snowflake}
import io.epiphanous.flinkrunner.model._
import io.epiphanous.flinkrunner.model.sink.JdbcSinkConfig.{
DEFAULT_CONNECTION_TIMEOUT,
DEFAULT_TIMESCALE_CHUNK_TIME_INTERVAL,
DEFAULT_TIMESCALE_NUMBER_PARTITIONS
}
import io.epiphanous.flinkrunner.model.sink.JdbcSinkConfig.{DEFAULT_CONNECTION_TIMEOUT, DEFAULT_TIMESCALE_CHUNK_TIME_INTERVAL, DEFAULT_TIMESCALE_NUMBER_PARTITIONS}
import io.epiphanous.flinkrunner.operator.CreateTableJdbcSinkFunction
import io.epiphanous.flinkrunner.util.SqlBuilder
import org.apache.flink.api.common.functions.RuntimeContext
Expand Down Expand Up @@ -282,11 +278,13 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](
sqlBuilder.append(")")
product match {
case SupportedDatabase.Postgresql =>
sqlBuilder
.append("\nON CONFLICT ON CONSTRAINT ")
.identifier(pkIndex)
.append(" DO UPDATE SET\n")
buildColumnList(nonPkCols, Some("=EXCLUDED."))
if (!isTimescale) {
sqlBuilder
.append("\nON CONFLICT ON CONSTRAINT ")
.identifier(pkIndex)
.append(" DO UPDATE SET\n")
buildColumnList(nonPkCols, Some("=EXCLUDED."))
}

case SupportedDatabase.Mysql =>
sqlBuilder.append("\nON DUPLICATE KEY UPDATE\n")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import io.epiphanous.flinkrunner.serde.{
JsonKafkaRecordSerializationSchema
}
import io.epiphanous.flinkrunner.util.ConfigToProps
import io.epiphanous.flinkrunner.util.ConfigToProps.getFromEither
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.connector.base.DeliveryGuarantee
Expand All @@ -19,6 +20,7 @@ import org.apache.flink.streaming.api.scala.DataStream

import java.time.Duration
import java.util.Properties
import scala.util.Random

/** Kafka sink config.
*
Expand Down Expand Up @@ -47,9 +49,13 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](
val bootstrapServers: String =
properties.getProperty("bootstrap.servers")

val topic: String = config.getString(pfx("topic"))
val isKeyed: Boolean =
config.getBooleanOpt(pfx("is.keyed")).getOrElse(true)
val topic: String = config.getString(pfx("topic"))

val isKeyed: Boolean = getFromEither(
pfx(),
Seq("keyed", "is.keyed"),
config.getBooleanOpt
).getOrElse(false)

def deliveryGuarantee: DeliveryGuarantee = config
.getStringOpt(pfx("delivery.guarantee"))
Expand All @@ -63,11 +69,40 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](

/** ensure transaction.timeout.ms is set */
val transactionTimeoutMs: Long = {
val t = properties.getProperty("transaction.timeout.ms", "60000")
properties.setProperty("transaction.timeout.ms", t)
t.toLong
val tms = getFromEither(
pfx(),
Seq("transaction.timeout.ms", "tx.timeout.ms"),
config.getLongOpt
)
val td = getFromEither(
pfx(),
Seq("transaction.timeout", "tx.timeout"),
config.getDurationOpt
)
val t = tms.getOrElse(td.getOrElse(Duration.ofHours(2)).toMillis)
properties.setProperty("transaction.timeout.ms", t.toString)
t
}

val transactionalIdPrefix: String =
getFromEither(
pfx(),
Seq(
"transactional.id.prefix",
"transactional.prefix",
"transactional.id",
"transaction.id.prefix",
"transaction.prefix",
"transaction.id",
"tx.id.prefix",
"tx.prefix",
"tx.id"
),
config.getStringOpt
).getOrElse(
s"${config.jobName}.$name.tx.id"
)

val schemaRegistryConfig: SchemaRegistryConfig = SchemaRegistryConfig(
isDeserializing = false,
config
Expand Down Expand Up @@ -117,7 +152,7 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](
.builder()
.setBootstrapServers(bootstrapServers)
.setDeliverGuarantee(deliveryGuarantee)
.setTransactionalIdPrefix(name)
.setTransactionalIdPrefix(transactionalIdPrefix)
.setKafkaProducerConfig(properties)
.setRecordSerializer(serializer)
.build()
Expand Down
35 changes: 25 additions & 10 deletions src/main/scala/io/epiphanous/flinkrunner/util/AvroUtils.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
package io.epiphanous.flinkrunner.util

import io.epiphanous.flinkrunner.model.{
EmbeddedAvroRecord,
EmbeddedAvroRecordInfo,
FlinkConfig,
FlinkEvent
}
import io.epiphanous.flinkrunner.model.{EmbeddedAvroRecord, EmbeddedAvroRecordInfo, FlinkConfig, FlinkEvent}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificRecordBase

import scala.collection.JavaConverters._
import scala.util.Try

object AvroUtils {

Expand All @@ -20,6 +16,11 @@ object AvroUtils {
def isSpecific[A <: GenericRecord](typeClass: Class[A]): Boolean =
classOf[SpecificRecordBase].isAssignableFrom(typeClass)

def isGenericInstance[A <: GenericRecord](instance: A): Boolean =
!isSpecificInstance(instance)
def isSpecificInstance[A <: GenericRecord](instance: A): Boolean =
isSpecific(instance.getClass)

def instanceOf[A <: GenericRecord](typeClass: Class[A]): A =
typeClass.getConstructor().newInstance()

Expand Down Expand Up @@ -79,7 +80,7 @@ object AvroUtils {
keyOpt: Option[String] = None,
headers: Map[String, String] = Map.empty)(implicit
fromKV: EmbeddedAvroRecordInfo[A] => E): E =
if (isGeneric(typeClass))
if (isGeneric(typeClass) || isSpecificInstance(genericRecord))
fromKV(
EmbeddedAvroRecordInfo(
genericRecord.asInstanceOf[A],
Expand All @@ -101,9 +102,23 @@ object AvroUtils {
implicit class GenericToSpecific(genericRecord: GenericRecord) {
def toSpecific[A <: GenericRecord](instance: A): A = {
genericRecord.getSchema.getFields.asScala
.map(_.name())
.foldLeft(instance) { (a, f) =>
a.put(f, genericRecord.get(f))
.foldLeft(instance) { (a, field) =>
val f = field.name()
genericRecord.get(f) match {
case rec: GenericRecord =>
if (isGenericInstance(rec)) {
Try(
Class
.forName(s"${rec.getSchema.getFullName}")
.getDeclaredConstructor()
.newInstance()
.asInstanceOf[GenericRecord]
).foreach(gi => a.put(f, rec.toSpecific(gi)))
} else {
a.put(f, rec)
}
case v => a.put(f, v)
}
a
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ class ConfluentAvroRegistryKafkaRecordDeserializationSchemaTest
collected.head shouldEqual aWrapper
}



}
62 changes: 56 additions & 6 deletions src/test/scala/io/epiphanous/flinkrunner/util/AvroUtilsTest.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package io.epiphanous.flinkrunner.util

import io.epiphanous.flinkrunner.PropSpec
import io.epiphanous.flinkrunner.model.{
BRecord,
BWrapper,
FlinkConfig,
MyAvroADT
}
import io.epiphanous.flinkrunner.model.{BRecord, BWrapper, CRecord, FlinkConfig, MyAvroADT}
import io.epiphanous.flinkrunner.util.AvroUtils.GenericToSpecific
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}

import java.time.Duration
Expand All @@ -22,6 +18,15 @@ class AvroUtilsTest extends PropSpec {
.set("b3", spec.b3.toEpochMilli)
.build()

def fromSpecCRecord(spec: CRecord): GenericRecord =
new GenericRecordBuilder(CRecord.SCHEMA$)
.set("id", spec.id)
.set("cOptInt", spec.cOptInt.getOrElse(null))
.set("cOptDouble", spec.cOptDouble.getOrElse(null))
.set("bRecord", spec.bRecord.getOrElse(null))
.set("ts", spec.ts.toEpochMilli)
.build()

property("isGeneric property") {
AvroUtils.isGeneric(classOf[GenericRecord]) shouldEqual true
AvroUtils.isGeneric(classOf[BRecord]) shouldEqual false
Expand All @@ -40,6 +45,24 @@ class AvroUtilsTest extends PropSpec {
}
}

property("GenericToSpecific embedded Avro") {
forAll { c: CRecord =>
val v = fromSpecCRecord(c)
val s = v.toSpecific(new CRecord())
print(s)
s shouldEqual c
}
}

property("instanceOf property embedded avro") {
val x = AvroUtils.instanceOf(classOf[CRecord])
val y = new CRecord()
x.id shouldEqual y.id
x.cOptInt shouldEqual y.cOptInt
x.cOptDouble shouldEqual y.cOptDouble
x.bRecord shouldEqual y.bRecord
}

property("instanceOf property") {
val x = AvroUtils.instanceOf(classOf[BRecord])
val y = new BRecord()
Expand All @@ -62,4 +85,31 @@ class AvroUtilsTest extends PropSpec {
ea shouldEqual bw
}

property("isGenericInstance property") {
val b = AvroUtils.instanceOf(classOf[BRecord])
AvroUtils.isGenericInstance(b) shouldBe false
val schema = SchemaBuilder
.record("testrec")
.fields()
.requiredInt("int")
.endRecord()
val g = new GenericRecordBuilder(schema).set("int", 17).build()
AvroUtils.isGenericInstance(g) shouldBe true
}

property("isSpecificInstance property") {
val b = AvroUtils.instanceOf(classOf[BRecord])
AvroUtils.isSpecificInstance(b) shouldBe true
def check(g: GenericRecord) =
AvroUtils.isSpecificInstance(g)
check(b) shouldBe true
val schema = SchemaBuilder
.record("testrec")
.fields()
.requiredInt("int")
.endRecord()
val g = new GenericRecordBuilder(schema).set("int", 17).build()
check(g) shouldBe false
}

}

0 comments on commit 79400ff

Please sign in to comment.