Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Apr 23, 2023
1 parent 6600e5c commit 2342fc4
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package io.epiphanous.flinkrunner.serde

import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import com.typesafe.scalalogging.LazyLogging
import io.confluent.kafka.schemaregistry.client.{
CachedSchemaRegistryClient,
SchemaRegistryClient
}
import io.confluent.kafka.serializers.KafkaAvroSerializer
import io.epiphanous.flinkrunner.model.sink.KafkaSinkConfig
import io.epiphanous.flinkrunner.model.{EmbeddedAvroRecord, FlinkEvent}
import io.epiphanous.flinkrunner.util.SinkDestinationNameUtils.RichSinkDestinationName
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.header.internals.RecordHeaders

Expand All @@ -26,43 +28,27 @@ case class ConfluentAvroRegistryKafkaRecordSerializationSchema[
A <: GenericRecord,
ADT <: FlinkEvent
](
sinkConfig: KafkaSinkConfig[ADT]
sinkConfig: KafkaSinkConfig[ADT],
schemaRegistryClientOpt: Option[SchemaRegistryClient] = None
) extends KafkaRecordSerializationSchema[E]
with LazyLogging {

@transient lazy val serializerCacheLoader
: CacheLoader[Schema, ConfluentRegistryAvroSerializationSchema[
GenericRecord
]] =
new CacheLoader[Schema, ConfluentRegistryAvroSerializationSchema[
GenericRecord
]] {
override def load(schema: Schema)
: ConfluentRegistryAvroSerializationSchema[GenericRecord] =
ConfluentRegistryAvroSerializationSchema
.forGeneric(
s"${schema.getFullName}-value",
schema,
sinkConfig.schemaRegistryConfig.url,
sinkConfig.schemaRegistryConfig.props
)

}
@transient
lazy val schemaRegistryClient: SchemaRegistryClient =
schemaRegistryClientOpt.getOrElse(
new CachedSchemaRegistryClient(
sinkConfig.schemaRegistryConfig.url,
sinkConfig.schemaRegistryConfig.cacheCapacity,
sinkConfig.schemaRegistryConfig.props,
sinkConfig.schemaRegistryConfig.headers
)
)

@transient lazy val serializerCache
: LoadingCache[Schema, ConfluentRegistryAvroSerializationSchema[
GenericRecord
]] = {
val cacheBuilder = CacheBuilder
.newBuilder()
.concurrencyLevel(sinkConfig.cacheConcurrencyLevel)
.maximumSize(sinkConfig.cacheMaxSize)
.expireAfterWrite(sinkConfig.cacheExpireAfter)
if (sinkConfig.cacheRecordStats) cacheBuilder.recordStats()
cacheBuilder.build[Schema, ConfluentRegistryAvroSerializationSchema[
GenericRecord
]](serializerCacheLoader)
}
@transient
lazy val serializer: KafkaAvroSerializer = new KafkaAvroSerializer(
schemaRegistryClient,
sinkConfig.schemaRegistryConfig.props
)

override def serialize(
element: E,
Expand All @@ -80,14 +66,14 @@ case class ConfluentAvroRegistryKafkaRecordSerializationSchema[

val topic = sinkConfig.expandTemplate(info.record)

val key = info.keyOpt.map(_.getBytes(StandardCharsets.UTF_8))
val key = info.keyOpt.map(k => serializer.serialize(topic, k))

logger.trace(
s"serializing ${info.record.getSchema.getFullName} record ${element.$id} to $topic ${if (sinkConfig.isKeyed) "with key"
s"serializing ${info.record.getSchema.getFullName} record ${element.$id} to topic <$topic> ${if (key.nonEmpty) "with key"
else "without key"}, headers=${info.headers}"
)

val value =
serializerCache.get(info.record.getSchema).serialize(info.record)
val value = serializer.serialize(topic, info.record)

new ProducerRecord(
topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import scala.collection.mutable
class ConfluentAvroRegistryKafkaRecordDeserializationSchemaTest
extends SerdeTestFixtures {

// ignore until set up testcontainers schema registry
property("deserialize works for bwrapper") {
val serde = getDeserializerFor[BWrapper, BRecord]
val collected = mutable.ArrayBuffer.empty[BWrapper]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ class ConfluentAvroRegistryKafkaRecordSerializationSchemaTest
}

// ignore this until we set up testcontainers schema registry testing
ignore("serialize a MyAvroADT instance to a producer record") {
property("serialize a MyAvroADT instance to a producer record") {
val serializer = getSerializerFor[BWrapper, BRecord]
val serialized = serializer.serialize(
bWrapper,
null,
Instant.now().toEpochMilli
)
// showBytes("serialized key:", serialized.key())
// showBytes("serialized value:", serialized.value())
serialized.key() shouldEqual bKeyBytes
serialized.value() shouldEqual bValueBytes
serialized.timestamp() shouldEqual bWrapper.$timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ trait SerdeTestFixtures extends PropSpec {
A,
MyAvroADT
](
kafkaSinkConfig
kafkaSinkConfig,
Some(schemaRegistryClient)
)
}
ss.open(null, null)
Expand Down

0 comments on commit 2342fc4

Please sign in to comment.