In [None]:
# Install required Python packages
!pip install requests kafka-python python-dotenv schedule

In [None]:
import os
import time
import pandas as pd
import requests
import json
from datetime import datetime
from kafka import KafkaProducer
from dotenv import load_dotenv

# Load environment variables
load_dotenv()
POLYGON_API_KEY = os.getenv("POLYGON_API_KEY")

# Check API Key
if not POLYGON_API_KEY:
    raise ValueError("Polygon.io API Key not found. Please add it to the .env file.")

# Kafka Producer setup
producer = KafkaProducer(
    bootstrap_servers='18.219.75.39:9092',  # Replace with your Kafka server address
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Fetch monthly stock data
def fetch_monthly_data(ticker, start_date, end_date):
    url = f"https://api.polygon.io/v2/aggs/ticker/{ticker}/range/1/month/{start_date}/{end_date}?adjusted=true&apiKey={POLYGON_API_KEY}"
    response = requests.get(url)
    if response.status_code == 200:
        return response.json().get('results', [])
    else:
        print(f"Error {response.status_code} for {ticker}: {response.json().get('message')}")
        return []

# Fetch data and save to CSV
def save_data_to_csv():
    # Set the date range
    start_date = "2024-01-01"
    end_date = "2025-01-01"

    # List of tickers
    tickers = ["AAPL", "MSFT", "GOOG", "NVDA", "AMD"]

    all_data = []
    for ticker in tickers:
        print(f"Fetching monthly data for ticker: {ticker}")
        monthly_data = fetch_monthly_data(ticker, start_date, end_date)
        if monthly_data:
            for record in monthly_data:
                data = {
                    "ticker": ticker,
                    "date": datetime.fromtimestamp(record['t'] / 1000).strftime('%Y-%m-%d'),
                    "open": record['o'],
                    "high": record['h'],
                    "low": record['l'],
                    "close": record['c'],
                    "volume": record['v']
                }
                all_data.append(data)
        else:
            print(f"No data available for ticker: {ticker}")

    # Save data to CSV
    df = pd.DataFrame(all_data)
    df.to_csv("stock_data.csv", index=False)
    print("Data saved to stock_data.csv")

# Stream data to Kafka from CSV
def stream_data_from_csv():
    df = pd.read_csv("stock_data.csv")
    print(df.head())  # Display the first few rows for verification
    while True:
        record = df.sample(1).to_dict(orient="records")[0]
        producer.send('stockmarket', value=record)
        print(f"Sent to Kafka: {record}")
        time.sleep(1)  # Add delay to simulate streaming
    producer.flush()

# Main execution
save_data_to_csv()
stream_data_from_csv()
