In [None]:
# pip install yfinance pandas numpy scikit-learn tensorflow matplotlib

import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
from sklearn.model_selection import TimeSeriesSplit
from collections import defaultdict
from tensorflow.keras.layers import Bidirectional, Dropout, Dense, LSTM, Input
from tensorflow.keras.models import Sequential
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

In [None]:

def get_data(ticker):
    data = yf.download(ticker, start="2014-01-01")

    plt.figure(figsize=(14, 5))
    plt.plot(data['Close'], label=f'{ticker} Closing Price')
    plt.title(f'{ticker} Closing Price History')
    plt.xlabel('Date')
    plt.ylabel('Closing Price USD ($)')
    plt.legend()
    plt.show()

    return data

def calculate_RSI(data, window=50):
    delta = data['Close'].diff(1)
    gain = (delta.where(delta > 0, 0)).fillna(0)
    loss = (-delta.where(delta < 0, 0)).fillna(0)
    avg_gain = gain.rolling(window=window).mean()
    avg_loss = loss.rolling(window=window).mean()
    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))
    return rsi

def create_sequences(data, seq_length):
    sequences = []
    targets = []
    for i in range(len(data) - seq_length):
        sequences.append(data[i:i + seq_length])
        targets.append(data[i + seq_length, 0])
    return np.array(sequences), np.array(targets)

def train_test(ticker):
    data = get_data(ticker)
    data['RSI'] = calculate_RSI(data)
    data.fillna(method='backfill', inplace=True)

    features = ['Close', 'RSI']

    scaler = MinMaxScaler(feature_range=(0, 1))
    scaled_data = scaler.fit_transform(data[features])

    tss = TimeSeriesSplit(n_splits = 2)

    seq_length = 5

    X, y = create_sequences(scaled_data, seq_length)

    for train_index, test_index in tss.split(X):
        X_train, X_test = X[train_index, :], X[test_index,:]
        y_train, y_test = y[train_index], y[test_index]
    
    return X_train, X_test, y_train, y_test, scaler

In [None]:
def evaluate(y_test_rescaled, predictions):
    mse = mean_squared_error(y_test_rescaled, predictions)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(y_test_rescaled, predictions)

    print(f'Mean Squared Error (MSE): {mse}')
    print(f'Root Mean Squared Error (RMSE): {rmse}')
    print(f'Mean Absolute Error (MAE): {mae}')

    return mae

def predict(model, X_test, y_test, scaler):
    predictions = model.predict(X_test)
    predictions = scaler.inverse_transform(np.concatenate((predictions, np.zeros((predictions.shape[0], X_test.shape[2] - 1))), axis=1))[:, 0]
    y_test_rescaled = scaler.inverse_transform(np.concatenate((y_test.reshape(-1, 1), np.zeros((y_test.shape[0], X_test.shape[2] - 1))), axis=1))[:, 0]

    return (y_test_rescaled, predictions)
    
def build_model(X_train, y_train, X_test, y_test, seq_length=5):
    model = Sequential([
        Input(shape=(seq_length, X_train.shape[2])),
        Bidirectional(LSTM(100, return_sequences=True, kernel_regularizer=l2(0.05))),
        Dropout(0.3),
        LSTM(100, kernel_regularizer=l2(0.05)),
        Dropout(0.3),
        Dense(50),
        Dense(25),
        Dense(1)
    ])

    optimizer = Adam(learning_rate=0.001)
    model.compile(optimizer=optimizer, loss='mean_squared_error')

    early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
    reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=1e-6, verbose=1)

    history = model.fit(X_train, y_train, epochs=75, batch_size=16, validation_data=(X_test, y_test), callbacks=[early_stopping, reduce_lr])

    return model

In [None]:
import math

industries = {
            "Finance": ["BAC", "BRK-B", "JPM", "MA", "V", "WFC"],
            "Med": ["UNH", "JNJ", "ABBV", "MRK", "TMO"],
            "Tech": ["AAPL", "AMZN", "GOOG", "META", "MSFT", "TSLA"]
            }

res = defaultdict(list)

for industry in industries:
    for stock in industries[industry]:
        X_train, X_test, y_train, y_test, scaler = train_test(stock)

        best_model, best_mae, best_avg_pct_diff = None, math.inf, math.inf
        best_predictions = None

        for i in range(10):
            model = build_model(X_train, y_train, X_test, y_test)
            y_test_rescaled, predictions = predict(model, X_test, y_test, scaler)
            mae = evaluate(y_test_rescaled, predictions)
            avg_pct_diff = mae/(sum(predictions)/len(predictions))
            
            if avg_pct_diff < best_avg_pct_diff or mae < best_mae:
                best_avg_pct_diff = avg_pct_diff
                best_mae = mae
                best_model = model
                best_predictions = predictions
        
        res[stock] = [industry, best_avg_pct_diff, best_mae, best_predictions, y_test_rescaled, best_model]

In [None]:
for stock in res:
    industry, avg_pct_diff, mae, predictions, actual, model = res[stock]
    best_model.save(f"Models/{industry}/{stock}.keras")

    plt.figure(figsize=(14, 5))
    plt.plot(actual, color='blue', label='Actual Stock Price')
    plt.plot(predictions, color='red', label='Predicted Stock Price')
    plt.title(f'{stock} Stock Price Prediction')
    plt.xlabel('Days')
    plt.ylabel('Stock Price USD ($)')
    plt.legend()
    plt.show()

    print(f"Mae: {mae}")
    print(f"Avg Percent Diff: {avg_pct_diff}")

    print("*" * 100)