In [1]:
import os
import json

from confluent_kafka import Consumer
import pandas as pd

In [2]:
topic = "module3"

config = {
    "bootstrap.servers": "localhost:9092",
    "group.id": "module3_consumer",
    "auto.offset.reset": "earliest"
}

consumer = Consumer(config)
consumer.subscribe([topic])

In [3]:
columns = ["exchange", "volume_1", "volume_2", "volume_3", "volume_4", "volume_5", "volume_6"]
df_result = pd.DataFrame(columns=columns)

def process_message(message):
    global df_result

    data = json.loads(message.value().decode("utf-8"))

    if data.get("exit", False):
        return True

    df_data = pd.DataFrame(data["volume_day"], columns=["timestap", "volume_day"])    
    df_data['timestap'] = pd.to_datetime(df_data['timestap'], unit='ms')
    df_data['volume_day'] = df_data['volume_day'].astype(float)
    df_data.set_index('timestap', inplace=True)

    df_aggregated = df_data.resample('4h').mean().reset_index()
    values = df_aggregated["volume_day"].to_numpy()[:6].reshape(1, 6)
    
    row_data = [data["exchange"]] + values.tolist()[0]
    new_row = pd.DataFrame([row_data], 
                           columns=columns)

    if df_result.empty:
        df_result = new_row
    else:
        df_result = pd.concat([df_result, new_row], ignore_index=True)
    
    if df_result.shape[0] == 5:
        if os.path.exists("exchange.csv"):
            df_result.to_csv(
                                f"exchange.csv", mode='a',
                                index=False, header=False, 
                                columns=columns
                            )
        else:
            df_result.to_csv(
                                f"exchange.csv", 
                                index=False, 
                                columns=columns
                            )
        df_result=pd.DataFrame(columns=df_result.columns)
    return False

In [4]:
while True:
    message = consumer.poll(1.0)
    if message is None:
        continue

    if message.error():
        print(f"Consumer error: {message.error()}")
        continue

    if process_message(message):
        print("Exiting consumer")
        break

if not df_result.empty:
    df_result.to_csv(
                    f"exchange.csv", mode='a',
                    index=False, header=False, 
                    columns=columns
                )

consumer.close()

Exiting consumer
