Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setup Avro schemas #15

Closed
inakianduaga opened this issue Feb 2, 2017 · 5 comments
Closed

Setup Avro schemas #15

inakianduaga opened this issue Feb 2, 2017 · 5 comments
Labels

Comments

@inakianduaga
Copy link
Owner

inakianduaga commented Feb 2, 2017

We want

Frontend:

  • Has toggle to distinguish between V1 or V2. When V2 is chosen, it will forward the filter type to be used to process the image

UI-Backend:

  • This will produce using either V1 of the Avro schema (where the filter type is not specified) or V2 of the schema, where the filter is specified

Image Processor:

This will always use V2 of the schema to read the data, with Avro schema evolution filling the gap for V1 produced data.

So what we will test initially is the case of "old producers", which are pushing data into the stream with an older version of the schema, and consumers still can read data properly.

TODO

  • Avro schemas for the UI-backend for V1 & V2, and copy V2 over to the processor
  • Install Avro Hugger on UI-backend / processor to be able to hydrate from the schemas
@inakianduaga
Copy link
Owner Author

inakianduaga commented Feb 3, 2017

Notes on schema registry integration

Avro producer example

Example of Akka stream avro record production (without using case classes)

Here is a snippet of my scala code that failed:

//My simple avro schema
val key = "key1"
  val f1 = "str1"
  val f2 = "str2"
  val f3 = "int1"
val userSchema = """
    {
        "fields": [
            { "name": "str1", "type": "string" },
            { "name": "str2", "type": "string" },
            { "name": "int1", "type": "int" }
        ],
        "name": "myrecord",
        "type": "record"
    }"""

  val parser = new Schema.Parser
  val schema = parser.parse(userSchema)

//My SchemaReg client
private val avroRecord: GenericData.Record = new GenericData.Record(schema)
val client = new CachedSchemaRegistryClient("http://hostname:8181",1000)

//My kafka producer settings with Akka Stream Kafka
val producerSettings = 
          ProducerSettings(context.system,
              new io.confluent.kafka.serializers.KafkaAvroSerializer(client),
              new io.confluent.kafka.serializers.KafkaAvroSerializer(client))
          .withBootstrapServers("sherpavm:6667")
          .withProperty("schema.registry.url", "http://sherpavm:8181")

//My producer function with with Akka Stream Kafka
def testProducer(epoch: Int) = Source(epoch to epoch+20)
          .map { i =>
            addRecords(avroRecord, s"st1-1-${i}", s"st1-2-${i}", i)
            new ProducerRecord[Object, Object](topic, key, avroRecord)
          }
          .runWith(Producer.plainSink(producerSettings))

def addRecords(avroRecord: GenericData.Record, str1: String, str2: String, int1: Int) = {
    if (str1 != null) avroRecord.put(f1, str1)
    if (str2 != null) avroRecord.put(f2, str2)
    avroRecord.put(f3, int1)
  }

inakianduaga pushed a commit that referenced this issue Feb 3, 2017
inakianduaga pushed a commit that referenced this issue Feb 3, 2017
… for automatically generating case classes from schema
@inakianduaga
Copy link
Owner Author

inakianduaga commented Feb 3, 2017

Problems:

Not sure how to handle reading with custom readers (projections)

https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L99

Funcionality is provided, but not sure how to get it to work automatically when passing the decoder. We might have to extend the KafkaAvroDeserializer with a reader Schema that if passed will be used to deserialize, otherwise defaults to standard

@inakianduaga
Copy link
Owner Author

inakianduaga commented Feb 8, 2017

Schema evolution rules

https://docs.oracle.com/cd/E26161_02/html/GettingStartedGuide/schemaevolution.html

Aliases:

Seems schema registry doesn't accept aliases as being backwards compatible:

https://groups.google.com/forum/#!topic/confluent-platform/0NlrxFD5FHk
confluentinc/schema-registry#453

@inakianduaga
Copy link
Owner Author

inakianduaga commented Feb 9, 2017

Example of writing our own deserializer

http://stackoverflow.com/questions/36697350/avro-with-kafka-deserializing-with-changing-schema
https://blog.knoldus.com/2016/08/02/scala-kafka-avro-producing-and-consuming-avro-messages/

which we might use to have a GenericRecord instead of an IndexedRecord or something like that

@inakianduaga
Copy link
Owner Author

inakianduaga commented Feb 15, 2017

Compilation error bug

Seems the class ImageRequestDeserializer on the UI backend is breaking compilationn for some reason, we need to remove it and investigate
Update: Running sbt last compile yields

[error] /var/kafka-playground/ui-backend/app/services/Kafka.scala:51: too many arguments for constructor ImageProcessedDeserializer: ()deserializers.ImageProcessedDeserializer
[error]       val avroDeserializer = new ImageProcessedDeserializer(schemaRegistryClient)

inakianduaga pushed a commit that referenced this issue Feb 19, 2017
…g deserialized with old schema, we want to upgrade it to the new schema
inakianduaga pushed a commit that referenced this issue Feb 19, 2017
inakianduaga pushed a commit that referenced this issue Feb 19, 2017
…e64 encoding to json image content pushed back to client
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant