# Measuring the input rate from Kafka to Parquet Factory


## Description

We need to know the rate of incoming records from Kafka and compare it to the current performance handling them in the Internal Data Pipeline in order to decide if we can handle the huge amount of stored archives to be reprocessed or not.
    

## About the task

The first part of this task will consist in retrieve several information from the Kafka partitions in both topics and calculate the input rate from each topic and partition

## How to get the source data

The relevant data that we need to retrieve from Kafka is the timestamps of the first and last messages from each partition and the offset of each message.

These values are relevant because the timestamps difference will show us the amount of time, and the offset of both messages, the number of records sent to this specific topic.

The following script can help to retrieve all the relevant data in a CSV

```bash
TOPICS="ccx-prod-insights-operator-archive-rules-results ccx-prod-insights-operator-archive-features"
PARTITIONS="0 1"
OUTPUT="kafka_input.csv"
OPTIONS=""
BROKER=kafka:9092
KAFKACAT_CMD="kafkacat -b $BROKER $OPTIONS -C "

echo "msg,topic,partition,offset,timestamp" > $OUTPUT

for t in $TOPICS; do
    for p in $PARTITIONS; do
        $KAFKACAT_CMD -t $t -p $p -o beginning -c 1 -f "beginning,%t,%p,%o,%T\n" >> $OUTPUT
        $KAFKACAT_CMD -t $t -p $p -o -1 -c 1 -f "end,%t,%p,%o,%T\n" >> $OUTPUT
    done
done
```

Using `kafka_input.csv` as input file, we can calculate the number of messages per second received in each topic and partition.

In [None]:
import csv
import pandas as pd

In [None]:
rows = pd.read_csv("kafka_input.csv")
rows

In [None]:
for topic in set(rows["topic"]):
    accumulated_rate_topic = 0.0

    for partition in set(rows.loc[(rows["topic"] == topic)]["partition"]):
        df = rows.loc[(rows["topic"] == topic) & (rows["partition"] == partition)]
        beginning = df.loc[df["msg"] == "beginning"]
        end = df.loc[df["msg"] == "end"]
        initial_offset = int(beginning["offset"])
        initial_ts = int(beginning["timestamp"]) / 1000.0
        end_offset = int(end["offset"])
        end_ts = int(end["timestamp"]) / 1000.0
        num_records = end_offset - initial_offset
        time_between = end_ts - initial_ts
        partition_rate = num_records/time_between
        accumulated_rate_topic += partition_rate
        print(f"\t{topic} - {partition}: {partition_rate} records/s")
    
    print(f"\nAccumulated rate for {topic}: {accumulated_rate_topic}\n")

# Capacity of consuming records from Parquet Factory

## How to get the source data

The `parquet-factory` is run in an OpenShift cluster, where the pods have CPU and memory limits. For this reason, in order to get realistic data, I chose to take some measures for the last run `parquet-factory` instances.

```bash

```

```bash
#!/bin/bash

PODNAMES=`oc get pods | awk '$1 ~ /parquet-factory/ { print $1 }'`

if [[ $? != 0 ]]; then
    echo "Error retrieving the parquet-factory pods from cluster. Did you performed `oc login`?"
    exit 1
else
    echo "Parquet Factory pods list retrieved"
fi
OUTPUT="pods_timing.csv"

# Print header of the CSV
echo "pod_name;exit_status;start_time;end_time" > ${OUTPUT}

for pod in $PODNAMES; do
    echo -n "$pod;" >> ${OUTPUT}
    oc describe pod $pod | awk -v ORS=";" '/Started:|Finished:/ { $1=""; print } /Exit\sCode:/ { print $NF}' >> ${OUTPUT}
    echo "" >> ${OUTPUT}
done

exit 0

```

In [None]:
timing_rows = pd.read_csv("pods_timing.csv", sep=";", parse_dates=["start_time", "end_time"])
timing_rows["duration"] = timing_rows["end_time"].sub(timing_rows["start_time"], axis=0).dt.seconds
timing_rows["processing_time_per_record"] = timing_rows["duration"] / timing_rows["num_messages"]
timing_rows

In [None]:
relevant = timing_rows["duration"] > 30.0
timing_rows[relevant]

In [None]:
avg = timing_rows[relevant]["duration"].mean()
print(f'Average duration: {avg:.2f}')

In [None]:
avg_processing = timing_rows[relevant]["processing_time_per_record"].mean()
print(f"Average processing time per record: {avg_processing:.2f}")