Imports

In [1]:
import pandas as pd
import numpy as np
import os
import json
from confluent_kafka import Consumer, KafkaError

Functions

In [2]:
columns = ['exchange', 'volume_1', 'volume_2', 'volume_3', 'volume_4', 'volume_5', 'volume_6']
trade_volume_df = pd.DataFrame(columns=columns)

def process_and_save_volumes(data):
    global trade_volume_df
    
    exchange_name = data['name']
    volumes = data['volume']
    
    valid_volumes = []
    for item in volumes:
        try:
            volume_value = float(item[1])
            valid_volumes.append(volume_value)
        except ValueError:
            continue
    
    num_chunks = 6
    if len(valid_volumes) < num_chunks:
        return
    
    chunk_size = len(valid_volumes) // num_chunks
    volumes_chunks = [valid_volumes[i:i+chunk_size] for i in range(0, len(valid_volumes), chunk_size)]
    volumes_avg = [np.mean(chunk) for chunk in volumes_chunks]
    new_row = {'exchange': exchange_name, **{f'volume_{i+1}': v for i, v in enumerate(volumes_avg)}}
    trade_volume_df = pd.concat([trade_volume_df, pd.DataFrame([new_row])], ignore_index=True)
    
    if len(trade_volume_df) >= 5:
        save_to_csv(trade_volume_df)
        trade_volume_df = pd.DataFrame(columns=columns)

def save_to_csv(dataframe):
    filename = 'output.csv'      
    dataframe.set_index('exchange', inplace=True)
    
    if os.path.exists(filename):
        existing_data = pd.read_csv(filename, index_col=0)
        combined_data = pd.concat([existing_data, dataframe], axis=0)
        
        combined_data.sort_index(inplace=True)        
        combined_data.to_csv(filename)
    else:
        dataframe.sort_index(inplace=True)        
        dataframe.to_csv(filename, index=True)


Consumer

In [3]:
consumer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'exchange_volume_group',
    'auto.offset.reset': 'earliest'
}

c = Consumer(consumer_conf)
c.subscribe(['exchange_volume_topic'])

try:
    while True:
        msg = c.poll(2.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(f'Consumer error: {msg.error()}')
                break
        data = json.loads(msg.value().decode('utf-8'))
        process_and_save_volumes(data)
except KeyboardInterrupt:
    pass
finally:
    c.close()


  trade_volume_df = pd.concat([trade_volume_df, pd.DataFrame([new_row])], ignore_index=True)
  trade_volume_df = pd.concat([trade_volume_df, pd.DataFrame([new_row])], ignore_index=True)
%6|1712746081.312|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1: Disconnected (after 1019501ms in state UP)
%6|1712746081.312|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: localhost:9092: Disconnected (after 1019500ms in state UP)
%3|1712746081.312|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1712746081.312|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: localhost:9092: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1712746081.526|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms