# Stock Market Prediction Model of the Magnificient Seven 

#pip install the dependencies from requirements. May take up to 12 minutes

In [90]:
!python3 -m pip install -r requirements.txt


Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/Library/Developer/CommandLineTools/usr/bin/python3 -m pip install --upgrade pip' command.[0m


### importing our libraries after pipinstall

In [91]:

#python data manipulation
import pandas as pd
import numpy as np

#data visualizaiton tools, EDA
import seaborn as sns
import matplotlib.pyplot as plt
import plotly.express as px

#mysql and Cassandra
import mysql.connector                         
from cassandra.cluster import Cluster          

#ARIMA
from statsmodels.tsa.arima.model import ARIMA
import statsmodels.api as sm

#LSTM Neural network
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
import tensorflow as tf

# our finance data
import yfinance as yf



## Extracting the Historical Data.


 ### The historical aspect of the data is starting from January 1st 2020 to yesterday (april 6th), We can adjust this to most recent always. We extracted the 7 companies and start with first 7 features from our project proposal: Date/time, open price, low price, high price, adjusted close price, trading volume

In [103]:
def extract_data(tickers, start_date="2020-01-01", end_date="2025-04-06", interval="1d"):
    # importing the yfinance api data, making sure the autoadjust is off because it overrides the close value if its on
    df = yf.download(tickers, start=start_date, end=end_date, interval=interval, auto_adjust=False)
    # tidying the data to make it better for analysis and transformation later
    df.columns = df.columns.swaplevel(0, 1)
    df = df.sort_index(axis=1, level=0)
    df_flat = df.stack(level=0, future_stack=True).reset_index()  
    df_flat.rename(columns={'level_0': 'Date'}, inplace=True)
    # renaming 'Adj Close' to 'Adj_Close' for consistency
    if 'Adj Close' in df_flat.columns:
        df_flat.rename(columns={'Adj Close': 'Adj_Close'}, inplace=True)
    # adding the previous day's close price per ticker
    df_flat['Previous_Close'] = df_flat.groupby('Ticker')['Close'].shift(1)
    # selecting the columns we expect and need
    expected_cols = ['Date', 'Ticker', 'Open', 'High', 'Low', 'Close', 'Adj_Close', 'Previous_Close', 'Volume']
    existing_cols = [col for col in expected_cols if col in df_flat.columns]
    return df_flat[existing_cols]


In [102]:
#giving yfinance a list of companies to return a list of data of. AND displaying the df
tickers = ["AAPL", "MSFT", "GOOGL", "AMZN", "META", "NVDA", "TSLA"]
df_raw = extract_data(tickers)
display(df_raw)

[*********************100%***********************]  7 of 7 completed


Price,Date,Ticker,Open,High,Low,Close,Adj_Close,Previous_Close,Volume
0,2020-01-02,AAPL,74.059998,75.150002,73.797501,75.087502,72.716064,,135480400
1,2020-01-02,AMZN,93.750000,94.900497,93.207497,94.900497,94.900497,,80580000
2,2020-01-02,GOOGL,67.420502,68.433998,67.324501,68.433998,68.108368,,27278000
3,2020-01-02,META,206.750000,209.789993,206.270004,209.779999,208.795944,,12077100
4,2020-01-02,MSFT,158.779999,160.729996,158.330002,160.619995,153.323257,,22622100
...,...,...,...,...,...,...,...,...,...
9249,2025-04-04,GOOGL,148.009995,151.070007,145.380005,145.600006,145.600006,150.720001,62138600
9250,2025-04-04,META,506.619995,518.000000,494.200012,504.730011,504.730011,531.619995,38515100
9251,2025-04-04,MSFT,364.130005,374.589996,359.480011,359.839996,359.839996,373.109985,49138700
9252,2025-04-04,NVDA,98.910004,100.129997,92.110001,94.309998,94.309998,101.800003,529707700


## Historical Data: Data Cleaning: 
### We will clean the data by taking care of any null values. We can use back and forward filling if there is a value missing from a float/int value. Otherwise we will drop the value if its in the date or Ticker/ or use the average of the past few days for the column. Additionally if there are are any duplicate records for a company and a speciifc date, one should be kept while rest dropped.

In [104]:
def cleaning(df):
    return df

## Historical Data, Feature Engineering:

### We are going to be transforming the tidy dataframe by doing some feature engineering. We will create the following fields into our table by manipulating the prexisting data from df_cleaned: 
1. Simple Moving Average (SMA)
2. Exponential Moving Average (EMA)
3. Relative Strength Index (RSI)
4. Bollinger Bands
5. MACD (Moving Average Convergence Divergence)
6. On-Balance Volume (OBV)
7. Volatility (ATR - Average True Range)


In [105]:
def transform(df):
    return df

## Loading into MySQL with historical data 

## Extracting Real time data
### Using Finnhub API to get real time data of the stock market , filling the Cassandra database every 10 seconds until 60seconds is reached for the past minute of data

In [106]:
from datetime import datetime
import finnhub

#using finhub api, use api key
finnhub_client = finnhub.Client(api_key="cvhdlvhr01qrtb3o0350cvhdlvhr01qrtb3o035g")

tickers = ["AAPL", "MSFT", "GOOGL", "AMZN", "META", "NVDA", "TSLA"]

def get_realtime_quotes():
    quotes = []
    for symbol in tickers:
        data = finnhub_client.quote(symbol)
        quotes.append({
            "Date": pd.to_datetime(data['t'], unit='s'),
            "Ticker": symbol,
            "Open": data['o'],
            "High": data['h'],
            "Low": data['l'],
            "Close": data['c'],           
            "Adj_Close": data['c'],       
            "Previous_Close": data['pc'],  
            "Volume": None               
        })
    return pd.DataFrame(quotes)


## Loading into Cassandra with Real time data

In [107]:
from cassandra.cluster import Cluster

try:
    cluster = Cluster(['localhost'])
    session = cluster.connect()
    print("Cassandra connection established.")
except Exception as e:
    print("Connection error:", e)


# connecting to  the Cassandra session
cluster = Cluster(['localhost'])
session = cluster.connect()

