Skip to content

Commit

Permalink
Merge 0263d72 into bd2e792
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Oct 31, 2022
2 parents bd2e792 + 0263d72 commit 70c374a
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package io.epiphanous.flinkrunner.model

import com.typesafe.config.{
Config,
ConfigFactory,
ConfigObject,
ConfigOriginFactory
}
import com.typesafe.config.{Config, ConfigFactory, ConfigObject, ConfigOriginFactory}
import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.util.ConfigToProps.RichConfigObject
import io.epiphanous.flinkrunner.util.FileUtils.getResourceOrFile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import io.epiphanous.flinkrunner.serde.{
JsonKafkaRecordSerializationSchema
}
import io.epiphanous.flinkrunner.util.ConfigToProps
import io.epiphanous.flinkrunner.util.ConfigToProps.getFromEither
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.connector.base.DeliveryGuarantee
Expand All @@ -19,6 +20,7 @@ import org.apache.flink.streaming.api.scala.DataStream

import java.time.Duration
import java.util.Properties
import scala.util.Random

/** Kafka sink config.
*
Expand Down Expand Up @@ -47,9 +49,13 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](
val bootstrapServers: String =
properties.getProperty("bootstrap.servers")

val topic: String = config.getString(pfx("topic"))
val isKeyed: Boolean =
config.getBooleanOpt(pfx("is.keyed")).getOrElse(true)
val topic: String = config.getString(pfx("topic"))

val isKeyed: Boolean = getFromEither(
pfx(),
Seq("keyed", "is.keyed"),
config.getBooleanOpt
).getOrElse(false)

def deliveryGuarantee: DeliveryGuarantee = config
.getStringOpt(pfx("delivery.guarantee"))
Expand All @@ -63,11 +69,40 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](

/** ensure transaction.timeout.ms is set */
val transactionTimeoutMs: Long = {
val t = properties.getProperty("transaction.timeout.ms", "60000")
properties.setProperty("transaction.timeout.ms", t)
t.toLong
val tms = getFromEither(
pfx(),
Seq("transaction.timeout.ms", "tx.timeout.ms"),
config.getLongOpt
)
val td = getFromEither(
pfx(),
Seq("transaction.timeout", "tx.timeout"),
config.getDurationOpt
)
val t = tms.getOrElse(td.getOrElse(Duration.ofHours(2)).toMillis)
properties.setProperty("transaction.timeout.ms", t.toString)
t
}

val transactionalIdPrefix: String =
getFromEither(
pfx(),
Seq(
"transactional.id.prefix",
"transactional.prefix",
"transactional.id",
"transaction.id.prefix",
"transaction.prefix",
"transaction.id",
"tx.id.prefix",
"tx.prefix",
"tx.id"
),
config.getStringOpt
).getOrElse(
s"${config.jobName}.$name.tx.id"
)

val schemaRegistryConfig: SchemaRegistryConfig = SchemaRegistryConfig(
isDeserializing = false,
config
Expand Down Expand Up @@ -117,7 +152,7 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](
.builder()
.setBootstrapServers(bootstrapServers)
.setDeliverGuarantee(deliveryGuarantee)
.setTransactionalIdPrefix(name)
.setTransactionalIdPrefix(transactionalIdPrefix)
.setKafkaProducerConfig(properties)
.setRecordSerializer(serializer)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ object AvroUtils {
def isSpecific[A <: GenericRecord](typeClass: Class[A]): Boolean =
classOf[SpecificRecordBase].isAssignableFrom(typeClass)

def isGenericInstance[A <: GenericRecord](instance: A): Boolean =
!isSpecificInstance(instance)
def isSpecificInstance[A <: GenericRecord](instance: A): Boolean =
isSpecific(instance.getClass)

def instanceOf[A <: GenericRecord](typeClass: Class[A]): A =
typeClass.getConstructor().newInstance()

Expand Down Expand Up @@ -79,7 +84,7 @@ object AvroUtils {
keyOpt: Option[String] = None,
headers: Map[String, String] = Map.empty)(implicit
fromKV: EmbeddedAvroRecordInfo[A] => E): E =
if (isGeneric(typeClass))
if (isGeneric(typeClass) || isSpecificInstance(genericRecord))
fromKV(
EmbeddedAvroRecordInfo(
genericRecord.asInstanceOf[A],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ class ConfluentAvroRegistryKafkaRecordDeserializationSchemaTest
collected.head shouldEqual aWrapper
}



}
62 changes: 56 additions & 6 deletions src/test/scala/io/epiphanous/flinkrunner/util/AvroUtilsTest.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package io.epiphanous.flinkrunner.util

import io.epiphanous.flinkrunner.PropSpec
import io.epiphanous.flinkrunner.model.{
BRecord,
BWrapper,
FlinkConfig,
MyAvroADT
}
import io.epiphanous.flinkrunner.model.{BRecord, BWrapper, CRecord, FlinkConfig, MyAvroADT}
import io.epiphanous.flinkrunner.util.AvroUtils.GenericToSpecific
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}

import java.time.Duration
Expand All @@ -22,6 +18,15 @@ class AvroUtilsTest extends PropSpec {
.set("b3", spec.b3.toEpochMilli)
.build()

def fromSpecCRecord(spec: CRecord): GenericRecord =
new GenericRecordBuilder(CRecord.SCHEMA$)
.set("id", spec.id)
.set("cOptInt", spec.cOptInt.getOrElse(null))
.set("cOptDouble", spec.cOptDouble.getOrElse(null))
.set("bRecord", spec.bRecord.getOrElse(null))
.set("ts", spec.ts.toEpochMilli)
.build()

property("isGeneric property") {
AvroUtils.isGeneric(classOf[GenericRecord]) shouldEqual true
AvroUtils.isGeneric(classOf[BRecord]) shouldEqual false
Expand All @@ -40,6 +45,24 @@ class AvroUtilsTest extends PropSpec {
}
}

property("GenericToSpecific embedded Avro") {
forAll { c: CRecord =>
val v = fromSpecCRecord(c)
val s = v.toSpecific(new CRecord())
print(s)
s shouldEqual c
}
}

property("instanceOf property embedded avro") {
val x = AvroUtils.instanceOf(classOf[CRecord])
val y = new CRecord()
x.id shouldEqual y.id
x.cOptInt shouldEqual y.cOptInt
x.cOptDouble shouldEqual y.cOptDouble
x.bRecord shouldEqual y.bRecord
}

property("instanceOf property") {
val x = AvroUtils.instanceOf(classOf[BRecord])
val y = new BRecord()
Expand All @@ -62,4 +85,31 @@ class AvroUtilsTest extends PropSpec {
ea shouldEqual bw
}

property("isGenericInstance property") {
val b = AvroUtils.instanceOf(classOf[BRecord])
AvroUtils.isGenericInstance(b) shouldBe false
val schema = SchemaBuilder
.record("testrec")
.fields()
.requiredInt("int")
.endRecord()
val g = new GenericRecordBuilder(schema).set("int", 17).build()
AvroUtils.isGenericInstance(g) shouldBe true
}

property("isSpecificInstance property") {
val b = AvroUtils.instanceOf(classOf[BRecord])
AvroUtils.isSpecificInstance(b) shouldBe true
def check(g: GenericRecord) =
AvroUtils.isSpecificInstance(g)
check(b) shouldBe true
val schema = SchemaBuilder
.record("testrec")
.fields()
.requiredInt("int")
.endRecord()
val g = new GenericRecordBuilder(schema).set("int", 17).build()
check(g) shouldBe false
}

}

0 comments on commit 70c374a

Please sign in to comment.