In [None]:
from confluent_kafka import Consumer
import json

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'stock-consumer-group',
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(conf)
consumer.subscribe(['stock_topic'])

try:
    print("Starting consumer...")
    while True:
        msg = consumer.poll(1.0)  # timeout in seconds

        if msg is None:
            continue
        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue

        # Decode the JSON payload
        try:
            payload = json.loads(msg.value().decode('utf-8'))
            print("Received:", payload)
        except json.JSONDecodeError:
            print("Failed to decode JSON:", msg.value())

except KeyboardInterrupt:
    print("Consumer interrupted.")

finally:
    consumer.close()


In [None]:
from confluent_kafka import Consumer
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import s3fs
import json
from datetime import datetime

#MinIO S3-compatible connection (IPv4-safe)
fs = s3fs.S3FileSystem(
    key='admin',
    secret='password',
    client_kwargs={'endpoint_url': 'http://127.0.0.1:9000'}  # use 127.0.0.1 instead of localhost
)

#Confluent Kafka Consumer config (binds to IPv4 explicitly)
conf = {
    'bootstrap.servers': '127.0.0.1:9092',
    'group.id': 'stock-consumer-new-group',
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(conf)
consumer.subscribe(['stockdaily-events'])  # Make sure topic exists and is populated

print("Listening to Confluent Kafka topic...")

# Buffer for partitioned writes
message_buffer = {}

def write_partition_to_minio(trade_date, records):
    if not records:
        return
    df = pd.DataFrame(records)
    table = pa.Table.from_pandas(df)

    file_path = f"lakehouse/stock_stream/trade_date={trade_date}/stocks.parquet"
    with fs.open(file_path, 'wb') as f:
        pq.write_table(table, f)

    print(f"Wrote {len(df)} records to MinIO: {file_path}")

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print("Consumer error:", msg.error())
            continue

        try:
            value = json.loads(msg.value().decode('utf-8'))
            print("Received message:", value)

            trade_date = value.get('trade_date')
            if not trade_date:
                print("Missing trade_date. Skipping message.")
                continue

            if trade_date not in message_buffer:
                message_buffer[trade_date] = []
            message_buffer[trade_date].append(value)

            if len(message_buffer[trade_date]) >= 20:
                write_partition_to_minio(trade_date, message_buffer[trade_date])
                message_buffer[trade_date] = []

        except Exception as e:
            print("Error processing message:", e)

except KeyboardInterrupt:
    print("Stopping consumer and flushing remaining data...")
    for trade_date, records in message_buffer.items():
        write_partition_to_minio(trade_date, records)

finally:
    consumer.close()


Listening to Confluent Kafka topic...
Received message: {'trade_date': '2025-05-16', 'ticker': 'AAPL', 'open_price': 212.36, 'close_price': 211.26, 'pct_change': -0.52}
Received message: {'trade_date': '2025-05-16', 'ticker': 'MSFT', 'open_price': 452.05, 'close_price': 454.27, 'pct_change': 0.49}
Received message: {'trade_date': '2025-05-16', 'ticker': 'GOOGL', 'open_price': 167.73, 'close_price': 166.19, 'pct_change': -0.92}
Received message: {'trade_date': '2025-05-16', 'ticker': 'AMZN', 'open_price': 206.85, 'close_price': 205.59, 'pct_change': -0.61}
Received message: {'trade_date': '2025-05-16', 'ticker': 'NVDA', 'open_price': 136.22, 'close_price': 135.4, 'pct_change': -0.6}
Received message: {'trade_date': '2025-05-16', 'ticker': 'TSLA', 'open_price': 346.24, 'close_price': 349.98, 'pct_change': 1.08}
Received message: {'trade_date': '2025-05-16', 'ticker': 'META', 'open_price': 637.96, 'close_price': 640.34, 'pct_change': 0.37}
Received message: {'trade_date': '2025-05-16', 't

%4|1747687297.539|SESSTMOUT|rdkafka#consumer-1| [thrd:main]: Consumer group session timed out (in join-state steady) after 606297 ms without a successful response from the group coordinator (broker 1, last error was Success): revoking assignment and rejoining group
%5|1747687297.540|REQTMOUT|rdkafka#consumer-1| [thrd:localhost:9092/1]: localhost:9092/1: Timed out FetchRequest in flight (after 603708ms, timeout #0)
%4|1747687297.540|REQTMOUT|rdkafka#consumer-1| [thrd:localhost:9092/1]: localhost:9092/1: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%3|1747687297.540|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/1]: localhost:9092/1: 1 request(s) timed out: disconnect (average rtt 502.602ms) (after 17065788ms in state UP)
%5|1747687930.123|REQTMOUT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Timed out JoinGroupRequest in flight (after 632400ms, timeout #0)
%4|1747687930.123|REQTMOUT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoo

Received message: {'trade_date': '2025-05-16', 'ticker': 'AAPL', 'open_price': 212.36, 'close_price': 211.26, 'pct_change': -0.52}
Wrote 21 records to MinIO: lakehouse/stock_stream/trade_date=2025-05-16/stocks.parquet
Received message: {'trade_date': '2025-05-16', 'ticker': 'MSFT', 'open_price': 452.05, 'close_price': 454.27, 'pct_change': 0.49}
Received message: {'trade_date': '2025-05-16', 'ticker': 'GOOGL', 'open_price': 167.73, 'close_price': 166.19, 'pct_change': -0.92}
Received message: {'trade_date': '2025-05-16', 'ticker': 'AMZN', 'open_price': 206.85, 'close_price': 205.59, 'pct_change': -0.61}
Received message: {'trade_date': '2025-05-16', 'ticker': 'NVDA', 'open_price': 136.22, 'close_price': 135.4, 'pct_change': -0.6}
Received message: {'trade_date': '2025-05-16', 'ticker': 'TSLA', 'open_price': 346.24, 'close_price': 349.98, 'pct_change': 1.08}
Received message: {'trade_date': '2025-05-16', 'ticker': 'META', 'open_price': 637.96, 'close_price': 640.34, 'pct_change': 0.37}


In [None]:
import s3fs
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

fs = s3fs.S3FileSystem(
    key='admin',
    secret='password',
    client_kwargs={'endpoint_url': 'http://localhost:9000'}
)

df = pd.DataFrame({"symbol": ["AAPL"], "price": [185.32]})
table = pa.Table.from_pandas(df)
with fs.open('lakehouse/test/aapl.parquet', 'wb') as f:
    pq.write_table(table, f)

print("Test file written.")


In [None]:
from confluent_kafka import Consumer
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import s3fs
import json

fs = s3fs.S3FileSystem(
    key='admin',
    secret='password',
    client_kwargs={'endpoint_url': 'http://localhost:9000'}
)

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'stock-consumer-group',
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(conf)
consumer.subscribe(['stock_topic'])

print("Listening to Confluent Kafka topic...")

message_buffer = {}

def write_partition_to_minio(trade_date, records):
    if not records:
        return
    df = pd.DataFrame(records)
    table = pa.Table.from_pandas(df)

    file_path = f"lakehouse/stock_stream/trade_date={trade_date}/stocks.parquet"
    with fs.open(file_path, 'wb') as f:
        pq.write_table(table, f)

    print(f"Wrote {len(df)} records to MinIO: {file_path}")

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print("Consumer error:", msg.error())
            continue

        value = json.loads(msg.value().decode('utf-8'))
        print("Received message:", value)

        trade_date = value.get('trade_date')
        if not trade_date:
            print("Missing trade_date. Skipping message.")
            continue

        message_buffer.setdefault(trade_date, []).append(value)

except KeyboardInterrupt:
    print("Stopping consumer and flushing remaining data...")
    for trade_date, records in message_buffer.items():
        write_partition_to_minio(trade_date, records)

finally:
    consumer.close()
