Skip to content

Commit

Permalink
Merge 171c6ba into efaef71
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Oct 27, 2022
2 parents efaef71 + 171c6ba commit ae0683d
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import io.epiphanous.flinkrunner.serde.{
JsonKafkaRecordSerializationSchema
}
import io.epiphanous.flinkrunner.util.ConfigToProps
import io.epiphanous.flinkrunner.util.ConfigToProps.getFromEither
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.connector.base.DeliveryGuarantee
Expand All @@ -19,6 +20,7 @@ import org.apache.flink.streaming.api.scala.DataStream

import java.time.Duration
import java.util.Properties
import scala.util.Random

/** Kafka sink config.
*
Expand Down Expand Up @@ -47,9 +49,13 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](
val bootstrapServers: String =
properties.getProperty("bootstrap.servers")

val topic: String = config.getString(pfx("topic"))
val isKeyed: Boolean =
config.getBooleanOpt(pfx("is.keyed")).getOrElse(true)
val topic: String = config.getString(pfx("topic"))

val isKeyed: Boolean = getFromEither(
pfx(),
Seq("keyed", "is.keyed"),
config.getBooleanOpt
).getOrElse(false)

def deliveryGuarantee: DeliveryGuarantee = config
.getStringOpt(pfx("delivery.guarantee"))
Expand All @@ -63,11 +69,40 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](

/** ensure transaction.timeout.ms is set */
val transactionTimeoutMs: Long = {
val t = properties.getProperty("transaction.timeout.ms", "60000")
properties.setProperty("transaction.timeout.ms", t)
t.toLong
val tms = getFromEither(
pfx(),
Seq("transaction.timeout.ms", "tx.timeout.ms"),
config.getLongOpt
)
val td = getFromEither(
pfx(),
Seq("transaction.timeout", "tx.timeout"),
config.getDurationOpt
)
val t = tms.getOrElse(td.getOrElse(Duration.ofHours(2)).toMillis)
properties.setProperty("transaction.timeout.ms", t.toString)
t
}

val transactionalIdPrefix: String =
getFromEither(
pfx(),
Seq(
"transactional.id.prefix",
"transactional.prefix",
"transactional.id",
"transaction.id.prefix",
"transaction.prefix",
"transaction.id",
"tx.id.prefix",
"tx.prefix",
"tx.id"
),
config.getStringOpt
).getOrElse(
s"${config.jobName}.$name.tx.id"
)

val schemaRegistryConfig: SchemaRegistryConfig = SchemaRegistryConfig(
isDeserializing = false,
config
Expand Down Expand Up @@ -117,7 +152,7 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](
.builder()
.setBootstrapServers(bootstrapServers)
.setDeliverGuarantee(deliveryGuarantee)
.setTransactionalIdPrefix(name)
.setTransactionalIdPrefix(transactionalIdPrefix)
.setKafkaProducerConfig(properties)
.setRecordSerializer(serializer)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ object AvroUtils {
def isSpecific[A <: GenericRecord](typeClass: Class[A]): Boolean =
classOf[SpecificRecordBase].isAssignableFrom(typeClass)

def isGenericInstance[A <: GenericRecord](instance: A): Boolean =
!isSpecificInstance(instance)
def isSpecificInstance[A <: GenericRecord](instance: A): Boolean =
isSpecific(instance.getClass)

def instanceOf[A <: GenericRecord](typeClass: Class[A]): A =
typeClass.getConstructor().newInstance()

Expand Down Expand Up @@ -79,7 +84,7 @@ object AvroUtils {
keyOpt: Option[String] = None,
headers: Map[String, String] = Map.empty)(implicit
fromKV: EmbeddedAvroRecordInfo[A] => E): E =
if (isGeneric(typeClass))
if (isGeneric(typeClass) || isSpecificInstance(genericRecord))
fromKV(
EmbeddedAvroRecordInfo(
genericRecord.asInstanceOf[A],
Expand Down
28 changes: 28 additions & 0 deletions src/test/scala/io/epiphanous/flinkrunner/util/AvroUtilsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.epiphanous.flinkrunner.model.{
MyAvroADT
}
import io.epiphanous.flinkrunner.util.AvroUtils.GenericToSpecific
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}

import java.time.Duration
Expand Down Expand Up @@ -62,4 +63,31 @@ class AvroUtilsTest extends PropSpec {
ea shouldEqual bw
}

property("isGenericInstance property") {
val b = AvroUtils.instanceOf(classOf[BRecord])
AvroUtils.isGenericInstance(b) shouldBe false
val schema = SchemaBuilder
.record("testrec")
.fields()
.requiredInt("int")
.endRecord()
val g = new GenericRecordBuilder(schema).set("int", 17).build()
AvroUtils.isGenericInstance(g) shouldBe true
}

property("isSpecificInstance property") {
val b = AvroUtils.instanceOf(classOf[BRecord])
AvroUtils.isSpecificInstance(b) shouldBe true
def check(g: GenericRecord) =
AvroUtils.isSpecificInstance(g)
check(b) shouldBe true
val schema = SchemaBuilder
.record("testrec")
.fields()
.requiredInt("int")
.endRecord()
val g = new GenericRecordBuilder(schema).set("int", 17).build()
check(g) shouldBe false
}

}

0 comments on commit ae0683d

Please sign in to comment.