Skip to content

Commit

Permalink
SchemaIdFromNuHeadersPotentiallyShiftingConfluentPayload even less co…
Browse files Browse the repository at this point in the history
…nfluent implementation dependant
  • Loading branch information
arkadius committed Jan 30, 2023
1 parent 66947d1 commit 6a77674
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 26 deletions.
Expand Up @@ -14,7 +14,7 @@ import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.Confluen
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.DefaultConfluentSchemaRegistryClient
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.schemaid.SchemaIdFromNuHeadersPotentiallyShiftingConfluentPayload.ValueSchemaIdHeaderName
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.{UniversalKafkaDeserializer, UniversalSchemaBasedSerdeProvider}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaId, SchemaIdFromMessageExtractor}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{ChainedSchemaIdFromMessageExtractor, SchemaId, SchemaIdFromMessageExtractor}

import java.io.OutputStream

Expand All @@ -28,10 +28,12 @@ class UniversalKafkaDeserializerTest extends SchemaRegistryMixin with TableDrive

type CreateSetup = RuntimeSchemaData[ParsedSchema] => SchemaRegistryProviderSetup

private val schemaIdExtractor: ChainedSchemaIdFromMessageExtractor = UniversalSchemaBasedSerdeProvider.createSchemaIdFromMessageExtractor(isConfluent = true)

lazy val payloadWithSchemaIdSetup: CreateSetup = readerSchema => SchemaRegistryProviderSetup(SchemaRegistryProviderSetupType.avro,
UniversalSchemaBasedSerdeProvider.create(MockSchemaRegistry.factory),
new SimpleKafkaAvroSerializer(MockSchemaRegistry.schemaRegistryMockClient, isKey = false),
new UniversalKafkaDeserializer(confluentSchemaRegistryClient, kafkaConfig, UniversalSchemaBasedSerdeProvider.schemaIdFromMessageExtractor, Some(readerSchema), isKey = false))
new UniversalKafkaDeserializer(confluentSchemaRegistryClient, kafkaConfig, schemaIdExtractor, Some(readerSchema), isKey = false))

