# Evaluation of stock prediction model

## Part 0: Setup

In [None]:
import numpy as np
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt
import seaborn as sns
import datetime, time
from termcolor import colored

from requests import Session
from requests_cache import CacheMixin, SQLiteCache
from requests_ratelimiter import LimiterMixin, MemoryQueueBucket
from pyrate_limiter import Duration, RequestRate, Limiter

# pd.set_option('display.max_rows', None)
# pd.set_option('display.max_columns', None)

# Dependencies for the LSTM model
from sklearn.preprocessing import MinMaxScaler
%pip install --upgrade tensorflow
import tensorflow as tf
from tensorflow.keras.losses import MeanSquaredError as mse
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, root_mean_squared_error
from tensorflow import keras
from keras.utils import pad_sequences
%pip install gputil
import GPUtil as GPU
from IPython.display import clear_output
clear_output()

## Part 1: Load the previously trained model.

In [None]:
MODEL_PATH = "./data/stock_prediction.h5"
model = tf.keras.models.load_model(MODEL_PATH, compile=False)

# Show model details
model.summary()

## Part 2: Create Test Cases

Here we list some variables for the test stocks. We've downloaded the for one past year from the NASDAQ site. There are some formatting specific things, that we will fix in the following code.

In [None]:
TESTDATADIR = "./data/test_stocks/"
TESTSTOCKS = [
    "AAPL", # Apple
    "AMD",  # AMD
    "AMZN", # Amazon
    "META", # Meta Platforms
    "NFLX", # Netflix
    "QCOM", # Qualcomm
    "SBUX", # Starbucks
    "SCSO", # Cisco
    "TSLA", # Tesla
]

In [None]:
# Create a dictionary to hold the test data for each stock
test_data = {}
for stock in TESTSTOCKS:
    # Read the CSV file into a DataFrame
    df = pd.read_csv(f"{TESTDATADIR}{stock}.csv")
    
    # The data column is given in the format "MM/DD/YYYY", change that to datetime
    df['Date'] = pd.to_datetime(df['Date'], format='%m/%d/%Y')
    
    # Remove the dollar sign from the columns where the value is monetary
    monetary_columns = ['Close/Last', 'Open', 'High', 'Low']
    for col in monetary_columns:
        df[col] = df[col].replace({'\$': '', ',': ''}, regex=True).astype(float)
        
    # Drop the columns not in Date and Close/Last
    df = df[['Date', 'Close/Last']]
    
    # Rename the columns to 'Date' and 'Close'
    df.columns = ['Date', 'Close']
        
    # Add the stock to the data dictionary
    test_data[stock] = df

In [None]:
# Scale the closing prices for each stock and store the scaler and df in a tuple.
for stock, df in test_data.items():
    # Create a MinMaxScaler object
    scaler = MinMaxScaler(feature_range=(0, 1))
    
    # Fit the scaler to the closing prices
    df['Close'] = scaler.fit_transform(df['Close'].values.reshape(-1, 1))
    
    # Store the scaler and df in a tuple
    test_data[stock] = (scaler, df)

In [None]:
# Create X_test and y_test for each stock
for stock, (scaler, df) in test_data.items():
    # Data is the closing prices
    data = df['Close'].values
    
    # Preprocess the data
    X_test = []
    y_test = []
    for i in range(30, len(data)):
        X_test.append(data[i-30:i])
        y_test.append(data[i])
        
    # Turn these data to numpy format, reshape, and return.
    X_test = np.array(X_test)
    y_test = np.array(y_test)
    X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)
    
    # Save the data as (df, X_test, y_test, scaler)
    test_data[stock] = (df, X_test, y_test, scaler)
        
    # Print final details
    print(f"Final data for the stock '{stock}'...")
    print(f"   Shape of X_test: {X_test.shape}")
    print(f"   Shape of y_test: {y_test.shape}")
    print()

## Part 3: Predicting with the model

### Part 3.1: Make predictions with the test data.

In [None]:
def predict_stock_price(model, ticker_data):
    """
    Predicts the stock price using the trained model.
    The input data should be a 3D array of shape (n_samples, 30, 1).
    The input data should be preprocessed in the same way as the training data. (normalized, reshaped)
    """
    # Get the input test data.
    X_test, y_test, scaler = ticker_data
    
    # Assert the input data shape
    expected_shape = (None, 30, 1)
    assert len(X_test.shape) == 3, f"Expected 3D array, got {len(X_test.shape)}D array"
    assert X_test.shape[1] == 30, f"Expected 30 time steps, got {X_test.shape[1]} time steps"
    assert X_test.shape[2] == 1, f"Expected 1 feature, got {X_test.shape[2]} features"
    
    # Make predictions
    predictions = model.predict(X_test)
    return predictions

