In [1]:
from confluent_kafka import Consumer
import pandas as pd
import json
import os
from minio import Minio
from minio.error import S3Error
import io

# MinIO configuration
MINIO_ENDPOINT = 'myminio:9000'  # Replace with your MinIO endpoint
MINIO_ACCESS_KEY = 'minio'    # Replace with your MinIO access key
MINIO_SECRET_KEY = 'minio123'    # Replace with your MinIO secret key
MINIO_BUCKET_NAME = 'kafka'   # Replace with your bucket name

# Initialize MinIO client
minio_client = Minio(
    MINIO_ENDPOINT,
    access_key=MINIO_ACCESS_KEY,
    secret_key=MINIO_SECRET_KEY,
    secure=False  # Set to True if using HTTPS
)

def consume_data_from_kafka(topic):
    conf = {
        'bootstrap.servers': 'kafka:9092',
        'group.id': 'my-group',
        'auto.offset.reset': 'earliest'
    }
    consumer = Consumer(conf)
    consumer.subscribe([topic])

    try:
        while True:
            msg = consumer.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                print(f"Consumer error: {msg.error()}")
                continue
            
            # Process message
            message_value = msg.value().decode('utf-8')
            print(f"Received message: {message_value}")

            # Assuming messages are JSON objects
            try:
                json_data = json.loads(message_value)
                save_to_minio(topic, json_data)
            except json.JSONDecodeError as e:
                print(f"Failed to decode JSON: {e}")

    except KeyboardInterrupt:
        print("Consuming interrupted.")
    finally:
        consumer.close()

def save_to_minio(stock_name, data):
    # Define a unique file name
    file_name = f'{stock_name}.csv'
    
    try:
        # Try to download the existing CSV file from MinIO
        try:
            response = minio_client.get_object(MINIO_BUCKET_NAME, file_name)
            df_existing = pd.read_csv(io.BytesIO(response.read()))
        except S3Error as e:
            if e.code == 'NoSuchKey':
                # File does not exist, so create an empty DataFrame
                df_existing = pd.DataFrame()
            else:
                raise
        
        # Convert data to DataFrame
        df_new = pd.DataFrame([data])
        
        # Concatenate with existing data
        df_combined = pd.concat([df_existing, df_new], ignore_index=True)
        
        # Save DataFrame to a temporary file
        temp_file_path = f'/tmp/{file_name}'
        df_combined.to_csv(temp_file_path, index=False)
        
        # Upload the updated CSV file to MinIO
        minio_client.fput_object(
            MINIO_BUCKET_NAME,
            file_name,
            temp_file_path
        )
        print(f"Uploaded {temp_file_path} to MinIO bucket {MINIO_BUCKET_NAME}")

    except S3Error as e:
        print(f"Error handling file in MinIO: {e}")

In [None]:
ticker = 'AAPL'
consume_data_from_kafka(f'{ticker}_stock')

Received message: {"Date": "2023-01-03T00:00:00-05:00", "Open": 129.0660783975807, "High": 129.68029653659528, "Low": 123.01300942940523, "Close": 123.90462493896484, "Volume": 112117500, "Dividends": 0.0, "Stock Splits": 0.0}
Uploaded /tmp/AAPL_stock.csv to MinIO bucket kafka
Received message: {"Date": "2023-01-04T00:00:00-05:00", "Open": 125.70766994061988, "High": 127.46118175357807, "Low": 123.91453748902029, "Close": 125.18260955810547, "Volume": 89113600, "Dividends": 0.0, "Stock Splits": 0.0}
Uploaded /tmp/AAPL_stock.csv to MinIO bucket kafka
Received message: {"Date": "2023-01-05T00:00:00-05:00", "Open": 125.94543511820807, "High": 126.57947117064332, "Low": 123.5975229597015, "Close": 123.85509490966797, "Volume": 80962700, "Dividends": 0.0, "Stock Splits": 0.0}
Uploaded /tmp/AAPL_stock.csv to MinIO bucket kafka
Received message: {"Date": "2023-01-06T00:00:00-05:00", "Open": 124.83586055803123, "High": 129.0759714168596, "Low": 123.7262938226401, "Close": 128.41221618652344, "