<a href="https://colab.research.google.com/github/dmbritton1/Stock-Market-Risk-Assessment-Tool/blob/main/Stock_Risk_Profile.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Stock Market Risk Assesment Tool**

Imports

In [None]:
import yfinance as yf
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sqlalchemy import create_engine, text
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from ipywidgets import interact, VBox, HBox, Text, Button
import ipywidgets as widgets
from IPython.display import display
from matplotlib.colors import LinearSegmentedColormap, Normalize
import matplotlib.cm as cm

Database Connection

In [None]:
engine = create_engine('sqlite:///stock_data.db')

Fetch and Process Stock Data

In [None]:
ticker = "META"  # Example ticker

# Download stock data and calculate metrics
stock_data = yf.download(ticker, start="2020-01-01")
stock_data['daily_return'] = stock_data['Close'].pct_change()
stock_data['volatility'] = stock_data['daily_return'].rolling(window=10).std()
stock_data['short_ma'] = stock_data['Close'].rolling(window=10).mean()
stock_data['long_ma'] = stock_data['Close'].rolling(window=50).mean()

[*********************100%***********************]  1 of 1 completed


Calculate VaR

In [None]:
def calculate_var(returns, confidence_level=0.05):
    return np.percentile(returns.dropna(), 100 * confidence_level)
stock_data['VaR'] = calculate_var(stock_data['daily_return'])

Calculate Beta

In [None]:
# Calculate beta using market data (S&P 500)
market_data = yf.download("^GSPC", start="2020-01-01", end="2023-12-31")
market_data['daily_return'] = market_data['Close'].pct_change()
covariance = stock_data['daily_return'].cov(market_data['daily_return'])
market_variance = market_data['daily_return'].var()
stock_data['beta'] = covariance / market_variance

[*********************100%***********************]  1 of 1 completed


Calculate Sharpe Ratio

In [None]:
risk_free_rate = 0.01
stock_data['sharpe_ratio'] = (stock_data['daily_return'].mean() - risk_free_rate) / stock_data['volatility']
stock_data = stock_data.dropna()

###Risk Labeling

In [None]:
# ---------------------
# Risk Labeling
# ---------------------
def categorize_risk_all_factors(stock_data):
    stock_data['risk_label'] = 'Low'  # Default

    # High Risk Conditions
    high_risk_condition = (
        (stock_data['volatility'] > 0.05) &
        (stock_data['VaR'] < -0.03) &
        (stock_data['beta'] > 1) &
        (stock_data['sharpe_ratio'] < 1)
    )
    stock_data.loc[high_risk_condition, 'risk_label'] = 'High'

    # Medium Risk Conditions
    medium_risk_condition = (
        (stock_data['volatility'] > 0.02) & (stock_data['volatility'] <= 0.05) &
        (stock_data['VaR'] >= -0.03) & (stock_data['VaR'] < -0.01) &
        (stock_data['beta'] >= 0.8) & (stock_data['beta'] <= 1) &
        (stock_data['sharpe_ratio'] >= 1) & (stock_data['sharpe_ratio'] <= 2)
    )
    stock_data.loc[medium_risk_condition, 'risk_label'] = 'Medium'

    return stock_data

# Apply risk categorization
stock_data = categorize_risk_all_factors(stock_data)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  stock_data['risk_label'] = 'Low'  # Default


In [None]:
stock_data.head(1000)

Price,Close,High,Low,Open,Volume,daily_return,volatility,short_ma,long_ma,VaR,beta,sharpe_ratio,risk_label
Ticker,META,META,META,META,META,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
Date,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2,Unnamed: 10_level_2,Unnamed: 11_level_2,Unnamed: 12_level_2,Unnamed: 13_level_2
2020-03-13,169.632034,170.140088,157.000293,162.907720,35028600,0.102350,0.060214,177.624484,204.720780,-0.040104,1.289687,-0.145416,High
2020-03-16,145.454361,158.524444,142.555446,151.740361,39120400,-0.142530,0.071814,172.600673,203.450233,-0.040104,1.289687,-0.121927,High
2020-03-17,148.851410,153.354214,139.467258,150.166394,34255600,0.023355,0.072743,168.967554,202.269743,-0.040104,1.289687,-0.120370,High
2020-03-18,146.400772,147.616116,136.578292,139.218202,37553100,-0.016464,0.070573,164.504604,200.961939,-0.040104,1.289687,-0.124070,High
2020-03-19,152.547287,159.321398,144.248984,146.062050,39862300,0.041984,0.073397,161.312799,199.767901,-0.040104,1.289687,-0.119297,High
...,...,...,...,...,...,...,...,...,...,...,...,...,...
2024-02-27,485.711365,485.930761,478.600987,478.660819,10809600,0.011023,0.020337,475.387442,401.057666,-0.040104,1.289687,-0.430555,Low
2024-02-28,482.689697,489.700375,481.423199,483.667015,12715500,-0.006221,0.019126,477.819507,404.073417,-0.040104,1.289687,-0.457808,Low
2024-02-29,488.782928,490.348621,481.283577,487.097571,17732000,0.012623,0.017572,479.549899,407.176165,-0.040104,1.289687,-0.498305,Low
2024-03-01,500.919464,502.864117,490.498203,490.757468,16273600,0.024830,0.017836,481.423035,410.328382,-0.040104,1.289687,-0.490914,Low


