In [0]:
%pip install pandas kafka-python

In [0]:
import importlib
from kafka import KafkaProducer, KafkaConsumer
import pandas as pd
import threading
import json
import time
import random

In [0]:
def extract(file_path):
    """Reads data from a CSV file."""
    return pd.read_csv(file_path)

def transform(df):
    """Transforms data: filters out invalid rows and adds a computed column."""
    df = df[df['price'] > 0]  # Remove invalid prices
    df['discounted_price'] = df['price'] * 0.9  # Apply 10% discount
    return df

def load(df, output_path):
    """Writes the transformed data to a new CSV file."""
    df.to_csv(output_path, index=False)

def batch_pipeline(input_file, output_file):
    """Orchestrates the batch pipeline."""
    df = extract(input_file)
    df = transform(df)
    load(df, output_file)

In [0]:
# Producer function
def run_producer():
    """Generates and sends transaction events continuously."""

    TOPIC_NAME = "transactions"
    KAFKA_SERVER = "localhost:9092"

    producer = KafkaProducer(
        bootstrap_servers=KAFKA_SERVER,
        value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    )

    def generate_transaction():
        """Simulates a real-time transaction event."""
        return {
            "user_id": random.randint(1, 100),
            "amount": round(random.uniform(10, 500), 2),
            "timestamp": time.time(),
        }

    while True:
        transaction = generate_transaction()
        producer.send(TOPIC_NAME, transaction)
        print(f"ðŸŸ¢ Sent: {transaction}")
        time.sleep(1)  # Simulate real-time events


# Consumer function
def run_consumer():
    """Consumes messages from Kafka topic and processes them."""

    TOPIC_NAME = "transactions"
    KAFKA_SERVER = "localhost:9092"

    consumer = KafkaConsumer(
        TOPIC_NAME,
        bootstrap_servers=KAFKA_SERVER,
        value_deserializer=lambda x: json.loads(x.decode("utf-8")),
    )

    for message in consumer:
        transaction = message.value
        print(f"ðŸ”µ Received: {transaction}")

In [0]:
if __name__ == "__main__":
    ########################
    # Batch Processing
    ########################
    batch_pipeline('products.csv', 'processed_products.csv')
    print("Batch processing completed.")

    ########################
    # Stream Processing
    ########################
    # Running both Producer and Consumer in separate threads
    producer_thread = threading.Thread(target=run_producer, daemon=True)
    consumer_thread = threading.Thread(target=run_consumer, daemon=True)

    producer_thread.start()
    consumer_thread.start()

    # Keep the main thread alive
    while True:
        time.sleep(1)