In [None]:
All of this code was Implemented by Anish Khatvakar

import pandas as pd
from kafka import KafkaProducer
from time import sleep
from datetime import datetime, timedelta
from json import dumps
import json
import requests

We are performing cleaning during ingest. 

In [19]:
alphavantage_api_key = ''
symbols = ['AAPL', 'IBM', 'NVDA']
interval = '2min'
alphavantage_base_url = 'https://www.alphavantage.co/query'

In [20]:
polygon_api_key = ''
polygon_base_url = "https://api.polygon.io/v2/aggs/ticker"

In [21]:
producer = KafkaProducer(bootstrap_servers=['18.219.236.123:9092'], #change ip here
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8'))

In [22]:
def fetch_alphavantage_data(symbol, interval, api_key, is_alphavantage_api=True, outputsize='compact'):
    
    params = {
        'function': 'TIME_SERIES_INTRADAY',  # You can change this to other functions like 'TIME_SERIES_DAILY'
        'symbol': symbol,
        'interval': interval,  # Options: '1min', '5min', '15min', '30min', '60min', 'daily', etc.
        'apikey': api_key,
        'outputsize': outputsize,  # 'compact' or 'full'
    }
    
    if is_alphavantage_api:
        response = requests.get(alphavantage_base_url, params=params)
    else:
        response = requests.get(polygon_base_url, params=params)
    data = response.json()
    
        
    # Extract time-series data
    time_series = data.get("Time Series (5min)", {})
    timestamps = []
    close_prices = []
    

    for timestamp, values in time_series.items():
        timestamps.append(timestamp)
        close_prices.append(float(values["4. close"]))
        
    
    # Convert to DataFrame for easier manipulation
    df = pd.DataFrame({"timestamp": timestamps, "close": close_prices})
    df["timestamp"] = pd.to_datetime(df["timestamp"])
    df['time'] = df['timestamp'].dt.time
    df = df.sort_values("time")
    
    return df
    

In [23]:
# def fetch_polygon_data(symbol, timespan, multiplier, from_date = str(date.today()), to_date = str(date.today())):
    
#     params = {
#     'apiKey': polygon_api_key
#     }
    
#     url = f"{polygon_base_url}/{symbol}/range/{multiplier}/{timespan}/{from_date}/{to_date}"
#     response = requests.get(url, params=params)
#     data = response.json()
    
#     # Extract time-series data
#     results = data.get("results", [])
#     # print(results)
#     timestamps = [pd.to_datetime(item["t"], unit="ms") for item in results]
#     close_prices = [item["c"] for item in results]

#     # Convert to DataFrame
#     df = pd.DataFrame({"timestamp": timestamps, "close": close_prices})
    
#     return df

In [24]:
def send_to_kafka(producer, topic, data):

    # data = df.to_dict(orient="records")  # Convert DataFrame to a list of dictionaries
    producer.send(topic, value=data)
    producer.flush()
    print("Data sent successfully.")

In [25]:
def create_data(symbols=["AAPL"], interval="5min"):
    
    try:
        print("Trying Alpha Vantage...")
        for s in symbols:
            df = fetch_alphavantage_data(s, interval, alphavantage_api_key)
            df.to_csv(f"data_{s}.csv", index=False)
            print(f"Created file data_{s}.csv")

    except Exception as e:
        print(f"Alpha Vantage failed: {e}")
        
    
  
    #     send_to_kafka(producer, "stock_data", df)
    # except Exception as e:
    #     print(f"Alpha Vantage failed: {e}")
    #     try:
    #         print("Switching to Polygon.io...")
    #         df = fetch_polygon_data(symbol, timespan, multiplier, polygon_api_key)
    #         send_to_kafka(producer, "stock_data", df)
    #     except Exception as e:
    #         print(f"Polygon.io also failed: {e}")

In [26]:
create_data(symbols=symbols)

Trying Alpha Vantage...
Created file data_AAPL.csv
Created file data_IBM.csv
Created file data_NVDA.csv


In [27]:
def find_closest_timestamp(df, current_time):
    """
    Find the row in the DataFrame with a timestamp closest to the current time.
    """
    
    df['time_diff'] = abs(pd.to_datetime(df['time'], format="%H:%M:%S") - pd.to_datetime(current_time, format="%H:%M:%S"))
    df.sort_values(by='time_diff', inplace=True, ascending=True)
    
    # closest_row = df.head(1)
    closest_row = df.loc[df['time_diff'].idxmin()]
    return closest_row['timestamp'], closest_row['close']


In [None]:
def process_and_send_stock_data(stock_files, producer, topic):
    """
    Process and send stock data for multiple stocks.
    
    Args:
    - stock_files: Dictionary where keys are stock symbols and values are file paths.
    - producer: Kafka producer object.
    - topic: Kafka topic name.
    """
    
    current_time = datetime.now().strftime("%H:%M:%S")

    stock_data_to_send = {}
    
    for symbol, file_path in stock_files.items():
        df = pd.read_csv(file_path)  # Load stock data from CSV
        try:
            closest_timestamp, close_value = find_closest_timestamp(df, current_time)
            stock_data_to_send[symbol] = {
                "timestamp": str(closest_timestamp),
                "close": close_value,
            }
        except Exception as e:
            print(f"Error processing {symbol}: {e}")
    
    # print(stock_data_to_send)       
    if stock_data_to_send:
        send_to_kafka(producer, topic, stock_data_to_send)


Final cell to run. Executes all above functions.

In [None]:
import schedule
import time

# Step 1: Call Alphavantage API and create all different stock .csv files
create_data(symbols=symbols)

# Step 2: Create a variable to encompass all stock data file names for future use. Ensure the same formatting is followed during file naming.
stock_files = {}
for s in symbols:
    stock_files[s] = f'data_{s}.csv'
    
# Step 3: Use a scheduler to schedule a function call every 5 minutes to send the latest stock data from Producer to Consumer
    
schedule.every(2).minutes.do(process_and_send_stock_data, stock_files, producer, 'stocks')

print("Scheduler started. Waiting for tasks to run...")
while True:
    schedule.run_pending()
    time.sleep(2)
    
    # process_and_send_stock_data(stock_files, producer, 'stocks')
    
# print(stock_files)

Trying Alpha Vantage...
Created file data_AAPL.csv
Created file data_IBM.csv
Created file data_NVDA.csv
Scheduler started. Waiting for tasks to run...
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent successfully.
Data sent

In [8]:
producer.send("stocks", value=['hello', 'world', 'i am god'])

<kafka.producer.future.FutureRecordMetadata at 0x1c524ace540>

In [None]:
producer.flush()

: 