###Model Training

In [None]:
# Prepare features and labels
le = LabelEncoder()
labels = le.fit_transform(stock_data['risk_label'])
features = stock_data[['volatility', 'VaR', 'beta', 'sharpe_ratio', 'daily_return']]

# Train-Test Split
X_train, X_test, y_train, y_test = train_test_split(features.dropna(), labels, test_size=0.2, random_state=42)

# Train RandomForest Model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

###Prediction Function

In [None]:
# ---------------------
# Prediction Function
# ---------------------
def predict_risk(ticker, model, le):
    # Fetch stock data and calculate metrics
    stock_data = yf.download(ticker, start="2020-01-01", end="2023-12-31")
    stock_data['daily_return'] = stock_data['Close'].pct_change()
    stock_data['volatility'] = stock_data['daily_return'].rolling(window=10).std()
    stock_data['VaR'] = calculate_var(stock_data['daily_return'])

    # Calculate beta
    market_data = yf.download("^GSPC", start="2020-01-01", end="2023-12-31")
    market_data['daily_return'] = market_data['Close'].pct_change()
    covariance = stock_data['daily_return'].cov(market_data['daily_return'])
    market_variance = market_data['daily_return'].var()
    stock_data['beta'] = covariance / market_variance

    # Calculate Sharpe Ratio
    stock_data['sharpe_ratio'] = (stock_data['daily_return'].mean() - risk_free_rate) / stock_data['volatility']

    # Prepare latest data for prediction
    latest_data = stock_data[['volatility', 'VaR', 'beta', 'sharpe_ratio', 'daily_return']].iloc[-1:].values
    prediction = model.predict(latest_data)
    risk_level = le.inverse_transform(prediction)[0]

    # Add risk level and ticker to data
    stock_data['risk_level'] = risk_level
    stock_data['Ticker'] = ticker

    # Save data to database
    table_name = f"{ticker.lower()}_data"
    stock_data.reset_index(inplace=True)
    stock_data.rename(columns={'Date': 'date'}, inplace=True)
    stock_data.to_sql(table_name, con=engine, if_exists="replace", index=False)

    print(f"\nData for {ticker} stored in table {table_name}.\n")

    # Return latest risk metrics
    return {
        "ticker": ticker,
        "volatility": stock_data['volatility'].iloc[-1],
        "VaR": stock_data['VaR'].iloc[-1],
        "beta": stock_data['beta'].iloc[-1],
        "sharpe_ratio": stock_data['sharpe_ratio'].iloc[-1],
        "risk_level": risk_level
    }


###Correlation Matrix Functions

In [None]:
def create_correlation_matrix(tickers=None):
    # Dynamically fetch table names if tickers not provided
    if tickers is None:
        query = "SELECT name FROM sqlite_master WHERE type='table';"
        tables = pd.read_sql(query, con=engine)['name'].tolist()
    else:
        tables = [f"{ticker.lower()}_data" for ticker in tickers]

    # Initialize DataFrame for daily returns
    all_returns = pd.DataFrame()

    for table in tables:
        query = f"""
            SELECT "('date', '')" AS date, "('daily_return', '')" AS daily_return
            FROM {table}
        """
        try:
            data = pd.read_sql(query, con=engine)
            data.set_index('date', inplace=True)
            all_returns[table.replace("_data", "")] = data['daily_return']
        except Exception as e:
            print(f"Error processing table {table}: {e}")

    all_returns.dropna(inplace=True)
    return all_returns.corr()

def plot_correlation_matrix(correlation_matrix):
    plt.figure(figsize=(10, 8))
    sns.heatmap(
        correlation_matrix,
        annot=True,
        cmap="YlOrRd",
        fmt=".2f",
        linewidths=0.5
    )
    plt.title("Stock Correlation Matrix", fontsize=16)
    plt.show()

Database contents

In [None]:
def list_tickers_in_database():
    query = "SELECT name FROM sqlite_master WHERE type='table';"
    tables = pd.read_sql(query, con=engine)['name'].tolist()
    tickers = [table.replace("_data", "").upper() for table in tables if table.endswith("_data")]

    print("Tickers currently in the database:")
    if tickers:
        print(", ".join(tickers))
    else:
        print("No tickers in the database.")

###UI Elements

In [None]:
ticker_input = widgets.Text(
    description="Ticker:",
    placeholder="Enter stock ticker (e.g., AAPL)"
)

add_button = widgets.Button(description="Add Ticker")
remove_button = widgets.Button(description="Remove Ticker")
risk_button = widgets.Button(description="Show Risk Levels")
correlation_button = widgets.Button(description="Show Correlation")
output_area = widgets.Output()

# Handlers for UI buttons
def on_add_click(b):
    with output_area:
        output_area.clear_output()
        ticker = ticker_input.value.strip().upper()
        if ticker:
            result = predict_risk(ticker, model, le)
            if result:
                print(f"Ticker {ticker} added successfully!\n")
            else:
                print(f"Failed to add ticker {ticker}.")
        else:
            print("Please enter a valid ticker.")
        list_tickers_in_database()
    ticker_input.value = ""

