#  02_ETL_Processing.ipynb

### Purpose

- Consume the stock price data from the Kafka topic.
- Perform basic transformations and calculations (e.g., moving averages).
- Save the processed data for further analysis.

In [None]:
# 02_ETL_Processing.ipynb

# Import necessary libraries
from kafka import KafkaConsumer
import json
import pandas as pd
from datetime import datetime

# Kafka configuration
KAFKA_BROKER = "localhost:9092"  # Replace with your Kafka broker's address
TOPIC_NAME = "stock_prices"

# Initialize Kafka Consumer
consumer = KafkaConsumer(
    TOPIC_NAME,
    bootstrap_servers=KAFKA_BROKER,
    auto_offset_reset="earliest",
    value_deserializer=lambda x: json.loads(x.decode("utf-8"))
)

# Create a DataFrame to store the data
columns = ["timestamp", "symbol", "open", "high", "low", "close", "volume"]
data = pd.DataFrame(columns=columns)

# Moving average window size (e.g., 5 minutes)
MOVING_AVG_WINDOW = 5

# Function to calculate moving average
def calculate_moving_average(df, column, window):
    df[f"{column}_moving_avg"] = df[column].rolling(window=window).mean()

# Process and transform the data
print("Starting ETL process...")
for message in consumer:
    # Get the stock data from Kafka
    stock_data = message.value
    print(f"Consumed message: {stock_data}")
    
    # Append the data to the DataFrame using pd.concat
    row = {
        "timestamp": datetime.strptime(stock_data["timestamp"], "%Y-%m-%d %H:%M:%S"),
        "symbol": stock_data["symbol"],
        "open": float(stock_data["open"]),
        "high": float(stock_data["high"]),
        "low": float(stock_data["low"]),
        "close": float(stock_data["close"]),
        "volume": int(stock_data["volume"]),
    }
    # Append the data to the DataFrame only if it's not a duplicate
if row not in data.to_dict('records'):
    data = pd.concat([data, pd.DataFrame([row])], ignore_index=True)
else:
    print(f"Duplicate data skipped: {row}")

    # Calculate moving averages
    calculate_moving_average(data, "close", MOVING_AVG_WINDOW)
    
    # Save the processed data periodically (e.g., every 10 records)
    if len(data) % 10 == 0:
        data.to_csv("processed_stock_data.csv", index=False)
        print(f"Saved {len(data)} records to 'processed_stock_data.csv'.")


Starting ETL process...
Consumed message: {'symbol': 'AAPL', 'timestamp': '2024-12-11 18:20:00', 'open': '246.7300', 'high': '246.8200', 'low': '246.7000', 'close': '246.7200', 'volume': '1631'}
Consumed message: {'symbol': 'AAPL', 'timestamp': '2024-12-11 18:20:00', 'open': '246.7300', 'high': '246.8200', 'low': '246.7000', 'close': '246.7200', 'volume': '1631'}
Consumed message: {'symbol': 'AAPL', 'timestamp': '2024-12-11 18:20:00', 'open': '246.7300', 'high': '246.8200', 'low': '246.7000', 'close': '246.7200', 'volume': '1631'}
Consumed message: {'symbol': 'AAPL', 'timestamp': '2024-12-11 18:20:00', 'open': '246.7300', 'high': '246.8200', 'low': '246.7000', 'close': '246.7200', 'volume': '1631'}
Consumed message: {'symbol': 'AAPL', 'timestamp': '2024-12-11 18:20:00', 'open': '246.7300', 'high': '246.8200', 'low': '246.7000', 'close': '246.7200', 'volume': '1631'}
Consumed message: {'symbol': 'AAPL', 'timestamp': '2024-12-11 18:20:00', 'open': '246.7300', 'high': '246.8200', 'low': '