# STOCK MARKET DATA STREAMING FOR REAL-TIME ANALYTICS

## KAFKA PRODUCER CODE

### The Top 100 US Stock Index One-Day (60min interval) data used for this project is requested using the API from "https://www.alphavantage.co/support/#api-key"
#### The following code is used for streaming the stock market data to Kafka topics for real-time analysis on the Consumer end. 

### IMPORT LIBRARIES

In [1]:
import pandas as pd
from kafka import KafkaProducer
from time import sleep
import datetime
from json import dumps
import json
import requests

### CREATE PRODUCER

In [2]:
producer = KafkaProducer(bootstrap_servers=['18.220.172.23:9092'],
                        value_serializer=lambda x: dumps(x).encode('utf-8'))

### LOAD THE DATA USING API & SEND IT FROM PRODUCER USING KAFKA

#### Top 100 US Stock Indexes

In [3]:
top_100_us_stock_symbols = [
    "AAPL", "MSFT", "AMZN", "GOOGL", "GOOG", "META", "TSLA", "NVDA", "BRK-A", "BRK-B"]

# # Removed The Rest for demonstartion purpose

#     "JPM", "JNJ", "V", "PG", "UNH", "MA", "HD", "BAC", "DIS", "PYPL"
#     "CMCSA", "VZ", "NFLX", "ADBE", "T", "CSCO", "PEP", "INTC", "KO", "ABT",
#     "PFE", "XOM", "WMT", "CVX", "NKE", "MRK", "WFC", "MCD", "ABNB", "CRM",
#     "BMY", "HON", "BA", "CMG", "COST", "TXN", "TMO", "AMGN", "PM", "MO",
#     "AVGO", "C", "GILD", "ORCL", "IBM", "AMD", "UNP", "GE", "PDD", "NEE",
#     "DHR", "D", "LIN", "TGT", "NOW", "NVO", "CAT", "TMUS", "LMT", "SAP",
#     "MMM", "LOW", "ADP", "AMD", "AMT", "AXP", "BLK", "CCI", "CME", "COF",
#     "COP", "CSX", "DIA", "DOCU", "EL", "FIS", "GD", "GM", "GS", "HAL",
#     "HDB", "HON", "IBM", "ICE", "IEFA", "IEMG", "IETC", "IEUR", "IEUS", "IEX"
#     ]


### Request One-Day data for each Stock Index using the API  from https://www.alphavantage.co/support/#api-key
#### Send the data to Consumer using KafkaProducer

In [4]:
interval       = 60 # in minutes
api_key        = "XV1DON5HG5EZUICZ"

data_all = []

for stock_index in sorted(top_100_us_stock_symbols):
   
    print(stock_index, "Stock Data Requested Using API")
    
    url = 'https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol='+stock_index+'&interval='+str(interval)+'min&apikey='+api_key
    
    r = requests.get(url)
    
    request_data = r.json()
    
    # Transform the data into a DataFrame 
    request_data = request_data['Time Series ('+str(interval)+'min)']

    data = pd.DataFrame.from_dict(request_data, orient='index').reset_index()
    
    data.columns = ['DateTime'] + list(data.columns)[1:]
        
    
    # Improve the Readability of the Data
    data.insert(1, 'Stock_Index', stock_index)
    
    data['Date'] = data['DateTime'].str.split(' ').str[0]
    
    data['Time'] = data['DateTime'].str.split(' ').str[1]
    
    date_col = data.pop('Date')
    time_col = data.pop('Time')
    
    data.insert(1, 'Date', date_col)
    data.insert(2, 'Time', time_col)
    
    data = data.drop(columns=['DateTime'])
    
    
    # SEND the data 
    for index, row in data.iterrows():
        dict_stock = row.to_dict()
        
        producer.send('demo_top100_us_stock', value=dict_stock)
        
        sleep(0.1) # To avoid overloading the server
    
    print(stock_index, " Stock Data Sent to Consumer using Kafka\n")
    
    producer.flush()
    
    sleep(2) # Optional: To avoid crossing the upper limit of 5 requests per minute (API restriction)

AAPL Stock Data Requested Using API
AAPL  Stock Data Sent to Consumer using Kafka

AMZN Stock Data Requested Using API
AMZN  Stock Data Sent to Consumer using Kafka

BRK-A Stock Data Requested Using API
BRK-A  Stock Data Sent to Consumer using Kafka

BRK-B Stock Data Requested Using API
BRK-B  Stock Data Sent to Consumer using Kafka

GOOG Stock Data Requested Using API
GOOG  Stock Data Sent to Consumer using Kafka

GOOGL Stock Data Requested Using API
GOOGL  Stock Data Sent to Consumer using Kafka

META Stock Data Requested Using API
META  Stock Data Sent to Consumer using Kafka

MSFT Stock Data Requested Using API
MSFT  Stock Data Sent to Consumer using Kafka

NVDA Stock Data Requested Using API
NVDA  Stock Data Sent to Consumer using Kafka

TSLA Stock Data Requested Using API
TSLA  Stock Data Sent to Consumer using Kafka