def drop_table_by_ticker(ticker):
    table_name = f"{ticker.lower()}_data"
    query = "SELECT name FROM sqlite_master WHERE type='table';"
    tables = pd.read_sql(query, con=engine)['name'].tolist()

    if table_name not in tables:
        print(f"Table for ticker '{ticker}' does not exist.")
        return

    with engine.connect() as connection:
        try:
            connection.execute(text(f"DROP TABLE IF EXISTS {table_name}"))
            print(f"Table {table_name} has been dropped.\n")
        except Exception as e:
            print(f"Failed to drop table {table_name}: {e}")

def on_remove_click(b):
    with output_area:
        output_area.clear_output()
        ticker = ticker_input.value.strip().upper()
        if not ticker:
            print("Please enter a valid ticker.")
            return
        drop_table_by_ticker(ticker)
        list_tickers_in_database()
    ticker_input.value = ""

def visualize_metrics_with_spectrum(metrics):
    thresholds = {
        "volatility": {"min": 0.0, "max": 0.1},
        "VaR": {"min": -0.05, "max": 0.0},
        "beta": {"min": 0.5, "max": 1.5},
        "sharpe_ratio": {"min": 0.0, "max": 2.0},
    }
    colormap = LinearSegmentedColormap.from_list("risk_spectrum", ["green", "yellow", "red"])

    def get_spectrum_color(value, min_val, max_val):
        norm = Normalize(vmin=min_val, vmax=max_val)
        return cm.ScalarMappable(norm=norm, cmap=colormap).to_rgba(value)

    plt.figure(figsize=(10, 6))
    plt.axis("off")
    metric_values = [
        ("Volatility", metrics["volatility"], thresholds["volatility"]),
        ("VaR", metrics["VaR"], thresholds["VaR"]),
        ("Beta", metrics["beta"], thresholds["beta"]),
        ("Sharpe Ratio", metrics["sharpe_ratio"], thresholds["sharpe_ratio"]),
    ]

    for i, (metric_name, value, threshold) in enumerate(metric_values):
        color = get_spectrum_color(value, threshold["min"], threshold["max"])
        plt.gca().add_patch(
            plt.Rectangle((0.1, 0.8 - i * 0.15), 0.8, 0.1, color=color, ec="black", lw=2)
        )
        plt.text(
            0.5,
            0.85 - i * 0.15,
            f"{metric_name}: {value:.2f}",
            ha="center",
            va="center",
            fontsize=12,
            color="black" if cm.colors.rgb_to_hsv(color[:3])[2] > 0.5 else "white",
        )
    overall_color = {"High": "red", "Medium": "yellow", "Low": "green"}[metrics["risk_level"]]
    plt.gca().add_patch(
        plt.Rectangle((0.1, 0.05), 0.8, 0.1, color=overall_color, ec="black", lw=2)
    )
    plt.text(
        0.5,
        0.1,
        f"Overall Risk Level: {metrics['risk_level']}",
        ha="center",
        va="center",
        fontsize=14,
        color="white" if overall_color != "yellow" else "black",
    )
    plt.text(0.5, 1, f"Risk Metrics for {metrics['ticker']}", ha="center", va="center", fontsize=16)
    plt.show()

def on_risk_click(b):
    with output_area:
        output_area.clear_output()
        ticker = ticker_input.value.strip().upper()
        if not ticker:
            print("Please enter a valid ticker.")
            return

        try:
            metrics = predict_risk(ticker, model, le)
            visualize_metrics_with_spectrum(metrics)
        except Exception as e:
            print(f"Failed to fetch and predict risk for {ticker}: {e}")
        list_tickers_in_database()
    ticker_input.value = ""

def on_correlation_click(b):
    with output_area:
        output_area.clear_output()
        ticker = ticker_input.value.strip().upper()
        if ticker:
            try:
                result = predict_risk(ticker, model, le)
                if result:
                    print(f"Ticker {ticker} added successfully!")
                else:
                    print(f"Failed to add ticker {ticker}.")
            except Exception as e:
                print(f"Error while adding ticker {ticker}: {e}")

        try:
            correlation_matrix = create_correlation_matrix()
            plot_correlation_matrix(correlation_matrix)
        except Exception as e:
            print(f"Failed to generate the correlation matrix: {e}")
        list_tickers_in_database()
    ticker_input.value = ""

# Attach event handlers to buttons
add_button.on_click(on_add_click)
remove_button.on_click(on_remove_click)
risk_button.on_click(on_risk_click)
correlation_button.on_click(on_correlation_click)

# Assemble and display the UI
ui = VBox([
    ticker_input,
    HBox([add_button, remove_button]),
    HBox([risk_button, correlation_button]),
    output_area
])
display(ui)

# Ensure matplotlib inline for notebooks
%matplotlib inline


VBox(children=(Text(value='', description='Ticker:', placeholder='Enter stock ticker (e.g., AAPL)'), HBox(chil…