# we are making a keyspace called "stock data" and inside the keyspace we have a table called real_time_quotes where we are pulling data realtime from the finnhub api
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS stock_data 
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
""")
session.set_keyspace('stock_data')
#schema for the cassandra csql table
session.execute("""
    CREATE TABLE IF NOT EXISTS real_time_quotes (
    ticker TEXT,
    date TIMESTAMP,
    open DOUBLE,
    high DOUBLE,
    low DOUBLE,
    close DOUBLE,
    adj_close DOUBLE,
    volume BIGINT,
    previous_close DOUBLE,
    PRIMARY KEY (ticker, date)
);
""")


Cassandra connection established.


<cassandra.cluster.ResultSet at 0x15974da60>

In [108]:
#from cassandra.query import PreparedStatement
import time
#inserting the values from the api and clearing the white space and storing the columns as ticker, date, open, high, low, close, adj_close, and volume
def insert_to_cassandra(df):
    query = """
            INSERT INTO real_time_quotes (ticker, date, open, high, low, close, adj_close, volume, previous_close)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
            """

    for _, row in df.iterrows():
        session.execute(query, (
            row['Ticker'],
            row['Date'].to_pydatetime(),
            row['Open'],
            row['High'],
            row['Low'],
            row['Close'],
            row['Adj_Close'],
            row['Volume'] if pd.notna(row['Volume']) else 0,
            row['Previous_Close'] if pd.notna(row['Previous_Close']) else None
        ))  

#making a backup incase the cassandra server acts up
#def backup_to_csv(df, path='realtime_backup.csv'):
    #header = not os.path.exists(path)
    #df.to_csv(path, mode='a', index=False, header=header)



In [109]:
# pulls stocks values every 10 seconds, 14 times for about 2 min 33 seconds + some buffer time 
for _ in range(14):
    df_tenseconds = get_realtime_quotes()
    insert_to_cassandra(df_tenseconds)
    #backup_to_csv(df_hour)
    print(" For 10 seconds: Adding a new batch of 7 rows into Cassandra DB.")
    time.sleep(10)  # delay to avoid rate limiting with the API


 For 10 seconds: Adding a new batch of 7 rows into Cassandra DB.
 For 10 seconds: Adding a new batch of 7 rows into Cassandra DB.
 For 10 seconds: Adding a new batch of 7 rows into Cassandra DB.
 For 10 seconds: Adding a new batch of 7 rows into Cassandra DB.
 For 10 seconds: Adding a new batch of 7 rows into Cassandra DB.
 For 10 seconds: Adding a new batch of 7 rows into Cassandra DB.
 For 10 seconds: Adding a new batch of 7 rows into Cassandra DB.
 For 10 seconds: Adding a new batch of 7 rows into Cassandra DB.
 For 10 seconds: Adding a new batch of 7 rows into Cassandra DB.
 For 10 seconds: Adding a new batch of 7 rows into Cassandra DB.
 For 10 seconds: Adding a new batch of 7 rows into Cassandra DB.
 For 10 seconds: Adding a new batch of 7 rows into Cassandra DB.
 For 10 seconds: Adding a new batch of 7 rows into Cassandra DB.
 For 10 seconds: Adding a new batch of 7 rows into Cassandra DB.


In [110]:
#turning the cassandra table into a python pandas dataframe for datamanipulation
query = "SELECT * FROM stock_data.real_time_quotes"
rows = session.execute(query)
df_cassandra = pd.DataFrame(rows)
display(df_cassandra)

Unnamed: 0,ticker,date,adj_close,close,high,low,open,previous_close,volume
0,META,2025-03-28 20:00:00,576.7400,576.7400,601.7500,573.92,600.31,602.58,0
1,META,2025-04-07 19:23:59,517.0383,517.0383,539.3700,481.90,485.10,504.73,0
2,META,2025-04-07 19:24:12,516.4800,516.4800,539.3700,481.90,485.10,504.73,0
3,META,2025-04-07 19:24:21,516.4800,516.4800,539.3700,481.90,485.10,504.73,0
4,META,2025-04-07 19:24:29,516.4800,516.4800,539.3700,481.90,485.10,504.73,0
...,...,...,...,...,...,...,...,...,...
100,MSFT,2025-04-07 19:25:31,355.8300,355.8300,370.9999,344.79,350.88,359.84,0
101,MSFT,2025-04-07 19:25:43,355.8300,355.8300,370.9999,344.79,350.88,359.84,0
102,MSFT,2025-04-07 19:25:52,355.8300,355.8300,370.9999,344.79,350.88,359.84,0
103,MSFT,2025-04-07 19:26:01,355.9100,355.9100,370.9999,344.79,350.88,359.84,0


## Real Time Data: Data Cleaning 


In [111]:
def cleaning(df):
    # making sure the date column is only in date time format
    df['date'] = pd.to_datetime(df['date'])
    #dropping rows that are missing ticker or date since its the most integral portion
    df = df.dropna(subset=['ticker', 'date'])
    # filling in the missing values using forward fill, or backfill when available 
    float_cols = ['open', 'high', 'low', 'close', 'adj_close', 'volume', 'previous_close']
    for col in float_cols:
        if col in df.columns:
            df[col] = df[col].ffill().bfill()
    # if there is still any remaining numerical columns (float)we can just average fill it 
    for col in float_cols:
        if df[col].isna().any():
            df[col] = df[col].fillna(df[col].mean())
    # making sure theres no duplicate 
    df = df.drop_duplicates(subset=['ticker', 'date'], keep='last')
    # reseting index if we drop
    df = df.reset_index(drop=True)
    return df


#applying the cleaning function on our previous tile: df_cassandra dataframe
df_cassandracleaned = cleaning(df_cassandra)
display(df_cassandracleaned)
print("\nNull Count for Real Time Data (Extracted from Cassandra):\n")
print(df_cassandracleaned.isnull().sum())

Unnamed: 0,ticker,date,adj_close,close,high,low,open,previous_close,volume
0,META,2025-03-28 20:00:00,576.7400,576.7400,601.7500,573.92,600.31,602.58,0
1,META,2025-04-07 19:23:59,517.0383,517.0383,539.3700,481.90,485.10,504.73,0
2,META,2025-04-07 19:24:12,516.4800,516.4800,539.3700,481.90,485.10,504.73,0
3,META,2025-04-07 19:24:21,516.4800,516.4800,539.3700,481.90,485.10,504.73,0
4,META,2025-04-07 19:24:29,516.4800,516.4800,539.3700,481.90,485.10,504.73,0
...,...,...,...,...,...,...,...,...,...
100,MSFT,2025-04-07 19:25:31,355.8300,355.8300,370.9999,344.79,350.88,359.84,0
101,MSFT,2025-04-07 19:25:43,355.8300,355.8300,370.9999,344.79,350.88,359.84,0
102,MSFT,2025-04-07 19:25:52,355.8300,355.8300,370.9999,344.79,350.88,359.84,0
103,MSFT,2025-04-07 19:26:01,355.9100,355.9100,370.9999,344.79,350.88,359.84,0



Null Count for Real Time Data (Extracted from Cassandra):

ticker            0
date              0
adj_close         0
close             0
high              0
low               0
open              0
previous_close    0
volume            0
dtype: int64


### Note: Finnhub API does not provide real time volume in its free API model. Additionally accuracy of per second volume would be hard to predict. We would potentially need to drop this or exclude volume entirely from subsequent machine learning later

## Real Time Data: Feature Engineering
### Can only perform this on a weekday between 8am EST and 4pm EST. Otherwise we will not be able to get more than 1 iteration of results since stock market is closed realtime. 
1. Simple Moving Average (SMA)
2. Exponential Moving Average (EMA)
3. Relative Strength Index (RSI)
4. Bollinger Bands
5. MACD (Moving Average Convergence Divergence)
6. On-Balance Volume (OBV)
7. Volatility (ATR - Average True Range)


In [115]:
def add_technical_indicators(df):
    df = df.sort_values(by=['ticker', 'date'])
    
    # Simple Moving Average (SMA)
    df['SMA_5'] = df.groupby('ticker')['close'].transform(lambda x: x.rolling(window=5).mean())

    # Exponential Moving Average (EMA)
    df['EMA_5'] = df.groupby('ticker')['close'].transform(lambda x: x.ewm(span=5, adjust=False).mean())

    # Relative Strength Index (RSI)
    def compute_rsi(series, period=5):
        delta = series.diff()
        gain = delta.where(delta > 0, 0.0)
        loss = -delta.where(delta < 0, 0.0)
        avg_gain = gain.rolling(window=period).mean()
        avg_loss = loss.rolling(window=period).mean()
        rs = avg_gain / avg_loss
        return 100 - (100 / (1 + rs))
    
    df['RSI_5'] = df.groupby('ticker')['close'].transform(lambda x: compute_rsi(x))

    # Bollinger Bands
    # Bollinger Bands (corrected)
    df['BB_Middle'] = df.groupby('ticker')['close'].transform(lambda x: x.rolling(window=5).mean())
    df['BB_Std'] = df.groupby('ticker')['close'].transform(lambda x: x.rolling(window=5).std())
    df['BB_Upper'] = df['BB_Middle'] + 2 * df['BB_Std']
    df['BB_Lower'] = df['BB_Middle'] - 2 * df['BB_Std']


    # MACD
    ema12 = df.groupby('ticker')['close'].transform(lambda x: x.ewm(span=12, adjust=False).mean())
    ema26 = df.groupby('ticker')['close'].transform(lambda x: x.ewm(span=26, adjust=False).mean())
    df['MACD'] = ema12 - ema26
    df['MACD_Signal'] = df.groupby('ticker')['MACD'].transform(lambda x: x.ewm(span=9, adjust=False).mean())

    # On-Balance Volume (OBV)
    def compute_obv(group):
        obv = [0]
        for i in range(1, len(group)):
            if group['close'].iloc[i] > group['close'].iloc[i - 1]:
                obv.append(obv[-1] + group['volume'].iloc[i])
            elif group['close'].iloc[i] < group['close'].iloc[i - 1]:
                obv.append(obv[-1] - group['volume'].iloc[i])
            else:
                obv.append(obv[-1])
        return pd.Series(obv, index=group.index)

    df['OBV'] = df.groupby('ticker')[['close', 'volume']].apply(compute_obv).reset_index(level=0, drop=True)

    # ATR - Average True Range
    df['prev_close'] = df.groupby('ticker')['close'].shift(1)
    df['high_low'] = df['high'] - df['low']
    df['high_prev_close'] = (df['high'] - df['prev_close']).abs()
    df['low_prev_close'] = (df['low'] - df['prev_close']).abs()
    df['TR'] = df[['high_low', 'high_prev_close', 'low_prev_close']].max(axis=1)
    df['ATR_5'] = df.groupby('ticker')['TR'].transform(lambda x: x.rolling(window=5).mean())

    # Drop temp columns
    df.drop(columns=['prev_close', 'high_low', 'high_prev_close', 'low_prev_close', 'TR'], inplace=True)

    return df

df_cleaned_cass_features=add_technical_indicators(df_cassandracleaned)
display(df_cleaned_cass_features)

Unnamed: 0,ticker,date,adj_close,close,high,low,open,previous_close,volume,SMA_5,EMA_5,RSI_5,BB_Middle,BB_Std,BB_Upper,BB_Lower,MACD,MACD_Signal,OBV,ATR_5
30,AAPL,2025-03-28 20:00:00,217.9000,217.9000,223.81,217.68,221.67,223.85,0,,217.900000,,,,,,0.000000,0.000000,0,
31,AAPL,2025-04-07 19:23:59,179.3600,179.3600,194.15,174.62,177.20,188.38,0,,205.053333,,,,,,-3.074416,-0.614883,0,
32,AAPL,2025-04-07 19:24:08,179.2500,179.2500,194.15,174.62,177.20,188.38,0,,196.452222,,,,,,-5.456885,-1.583284,0,
33,AAPL,2025-04-07 19:24:21,179.2500,179.2500,194.15,174.62,177.20,188.38,0,,190.718148,,,,,,-7.261306,-2.718888,0,
34,AAPL,2025-04-07 19:24:29,179.2500,179.2500,194.15,174.62,177.20,188.38,0,187.00200,186.895432,0.0,187.00200,17.272573,221.547146,152.456854,-8.592275,-3.893565,0,21.60
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
25,TSLA,2025-04-07 19:25:31,229.7900,229.7900,252.00,214.25,223.78,239.43,0,229.96000,230.528524,0.0,229.96000,0.380132,230.720263,229.199737,-9.240855,-7.559781,0,37.75
26,TSLA,2025-04-07 19:25:44,229.7900,229.7900,252.00,214.25,223.78,239.43,0,229.79000,230.282350,0.0,229.79000,0.000000,229.790000,229.790000,-9.085016,-7.864828,0,37.75
27,TSLA,2025-04-07 19:25:52,229.7900,229.7900,252.00,214.25,223.78,239.43,0,229.79000,230.118233,,229.79000,0.000000,229.790000,229.790000,-8.859388,-8.063740,0,37.75
28,TSLA,2025-04-07 19:26:01,229.9338,229.9338,252.00,214.25,223.78,239.43,0,229.81876,230.056755,100.0,229.81876,0.064309,229.947379,229.690141,-8.570180,-8.165028,0,37.75


## Exploratory Data Analysis

## Machine Learning
### ARIMA for MySQL (historical) and LSTM for Cassandra (real time)