<div style="text-align: center; line-height: 0; padding-top: 2px;">
  <img src="https://www.quantiaconsulting.com/logos/quantia_logo_orizz.png" alt="Quantia Consulting" style="width: 600px; height: 250px">
</div>

# Kafka Avro Consumer 

**Technical Accomplishments:**
- Consume data from Kafka Avro topic

## Getting Started

Let's start importing libraries and creating useful variables 

In [None]:
%load_ext autotime

import os
import qcutils
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
import io
from avro.io import DatumReader, BinaryDecoder
from pyspark.sql.functions import *
import time
import json
import avro.schema
import struct
import requests 

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.5,org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.5,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,org.apache.kafka:kafka-clients:2.4.1,com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.5 pyspark-shell'

spark = (SparkSession.builder 
    .master("local[*]")
    .appName("test")
    .getOrCreate()
        )
qcutils.init_spark_session(spark)

spark

### Best Solution

In this solution we use the `MessageSerializer`, an internal component of the `confluent_kafka` library, that hides most of the deserialization complexity.

In [None]:
from pyspark.sql.functions import *
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient

@udf("string")
def from_avro(value,sr_url): 
    sr_conf = {'url': sr_url}
    schema_registry = CachedSchemaRegistryClient(sr_conf)
    deSerializer = MessageSerializer(schema_registry)
    return deSerializer.decode_message(value)

In [None]:
topic = ''

assert len(topic) > 0, "In order to avoid conflicts during write operation, please name the topic as <surname>-topic"

servers=qcutils.read_config_value("kafka.server") + ":" + str(qcutils.read_config_value("kafka.port"))
sr_url=qcutils.read_config_value("kafka.schema_registry.url")

avro_df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("startingOffsets", "earliest")
  .option("subscribe", topic)
  .load())

output_df = (avro_df.select(from_avro("value", lit(sr_url)).alias("v")))

query = (output_df
    .writeStream
    .outputMode("append")
    .format("parquet") 
    .option("path", "/home/jovyan/data/pyspark/best_solution.parquet")
    .option("checkpointLocation","/home/jovyan/data/pyspark/checkpoint/best_solution") \
    .start())

#time.sleep(30)
#dfw.stop()

query.awaitTermination()

### Ugly Solution

In this solution we build the deserializer from scratch.

In particular, in the `from_avro` udf, we have to take care of the:
* `value` byte array parsing;
* the extraction of the `magic byte` and of the `schema id` (the first 5 bytes of the encoded message);
* the explicit request of the schema to the schema registry;
* the message deserialization.

In [None]:
@udf("string")
def from_avro(value,sr_base_url): 
    message_bytes = io.BytesIO(value)
    
    # extraction of the schema id and magic byte (5 bytes)
    magic, schema_id = struct.unpack('>bI', message_bytes.read(5))
    
    # schema request
    schemaRegistry = "{}/schemas/ids/".format(sr_base_url)
    URL = schemaRegistry + str(schema_id)
    r = requests.get(url = URL) 
    data = r.json() 
    avroSchema = str(data["schema"])
    schema = avro.schema.Parse(avroSchema)
    
    # decoding
    reader = DatumReader(schema)
    decoder = BinaryDecoder(message_bytes)
    return reader.read(decoder)

In [None]:
topic = ''

assert len(topic) > 0, "In order to avoid conflicts during write operation, please name the topic as <surname>-topic"

servers=qcutils.read_config_value("kafka.server") + ":" + str(qcutils.read_config_value("kafka.port"))
sr_url=qcutils.read_config_value("kafka.schema_registry.url")

avro_df2 = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("startingOffsets", "earliest")
  .option("subscribe", topic)
  .load())

output_df2 = (avro_df2.select(from_avro("value", lit(sr_url)).alias("v")))

query2 = (output_df2
    .writeStream
    .outputMode("append")
    .format("parquet") 
    .option("path", "/home/jovyan/data/pyspark/ugly_solution.parquet")
    .option("checkpointLocation","/home/jovyan/data/pyspark/checkpoint/ugly_solution") \
    .start())

query2.awaitTermination()

##### ![Quantia Tiny Logo](https://www.quantiaconsulting.com/logos/quantia_logo_tiny.png) 2020 Quantia Consulting, srl. All rights reserved.