In [None]:
# Now that cases are created, as (dataframe, X_test, y_test, scaler) for each ticker, we can predict the stock prices
for ticker, (dataframe, X_test, y_test, scaler) in test_data.items():
    # Predict the stock price
    predictions = predict_stock_price(model, (X_test, y_test, scaler))
    
    # Inverse transform the predictions and y_test to get the actual prices
    predictions_unscaled = scaler.inverse_transform(predictions)
    y_test_unscaled = scaler.inverse_transform(y_test.reshape(-1, 1))
    
    # Create a DataFrame to hold the results
    results = pd.DataFrame({
        'Date': dataframe['Date'][30:],
        'Actual_Scaled': y_test,
        'Prediction_Scaled': predictions.flatten(),
        'Actual': y_test_unscaled.flatten(),
        'Prediction': predictions_unscaled.flatten()
    })
    
    # Update the test_data dictionary with the results
    test_data[ticker] = (dataframe, X_test, y_test, scaler, results)
    
    # Print the first few rows of the results
    print(f"Results for {ticker}:")
    print(results.head())

In [None]:
# Crate a 10 subplot grid to show the results for each stock
plt.figure(figsize=(20, 20))
for i, (ticker, (dataframe, X_test, y_test, scaler, results)) in enumerate(test_data.items()):
    # Add the subplot for each stock
    plt.subplot(5, 2, i + 1)
    plt.plot(results['Date'], results['Actual'], label='Actual Price', color='blue')
    plt.plot(results['Date'], results['Prediction'], label='Predicted Price', color='red')
    plt.title(f'{ticker} Stock Price Prediction')
    plt.xlabel('Date')
    plt.ylabel('Price')
    plt.legend()
plt.tight_layout()
plt.show()

### Part 3.2: Use the predective capabilities of the model to simulate trading.

In [None]:
# Organize the trading results into a DataFrame

# Define the buy signal logic
def buy_shares(cash, actual_price_today):
    """
    Buys as many shares as possible with the available cash.
    Returns the updated cash and number of shares bought.
    """
    shares_to_buy = cash // actual_price_today
    cash -= shares_to_buy * actual_price_today
    return cash, shares_to_buy

# Define the sell signal logic
def sell_shares(cash, shares, actual_price_today):
    """
    Sells all shares owned.
    Returns the updated cash and sets shares to 0.
    """
    cash += shares * actual_price_today
    shares = 0
    return cash, shares

# Simulate paper trading for each stock
def simulate_paper_trading(test_data, initial_investment=10000):
    """
    Simulates paper trading for each stock in the test_data dictionary.
    Returns a DataFrame with the final portfolio value and yield for each stock.
    """
    trading_results = []

    for ticker, (dataframe, X_test, y_test, scaler, results) in test_data.items():
        cash = initial_investment
        shares = 0
        portfolio_value = []

        # Iterate through the results DataFrame
        for i in range(len(results) - 1):  # Exclude the last day since we can't predict beyond it
            actual_price_today = results['Actual'].iloc[i]
            predicted_price_tomorrow = results['Prediction'].iloc[i + 1]

            # Buy signal: Predicted price tomorrow > Actual price today
            if predicted_price_tomorrow > actual_price_today:
                cash, shares_bought = buy_shares(cash, actual_price_today)
                shares += shares_bought

            # Sell signal: Predicted price tomorrow < Actual price today
            elif predicted_price_tomorrow < actual_price_today and shares > 0:
                cash, shares = sell_shares(cash, shares, actual_price_today)

            # Calculate the portfolio value (cash + value of shares)
            portfolio_value.append(cash + shares * actual_price_today)

        # Final portfolio value
        final_portfolio_value = cash + shares * results['Actual'].iloc[-1]
        yield_percentage = ((final_portfolio_value - initial_investment) / initial_investment) * 100

        # Store the results
        trading_results.append({
            'Ticker': ticker,
            'Final Portfolio Value': final_portfolio_value,
            'Yield (%)': yield_percentage
        })

        # Print results for the stock
        print(f"Results for {ticker}:")
        print(f"   Final Portfolio Value: ${final_portfolio_value:.2f}")
        print(f"   Yield: {yield_percentage:.2f}%")
        print()

    # Convert trading results to a DataFrame for display
    trading_results_df = pd.DataFrame(trading_results)
    return trading_results_df

# Run the simulation
trading_results_df = simulate_paper_trading(test_data, initial_investment=10000)

# Display the trading results
display(trading_results_df)

### Part 3.3: Visualize the results of the trading simulation.

