# Capstone Project EDA - Auxiliary Functions

run on terminal 

jupyter nbconvert --to script EDA_auxiliary_functions.ipynb

In [11]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import plotly.express as px

from scipy.optimize import minimize
from sklearn.neighbors import KernelDensity

import seaborn as sns
import plotly.graph_objects as go
import random
plt.rcParams['figure.figsize'] = (8.0, 6.0) #setting figure size

## 0.a - Loading all the relevant auxiliary functions

Function to set a random seet for the notebook

In [12]:
#Set random seed, define a function so we don't need to recall later in the notebook
def set_global_seed(seed=10):
    '''
    Set a global seed value for the notebook. If seed=10, you can just run set_global_seed()
    '''
    random.seed(seed)
    np.random.seed(seed)


Here are some auxiliary functions for plotly animation. One simply separates the data in the dataframe in order to heatmap correlation matrix. The second is an argument for the duration of the animation

In [13]:
#Define a dictionary with the data from the correlation matrices
def df_to_plotly(df):
    '''
    Simple auxiliary function breaking down the parameters of the DataFrame in order to create a plotly heatmap.

    Input:
    ------
    
    df: The Pandas DataFrame we want to plot a heatmap
    

    Output:
    ------
    z: values of df
    x: column of df
    y: index of df
    
    '''
    return {'z': df.values.tolist(),
            'x': df.columns.tolist(),
            'y': df.index.tolist()}
# Define frame arguments for the play button
def frame_args(duration):
    '''
    Auxiliary function for the duration of frame in the animation.

    Input:
    ------
    
    duration: Duration for a frame in the animation (I believe in miliseconds)
    

    Output:
    ------
    Dictionary with formated instructions for plotly function
    
    '''
    return {
        "frame": {"duration": duration, "redraw": True},
        "mode": "immediate",
        "fromcurrent": True,
        "transition": {"duration": duration, "easing": "linear"},
    }

This is an auxiliary function to print the correct date ranges used to calculate the correlation matrices for the plotly animation.

