In [1]:
from kafka import KafkaConsumer
import json
import csv
import os

# Create a Kafka consumer instance
consumer = KafkaConsumer(
    'skippy',  # The Kafka topic you're consuming from
    bootstrap_servers='localhost:9092',  # Address of your Kafka broker
    group_id='SmsGroup',  # Consumer group ID
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))  # Deserialize the message from JSON
)

# CSV file path
csv_file_path = 'saved_from_stream.csv'

# Check if the CSV file already exists
file_exists = os.path.exists(csv_file_path)

# Define the expected fieldnames based on the incoming data
fieldnames = [
    'Invoice ID', 'Line', 'Customer ID', 'Product ID', 'Size', 'Color', 'Unit Price', 'Quantity',
    'Date', 'Discount', 'Line Total', 'Store ID', 'Employee ID', 'Currency', 'Currency Symbol',
    'SKU', 'Transaction Type', 'Payment Method', 'Invoice Total'
]

# Open the CSV file in append mode
with open(csv_file_path, mode='a', newline='', encoding='utf-8') as csv_file:
    writer = csv.DictWriter(csv_file, fieldnames=fieldnames)

    # Write the header only if the file doesn't already exist
    if not file_exists:
        writer.writeheader()

    # Counter to stop after a certain number of records
    record_count = 0
    max_records = 15

    # Consume messages from Kafka
    for message in consumer:
        if record_count >= max_records:
            print("✅ Reached 15 records. Stopping the consumer.")
            break

        data = message.value

        # Check for any unexpected fields in the data
        extra_fields = set(data.keys()) - set(fieldnames)
        if extra_fields:
            print(f"⚠️ Extra fields found and ignored: {extra_fields}")

        # Keep only the fields defined in fieldnames
        filtered_data = {key: data.get(key, '') for key in fieldnames}

        # Write the filtered data to the CSV
        writer.writerow(filtered_data)
        csv_file.flush()
        print(f"💾 Saved: {filtered_data}")

        record_count += 1


⚠️ Extra fields found and ignored: {'', 'Time'}
💾 Saved: {'Invoice ID': 'INV-CN-006-03123323', 'Line': '1', 'Customer ID': '395766', 'Product ID': '15782', 'Size': 'N', 'Color': 'Black', 'Unit Price': '269.5', 'Quantity': '1', 'Date': '2025-04-15', 'Discount': '0', 'Line Total': '269.5', 'Store ID': '6', 'Employee ID': '71', 'Currency': 'CNY', 'Currency Symbol': '', 'SKU': 'CHAC15782--', 'Transaction Type': 'Sale', 'Payment Method': 'Credit Card', 'Invoice Total': '1233.53'}
⚠️ Extra fields found and ignored: {'', 'Time'}
💾 Saved: {'Invoice ID': 'INV-CN-006-03123323', 'Line': '2', 'Customer ID': '395766', 'Product ID': '16474', 'Size': '38', 'Color': 'Black', 'Unit Price': '809', 'Quantity': '1', 'Date': '2025-04-15', 'Discount': '0', 'Line Total': '809', 'Store ID': '6', 'Employee ID': '71', 'Currency': 'CNY', 'Currency Symbol': '', 'SKU': 'MAPA16474-38-', 'Transaction Type': 'Sale', 'Payment Method': 'Credit Card', 'Invoice Total': '1233.53'}
⚠️ Extra fields found and ignored: {'', '