In [None]:
# Plot these results for each stock with red and green lines for gains and losses
plt.figure(figsize=(20, 10))
for i, (ticker, (dataframe, X_test, y_test, scaler, results)) in enumerate(test_data.items()):
    # Add the subplot for each stock
    plt.subplot(5, 2, i + 1)
    plt.plot(results['Date'], results['Actual'], label='Actual Price', color='blue')
    plt.plot(results['Date'], results['Prediction'], label='Predicted Price', color='red')
    
    # Highlight gains and losses
    gains = results['Actual'] > results['Prediction']
    losses = results['Actual'] < results['Prediction']
    
    plt.fill_between(results['Date'], results['Actual'], where=gains, color='green', alpha=0.3)
    plt.fill_between(results['Date'], results['Actual'], where=losses, color='red', alpha=0.3)
    
    plt.title(f'{ticker} Stock Price Prediction')
    plt.xlabel('Date')
    plt.ylabel('Price')
    plt.legend()
plt.tight_layout()
plt.show()


In [None]:
# Print a header
print("\n{:<8} {:<22} {:<10}".format('Ticker', 'Final Portfolio Value', 'Yield (%)'))
print("-" * 45)

# Iterate through the test data and print colored results
for ticker, (dataframe, X_test, y_test, scaler, results) in test_data.items():
    final_value = trading_results_df.loc[trading_results_df['Ticker'] == ticker, 'Final Portfolio Value'].values[0]
    yield_pct = trading_results_df.loc[trading_results_df['Ticker'] == ticker, 'Yield (%)'].values[0]
    
    # Format the values
    final_value_str = "${:.2f}".format(final_value)
    yield_str = "{:.2f}%".format(yield_pct)
    
    # Color based on yield (green for positive, red for negative)
    if yield_pct >= 0:
        colored_yield = colored(yield_str, 'green')
    else:
        colored_yield = colored(yield_str, 'red')
    
    # Print the row
    print("{:<8} {:<22} {}".format(ticker, final_value_str, colored_yield))

### Part 3.4: Store the results of the simulated trading.

In [None]:
  # Create a dataframe from the results.
dataframe_columns = ['Ticker', 'Actual', 'Prediction', 'Date']
trading_results_df = pd.DataFrame(columns=dataframe_columns)
for ticker, (dataframe, X_test, y_test, scaler, results) in test_data.items():
    # Create a DataFrame for the results
    results_df = pd.DataFrame({
        'Ticker': ticker,
        'Actual': results['Actual_Scaled'],
        'Prediction': results['Prediction_Scaled'],
        'Date': results['Date']
    })
    
    # Append to the main DataFrame
    trading_results_df = pd.concat([trading_results_df, results_df], ignore_index=True)
    
# Display the trading results DataFrame
display(trading_results_df)

### Part 3.5: Explote the accuracy of the model prediction.

In [None]:
# Define a function to calculate the accuracy for a stock in a given week with the results DataFrame
def calculate_weekly_accuracy(results, week: int, year: int):
    """
    Calculate the accuracy of predictions for a given week.
    """
    # Convert the 'Date' column to datetime format
    results['Date'] = pd.to_datetime(results['Date'])
    
    # Filter the results for the given week and year
    week_start = pd.to_datetime(f'{year}-W{week}-1', format='%Y-W%W-%w')
    week_end = week_start + pd.DateOffset(days=6)
    weekly_results = results[(results['Date'] >= week_start) & (results['Date'] <= week_end)]
    
    # Calculate the accuracy
    correct_predictions = ((weekly_results['Actual'] - weekly_results['Prediction']).abs() < 0.05 * weekly_results['Actual']).sum()
    total_predictions = len(weekly_results)
    
    if total_predictions == 0:
        return 0.0
    
    accuracy = correct_predictions / total_predictions
    return accuracy

In [None]:
# Show the weekly accuracy for each stock, using a colored table
for ticker, (dataframe, X_test, y_test, scaler, results) in test_data.items():
    # Calculate the weekly accuracy for the current year
    current_year = datetime.datetime.now().year
    weekly_accuracy = []
    
    for week in range(1, 10):
        accuracy = calculate_weekly_accuracy(results, week, current_year)
        weekly_accuracy.append(accuracy)
    
    # Create a DataFrame to hold the weekly accuracy
    weekly_accuracy_df = pd.DataFrame({
        'Week': range(1, 10),
        'Accuracy': weekly_accuracy
    })
    
    # Print the results
    print(f"Weekly Accuracy for {ticker}:")
    
    # Print colored results
    for index, row in weekly_accuracy_df.iterrows():
        week_str = f"Week {row['Week']}"
        accuracy_str = "{:.2f}%".format(row['Accuracy'] * 100)
        
        # Color based on accuracy (green for high accuracy, red for low accuracy)
        if row['Accuracy'] >= 0.8:
            colored_accuracy = colored(accuracy_str, 'green')
        elif row['Accuracy'] >= 0.5:
            colored_accuracy = colored(accuracy_str, 'yellow')
        else:
            colored_accuracy = colored(accuracy_str, 'red')
        
        # Print the row
        print(f"{week_str}: {colored_accuracy}")
    print()

