# Standardizing Data for Kafka Consumers

_A single pipeline approach to data standardization and serialization._

Wesley Jones – Distributed Systems and Middleware – CPRE 550

# Background

Kafka is a publish and subscribe messaging system that is "naturally" clustered. A single node can hold one of two basic roles: 
* broker
* controller

These nodes will utilize either an offered consensus service (Apache ZooKeeper) or a built in system (KRaft, a Kafka version of Raft).

In this way one or several nodes can establish a cluster.

## Data Serializers

_Using Apache Avro_

* Allows for custom definition of schema types using an IDL
* Has native APIs for common langauges (yours truly, Python)
* Data is encoded to a schema, outputted to bytecode and can be decoded with the original schema

<img src="typical-pipeline.png" alt="A common Kafka+Avro setup" width="300"/>

## Components of Kafka

* Controllers and brokers
* Topics
* Partitions
* Keys
* Producers and consumers

# Problem

There are _inherent_ inefficiencies in a data pipeline For example:


1. Producer creates a record
2. Kafka receives the data and stores it within a topic
3. A consumer receives the message from a topic
4. An application parses message for effective usage
   1. This repeats for each application


_The steps of 4 and 4.A can create additional overhead for consuming data!_

# Objectives

1. Design a system that runs on top of Kafka and ensures data offered to consumers is standardized based on the content of data ingested.
2. Analyze each message so that all data within a defined Kafka topic can be consumed with the same technique.
3. Construct this to happen within the Kafka cluster so that no external services (aside from producers and consumers) are required.

# About Related Works

* Kafka was introduced by an engineer in 2011 at LinkedIn as an efficient centralized online and offline log processing cluster. The initial engineers have provided a good basis of foundational research on this process.
* Many researchers have verified performance between Kafka and similar products such as RabbitMQ.
* As a log ingest system some of Kafka's weaknesses with commit orders and general streaming inconsistencies are less important.
* Data serialization as an add-on to a messaging system is a well established topic, there are some performance considerations I take into account in my system design.
* Online log parsing efficiency was a point of interest as well.

# Methodology

## Cluster

I set up the cluster in Docker containers. This appeared to have no performance impact during the amount of testing I was doing. To collect metrics the entire cluster consisted of the following containers:
* 3 Kafka nodes, all elligble for controller and broker roles.
* 1 metrics exporter (kafka-exporter)
* 1 metrics collector (Prometheus)
* 1 metrics grapher (Grafana)
* 1 UI viewer (kafka-ui)

These were run with Podman on a small RedHat Enterprise Linux 8.9 server using 2 CPU, 8GB RAM and less than 10G of disk space.

## Data Serialization

This involves a two phase process

1. Parsing data into standard fields
2. Labelling the data and creating a schema to apply (and then applying it).

### Phase 1: Parsing Data

Data is parsed using grok -- a regex matching technique.

* To increase _capability_ several patterns are tried, and the "winning" one is giving selected.

#### Input Example

```plain
<15>April 12 19:32:44 powlowski2164 quas[5150]: The PNG pixel is down, synthesize the back-end bandwidth so we can transmit the HTTP firewall!
```

#### Output Example

```json
{
  "timestamp": "April 12 19:32:44",
  "timestamp8601": null,
  "facility": null,
  "priority": null,
  "logsource": "powlowski2164",
  "program": "quas",
  "pid": "5150",
  "message": "The PNG pixel is down, synthesize the back-end bandwidth so we can transmit the HTTP firewall!"
}
```

### Phase 2: Creating and Applying a Schema

Schemas are auto-generated and applied to the data before it's subitted to a topic for consumption.

* The data is condensed and then identified. This ensures that common log types are encoded with the same schema.

#### Prepared data:

```json
{
    "timestamp": "April 12 19:32:44",
    "logsource": "powlowski2164",
    "program": "quas",
    "pid": 5150,
    "message": "The PNG pixel is down, synthesize the back-end bandwidth so we can transmit the HTTP firewall!"
}
```

#### Schema bytecode:

```plain
b’\x00"April 12 19:32:44\x00\x1apowlowski2164\ x00\x08quas\x00\xbcP\x00\xbc\x01The PNG pixel is down, synthesize the back-end bandwidth so we can transmit the HTTP firewall!’
```


## Resulting System Design

![A "single-stream" pipeline – in that all the consumers pull data in the same process.](./redefined-pipeline.png)


# Performance

* Around 20% or less of realtime (bad)

| Step | Time |
| --- | --- |
| Matching grok pattern | 1.97 × 10−3 seconds (1970μ) |
| Schema application | 3.57 × 10−5 seconds (35μ) |
| Producer | 2.45 × 10−5 seconds (24μ) |
  

# Discussion

## Parsing is the problem

Message parsing is the major delay – taking over 50 times longer to complete than the second steps in the process.

At no point did the Kafka processes (producer, broker operation, and consuming) play into performance.

## Schemas

The schema process was very fast and introduced an acceptable delay in to the chain. It would be worth it to retain Avro as a standarization tool for the process.

## Improvements

1. Find a faster parser
2. Field enumeration for schema generation is inefficient
3. Implement with C or Java

# Conclusion & Future Work

The system design for a single pipeline is likely good, but latency improvments must occur.

* Online log parsing research for efficient streaming log parsing work is needed
* A generalized preparsing framework for Apache Kafka
* Apache Avro schema auto-generation tools could be created that are more robust and feature-full.