lazy val payloadWithoutSchemaIdSetup: CreateSetup = readerSchema => payloadWithSchemaIdSetup(readerSchema).copy(valueSerializer = new SimpleKafkaAvroSerializer(MockSchemaRegistry.schemaRegistryMockClient, isKey = false) {
override def writeHeader(data: Any, avroSchema: Schema, schemaId: Int, out: OutputStream): Unit = ()
Expand Down
Expand Up @@ -5,7 +5,7 @@ import org.apache.kafka.common.header.Headers
import java.nio.ByteBuffer
import scala.collection.compat.immutable.LazyList

trait SchemaIdFromMessageExtractor {
trait SchemaIdFromMessageExtractor extends Serializable {

final def getSchemaId(headers: Headers, data: Array[Byte], isKey: Boolean): Option[SchemaIdWithPositionedBuffer] =
getSchemaId(GetSchemaIdArgs(headers, data, isKey))
Expand All @@ -18,7 +18,7 @@ case class SchemaIdWithPositionedBuffer(value: SchemaId, buffer: ByteBuffer)

case class GetSchemaIdArgs(headers: Headers, data: Array[Byte], isKey: Boolean)

class ChainedSchemaIdFromMessageExtractor(chain: List[SchemaIdFromMessageExtractor]) extends SchemaIdFromMessageExtractor with Serializable {
class ChainedSchemaIdFromMessageExtractor(chain: List[SchemaIdFromMessageExtractor]) extends SchemaIdFromMessageExtractor {
override private[schemedkafka] def getSchemaId(args: GetSchemaIdArgs): Option[SchemaIdWithPositionedBuffer] = {
chain.to(LazyList)
.map(f => f.getSchemaId(args))
Expand Down
Expand Up @@ -2,12 +2,19 @@ package pl.touk.nussknacker.engine.schemedkafka.schemaregistry

import pl.touk.nussknacker.engine.kafka.{KafkaConfig, SchemaRegistryClientKafkaConfig}

import scala.reflect.ClassTag

trait SchemaRegistryClientFactory extends Serializable {

type SchemaRegistryClientT <: SchemaRegistryClient

def create(config: KafkaConfig): SchemaRegistryClientT = create(config.schemaRegistryClientKafkaConfig)

def create(config: SchemaRegistryClientKafkaConfig): SchemaRegistryClientT


def clientClassTag: ClassTag[SchemaRegistryClientT]

}

trait SchemaRegistryClientFactoryWithRegistration extends SchemaRegistryClientFactory {
Expand Down
Expand Up @@ -7,6 +7,7 @@ import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaId, SchemaR
import pl.touk.nussknacker.engine.util.cache.{CacheConfig, DefaultCache, SingleValueCache}

import scala.collection.mutable
import scala.reflect.{ClassTag, classTag}

object CachedConfluentSchemaRegistryClientFactory extends CachedConfluentSchemaRegistryClientFactory

Expand All @@ -28,6 +29,8 @@ class CachedConfluentSchemaRegistryClientFactory extends SchemaRegistryClientFac
}

protected def confluentClient(config: SchemaRegistryClientKafkaConfig): CSchemaRegistryClient = CachedSchemaRegistryClient(config)

override def clientClassTag: ClassTag[ConfluentSchemaRegistryClient] = classTag[ConfluentSchemaRegistryClient]
}

class SchemaRegistryCaches(cacheConfig: SchemaRegistryCacheConfig) extends LazyLogging {
Expand Down
Expand Up @@ -2,7 +2,9 @@ package pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client

import io.confluent.kafka.schemaregistry.client.{SchemaRegistryClient => CSchemaRegistryClient}
import pl.touk.nussknacker.engine.kafka.SchemaRegistryClientKafkaConfig
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaRegistryClientWithRegistration, SchemaRegistryClientFactoryWithRegistration}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaRegistryClientFactoryWithRegistration, SchemaRegistryClientWithRegistration}

import scala.reflect.{ClassTag, classTag}

/**
* SchemaRegistryClient must be passed by name, because schemaRegistryMockClient is not serializable.
Expand All @@ -18,4 +20,7 @@ class MockConfluentSchemaRegistryClientFactory(schemaRegistryMockClient: => CSch
new DefaultConfluentSchemaRegistryClient(schemaRegistryMockClient, config)
}

override def clientClassTag: ClassTag[SchemaRegistryClientWithRegistration with ConfluentSchemaRegistryClient] =
classTag[SchemaRegistryClientWithRegistration with ConfluentSchemaRegistryClient]

}
Expand Up @@ -3,36 +3,51 @@ package pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.schemai
import org.apache.kafka.common.header.Headers
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{GetSchemaIdArgs, SchemaId, SchemaIdFromMessageExtractor, SchemaIdWithPositionedBuffer}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.ConfluentUtils
import SchemaIdFromNuHeadersPotentiallyShiftingConfluentPayload._

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import scala.util.Try

object SchemaIdFromNuHeadersPotentiallyShiftingConfluentPayload {

val ValueSchemaIdHeaderName = "value.schemaId"
val KeySchemaIdHeaderName = "key.schemaId"

}

/**
* This class basically extract schema id from our specific headers: key/value.schemaId.
* Because we always produce those headers, we can have situation when schema id is in headers but also payload
* is in Confluent format (magic byte + schema id + bytes). Because of that we need to shift this payload
* so next step (payload deserializer) will have clear situation - buffer pointer pointing to bytes with message
*/
object SchemaIdFromNuHeadersPotentiallyShiftingConfluentPayload extends SchemaIdFromMessageExtractor with Serializable {

val ValueSchemaIdHeaderName = "value.schemaId"
val KeySchemaIdHeaderName = "key.schemaId"
class SchemaIdFromNuHeadersPotentiallyShiftingConfluentPayload(intSchemaId: Boolean,
potentiallyShiftConfluentPayload: Boolean)
extends SchemaIdFromMessageExtractor {

implicit class RichHeaders(h: Headers) {
def getSchemaId(headerName: String): Option[SchemaId] = Option(h.lastHeader(headerName))
.map(h => new String(h.value()))
.map { v =>
Try(v.toInt).fold(
e => throw new InvalidSchemaIdHeader(headerName, v, e),
// TODO: handle string schema ids
v => SchemaId.fromInt(v))
.map(header => new String(header.value(), StandardCharsets.UTF_8))
.map { stringValue =>
if (intSchemaId) {
Try(stringValue.toInt).fold(
e => throw new InvalidSchemaIdHeader(headerName, stringValue, e),
SchemaId.fromInt)
} else {
SchemaId.fromString(stringValue)
}
}
}

override def getSchemaId(args: GetSchemaIdArgs): Option[SchemaIdWithPositionedBuffer] = {
val headerName = if (args.isKey) KeySchemaIdHeaderName else ValueSchemaIdHeaderName
args.headers.getSchemaId(headerName).map { idFromHeader =>
val buffer = ConfluentUtils.readIdAndGetBuffer(args.data).toOption.map(_._2).getOrElse(ByteBuffer.wrap(args.data))
val buffer = if (potentiallyShiftConfluentPayload) {
ConfluentUtils.readIdAndGetBuffer(args.data).toOption.map(_._2).getOrElse(ByteBuffer.wrap(args.data))
} else {
ByteBuffer.wrap(args.data)
}
SchemaIdWithPositionedBuffer(idFromHeader, buffer)
}
}
Expand Down
Expand Up @@ -3,7 +3,7 @@ package pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.schemai
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{GetSchemaIdArgs, SchemaIdFromMessageExtractor, SchemaIdWithPositionedBuffer}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.ConfluentUtils

object SchemaIdFromPayloadInConfluentFormat extends SchemaIdFromMessageExtractor with Serializable {
object SchemaIdFromPayloadInConfluentFormat extends SchemaIdFromMessageExtractor {
override private[schemedkafka] def getSchemaId(args: GetSchemaIdArgs): Option[SchemaIdWithPositionedBuffer] = {
ConfluentUtils.readIdAndGetBuffer(args.data).toOption.map(SchemaIdWithPositionedBuffer.apply _ tupled)
}
Expand Down
@@ -1,20 +1,14 @@
package pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal

import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.ConfluentSchemaRegistryClient
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.schemaid.{SchemaIdFromNuHeadersPotentiallyShiftingConfluentPayload, SchemaIdFromPayloadInConfluentFormat}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.serialization.{KafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory, KafkaSchemaRegistryBasedValueSerializationSchemaFactory}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{ChainedSchemaIdFromMessageExtractor, SchemaBasedSerdeProvider, SchemaRegistryClientFactory}

object UniversalSchemaBasedSerdeProvider {

// SchemaId can be obtain in several ways. Precedent:
// * from kafka header - it is our own, Nussknacker headers standard format: key.schemaId and value.schemaId headers
// * from payload serialized in 'Confluent way' ([magicbyte][schemaid][payload])
// * (fallback) from source editor version param - this is just an assumption we make (when processing no-schemed-data, everything can happen)
val schemaIdFromMessageExtractor = new ChainedSchemaIdFromMessageExtractor(List(
SchemaIdFromNuHeadersPotentiallyShiftingConfluentPayload,
SchemaIdFromPayloadInConfluentFormat))

def create(schemaRegistryClientFactory: SchemaRegistryClientFactory): SchemaBasedSerdeProvider = {
val schemaIdFromMessageExtractor = createSchemaIdFromMessageExtractor(schemaRegistryClientFactory)
SchemaBasedSerdeProvider(
new KafkaSchemaRegistryBasedValueSerializationSchemaFactory(schemaRegistryClientFactory, UniversalSerializerFactory),
new KafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory(
Expand All @@ -25,4 +19,19 @@ object UniversalSchemaBasedSerdeProvider {
)
}

private def createSchemaIdFromMessageExtractor(schemaRegistryClientFactory: SchemaRegistryClientFactory): ChainedSchemaIdFromMessageExtractor = {
val isConfluent = schemaRegistryClientFactory.clientClassTag.runtimeClass.isAssignableFrom(classOf[ConfluentSchemaRegistryClient])
createSchemaIdFromMessageExtractor(isConfluent)
}

// SchemaId can be obtain in several ways. Precedent:
// * from nu kafka headers - it is our own, Nussknacker headers standard format: key.schemaId and value.schemaId headers
// * from payload serialized in 'Confluent way' ([magicbyte][schemaid][payload])
// * (fallback) from source editor version param - this is just an assumption we make (when processing no-schemed-data, everything can happen)
private[schemaregistry] def createSchemaIdFromMessageExtractor(isConfluent: Boolean): ChainedSchemaIdFromMessageExtractor = {
val chain = new SchemaIdFromNuHeadersPotentiallyShiftingConfluentPayload(intSchemaId = isConfluent, potentiallyShiftConfluentPayload = isConfluent) ::
List(SchemaIdFromPayloadInConfluentFormat).filter(_ => isConfluent)
new ChainedSchemaIdFromMessageExtractor(chain)
}

}
Expand Up @@ -4,6 +4,8 @@ import pl.touk.nussknacker.engine.kafka.SchemaRegistryClientKafkaConfig
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.CachedConfluentSchemaRegistryClientFactory
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaRegistryClient, SchemaRegistryClientFactory}

import scala.reflect.{ClassTag, classTag}

object UniversalSchemaRegistryClientFactory extends UniversalSchemaRegistryClientFactory

class UniversalSchemaRegistryClientFactory extends SchemaRegistryClientFactory {
Expand All @@ -17,4 +19,5 @@ class UniversalSchemaRegistryClientFactory extends SchemaRegistryClientFactory {
CachedConfluentSchemaRegistryClientFactory
}

override def clientClassTag: ClassTag[SchemaRegistryClient] = classTag[SchemaRegistryClient]
}
Expand Up @@ -12,7 +12,7 @@ import java.io.ByteArrayOutputStream

class SchemaIdFromMessageExtractorTest extends AnyFunSuite with Matchers with OptionValues {

private val extractor = UniversalSchemaBasedSerdeProvider.schemaIdFromMessageExtractor
private val extractor = UniversalSchemaBasedSerdeProvider.createSchemaIdFromMessageExtractor(isConfluent = true)

test("extract schema id from nu specific headers") {
extractor.getSchemaId(
Expand Down

0 comments on commit 6a77674

Please sign in to comment.