# consumer

In [1]:
from confluent_kafka import Consumer
import socket
import json
import numpy as np
import pandas as pd

In [2]:
import os, os.path

# simple version for working with CWD

# path joining version for other paths
DIR = './data'
print (len(os.listdir(DIR)))


1


In [None]:

DIR = './data/'

config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my_group',
    'auto.offset.reset': 'earliest',
    'client.id': socket.gethostname()
}

try:
    consumer.close()
except:
    pass

consumer = Consumer(config)

topic = 'coins_stock'
consumer.subscribe([topic])

def message_splitter(msgs: list, count_files: int):
    list_dicts = []

    for msg in msgs:
        dict_splited = {'name': msg['name']}
        values = msg['values']

        averaged_values = []
        buffer = []
        for value in values:
            try:
                numeric_value = float(value)  # Convert value to float
                buffer.append(numeric_value)
            except ValueError:
                print(f"Non-numeric value encountered: {value}")
            if len(buffer) == 6:
                averaged_values.append(np.mean(buffer))
                buffer = []

        if buffer:
            averaged_values.append(np.mean(buffer))

        dict_splited['values'] = averaged_values
        list_dicts.append(dict_splited)

    names = [d['name'] for d in list_dicts]
    max_len = max(len(d['values']) for d in list_dicts)
    values = [d['values'] + [np.nan] * (max_len - len(d['values'])) for d in list_dicts]
    
    file_path = f'{DIR}/output{count_files}.csv'
    
    if os.path.exists(file_path):
        existing_df = pd.read_csv(file_path, index_col='name')
        new_df = pd.DataFrame(values, index=names)
        new_df.columns = [f'value{i+1}' for i in range(max_len)]
        combined_df = pd.concat([existing_df, new_df])
        combined_df.to_csv(file_path, index_label='name')
    else:
        df = pd.DataFrame(values, index=names)
        df.columns = [f'value{i+1}' for i in range(max_len)]
        df.to_csv(file_path, index_label='name')

    return df

counter = 0
buffer = []
try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print(f"End of partition reached {msg.partition()}")
            elif msg.error():
                raise KafkaException(msg.error())
        else:
            if counter < 5:
                message_value = msg.value().decode('utf-8')
                message_dict = json.loads(message_value)
                counter += 1
                buffer.append(message_dict)
                print(counter)
            else:
                count_files = len(os.listdir(DIR))
                message_splitter(buffer, count_files)
                counter = 0
                buffer = []
except KeyboardInterrupt:
    pass
finally:
    consumer.close()