## Part 4: Create prediction evaluation metrics

### Part 4.1: Evaluate and store some metrics for the model prediction.

In [None]:
# DataFrame to hold the evaluation metrics
evaluation_metrics = pd.DataFrame(columns=['Ticker', 'MSE', 'RMSE', 'MAE', 'R2'])

# Providide the MSE< RMSE, MAE, and R2 for each stock (Note small sample size and specificity of the data)
for ticker, (dataframe, X_test, y_test, scaler, results) in test_data.items():
    # Calculate the Mean Squared Error (MSE)
    mse_value = mean_squared_error(y_test, results['Prediction_Scaled'].values)
    
    # Calculate the Root Mean Squared Error (RMSE)
    rmse_value = np.sqrt(mse_value)
    
    # Calculate the Mean Absolute Error (MAE)
    mae_value = mean_absolute_error(y_test, results['Prediction_Scaled'].values)
    
    # Calculate the R-squared value
    r2_value = r2_score(y_test, results['Prediction_Scaled'].values)
    
    # Store the metrics in the DataFrame
    evaluation_metrics_row = pd.DataFrame({
        'Ticker': [ticker],
        'MSE': [mse_value],
        'RMSE': [rmse_value],
        'MAE': [mae_value],
        'R2': [r2_value]
    })
    evaluation_metrics = pd.concat([evaluation_metrics, evaluation_metrics_row], ignore_index=True)
    
    # Print the results
    print(f"Results for {ticker}:")
    print(f"   MSE: {mse_value:.4f}")
    print(f"   RMSE: {rmse_value:.4f}")
    print(f"   MAE: {mae_value:.4f}")
    print(f"   R-squared: {r2_value:.4f}")
    print()

In [None]:
# Display the evaluation metrics
display(evaluation_metrics)

### Part 4.2: Create Evaluation metrics per weeks.

In [None]:
def get_weeek_of_dataframe(df, ticker, start_date, end_date):
    
    # Slice the data for the specific ticker.
    df = df[df['Ticker'] == ticker]
    
    # Slice the data for the specific date range.
    df = df[(df['Date'] >= start_date) & (df['Date'] <= end_date)]

    # return a copy of the dataframe
    return df.copy()


# Calculate the weekly metric for each stock
def calculate_weekly_metric(data, start_date='2025-01-01', weeks=10):
    weekly_metrics = pd.DataFrame(columns=['Ticker', 'Week', 'Metric', 'Value'])

    for ticker in data['Ticker'].unique():
        # Slice the partial data for the specific ticker.
        current_data = data[data['Ticker'] == ticker]
        
        # Slice the data for each date range.
        for week in range(1, weeks):
            # Get the data for the specific week
            begin_date = datetime.datetime.strptime(start_date, '%Y-%m-%d') + datetime.timedelta(weeks=week-1)
            end_date = begin_date + datetime.timedelta(weeks=1)
            
            # Filter the data for the specific week
            results_filtered = get_weeek_of_dataframe(current_data, 
                                                      ticker, 
                                                      start_date=start_date,
                                                      end_date=end_date)
            
            # Check if the filtered DataFrame is empty
            if results_filtered.empty:
                print(f"No data available for {ticker} from {start_date} to {end_date}.")
                continue
            
            # Calculate the metrics for the week.
            actual_prices = results_filtered['Actual'].values
            predicted_prices = results_filtered['Prediction'].values

            # Calculate each metric
            mse_value = mean_squared_error(actual_prices, predicted_prices)
            rmse_value = np.sqrt(mse_value)
            mae_value = mean_absolute_error(actual_prices, predicted_prices)
            r2_value = r2_score(actual_prices, predicted_prices)
            
            # Create a DataFrame for the metrics
            metrics_df = pd.DataFrame({
                'Ticker': [ticker] * 4,
                'Week': [week] * 4,
                'Metric': ['MSE', 'RMSE', 'MAE', 'R2'],
                'Value': [mse_value, rmse_value, mae_value, r2_value]
            })
            
            # Append the metrics DataFrame to the weekly_metrics DataFrame
            weekly_metrics = pd.concat([weekly_metrics, metrics_df], ignore_index=True)
            
    # Return the weekly metrics DataFrame
    return weekly_metrics

