In [2]:
import pandas as pd
import os
import yfinance as yf
from kafka import KafkaProducer, KafkaConsumer
from json import dumps
import json
from time import sleep
import time
from datetime import date, timedelta
from dotenv import load_dotenv

# Load environment variables from the .env file
load_dotenv()

In [None]:
def fetch_realtime_data():
    
    messages = []  # List to hold messages for Kafka
    
     # Extract information about the tech sector's top companies
    tech = yf.Sector('technology')
    tickers = tech.top_companies.index.to_list()
    
    for ticker in tickers:
        try:
            # Create a Ticker object
            stock = yf.Ticker(ticker)
            
            # Fetch real-time data (latest)
            latest_data = stock.history(period="1d")
            if latest_data.empty:
                print(f"No real-time data found for {ticker}")
                continue
            
            latest_data = latest_data.iloc[-1]  # Get the latest available data
            
            # Fetch company info
            stock_info = stock.info
            
            # Prepare data for Kafka
            message = {
                "ticker": ticker,
                "date": str(latest_data.name.date()),  # Get the index date
                "open": float(latest_data['Open']),    # Latest open price
                "close": float(latest_data['Close']),  # Latest close price
                "low": float(latest_data['Low']),      # Latest low price
                "volume": int(latest_data['Volume']),  # Latest volume
                "company_name": stock_info.get('longName', 'N/A')
            }
            messages.append(message)

        except Exception as e:
            print(f"Error fetching real-time data for {ticker}: {e}")

    return messages


In [3]:
# Function to fetch historical data for the past year for a list of tech companies
def fetch_historical_data():
    historical_data = []

    # Extract information about the tech sector's top companies
    tech = yf.Sector('technology')
    tickers = tech.top_companies.index.to_list()

    for ticker in tickers:
        try:
            # Create a Ticker object
            stock = yf.Ticker(ticker)
            
            # Fetch historical data for the past year
            data = stock.history(period="1y")
            
            # Check for data availability
            if data.empty:
                print(f"No data available for ticker {ticker}")
                continue

            data.index.name = 'Date'  # Set the index name
            data.reset_index(inplace=True)  # Reset the index to ensure proper structure
            
            # Fetch company info
            info = stock.info
            
            # Iterate over each row to create a JSON record for Kafka or other purposes
            for _, row in data.iterrows():
                message = {
                    "ticker": ticker,
                    "date": str(row['Date'].date()),  # Date
                    "open": float(row['Open']),       # Open price
                    "close": float(row['Close']),     # Close price
                    "low": float(row['Low']),         # Low price
                    "high": float(row['High']),       # High price
                    "volume": int(row['Volume']),     # Volume
                    "company_name": info.get('longName', 'N/A')
                }
                historical_data.append(message)
        
        except Exception as e:
            print(f"Error fetching data for ticker {ticker}: {e}")
    
    return historical_data

In [None]:
# Function to send data to Kafka
def send_to_kafka(producer, message):
    try:
        producer.send('stock-prices-test', message)
        time.sleep(1)
        producer.flush()
    except Exception as e:
        print(f"Failed to send to Kafka: {e}")
        
        
bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS").split(",")

# Create Kafka producer
producer = KafkaProducer(
    bootstrap_servers=bootstrap_servers,  # Replace with your EC2 instance IP
    value_serializer=lambda x: dumps(x).encode('utf-8')  # Serialize JSON to bytes
    )

while True:
    try:
        #messages = fetch_realtime_data(tickers)
        messages = fetch_historical_data()
        # Send each message to Kafka
        for message in messages:
            send_to_kafka(producer, message)
    except Exception as e:
        print(f"Error in main loop: {e}") 
