Skip to content

Commit

Permalink
fix for support of top level arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Jan 30, 2023
1 parent 4441120 commit 414c9b0
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 6 deletions.
@@ -1,6 +1,7 @@
package pl.touk.nussknacker.engine.schemedkafka.sink.flink

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.serializers.NonRecordContainer
import org.apache.avro.{AvroRuntimeException, Schema}
import org.scalatest.BeforeAndAfter
import pl.touk.nussknacker.engine.api.validation.ValidationMode
Expand All @@ -17,6 +18,8 @@ import pl.touk.nussknacker.engine.schemedkafka.{AvroUtils, KafkaAvroTestProcessC
import pl.touk.nussknacker.engine.spel.Implicits.asSpelExpression
import pl.touk.nussknacker.engine.testing.LocalModelData

import scala.jdk.CollectionConverters._

private object SinkValueEditorWithAvroPayloadIntegrationTest {

val avroEncoder = BestEffortAvroEncoder(ValidationMode.strict)
Expand Down Expand Up @@ -129,9 +132,7 @@ class SinkValueEditorWithAvroPayloadIntegrationTest extends KafkaAvroSpecMixin w
val sourceParam = SourceAvroParam.forUniversal(topicConfig, ExistingSchemaVersion(1))
val sinkParam = UniversalSinkParam(topicConfig, ExistingSchemaVersion(1), "{42L}")
val process = createAvroProcess(sourceParam, sinkParam)
val thrown = intercept[IllegalArgumentException] {
runAndVerifyResult(process, topicConfig, event = null, expected = null)
}
thrown.getMessage shouldBe "Compilation errors: CustomNodeError(end,Unsupported Avro type. Top level Arrays are not supported,None)"
val encoded = encode(new NonRecordContainer(topicSchemas("array"), List(42L).asJava), topicSchemas("array"))
runAndVerifyResult(process, topicConfig, event = encoded, expected = List(42L).asJava)
}
}
Expand Up @@ -4,6 +4,7 @@ import cats.data.NonEmptyList
import cats.data.Validated.Invalid
import com.typesafe.config.ConfigFactory
import io.confluent.kafka.schemaregistry.client.{SchemaRegistryClient => CSchemaRegistryClient}
import io.confluent.kafka.serializers.NonRecordContainer
import org.apache.avro.generic.{GenericData, GenericRecord}
import pl.touk.nussknacker.engine.api.component.SingleComponentConfig
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.{CustomNodeError, InvalidPropertyFixedValue}
Expand All @@ -23,6 +24,7 @@ import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{ExistingSchemaVer
import pl.touk.nussknacker.engine.spel.Implicits._
import pl.touk.nussknacker.engine.testing.LocalModelData

import scala.jdk.CollectionConverters._
import java.nio.charset.StandardCharsets
import java.time.{LocalDateTime, ZoneOffset}
import scala.collection.immutable.ListMap
Expand Down Expand Up @@ -124,6 +126,28 @@ class KafkaAvroPayloadSourceFactorySpec extends KafkaAvroSpecMixin with KafkaAvr
roundTripKeyValueObject(universalSourceFactory, useStringForKey = true, InvalidDefaultsTopic, ExistingSchemaVersion(1), null, givenObj)
}

test("should read array of primitives on top level") {
val topic = ArrayOfNumbersTopic
val arrayOfInts = List(123).asJava
val arrayOfLongs = List(123L).asJava
val wrappedObj = new NonRecordContainer(ArrayOfIntsSchema, arrayOfInts)
pushMessageWithKey(null, wrappedObj, topic, useStringForKey = true)

readLastMessageAndVerify(universalSourceFactory(useStringForKey = true), topic, ExistingSchemaVersion(2), null, arrayOfLongs)
}

test("should read array of records on top level") {
val topic = ArrayOfRecordsTopic
val recordV1 = FullNameV1.createRecord("Jan", "Kowalski")
val arrayOfRecordsV1 = List(recordV1).asJava
val recordV2 = FullNameV2.createRecord("Jan", null, "Kowalski")
val arrayOfRecordsV2 = List(recordV2).asJava
val wrappedObj = new NonRecordContainer(ArrayOfRecordsV1Schema, arrayOfRecordsV1)
pushMessageWithKey(null, wrappedObj, topic, useStringForKey = true)

readLastMessageAndVerify(universalSourceFactory(useStringForKey = true), topic, ExistingSchemaVersion(2), null, arrayOfRecordsV2)
}