# Calculate the weekly metrics for each stock
weekly_metrics = calculate_weekly_metric(trading_results_df)

# Pivot the DataFrame to have metrics as columns
weekly_metrics_pivot = weekly_metrics.pivot_table(index=['Ticker', 'Week'], columns='Metric', values='Value').reset_index()

# Display the weekly metrics DataFrame
display(weekly_metrics_pivot)

## Part 5: Plot and explore the evaluation metrics.

### Part 5.1: Show the metrics per weeks.

In [None]:
# Plot the weekly metrics for each stock, where the stocks are superimposed on each other
plt.figure(figsize=(20, 10))
for metric in ['MSE', 'RMSE', 'MAE', 'R2']:
    plt.figure(figsize=(20, 10))
    for ticker in weekly_metrics_pivot['Ticker'].unique():
        # Filter the data for the specific ticker
        ticker_data = weekly_metrics_pivot[weekly_metrics_pivot['Ticker'] == ticker]
        
        # Plot the metric
        plt.plot(ticker_data['Week'], ticker_data[metric], label=ticker)
    
    # Add labels and title
    plt.title(f'Weekly {metric} for Each Stock')
    plt.xlabel('Week')
    plt.ylabel(metric)
    plt.legend()
    plt.show()


### Part 5.2: Reframe the data tables and calculate metrics cumulatively.

In [None]:
# Calculate weekly metrics per all stocks
weekly_metrics_all = pd.DataFrame(columns=['Week', 'Metric', 'Value'])
weeks = 10

# Add wee label date...
def get_week_label(date):
    # Get the week after the start date.
    start_date = datetime.datetime.strptime('2025-01-01', '%Y-%m-%d')
    week = ((date - start_date).days // 7)+ 1
    return week

# Add the week label to the dataframe
trading_results_df['Week'] = trading_results_df['Date'].apply(get_week_label)

# Note: Some weeks are negative since the start date is prior to the first date.

# Display the updated dataframe
display(trading_results_df)

In [None]:
# Function to calculate weekly metrics across all stocks
def calculate_weekly_metric_all(data):
    weeks = [week for week in data['Week'].unique() if week > 0]
    weekly_metrics = pd.DataFrame(columns=['Week', 'Metric', 'Value'])
    max_weeks = max(weeks)

    for week in range(1, max_weeks + 1):
        # Get the data for the specific week
        results_filtered = data[data['Week'] == week]
        
        # Check if the filtered DataFrame is empty
        if results_filtered.empty:
            print(f"No data available for week {week}.")
            continue
        
        # Calculate the metrics for the week.
        actual_prices = results_filtered['Actual'].values
        predicted_prices = results_filtered['Prediction'].values

        # Calculate each metric
        mse_value = mean_squared_error(actual_prices, predicted_prices)
        rmse_value = np.sqrt(mse_value)
        mae_value = mean_absolute_error(actual_prices, predicted_prices)
        r2_value = r2_score(actual_prices, predicted_prices)
        
        # Create a DataFrame for the metrics
        metrics_df = pd.DataFrame({
            'Week': [week] * 4,
            'Metric': ['MSE', 'RMSE', 'MAE', 'R2'],
            'Value': [mse_value, rmse_value, mae_value, r2_value]
        })
        
        # Append the metrics DataFrame to the weekly_metrics DataFrame
        weekly_metrics = pd.concat([weekly_metrics, metrics_df], ignore_index=True)
            
    # Return the weekly metrics DataFrame
    return weekly_metrics

In [None]:
# Calculate the weekly metric for all stocks
weekly_metrics_all = calculate_weekly_metric_all(trading_results_df)

# Pivot the DataFrame to have metrics as columns
weekly_metrics_all_pivot = weekly_metrics_all.pivot_table(index='Week', columns='Metric', values='Value').reset_index()

# Display the weekly metrics DataFrame
display(weekly_metrics_all_pivot)

### Part 5.3: View the cumulative metrics.

In [None]:
# Plot the weekly metrics for each metric
plt.figure(figsize=(20, 10))
for metric in ['MSE', 'RMSE', 'MAE', 'R2']:
    plt.figure(figsize=(20, 10))
    plt.plot(weekly_metrics_all_pivot['Week'], weekly_metrics_all_pivot[metric], label=metric)
    
    # Add labels and title
    plt.title(f'Weekly {metric} for All Stocks')
    plt.xlabel('Week')
    plt.ylabel(metric)
    plt.legend()
    plt.show()