In [14]:
def dates_legend(df_og, row_step=15):
    '''
    Auxiliary function for heatmap animation plotting. It generates a list with dates such that shows the end date at each frame.

    Input:
    ------
    
    df_og: The Pandas DataFrame we want to analyze. Notice this is in the original orientation, where there is a `Symbol` column.
    
    row_step: What is the interval we are adding to the correlations at each frame. Default is 15.

    Output:
    ------
    list_range_dates = List of strings representing the end date for the interval being calculated at each frame. Used in the animated heatmap.
    '''
    
    #Generate just one stock for simplicity
    examp=df_og['Symbol'].iloc[0]
    df_og_ex=df_og[df_og['Symbol']==examp]

    # Let us create a list to put the correct date range in our plot
    list_range_dates=[]
    start_date=f'{df_og_ex.iloc[0,0].year}-{df_og_ex.iloc[0,0].month}-{df_og_ex.iloc[0,0].day}'
    for i in range (df_og_ex.shape[0] // row_step + 1):
        end_date=f'{df_og_ex.iloc[i*row_step,0].year}-{df_og_ex.iloc[i*row_step,0].month}-{df_og_ex.iloc[i*row_step,0].day}'
        list_range_dates.append(start_date + ' to ' + end_date)
    return list_range_dates

This function creates an animation where each frame is a correlation matrix calculated for a specific range - in the default case adding 15 data point each frame.

In [15]:
# Define function as a plotly object in order to plot the animation of Heatmap

def plot_heatmap_animation(df_og, title_plot, row_step=15):
    '''
    Auxiliary function in order to animate a heatmap in Plotly. 

    Input:
    ------

    df_og: The Pandas DataFrame we want to analyze. Notice this is in the original orientation, where there is a `Symbol` column.
    
    title_plot: A string that simply labels the animation plot. Generally want to describe the properties of the DataFrame being plotted.
    
    row_step: What is the interval we are adding to the correlations at each frame. Default is 15.

    Output:
    ------
    fig: A Plotly object, we can simply fig.show() after calling the function. We can also save it to html.
    '''
    
    # Manipulate the DataFrame such that each column corresponds to a stock. It facilitates the calculation of correlation.

    grouped_value=df_og.groupby('Symbol')['Return'].apply(list).reset_index()
    df_value=pd.DataFrame(grouped_value['Return'].tolist(), index=grouped_value['Symbol']).T


    
    # Define the row step size (e.g., every 15 trading days is the default)
    frames = []

    # Create correlation matrices for increasing number of rows
    for i in range(1, df_value.shape[0] // row_step + 1):
        corr_matrix = df_value[:i * row_step].corr()  # Compute correlation for i*row_step rows starting from 0
        frames.append(corr_matrix)
    n_frames=len(frames)
    
    #Get the list of dates, used for the legend at each frame of the animation
    list_range_dates=dates_legend(df_og,row_step)


    # Create figure
    fig = go.Figure()

    # Add traces for the initial frame
    for step in np.arange(0, n_frames, 1):
        fig.add_trace(
            go.Heatmap(df_to_plotly(frames[step]), colorscale='RdBu', name="t = " + str(step), zmin=-1, zmax=1)
        )
    
    # Ensure only the first trace is visible initially
    for trace in fig.data:
        trace.visible = False
    fig.data[0].visible = True  # First trace is visible initially



    # Create frames for the animation
    fig.frames = [
        go.Frame(data=[go.Heatmap(df_to_plotly(frames[k]), colorscale='RdBu', zmin=-1, zmax=1)],
                 name=str(k)) for k in range(n_frames)
    ]

    # Define sliders for manual frame control
    sliders = [
        {
            "pad": {"b": 10, "t": 60},
            "len": 0.9,
            "x": 0.1,
            "y": 0,
            "steps": [
                {
                    "args": [[str(k)], frame_args(0)],
                    "label": list_range_dates[k],
                    "method": "animate",
                }
                for k in range(n_frames)
            ],
        }
    ]

    # Layout with play/pause buttons and sliders
    fig.update_layout(
        title=title_plot,
        width=600,
        height=600,
        updatemenus=[{
            "buttons": [
                {
                    "args": [None, frame_args(100)],  # Play button
                    "label": "&#9654;",  # play symbol
                    "method": "animate",
                },
                {
                    "args": [[None], frame_args(0)],  # Pause button
                    "label": "&#9724;",  # pause symbol
                    "method": "animate",
                },
            ],
            "direction": "left",
            "pad": {"r": 10, "t": 70},
            "type": "buttons",
            "x": 0.1,
            "y": 0,
        }],
        sliders=sliders
    )
    return fig
    

Perform Kernel Density Estimator to the histogram "data". Adapted from Lopez Prado's book.

In [16]:
# Create the KDE from the data
def KDE_pdf(data, x_range, bandwidth=0.2):
    """
    Perform Kernel Density Estimator for the data.

    Input:
    ------
    
    data: In our case the data is the list of eigenvalues calculate from a correlation matrix.
    
    bandwidth: Parameter that control the smoothing of the KDE.

    x_range: Range in x for evaluation of the KDE
    

    Output:
    ------

    pdf: It outputs the estimated pdf function from our data

    """
    # Reshape data 
    data = data.reshape(-1, 1)
    
    # Use KernelDensity model from sklearn to fit the data
    
    kde = KernelDensity(kernel='gaussian', bandwidth=bandwidth).fit(data)
    
    
    # Calculate the probability, need to exponentiate to be a pdf
    logProb = kde.score_samples(x_range)
    pdf = pd.Series(np.exp(logProb), index=x_range.flatten())
    
    return pdf

Shifted Marcenko-Pastur distribution. Shifted in the sense that the range is from (0 to $\lambda_+-\lambda_-$). 

In [17]:
def f_MP_pdf_shift(q, sigma, pts=1000):
    """
    Generate the theoretical Marchenko-Pastur PDF with shifted eigenvalues.

    Input:
    ------
    q: Aspect ratio (T/N)
    sigma: Variance (to be optimized)
    pts: Number of points for the PDF.

    Output:
    -------
    f: MP pdf
    
    
    """
    lambda_max = (sigma**2) * (1 + np.sqrt(1/q))**2
    lambda_min = (sigma**2) * (1 - np.sqrt(1/q))**2
    lambda_vals = np.linspace(lambda_min + 10.**-14, lambda_max, pts)
    lambda_vals_shift=np.linspace(10.**-14, lambda_max-lambda_min, pts)
    
    # The Marchenko-Pastur PDF formula
    f = q * np.sqrt((lambda_max - lambda_vals) * (lambda_vals - lambda_min)) / (lambda_vals * 2 * np.pi * (sigma)**2)
    
    # Return as a pandas Series with lambda_vals as index
    f = pd.Series(f, index=lambda_vals_shift)
    return f

def f_MP_pdf_plot(q, sigma, pts=1000):
    """
    Generate the theoretical Marchenko-Pastur PDF with shifted eigenvalues, but return both values for plotting
    
    Input:
    ------
    q: Aspect ratio (T/N)
    sigma: Variance (to be optimized)
    pts: Number of points for the PDF.

    Output:
    -------
    f: MP pdf
    lambda_vals_shift: x range of values
    
    """
    lambda_max = (sigma**2) * (1 + np.sqrt(1/q))**2
    lambda_min = (sigma**2) * (1 - np.sqrt(1/q))**2
    lambda_vals = np.linspace(lambda_min + 10.**-14, lambda_max, pts)
    lambda_vals_shift = np.linspace( 10.**-14, lambda_max-lambda_min, pts)
    
    # The Marchenko-Pastur PDF formula
    f = q * np.sqrt((lambda_max - lambda_vals) * (lambda_vals - lambda_min)) / (lambda_vals * 2 * np.pi * (sigma)**2)
    
    # Return as a pandas Series with lambda_vals as index
    f = pd.Series(f, index=lambda_vals)
    return f,lambda_vals_shift

Cost function for the minimization. We will compare the MP-pdf with the kde of the histogram of the eigenvalues.  Adapted from Lopez Prado's book.

In [18]:
def cost_function(params, eVal, bWidth,flag=False):
    """
    Cost function for the minimization between data and MP pdfs.

    Input:
    ------
    
    params: array containing both parameters being optimized, q and sigma.
    
    eval: The eigenvalue list used to calculate KDE and the data_pdf.

    bWidth: Parameter determining the "smoothing" of the KDE procedure.

    flag: Turn to True if we want to print the convergence of the sum of squared during the minimization.

    Output:
    ------

    sse: Sum of squared errors between the theoretical prediction MP_pdf and the pdf obtained by the data data_pdf.

    """

    # The parameters being optimized
    sigma = params[0]  
    q= params[1]

    
    # Generate the theoretical MP_pdf using f_MP_pdf_shift
    MP_pdf = f_MP_pdf_shift(q, sigma)
    
    # Generate data_pdf using KDE_pdf
    data_pdf = KDE_pdf(eVal, x_range=MP_pdf.index.values.reshape(-1, 1),bandwidth=bWidth)
    
    # Compute the sum of squared errors (SSE) 
    sse = np.sum((MP_pdf - data_pdf) ** 2)

    # Passing a flag, set to True if we want to print the SSE at each step
    if flag:
        print(f"SSE: {sse}")
    
    return sse

Minimization procedure to find best q and $\sigma$ to fit the kde of the correlation matrix histogram.  Adapted from Lopez Prado's book.

In [19]:
def findMaxEval(eVal, bWidth,flag=False):
    """
    Fit shifted Marcenko-Pastur pdf to the data.

    Input:
    ------
    
    eVal: Data, in our case a list of eigenvalues calculated from a correlation matrix.
    
    bWidth: Parameter determining the "smoothing" of the KDE procedure.

    flag: Turn to True if we want to print the convergence of the sum of squared during the minimization.

    Output:
    ------

    eMax: Scaled maximum random eigenvalue predicted by RMT and the MP pdf.
    
    sigma_opt = Resulting sigma from optimization. 
    
    q_opt = Resulting q from optimization. 
    """
    
    # Initial guess for the parameters being optimized
    initial_params = np.array([0.99,40])

    # Minimize cost_function defined previously. Make sure we put bounds for the parameters,
    
    result = minimize(cost_function, initial_params, args=(eVal, bWidth,flag),
                      bounds=[(1E-5, 1-1E-5),(4 + 1E-5, 50)])
    
    # Print the optimization result
    print(f"Optimization result: {result}")
    
    # If optimization succeeds, use the optimized sigma
    if result.success:
        sigma_opt = result.x[0]
        q_opt=result.x[1]
    else:
        sigma_opt = 1  # Fallback in case of failure
        q_opt=10
    
    # Calculate the maximum eigenvalue based on the optimized sigma, as described by Lopez Prado
    eMax = sigma_opt * (1 + np.sqrt(1/q_opt)) ** 2
    
    return eMax, sigma_opt, q_opt