# Kafka structured messages and avro encoding

A common use-case is to send messages payloads into kafka in JSON format

In [142]:
from kafka import KafkaProducer
import json

In [143]:
topic = "json"
server = "kafka:9092"
producer = KafkaProducer(bootstrap_servers=server, value_serializer=lambda v: json.dumps(v).encode('utf-8'))

First we need to load some data. Alas, the example apache log file is in unstructured common format.

In [144]:
src = "/home/jovyan/data/SDM/logs/apache-short.log"
with open(src, "r") as logfile:
    print(logfile.readline())

233.167.247.91 - - [26/Sep/2017:07:54:24 +0000] "POST /app/main/posts HTTP/1.0" 404 4948 "http://www.patel.org/post.asp" "Mozilla/5.0 (Macintosh; U; PPC Mac OS X 10_6_4) AppleWebKit/5360 (KHTML, like Gecko) Chrome/14.0.812.0 Safari/5360"



So I will need to parse this file into structured JSON. I will write a simple grok-ish parser for this purpose.

In [145]:
import re


grokpattern = "((?:\d{1,3}\.){3}\d{1,3}) - (\S+) \[(.+)\] \"(.+?)\" (\d+) (\d+) \"(.+?)\" \"(.+?)\""
grokpattern = re.compile(grokpattern)

grokData = []

with open(src, "r") as logfile:
    i = 0
    for line in logfile:
        i += 1
        vars = grokpattern.match(line)
        ip = vars.group(1)
        struct = {
            "ipv4": vars.group(1),
            "user": vars.group(2),
            "timestamp": vars.group(3),
            "request": vars.group(4),
            "response": int(vars.group(5)),
            "bytes": int(vars.group(6)),
            "referer": vars.group(7),
            "ua": vars.group(8)
        }
        grokData.append(struct)
        producer.send(topic, struct)

In [146]:
from kafka import KafkaConsumer
consumer = KafkaConsumer(topic, group_id=None, bootstrap_servers=server, auto_offset_reset='earliest')

i = 0
for msg in consumer:
    if i == 5:
        print(msg)
    i += 1
    if i == 10:
        break
consumer.close(autocommit = False)

ConsumerRecord(topic='json', partition=1, offset=5, timestamp=1519578223633, timestamp_type=0, key=None, value=b'{"ipv4": "66.76.195.231", "user": "-", "timestamp": "26/Sep/2017:07:54:24 +0000", "request": "GET /wp-admin HTTP/1.0", "response": 200, "bytes": 4994, "referer": "http://bolton-ali.biz/", "ua": "Mozilla/5.0 (compatible; MSIE 5.0; Windows NT 5.0; Trident/5.1)"}', checksum=-252235342, serialized_key_size=-1, serialized_value_size=260)


As Kafka stores raw bytes, it it's essentially agnostic to message source format. Storing such clearly sturctural data as bytearray from string is quite inefficient. For example, if we wanted to stora netflow, then kafka disk usage can fill quite fast. 

Avro is a popular bigdata encoding format that can be used to mitigate this issue. 

* https://avro.apache.org/docs/1.8.2/spec.html

Firstly, we will need to specify our message schema.

In [147]:
apacheSchema = {
    "name": "apache",
    "type": "record",
    "fields": [
        {
            "name": "ipv4",
            "type": "string"
        },
        {
            "name": "user",
            "type": "string"
        },
        {
            "name": "timestamp",
            "type": "string"
        },
        {
            "name": "request",
            "type": "string"
        },
        {
            "name": "response",
            "type": "int"
        },
        {
            "name": "bytes",
            "type": "int"
        },
        {
            "name": "referer",
            "type": "string"
        },
        {
            "name": "ua",
            "type": "string"
        }
    ]
}

In [148]:
import avro.schema, avro.io
import json
import io

In [149]:
schema = avro.schema.Parse(json.dumps(apacheSchema))

Initiate a new producer to another topic. Kafka will not stop you from sending mixed encoded and plain data into same topic, but it will make your like very difficult.

In [150]:
topic = "binary"
server = "kafka:9092"
producer = KafkaProducer(bootstrap_servers=server)

Encode data and send it to kafka.

In [151]:
for log in grokData:
    writer = avro.io.DatumWriter(schema)
    byteWriter = io.BytesIO()
    encoder = avro.io.BinaryEncoder(byteWriter)
    writer.write(log, encoder)
    raw = byteWriter.getvalue()
    producer.send(topic, raw)

In [152]:
consumer = KafkaConsumer(topic, group_id=None, bootstrap_servers=server, auto_offset_reset='earliest')

i = 0
for msg in consumer:
    if i % 10 == 0:
        print(msg)
    i += 1
    if i == 10:
        break
consumer.close(autocommit = False)

ConsumerRecord(topic='binary', partition=2, offset=0, timestamp=1519579033146, timestamp_type=0, key=None, value=b'\x18117.18.66.28\x02-426/Sep/2017:07:54:24 +0000*GET /explore HTTP/1.0\x90\x03\xfeL6http://www.nguyen-king.com/\xa6\x01Mozilla/5.0 (X11; Linux x86_64; rv:1.9.7.20) Gecko/2016-10-25 04:14:06 Firefox/10.0', checksum=-1814158432, serialized_key_size=-1, serialized_value_size=181)


We now byte data when attempting to consume the topic. We are also missing JSON keys as our schema omits the need for those. To get the data out in raw format, we need to reverse the avro process.

In [153]:
reader = avro.io.DatumReader(schema)

In [154]:
consumer = KafkaConsumer(topic, group_id=None, bootstrap_servers=server, auto_offset_reset='earliest')

i = 0
for msg in consumer:
    i += 1
    bytes_reader = io.BytesIO(msg.value)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    payload = reader.read(decoder)
    if i == 10:
        print(payload)
        break

{'ipv4': '85.83.140.71', 'user': '-', 'timestamp': '26/Sep/2017:07:54:24 +0000', 'request': 'GET /search/tag/list HTTP/1.0', 'response': 200, 'bytes': 4939, 'referer': 'http://www.cole.com/', 'ua': 'Mozilla/5.0 (Windows 95) AppleWebKit/5310 (KHTML, like Gecko) Chrome/15.0.806.0 Safari/5310'}