test("should read last generated key-value object, simple type") {
val givenKey = 123
val givenValue = 456
Expand Down
Expand Up @@ -31,6 +31,8 @@ trait KafkaAvroSourceSpecMixin {
val RecordTopicWithKey: String = "testAvroRecordTopic1WithKey"
val IntTopicWithKey: String = "testAvroIntTopic1WithKey"
val IntTopicNoKey: String = "testAvroIntTopic1NoKey"
val ArrayOfNumbersTopic: String = "testArrayOfNumbersTopic"
val ArrayOfRecordsTopic: String = "testArrayOfRecordsTopic"
val InvalidDefaultsTopic: String = "testAvroInvalidDefaultsTopic1"
val PaymentDateTopic: String = "testPaymentDateTopic"
val GeneratedWithLogicalTypesTopic: String = "testGeneratedWithLogicalTypesTopic"
Expand All @@ -42,6 +44,21 @@ trait KafkaAvroSourceSpecMixin {
""".stripMargin
)

val ArrayOfIntsSchema: Schema = arraySchema("\"int\"")

val ArrayOfLongsSchema: Schema = arraySchema("\"long\"")

val ArrayOfRecordsV1Schema: Schema = arraySchema(FullNameV1.schema.toString)

val ArrayOfRecordsV2Schema: Schema = arraySchema(FullNameV2.schema.toString)

private def arraySchema(itemsType: String) = AvroUtils.parseSchema(
s"""{
| "type": "array",
| "items": $itemsType
|}
""".stripMargin)

val InvalidDefaultsSchema: Schema = AvroUtils.nonRestrictiveParseSchema(
"""{
| "type": "record",
Expand All @@ -68,6 +85,10 @@ trait KafkaAvroSourceSpecMixin {
.register(IntTopicWithKey, IntSchema, 1, isKey = false)
.register(IntTopicWithKey, IntSchema, 1, isKey = true)
.register(InvalidDefaultsTopic, InvalidDefaultsSchema, 1, isKey = false)
.register(ArrayOfNumbersTopic, ArrayOfIntsSchema, 1, isKey = false)
.register(ArrayOfNumbersTopic, ArrayOfLongsSchema, 2, isKey = false)
.register(ArrayOfRecordsTopic, ArrayOfRecordsV1Schema, 1, isKey = false)
.register(ArrayOfRecordsTopic, ArrayOfRecordsV2Schema, 2, isKey = false)
.register(PaymentDateTopic, PaymentDate.schema, 1, isKey = false)
.register(GeneratedWithLogicalTypesTopic, GeneratedAvroClassWithLogicalTypes.getClassSchema, 1, isKey = false)
.build
Expand Down
Expand Up @@ -3,6 +3,7 @@ package pl.touk.nussknacker.engine.schemedkafka.encode
import cats.data.Validated.{Invalid, Valid}
import cats.data.{NonEmptyList, ValidatedNel}
import cats.implicits._
import io.confluent.kafka.serializers.NonRecordContainer
import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
import org.apache.avro.generic.{GenericContainer, GenericData}
import org.apache.avro.util.Utf8
Expand Down Expand Up @@ -37,6 +38,10 @@ class BestEffortAvroEncoder(avroSchemaEvolution: AvroSchemaEvolution, validation
(schema.getType, value) match {
case (_, Some(nested)) =>
encode(nested, schema)
case (_, nonRecord: NonRecordContainer) =>
// It is rather synthetic situation, only for purpose of tests - we use this class in some places for purpose of
// preparation input test data. During this, we can't loose information about schema
encode(nonRecord.getValue, schema).map(new NonRecordContainer(schema, _))
case (Schema.Type.RECORD, container: GenericContainer) =>
encodeGenericContainer(container, schema)
case (Schema.Type.RECORD, map: collection.Map[String@unchecked, _]) =>
Expand Down
Expand Up @@ -2,7 +2,7 @@ package pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.seriali

import io.confluent.kafka.schemaregistry.avro.{AvroSchema, AvroSchemaUtils}
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException
import io.confluent.kafka.serializers.{AbstractKafkaAvroSerializer, AbstractKafkaSchemaSerDe}
import io.confluent.kafka.serializers.{AbstractKafkaAvroSerializer, AbstractKafkaSchemaSerDe, NonRecordContainer}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericContainer
import org.apache.avro.io.{Encoder, EncoderFactory}
Expand Down Expand Up @@ -35,10 +35,14 @@ class AbstractConfluentKafkaAvroSerializer(avroSchemaEvolution: AvroSchemaEvolut
case (Some(schema), other) => (schema, other)
case (None, other) => (new AvroSchema(AvroSchemaUtils.getSchema(data, this.useSchemaReflection, false, false)), other)
}
val extracted = record match {
case nonRecord: NonRecordContainer => nonRecord.getValue
case other => other
}

try {
val schemaId: Int = autoRegisterSchemaIfNeeded(topic, data, isKey, avroSchema)
writeData(record, avroSchema.rawSchema(), schemaId)
writeData(extracted, avroSchema.rawSchema(), schemaId)
} catch {
case exc@(_: RuntimeException | _: IOException) =>
throw new SerializationException("Error serializing Avro message", exc)
Expand Down

0 comments on commit 414c9b0

Please sign in to comment.