# Open Midterm 2

## FINM 36700 - 2025

### UChicago Financial Mathematics

* Charles Benello
* cmbenello@uchicago.edu
* Studendid - 12248247

i use chatgpt/codex

***

## Scoring

| Problem | Points |
|---------|--------|
| 1       | 30     |
| 2       | 20     |
| 2       | 20     |
| 2       | 30     |

**Numbered problems are worth 5pts unless specified otherwise.**

## Submission

You should submit a **single** Jupyter notebook (`.ipynb`) file containing all of your code and answers to Canvas. 

Note: If any other files are required to run your notebook, please include them **and only them** in a single `.zip` file.

## Data

**All data files are found in at the course web-book.**

https://markhendricks.github.io/finm-portfolio/.

The exam uses the data found in `commodity_factors.xlsx`
* sheet `factors`
* sheet `returns`

Both tabs contain **daily** returns for a set of commodity futures from `January 2010` to `October 2025`.
* approximate 252 observations per year for purposes of annualization

Factors are
* **LVL**: A level factor of commodity data
* **HMS**: Hard Minus Soft commodities
* **IMO**: Input Minus Output commodities

Returns are
* various commodity futures across energy, metals, livestock, and agriculture

In [3]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [9]:
factors = pd.read_excel('./data/commodity_factors.xlsx', sheet_name='factors', index_col=0, parse_dates=True)
factors = pd.read_excel('data/commodity_factors.xlsx', sheet_name='factors', index_col=0, parse_dates=True)
factors = factors[["LVL", "HMS", "IMO"]]
returns = pd.read_excel('./data/commodity_factors.xlsx', sheet_name='returns', index_col=0, parse_dates=True)
returns = pd.read_excel('data/commodity_factors.xlsx', sheet_name='returns', index_col=0, parse_dates=True)

In [8]:
##helper functions
import pandas as pd
import numpy as np
import math
import datetime
pd.options.display.float_format = "{:,.4f}".format
from typing import Union, List
from pandas import Timestamp

import matplotlib.pyplot as plt
import seaborn as sns

import statsmodels.api as sm
from sklearn.linear_model import LinearRegression

import warnings
warnings.filterwarnings("ignore")

from collections import defaultdict

from scipy.stats import norm

import re

def read_excel_default(excel_name: str, index_col : int = 0, parse_dates: bool =True, print_sheets: bool = False, sheet_name: str = None, **kwargs):
    """
    Reads an Excel file and returns a DataFrame with specified options.

    Parameters:
    excel_name (str): The path to the Excel file.
    index_col (int, default=0): Column to use as the row labels of the DataFrame.
    parse_dates (bool, default=True): Boolean to parse dates.
    print_sheets (bool, default=False): If True, prints the names and first few rows of all sheets.
    sheet_name (str or int, default=None): Name or index of the sheet to read. If None, reads the first sheet.
    **kwargs: Additional arguments passed to `pd.read_excel`.

    Returns:
    pd.DataFrame: DataFrame containing the data from the specified Excel sheet.

    Notes:
    - If `print_sheets` is True, the function will print the names and first few rows of all sheets and return None.
    - The function ensures that the index name is set to 'date' if the index column name is 'date' or 'dates', or if the index contains date-like values.
    """
    if print_sheets:
        n = 0
        while True:
            try:
                sheet = pd.read_excel(excel_name, sheet_name=n)
                print(f'Sheet {n}:')
                print(", ".join(list(sheet.columns)))
                print(sheet.head(3))
                n += 1
                print('\n' * 2)
            except:
                return
    sheet_name = 0 if sheet_name is None else sheet_name
    returns = pd.read_excel(excel_name, index_col=index_col, parse_dates=parse_dates,  sheet_name=sheet_name, **kwargs)
    if returns.index.name is not None:
        if returns.index.name.lower() in ['date', 'dates']:
            returns.index.name = 'date'
    elif isinstance(returns.index[0], (datetime.date, datetime.datetime)):
        returns.index.name = 'date'
    return returns


def calc_cummulative_returns(
    returns: Union[pd.DataFrame, pd.Series],
    return_plot: bool = True,
    fig_size: tuple = (7, 5),
    return_series: bool = False,
    name: str = None,
    timeframes: Union[None, dict] = None,
):
    """
    Calculates cumulative returns from a time series of returns.

    Parameters:
    returns (pd.DataFrame or pd.Series): Time series of returns.
    return_plot (bool, default=True): If True, plots the cumulative returns.
    fig_size (tuple, default=(7, 5)): Size of the plot for cumulative returns.
    return_series (bool, default=False): If True, returns the cumulative returns as a DataFrame.
    name (str, default=None): Name for the title of the plot or the cumulative return series.
    timeframes (dict or None, default=None): Dictionary of timeframes to calculate cumulative returns for each period.

    Returns:
    pd.DataFrame or None: Returns cumulative returns DataFrame if `return_series` is True.
    """
    if timeframes is not None:
        for name, timeframe in timeframes.items():
            if timeframe[0] and timeframe[1]:
                timeframe_returns = returns.loc[timeframe[0]:timeframe[1]]
            elif timeframe[0]:
                timeframe_returns = returns.loc[timeframe[0]:]
            elif timeframe[1]:
                timeframe_returns = returns.loc[:timeframe[1]]
            else:
                timeframe_returns = returns.copy()
            if len(timeframe_returns.index) == 0:
                raise Exception(f'No returns for {name} timeframe')
            calc_cummulative_returns(
                timeframe_returns,
                return_plot=True,
                fig_size=fig_size,
                return_series=False,
                name=name,
                timeframes=None
            )
        return
    returns = returns.copy()
    if isinstance(returns, pd.Series):
        returns = returns.to_frame()
    returns = returns.apply(lambda x: x.astype(float))
    returns = returns.apply(lambda x: x + 1)
    returns = returns.cumprod()
    returns = returns.apply(lambda x: x - 1)
    title = f'Cummulative Returns {name}' if name else 'Cummulative Returns'
    if return_plot:
        returns.plot(
            title=title,
            figsize=fig_size,
            grid=True,
            xlabel='Date',
            ylabel='Cummulative Returns'
        )
    if return_series:
        return returns


def calc_summary_statistics(
    returns: Union[pd.DataFrame, List],
    annual_factor: int = None,
    provided_excess_returns: bool = None,
    rf: Union[pd.Series, pd.DataFrame] = None,
    var_quantile: Union[float, List] = .05,
    timeframes: Union[None, dict] = None,
    return_tangency_weights: bool = True,
    correlations: Union[bool, List] = True,
    keep_columns: Union[list, str] = None,
    drop_columns: Union[list, str] = None,
    keep_indexes: Union[list, str] = None,
    drop_indexes: Union[list, str] = None,
    drop_before_keep: bool = False,
    _timeframe_name: str = None,
):
    """
    Calculates summary statistics for a time series of returns.

    Parameters:
    returns (pd.DataFrame or List): Time series of returns.
    annual_factor (int, default=None): Factor for annualizing returns.
    provided_excess_returns (bool, default=None): Whether excess returns are already provided.
    rf (pd.Series or pd.DataFrame, default=None): Risk-free rate data.
    var_quantile (float or list, default=0.05): Quantile for Value at Risk (VaR) calculation.
    timeframes (dict or None, default=None): Dictionary of timeframes to calculate statistics for each period.
    return_tangency_weights (bool, default=True): If True, returns tangency portfolio weights.
    correlations (bool or list, default=True): If True, returns correlations, or specify columns for correlations.
    keep_columns (list or str, default=None): Columns to keep in the resulting DataFrame.
    drop_columns (list or str, default=None): Columns to drop from the resulting DataFrame.
    keep_indexes (list or str, default=None): Indexes to keep in the resulting DataFrame.
    drop_indexes (list or str, default=None): Indexes to drop from the resulting DataFrame.
    drop_before_keep (bool, default=False): Whether to drop specified columns/indexes before keeping.

    Returns:
    pd.DataFrame: Summary statistics of the returns.
    """
    returns = returns.copy()
    if isinstance(rf, (pd.Series, pd.DataFrame)):
        rf = rf.copy()
        if provided_excess_returns is True:
            raise Exception(
                'rf is provided but excess returns were provided as well.'
                'Remove "rf" or set "provided_excess_returns" to None or False'
            )
        
    if isinstance(returns, list):
        returns_list = returns[:]
        returns = pd.DataFrame({})
        for series in returns_list:
            returns = returns.merge(series, right_index=True, left_index=True, how='outer')
    """
    This functions returns the summary statistics for the input total/excess returns passed
    into the function
    """
    if 'date' in returns.columns.str.lower():
        returns = returns.rename({'Date': 'date'}, axis=1)
        returns = returns.set_index('date')
    returns.index.name = 'date'

    try:
        returns.index = pd.to_datetime(returns.index.map(lambda x: x.date()))
    except AttributeError:
        print('Could not convert "date" index to datetime.date')
        pass

    returns = returns.apply(lambda x: x.astype(float))

    if annual_factor is None:
        print('Assuming monthly returns with annualization term of 12')
        annual_factor = 12

    if provided_excess_returns is None:
        print(
            'Assuming excess returns were provided to calculate Sharpe.'
            ' If returns were provided (steady of excess returns), the column "Sharpe" is actually "Mean/Volatility"'
        )
        provided_excess_returns = True
    elif provided_excess_returns is False:
        if rf is not None:
            if len(rf.index) != len(returns.index):
                raise Exception('"rf" index must be the same lenght as "returns"')
            print('"rf" is used to subtract returns to calculate Sharpe, but nothing else')

    if isinstance(timeframes, dict):
        all_timeframes_summary_statistics = pd.DataFrame({})
        for name, timeframe in timeframes.items():
            if timeframe[0] and timeframe[1]:
                timeframe_returns = returns.loc[timeframe[0]:timeframe[1]]
            elif timeframe[0]:
                timeframe_returns = returns.loc[timeframe[0]:]
            elif timeframe[1]:
                timeframe_returns = returns.loc[:timeframe[1]]
            else:
                timeframe_returns = returns.copy()
            if len(timeframe_returns.index) == 0:
                raise Exception(f'No returns for {name} timeframe')
            timeframe_returns = timeframe_returns.rename(columns=lambda c: c + f' {name}')
            timeframe_summary_statistics = calc_summary_statistics(
                returns=timeframe_returns,
                annual_factor=annual_factor,
                provided_excess_returns=provided_excess_returns,
                rf=rf,
                var_quantile=var_quantile,
                timeframes=None,
                correlations=correlations,
                _timeframe_name=name,
                keep_columns=keep_columns,
                drop_columns=drop_columns,
                keep_indexes=keep_indexes,
                drop_indexes=drop_indexes,
                drop_before_keep=drop_before_keep
            )
            all_timeframes_summary_statistics = pd.concat(
                [all_timeframes_summary_statistics, timeframe_summary_statistics],
                axis=0
            )
        return all_timeframes_summary_statistics

    summary_statistics = pd.DataFrame(index=returns.columns)
    summary_statistics['Mean'] = returns.mean()
    summary_statistics['Annualized Mean'] = returns.mean() * annual_factor
    summary_statistics['Vol'] = returns.std()
    summary_statistics['Annualized Vol'] = returns.std() * np.sqrt(annual_factor)
    try:
        if not provided_excess_returns:
            if type(rf) == pd.DataFrame:
                rf = rf.iloc[:, 0].to_list()
            elif type(rf) == pd.Series:
                rf = rf.to_list()
            else:
                raise Exception('"rf" must be either a pd.DataFrame or pd.Series')
            excess_returns = returns.apply(lambda x: x - rf)
            summary_statistics['Sharpe'] = excess_returns.mean() / returns.std()
        else:
            summary_statistics['Sharpe'] = returns.mean() / returns.std()
    except Exception as e:
        print(f'Could not calculate Sharpe: {e}')
    summary_statistics['Annualized Sharpe'] = summary_statistics['Sharpe'] * np.sqrt(annual_factor)
    summary_statistics['Min'] = returns.min()
    summary_statistics['Max'] = returns.max()
    summary_statistics['Skewness'] = returns.skew()
    summary_statistics['Excess Kurtosis'] = returns.kurtosis()
    var_quantile = [var_quantile] if isinstance(var_quantile, (float, int)) else var_quantile
    for var_q in var_quantile:
        summary_statistics[f'Historical VaR ({var_q:.2%})'] = returns.quantile(var_q, axis = 0)
        summary_statistics[f'Annualized Historical VaR ({var_q:.2%})'] = returns.quantile(var_q, axis = 0) * np.sqrt(annual_factor)
        summary_statistics[f'Historical CVaR ({var_q:.2%})'] = returns[returns <= returns.quantile(var_q, axis = 0)].mean()
        summary_statistics[f'Annualized Historical CVaR ({var_q:.2%})'] = returns[returns <= returns.quantile(var_q, axis = 0)].mean() * np.sqrt(annual_factor)
    
    wealth_index = 1000 * (1 + returns).cumprod()
    previous_peaks = wealth_index.cummax()
    drawdowns = (wealth_index - previous_peaks) / previous_peaks

    summary_statistics['Max Drawdown'] = drawdowns.min()
    summary_statistics['Peak'] = [previous_peaks[col][:drawdowns[col].idxmin()].idxmax() for col in previous_peaks.columns]
    summary_statistics['Bottom'] = drawdowns.idxmin()

    if return_tangency_weights:
        tangency_weights = calc_tangency_weights(returns)
        summary_statistics = summary_statistics.join(tangency_weights)
    
    recovery_date = []
    for col in wealth_index.columns:
        prev_max = previous_peaks[col][:drawdowns[col].idxmin()].max()
        recovery_wealth = pd.DataFrame([wealth_index[col][drawdowns[col].idxmin():]]).T
        recovery_date.append(recovery_wealth[recovery_wealth[col] >= prev_max].index.min())
    summary_statistics['Recovery'] = recovery_date
    try:
        summary_statistics["Duration (days)"] = [
            (i - j).days if i != "-" else "-" for i, j in
            zip(summary_statistics["Recovery"], summary_statistics["Bottom"])
        ]
    except (AttributeError, TypeError) as e:
        print(f'Cannot calculate "Drawdown Duration" calculation because there was no recovery or because index are not dates: {str(e)}')

    if correlations is True or isinstance(correlations, list):
        returns_corr = returns.corr()
        if _timeframe_name:
            returns_corr = returns_corr.rename(columns=lambda c: c.replace(f' {_timeframe_name}', ''))
        returns_corr = returns_corr.rename(columns=lambda c: c + ' Correlation')
        if isinstance(correlations, list):
            correlation_names = [c + ' Correlation' for c  in correlations]
            not_in_returns_corr = [c for c in correlation_names if c not in returns_corr.columns]
            if len(not_in_returns_corr) > 0:
                not_in_returns_corr = ", ".join([c.replace(' Correlation', '') for c in not_in_returns_corr])
                raise Exception(f'{not_in_returns_corr} not in returns columns')
            returns_corr = returns_corr[[c + ' Correlation' for c  in correlations]]
        summary_statistics = summary_statistics.join(returns_corr)
    
    return filter_columns_and_indexes(
        summary_statistics,
        keep_columns=keep_columns,
        drop_columns=drop_columns,
        keep_indexes=keep_indexes,
        drop_indexes=drop_indexes,
        drop_before_keep=drop_before_keep
    )


def calc_negative_pct(
    returns: Union[pd.DataFrame, pd.Series, list],
    calc_positive: bool = False,
    keep_columns: Union[list, str] = None,
    drop_columns: Union[list, str] = None,
    keep_indexes: Union[list, str] = None,
    drop_indexes: Union[list, str] = None,
    drop_before_keep: bool = False,
):
    """
    Calculates the percentage of negative or positive returns in the provided data.

    Parameters:
    returns (pd.DataFrame, pd.Series, or list): Time series of returns.
    calc_positive (bool, default=False): If True, calculates the percentage of positive returns.
    keep_columns (list or str, default=None): Columns to keep in the resulting DataFrame.
    drop_columns (list or str, default=None): Columns to drop from the resulting DataFrame.
    keep_indexes (list or str, default=None): Indexes to keep in the resulting DataFrame.
    drop_indexes (list or str, default=None): Indexes to drop from the resulting DataFrame.
    drop_before_keep (bool, default=False): Whether to drop specified columns/indexes before keeping.

    Returns:
    pd.DataFrame: A DataFrame with the percentage of negative or positive returns, number of returns, and the count of negative/positive returns.
    """
    returns = returns.copy()
    if isinstance(returns, list):
        returns_list = returns[:]
        returns = pd.DataFrame({})
        for series in returns_list:
            returns = returns.merge(series, right_index=True, left_index=True, how='outer')

    if 'date' in returns.columns.str.lower():
        returns = returns.rename({'Date': 'date'}, axis=1)
        returns = returns.set_index('date')
        
    returns.index.name = 'date'

    if isinstance(returns, pd.Series):
        returns = returns.to_frame()
    returns = returns.apply(lambda x: x.astype(float))
    prev_len_index = returns.apply(lambda x: len(x))
    returns  =returns.dropna(axis=0)
    new_len_index = returns.apply(lambda x: len(x))
    if not (prev_len_index == new_len_index).all():
        print('Some columns had NaN values and were dropped')
    if calc_positive:
        returns = returns.applymap(lambda x: 1 if x > 0 else 0)
    else:
        returns = returns.applymap(lambda x: 1 if x < 0 else 0)

    negative_statistics = (
        returns
        .agg(['mean', 'count', 'sum'])
        .set_axis(['% Negative Returns', 'Nº Returns', 'Nº Negative Returns'], axis=0)
    )

    if calc_positive:
        negative_statistics = negative_statistics.rename(lambda i: i.replace('Negative', 'Positive'), axis=0)

    return filter_columns_and_indexes(
        negative_statistics,
        keep_columns=keep_columns,
        drop_columns=drop_columns,
        keep_indexes=keep_indexes,
        drop_indexes=drop_indexes,
        drop_before_keep=drop_before_keep
    )


def filter_columns_and_indexes(
    df: pd.DataFrame,
    keep_columns: Union[list, str],
    drop_columns: Union[list, str],
    keep_indexes: Union[list, str],
    drop_indexes: Union[list, str],
    drop_before_keep: bool = False
):
    """
    Filters a DataFrame based on specified columns and indexes.

    Parameters:
    df (pd.DataFrame): DataFrame to be filtered.
    keep_columns (list or str): Columns to keep in the DataFrame.
    drop_columns (list or str): Columns to drop from the DataFrame.
    keep_indexes (list or str): Indexes to keep in the DataFrame.
    drop_indexes (list or str): Indexes to drop from the DataFrame.
    drop_before_keep (bool, default=False): Whether to drop specified columns/indexes before keeping.

    Returns:
    pd.DataFrame: The filtered DataFrame.
    """
    if not isinstance(df, (pd.DataFrame, pd.Series)):
        return df
    df = df.copy()
    # Columns
    if keep_columns is not None:
        keep_columns = "(?i)" + "|".join(keep_columns) if isinstance(keep_columns, list) else "(?i)" + keep_columns
    else:
        keep_columns = None
    if drop_columns is not None:
        drop_columns = "(?i)" + "|".join(drop_columns) if isinstance(drop_columns, list) else "(?i)" + drop_columns
    else:
        drop_columns = None
    if not drop_before_keep:
        if keep_columns is not None:
            df = df.filter(regex=keep_columns)
    if drop_columns is not None:
        df = df.drop(columns=df.filter(regex=drop_columns).columns)
    if drop_before_keep:
        if keep_columns is not None:
            df = df.filter(regex=keep_columns)
    # Indexes
    if keep_indexes is not None:
        keep_indexes = "(?i)" + "|".join(keep_indexes) if isinstance(keep_indexes, list) else "(?i)" + keep_indexes
    else:
        keep_indexes = None
    if drop_indexes is not None:
        drop_indexes = "(?i)" + "|".join(drop_indexes) if isinstance(drop_indexes, list) else "(?i)" + drop_indexes
    else:
        drop_indexes = None
    if not drop_before_keep:
        if keep_indexes is not None:
            df = df.filter(regex=keep_indexes, axis=0)
    if drop_indexes is not None:
        df = df.drop(index=df.filter(regex=drop_indexes, axis=0).index)
    if drop_before_keep:
        if keep_indexes is not None:
            df = df.filter(regex=keep_indexes, axis=0)
    return df


def calc_cross_section_regression(
    returns: Union[pd.DataFrame, List],
    factors: Union[pd.DataFrame, List],
    annual_factor: int = None,
    provided_excess_returns: bool = None,
    rf: pd.Series = None,
    return_model: bool = False,
    name: str = None,
    return_mae: bool = True,
    intercept_cross_section: bool = True,
    return_historical_premium: bool = True,
    return_annualized_premium: bool = True,
    compare_premiums: bool = False,
    keep_columns: Union[list, str] = None,
    drop_columns: Union[list, str] = None,
    keep_indexes: Union[list, str] = None,
    drop_indexes: Union[list, str] = None,
    drop_before_keep: bool = False
):
    """
    Performs a cross-sectional regression on the provided returns and factors.

    Parameters:
    returns (pd.DataFrame or list): Time series of returns.
    factors (pd.DataFrame or list): Time series of factor data.
    annual_factor (int, default=None): Factor for annualizing returns.
    provided_excess_returns (bool, default=None): Whether excess returns are already provided.
    rf (pd.Series, default=None): Risk-free rate data for subtracting from returns.
    return_model (bool, default=False): If True, returns the regression model.
    name (str, default=None): Name for labeling the regression.
    return_mae (bool, default=True): If True, returns the mean absolute error of the regression.
    intercept_cross_section (bool, default=True): If True, includes an intercept in the cross-sectional regression.
    return_historical_premium (bool, default=True): If True, returns the historical premium of factors.
    return_annualized_premium (bool, default=True): If True, returns the annualized premium of factors.
    compare_premiums (bool, default=False): If True, compares the historical and estimated premiums.
    keep_columns (list or str, default=None): Columns to keep in the resulting DataFrame.
    drop_columns (list or str, default=None): Columns to drop from the resulting DataFrame.
    keep_indexes (list or str, default=None): Indexes to keep in the resulting DataFrame.
    drop_indexes (list or str, default=None): Indexes to drop from the resulting DataFrame.
    drop_before_keep (bool, default=False): Whether to drop specified columns/indexes before keeping.

    Returns:
    pd.DataFrame or model: Cross-sectional regression output or the model if `return_model` is True.
    """
    returns = returns.copy()
    factors = factors.copy()
    if isinstance(rf, (pd.Series, pd.DataFrame)):
        rf = rf.copy()

    if compare_premiums:
        return_historical_premium = True
        return_annualized_premium = True

    if isinstance(returns, list):
        returns_list = returns[:]
        returns = pd.DataFrame({})
        for series in returns_list:
            returns = returns.merge(series, right_index=True, left_index=True, how='outer')

    if annual_factor is None:
        print('Assuming monthly returns with annualization term of 12')
        annual_factor = 12

    if isinstance(factors, list):
        factors_list = returns[:]
        factors = pd.DataFrame({})
        for series in factors_list:
            factors = factors.merge(series, right_index=True, left_index=True, how='outer')

    if 'date' in returns.columns.str.lower():
        returns = returns.rename({'Date': 'date'}, axis=1)
        returns = returns.set_index('date')
    returns.index.name = 'date'

    if provided_excess_returns is None:
        print('Assuming excess returns were provided')
        provided_excess_returns = True
    elif provided_excess_returns is False:
        if rf is not None:
            if len(rf.index) != len(returns.index):
                raise Exception('"rf" index must be the same lenght as "returns"')
            print('"rf" is used to subtract returns')
            returns = returns.sub(rf, axis=0)
    
    time_series_regressions = calc_iterative_regression(returns, factors, annual_factor=annual_factor, warnings=False)
    time_series_betas = time_series_regressions.filter(regex='Beta$', axis=1)
    time_series_historical_returns = time_series_regressions[['Fitted Mean']]
    cross_section_regression = calc_regression(
        time_series_historical_returns, time_series_betas,
        annual_factor=annual_factor, intercept=intercept_cross_section,
        return_model=return_model, warnings=False
    )

    if return_model:
        return cross_section_regression
    cross_section_regression = cross_section_regression.rename(columns=lambda c: c.replace(' Beta Beta', ' Lambda').replace('Alpha', 'Eta'))
    if name is None:
        name = " + ".join([c.replace(' Lambda', '') for c in cross_section_regression.filter(regex=' Lambda$', axis=1).columns])
    cross_section_regression.index = [f'{name} Cross-Section Regression']
    cross_section_regression.drop([
        'Information Ratio', 'Annualized Information Ratio', 'Tracking Error', 'Annualized Tracking Error', 'Fitted Mean', 'Annualized Fitted Mean'
    ], axis=1, inplace=True)
    if return_annualized_premium:
        factors_annualized_premium = (
            cross_section_regression
            .filter(regex=' Lambda$', axis=1)
            .apply(lambda x: x * annual_factor)
            .rename(columns=lambda c: c.replace(' Lambda', ' Annualized Lambda'))
        )
        cross_section_regression = cross_section_regression.join(factors_annualized_premium)

    if return_historical_premium:
        print('Lambda represents the premium calculated by the cross-section regression and the historical premium is the average of the factor excess returns')
        factors_historical_premium = factors.mean().to_frame(f'{name} Cross-Section Regression').transpose().rename(columns=lambda c: c + ' Historical Premium')
        cross_section_regression = cross_section_regression.join(factors_historical_premium)
        if return_annualized_premium:
            factors_annualized_historical_premium = (
                factors_historical_premium
                .apply(lambda x: x * annual_factor)
                .rename(columns=lambda c: c.replace(' Historical Premium', ' Annualized Historical Premium'))
            )
            cross_section_regression = cross_section_regression.join(factors_annualized_historical_premium)

    if compare_premiums:
        cross_section_regression = cross_section_regression.filter(regex='Lambda$|Historical Premium$', axis=1)
        cross_section_regression = cross_section_regression.transpose()
        cross_section_regression['Factor'] = cross_section_regression.index.str.extract(f'({"|".join(list(factors.columns))})').values
        cross_section_regression['Premium Type'] = cross_section_regression.index.str.replace(f'({"|".join(list(factors.columns))})', '')
        premiums_comparison = cross_section_regression.pivot(index='Factor', columns='Premium Type', values=f'{name} Cross-Section Regression')
        premiums_comparison.columns.name = None
        premiums_comparison.index.name = None
        premiums_comparison.join(calc_tangency_weights(factors))
        premiums_comparison = premiums_comparison.join(factors.corr().rename(columns=lambda c: c + ' Correlation'))
        return filter_columns_and_indexes(
            premiums_comparison,
            keep_columns=keep_columns,
            drop_columns=drop_columns,
            keep_indexes=keep_indexes,
            drop_indexes=drop_indexes,
            drop_before_keep=drop_before_keep
        )
    
    if return_mae:
        cross_section_regression['TS MAE'] = time_series_regressions['Alpha'].abs().mean()
        cross_section_regression['TS Annualized MAE'] = time_series_regressions['Annualized Alpha'].abs().mean()
        cross_section_regression_model = calc_regression(
            time_series_historical_returns, time_series_betas,
            annual_factor=annual_factor, intercept=intercept_cross_section,
            return_model=True, warnings=False
        )
        cross_section_regression['CS MAE'] = cross_section_regression_model.resid.abs().mean()
        cross_section_regression['CS Annualized MAE'] = cross_section_regression['CS MAE'] * annual_factor

    return filter_columns_and_indexes(
        cross_section_regression,
        keep_columns=keep_columns,
        drop_columns=drop_columns,
        keep_indexes=keep_indexes,
        drop_indexes=drop_indexes,
        drop_before_keep=drop_before_keep
    )



def get_best_and_worst(
    summary_statistics: pd.DataFrame,
    stat: str = 'Annualized Sharpe',
    return_df: bool = True
):
    """
    Identifies the best and worst assets based on a specified statistic.

    Parameters:
    summary_statistics (pd.DataFrame): DataFrame containing summary statistics.
    stat (str, default='Annualized Sharpe'): The statistic to compare assets by.
    return_df (bool, default=True): If True, returns a DataFrame with the best and worst assets.

    Returns:
    pd.DataFrame or None: DataFrame with the best and worst assets if `return_df` is True.
    """
    summary_statistics = summary_statistics.copy()

    if len(summary_statistics.index) < 2:
        raise Exception('"summary_statistics" must have at least two lines in order to do comparison')

    if stat not in summary_statistics.columns:
        raise Exception(f'{stat} not in "summary_statistics"')
    summary_statistics.rename(columns=lambda c: c.replace(' ', '').lower())
    best_stat = summary_statistics[stat].max()
    worst_stat = summary_statistics[stat].min()
    asset_best_stat = summary_statistics.loc[lambda df: df[stat] == df[stat].max()].index[0]
    asset_worst_stat = summary_statistics.loc[lambda df: df[stat] == df[stat].min()].index[0]
    print(f'The asset with the highest {stat} is {asset_best_stat}: {best_stat:.5f}')
    print(f'The asset with the lowest {stat} is {asset_worst_stat}: {worst_stat:.5f}')
    if return_df:
        return pd.concat([
            summary_statistics.loc[lambda df: df.index == asset_best_stat],
            summary_statistics.loc[lambda df: df.index == asset_worst_stat]
        ])
    

def calc_correlations(
    returns: pd.DataFrame,
    print_highest_lowest: bool = True,
    matrix_size: Union[int, float] = 7,
    return_heatmap: bool = True,
    keep_columns: Union[list, str] = None,
    drop_columns: Union[list, str] = None,
    keep_indexes: Union[list, str] = None,
    drop_indexes: Union[list, str] = None,
    drop_before_keep: bool = False
):
    """
    Calculates the correlation matrix of the provided returns and optionally prints or visualizes it.

    Parameters:
    returns (pd.DataFrame): Time series of returns.
    print_highest_lowest (bool, default=True): If True, prints the highest and lowest correlations.
    matrix_size (int or float, default=7): Size of the heatmap for correlation matrix visualization.
    return_heatmap (bool, default=True): If True, returns a heatmap of the correlation matrix.
    keep_columns (list or str, default=None): Columns to keep in the resulting DataFrame.
    drop_columns (list or str, default=None): Columns to drop from the resulting DataFrame.
    keep_indexes (list or str, default=None): Indexes to keep in the resulting DataFrame.
    drop_indexes (list or str, default=None): Indexes to drop from the resulting DataFrame.
    drop_before_keep (bool, default=False): Whether to drop specified columns/indexes before keeping.

    Returns:
    sns.heatmap or pd.DataFrame: Heatmap of the correlation matrix or the correlation matrix itself.
    """
    returns = returns.copy()

    if 'date' in returns.columns.str.lower():
        returns = returns.rename({'Date': 'date'}, axis=1)
        returns = returns.set_index('date')
    returns.index.name = 'date'

    correlation_matrix = returns.corr()
    if return_heatmap:
        fig, ax = plt.subplots(figsize=(matrix_size * 1.5, matrix_size))
        heatmap = sns.heatmap(
            correlation_matrix, 
            xticklabels=correlation_matrix.columns,
            yticklabels=correlation_matrix.columns,
            annot=True,
        )

    if print_highest_lowest:
        highest_lowest_corr = (
            correlation_matrix
            .unstack()
            .sort_values()
            .reset_index()
            .set_axis(['asset_1', 'asset_2', 'corr'], axis=1)
            .loc[lambda df: df.asset_1 != df.asset_2]
        )
        highest_corr = highest_lowest_corr.iloc[lambda df: len(df)-1, :]
        lowest_corr = highest_lowest_corr.iloc[0, :]
        print(f'The highest correlation ({highest_corr["corr"]:.2%}) is between {highest_corr.asset_1} and {highest_corr.asset_2}')
        print(f'The lowest correlation ({lowest_corr["corr"]:.2%}) is between {lowest_corr.asset_1} and {lowest_corr.asset_2}')
    
    if return_heatmap:
        return heatmap
    else:
        return filter_columns_and_indexes(
            correlation_matrix,
            keep_columns=keep_columns,
            drop_columns=drop_columns,
            keep_indexes=keep_indexes,
            drop_indexes=drop_indexes,
            drop_before_keep=drop_before_keep
        )
    

def calc_tangency_weights(
    returns: pd.DataFrame,
    cov_mat: str = 1,
    return_graphic: bool = False,
    return_port_ret: bool = False,
    target_ret_rescale_weights: Union[None, float] = None,
    annual_factor: int = 12,
    name: str = 'Tangency',
    expected_returns: Union[pd.Series, pd.DataFrame] = None,
    expected_returns_already_annualized: bool = False
):
    """
    Calculates tangency portfolio weights based on the covariance matrix of returns.

    Parameters:
    returns (pd.DataFrame): Time series of returns.
    cov_mat (str, default=1): Covariance matrix for calculating tangency weights.
    return_graphic (bool, default=False): If True, plots the tangency weights.
    return_port_ret (bool, default=False): If True, returns the portfolio returns.
    target_ret_rescale_weights (float or None, default=None): Target return for rescaling weights.
    annual_factor (int, default=12): Factor for annualizing returns.
    name (str, default='Tangency'): Name for labeling the weights and portfolio.

    Returns:
    pd.DataFrame or pd.Series: Tangency portfolio weights or portfolio returns if `return_port_ret` is True.
    """
    returns = returns.copy()
    
    if 'date' in returns.columns.str.lower():
        returns = returns.rename({'Date': 'date'}, axis=1)
        returns = returns.set_index('date')
    returns.index.name = 'date'

    if cov_mat == 1:
        cov_inv = np.linalg.inv((returns.cov() * annual_factor))
    else:
        cov = returns.cov()
        covmat_diag = np.diag(np.diag((cov)))
        covmat = cov_mat * cov + (1 - cov_mat) * covmat_diag
        cov_inv = np.linalg.pinv((covmat * annual_factor))  
        
    ones = np.ones(returns.columns.shape) 
    if expected_returns is not None:
        mu = expected_returns
        if not expected_returns_already_annualized:
            mu *= annual_factor
    else:
        mu = returns.mean() * annual_factor
    scaling = 1 / (np.transpose(ones) @ cov_inv @ mu)
    tangent_return = scaling * (cov_inv @ mu)
    tangency_wts = pd.DataFrame(
        index=returns.columns,
        data=tangent_return,
        columns=[f'{name} Weights']
    )
    port_returns = returns @ tangency_wts.rename({f'{name} Weights': f'{name} Portfolio'}, axis=1)

    if return_graphic:
        tangency_wts.plot(kind='bar', title=f'{name} Weights')

    if isinstance(target_ret_rescale_weights, (float, int)):
        scaler = target_ret_rescale_weights / port_returns[f'{name} Portfolio'].mean()
        tangency_wts[[f'{name} Weights']] *= scaler
        port_returns *= scaler
        tangency_wts = tangency_wts.rename(
            {f'{name} Weights': f'{name} Weights Rescaled Target {target_ret_rescale_weights:.2%}'},
            axis=1
        )
        port_returns = port_returns.rename(
            {f'{name} Portfolio': f'{name} Portfolio Rescaled Target {target_ret_rescale_weights:.2%}'},
            axis=1
        )

    if cov_mat != 1:
        port_returns = port_returns.rename(columns=lambda c: c.replace('Tangency', f'Tangency Regularized {cov_mat:.2f}'))
        tangency_wts = tangency_wts.rename(columns=lambda c: c.replace('Tangency', f'Tangency Regularized {cov_mat:.2f}'))
        
    if return_port_ret:
        return port_returns
    return tangency_wts


def calc_equal_weights(
    returns: pd.DataFrame,
    return_graphic: bool = False,
    return_port_ret: bool = False,
    target_ret_rescale_weights: Union[float, None] = None,
    name: str = 'Equal Weights'
):
    """
    Calculates equal weights for the portfolio based on the provided returns.

    Parameters:
    returns (pd.DataFrame): Time series of returns.
    return_graphic (bool, default=False): If True, plots the equal weights.
    return_port_ret (bool, default=False): If True, returns the portfolio returns.
    target_ret_rescale_weights (float or None, default=None): Target return for rescaling weights.
    name (str, default='Equal Weights'): Name for labeling the portfolio.

    Returns:
    pd.DataFrame or pd.Series: Equal portfolio weights or portfolio returns if `return_port_ret` is True.
    """
    returns = returns.copy()

    if 'date' in returns.columns.str.lower():
        returns = returns.rename({'Date': 'date'}, axis=1)
        returns = returns.set_index('date')
    returns.index.name = 'date'

    equal_wts = pd.DataFrame(
        index=returns.columns,
        data=[1 / len(returns.columns)] * len(returns.columns),
        columns=[f'{name}']
    )
    port_returns = returns @ equal_wts.rename({f'{name}': f'{name} Portfolio'}, axis=1)

    if return_graphic:
        equal_wts.plot(kind='bar', title=f'{name}')

    if isinstance(target_ret_rescale_weights, (float, int)):
        scaler = target_ret_rescale_weights / port_returns[f'{name} Portfolio'].mean()
        equal_wts[[f'{name}']] *= scaler
        port_returns *= scaler
        equal_wts = equal_wts.rename(
            {f'{name}': f'{name} Rescaled Target {target_ret_rescale_weights:.2%}'},
            axis=1
        )
        port_returns = port_returns.rename(
            {f'{name} Portfolio': f'{name} Portfolio Rescaled Target {target_ret_rescale_weights:.2%}'},
            axis=1
        )
        
    if return_port_ret:
        return port_returns
    return equal_wts


def calc_risk_parity_weights(
    returns: pd.DataFrame,
    return_graphic: bool = False,
    return_port_ret: bool = False,
    target_ret_rescale_weights: Union[None, float] = None,
    name: str = 'Risk Parity'
):
    """
    Calculates risk parity portfolio weights based on the variance of each asset.

    Parameters:
    returns (pd.DataFrame): Time series of returns.
    return_graphic (bool, default=False): If True, plots the risk parity weights.
    return_port_ret (bool, default=False): If True, returns the portfolio returns.
    target_ret_rescale_weights (float or None, default=None): Target return for rescaling weights.
    name (str, default='Risk Parity'): Name for labeling the portfolio.

    Returns:
    pd.DataFrame or pd.Series: Risk parity portfolio weights or portfolio returns if `return_port_ret` is True.
    """
    returns = returns.copy()

    if 'date' in returns.columns.str.lower():
        returns = returns.rename({'Date': 'date'}, axis=1)
        returns = returns.set_index('date')
    returns.index.name = 'date'

    risk_parity_wts = pd.DataFrame(
        index=returns.columns,
        data=[1 / returns[asset].var() for asset in returns.columns],
        columns=[f'{name} Weights']
    )
    port_returns = returns @ risk_parity_wts.rename({f'{name} Weights': f'{name} Portfolio'}, axis=1)

    if return_graphic:
        risk_parity_wts.plot(kind='bar', title=f'{name} Weights')

    if isinstance(target_ret_rescale_weights, (float, int)):
        scaler = target_ret_rescale_weights / port_returns[f'{name} Portfolio'].mean()
        risk_parity_wts[[f'{name} Weights']] *= scaler
        port_returns *= scaler
        risk_parity_wts = risk_parity_wts.rename(
            {f'{name} Weights': f'{name} Weights Rescaled Target {target_ret_rescale_weights:.2%}'},
            axis=1
        )
        port_returns = port_returns.rename(
            {f'{name} Portfolio': f'{name} Portfolio Rescaled Target {target_ret_rescale_weights:.2%}'},
            axis=1
        )
        
    if return_port_ret:
        return port_returns
    return risk_parity_wts


def calc_gmv_weights(
    returns: pd.DataFrame,
    return_graphic: bool = False,
    return_port_ret: bool = False,
    target_ret_rescale_weights: Union[float, None] = None,
    name: str = 'GMV'
):
    """
    Calculates Global Minimum Variance (GMV) portfolio weights.

    Parameters:
    returns (pd.DataFrame): Time series of returns.
    return_graphic (bool, default=False): If True, plots the GMV weights.
    return_port_ret (bool, default=False): If True, returns the portfolio returns.
    target_ret_rescale_weights (float or None, default=None): Target return for rescaling weights.
    name (str, default='GMV'): Name for labeling the portfolio.

    Returns:
    pd.DataFrame or pd.Series: GMV portfolio weights or portfolio returns if `return_port_ret` is True.
    """
    returns = returns.copy()

    if 'date' in returns.columns.str.lower():
        returns = returns.rename({'Date': 'date'}, axis=1)
        returns = returns.set_index('date')
    returns.index.name = 'date'

    ones = np.ones(returns.columns.shape)
    cov = returns.cov()
    cov_inv = np.linalg.inv(cov)
    scaling = 1 / (np.transpose(ones) @ cov_inv @ ones)
    gmv_tot = scaling * cov_inv @ ones
    gmv_wts = pd.DataFrame(
        index=returns.columns,
        data=gmv_tot,
        columns=[f'{name} Weights']
    )
    port_returns = returns @ gmv_wts.rename({f'{name} Weights': f'{name} Portfolio'}, axis=1)

    if isinstance(target_ret_rescale_weights, (float, int)):
        scaler = target_ret_rescale_weights / port_returns[f'{name} Portfolio'].mean()
        gmv_wts[[f'{name} Weights']] *= scaler
        port_returns *= scaler
        gmv_wts = gmv_wts.rename(
            {f'{name} Weights': f'{name} Weights Rescaled Target {target_ret_rescale_weights:.2%}'},
            axis=1
        )
        port_returns = port_returns.rename(
            {f'{name} Portfolio': f'{name} Portfolio Rescaled Target {target_ret_rescale_weights:.2%}'},
            axis=1
        )

    if return_graphic:
        gmv_wts.plot(kind='bar', title=f'{name} Weights')

    if return_port_ret:
        return port_returns

    return gmv_wts


def calc_target_ret_weights(
    target_ret: float,
    returns: pd.DataFrame,
    return_graphic: bool = False,
    return_port_ret: bool = False
):
    """
    Calculates the portfolio weights to achieve a target return by combining Tangency and GMV portfolios.

    Parameters:
    target_ret (float): Target return for the portfolio.
    returns (pd.DataFrame): Time series of asset returns.
    return_graphic (bool, default=False): If True, plots the portfolio weights.
    return_port_ret (bool, default=False): If True, returns the portfolio returns.

    Returns:
    pd.DataFrame: Weights of the Tangency and GMV portfolios, along with the combined target return portfolio.
    """
    returns = returns.copy()
    
    if 'date' in returns.columns.str.lower():
        returns = returns.rename({'Date': 'date'}, axis=1)
        returns = returns.set_index('date')
    returns.index.name = 'date'
    
    mu_tan = returns.mean() @ calc_tangency_weights(returns, cov_mat = 1)
    mu_gmv = returns.mean() @ calc_gmv_weights(returns)
    
    delta = (target_ret - mu_gmv[0]) / (mu_tan[0] - mu_gmv[0])
    mv_weights = (delta * calc_tangency_weights(returns, cov_mat=1)).values + ((1 - delta) * calc_gmv_weights(returns)).values
    
    mv_weights = pd.DataFrame(
        index=returns.columns,
        data=mv_weights,
        columns=[f'Target {target_ret:.2%} Weights']
    )
    port_returns = returns @ mv_weights.rename({f'Target {target_ret:.2%} Weights': f'Target {target_ret:.2%} Portfolio'}, axis=1)

    if return_graphic:
        mv_weights.plot(kind='bar', title=f'Target Return of {target_ret:.2%} Weights')

    if return_port_ret:
        return port_returns

    mv_weights['Tangency Weights'] = calc_tangency_weights(returns, cov_mat=1).values
    mv_weights['GMV Weights'] = calc_gmv_weights(returns).values

    return mv_weights


def calc_regression(
    y: Union[pd.DataFrame, pd.Series],
    X: Union[pd.DataFrame, pd.Series],
    intercept: bool = True,
    annual_factor: Union[None, int] = None,
    warnings: bool = True,
    return_model: bool = False,
    return_fitted_values: bool = False,
    name_fitted_values: str = None,
    calc_treynor_info_ratios: bool = True,
    timeframes: Union[None, dict] = None,
    keep_columns: Union[list, str] = None,
    drop_columns: Union[list, str] = None,
    keep_indexes: Union[list, str] = None,
    drop_indexes: Union[list, str] = None,
    drop_before_keep: bool = False,
    calc_sortino_ratio: bool = False
):
    """
    Performs an OLS regression on the provided data with optional intercept, timeframes, and statistical ratios.

    Parameters:
    y (pd.DataFrame or pd.Series): Dependent variable for the regression.
    X (pd.DataFrame or pd.Series): Independent variable(s) for the regression.
    intercept (bool, default=True): If True, includes an intercept in the regression.
    annual_factor (int or None, default=None): Factor for annualizing regression statistics.
    warnings (bool, default=True): If True, prints warnings about assumptions.
    return_model (bool, default=False): If True, returns the regression model object.
    return_fitted_values (bool, default=False): If True, returns the fitted values of the regression.
    name_fitted_values (str, default=None): Name for the fitted values column.
    calc_treynor_info_ratios (bool, default=True): If True, calculates Treynor and Information ratios.
    timeframes (dict or None, default=None): Dictionary of timeframes to run separate regressions for each period.
    keep_columns (list or str, default=None): Columns to keep in the resulting DataFrame.
    drop_columns (list or str, default=None): Columns to drop from the resulting DataFrame.
    keep_indexes (list or str, default=None): Indexes to keep in the resulting DataFrame.
    drop_indexes (list or str, default=None): Indexes to drop from the resulting DataFrame.
    drop_before_keep (bool, default=False): If True, drops specified columns/indexes before keeping.
    calc_sortino_ratio (bool, default=False): If True, calculates the Sortino ratio.

    Returns:
    pd.DataFrame or model: Regression summary statistics or the model if `return_model` is True.
    """
    y = y.copy()
    X = X.copy()

    y_name = y.name if isinstance(y, pd.Series) else y.columns[0]
    X_names = " + ".join(list(X.columns))
    X_names = "Intercept + " + X_names if intercept else X_names

    return_model = return_model if not return_fitted_values else True

    if annual_factor is None:
        print("Regression assumes 'annual_factor' equals to 12 since it was not provided")
        annual_factor = 12
    
    if 'date' in X.columns.str.lower():
        X = X.rename({'Date': 'date'}, axis=1)
        X = X.set_index('date')
    X.index.name = 'date'
    
    if warnings:
        print('"calc_regression" assumes excess returns to calculate Information and Treynor Ratios')
    if intercept:
        X = sm.add_constant(X)
    
    y_name = y.name if isinstance(y, pd.Series) else y.columns[0]

    if len(X.index) != len(y.index):
        print(f'y has lenght {len(y.index)} and X has lenght {len(X.index)}. Joining y and X by index...')
        df = y.join(X, how='left')
        df = df.dropna()
        y = df[y_name]
        X = df.drop(y_name, axis=1)
        if len(X.index) < 4:
            raise Exception('Indexes of y and X do not match and there are less than 4 observations. Cannot calculate regression')

    if isinstance(timeframes, dict):
        all_timeframes_regressions = pd.DataFrame({})
        for name, timeframe in timeframes.items():
            if timeframe[0] and timeframe[1]:
                timeframe_y = y.loc[timeframe[0]:timeframe[1]]
                timeframe_X = X.loc[timeframe[0]:timeframe[1]]
            elif timeframe[0]:
                timeframe_y = y.loc[timeframe[0]:]
                timeframe_X = X.loc[timeframe[0]:]
            elif timeframe[1]:
                timeframe_y = y.loc[:timeframe[1]]
                timeframe_X = X.loc[:timeframe[1]]
            else:
                timeframe_y = y.copy()
                timeframe_X = X.copy()
            if len(timeframe_y.index) == 0 or len(timeframe_X.index) == 0:
                raise Exception(f'No returns for {name} timeframe')
            timeframe_regression = calc_regression(
                y=timeframe_y,
                X=timeframe_X,
                intercept=intercept,
                annual_factor=annual_factor,
                warnings=False,
                return_model=False,
                calc_treynor_info_ratios=calc_treynor_info_ratios,
                timeframes=None,
                keep_columns=keep_columns,
                drop_columns=drop_columns,
                keep_indexes=keep_indexes,
                drop_indexes=drop_indexes,
                drop_before_keep=drop_before_keep
            )
            timeframe_regression.index = [timeframe_regression.index + " " + name]
            all_timeframes_regressions = pd.concat(
                [all_timeframes_regressions, timeframe_regression],
                axis=0
            )
        return all_timeframes_regressions

    try:
        model = sm.OLS(y, X, missing="drop", hasconst=intercept)
    except ValueError:
        y = y.reset_index(drop=True)
        X = X.reset_index(drop=True)
        model = sm.OLS(y, X, missing="drop", hasconst=intercept)
        if warnings:
            print(f'"{y_name}" Required to reset indexes to make regression work. Try passing "y" and "X" as pd.DataFrame')
    results = model.fit()
    summary = dict()

    if return_model:
        if not return_fitted_values:
            return results
        else:
            fitted_values = results.fittedvalues
            if name_fitted_values is None:
                name_fitted_values = f'{y_name} ~ {X_names}'
            fitted_values = fitted_values.to_frame(name_fitted_values)
            return fitted_values

    inter = results.params[0] if intercept else None
    betas = results.params[1:] if intercept else results.params

    summary["Alpha"] = inter if inter is not None else '-'
    summary["Annualized Alpha"] = inter * annual_factor if inter is not None else '-'
    summary["R-Squared"] = results.rsquared

    if isinstance(X, pd.Series):
        X = pd.DataFrame(X)

    X_assets = X.columns[1:] if intercept else X.columns
    for i, asset_name in enumerate(X_assets):
        summary[f"{asset_name} Beta"] = betas[i]

    if calc_treynor_info_ratios:
        if len([c for c in X.columns if c != 'const']) == 1:
            summary["Treynor Ratio"] = (y.mean() / betas[0])
            summary["Annualized Treynor Ratio"] = summary["Treynor Ratio"] * annual_factor
        summary["Information Ratio"] = (inter / results.resid.std()) if intercept else "-"
        summary["Annualized Information Ratio"] = summary["Information Ratio"] * np.sqrt(annual_factor) if intercept else "-"
    summary["Tracking Error"] = results.resid.std()
    summary["Annualized Tracking Error"] = results.resid.std() * np.sqrt(annual_factor)
    summary['Fitted Mean'] = results.fittedvalues.mean()
    summary['Annualized Fitted Mean'] = summary['Fitted Mean'] * annual_factor
    if calc_sortino_ratio:
        try:
            summary['Sortino Ratio'] = summary['Fitted Mean'] / y[y < 0].std()
            summary['Annualized Sortino Ratio'] = summary['Sortino Ratio'] * np.sqrt(annual_factor)
        except Exception as e:
            print(f'Cannot calculate Sortino Ratio: {str(e)}. Set "calc_sortino_ratio" to False or review function')
    y_name = f"{y_name} no Intercept" if not intercept else y_name
    return filter_columns_and_indexes(
        pd.DataFrame(summary, index=[y_name]),
        keep_columns=keep_columns,
        drop_columns=drop_columns,
        keep_indexes=keep_indexes,
        drop_indexes=drop_indexes,
        drop_before_keep=drop_before_keep
    )


def calc_strategy_oos(
    y: Union[pd.Series, pd.DataFrame],
    X: Union[pd.Series, pd.DataFrame],
    intercept: bool = True,
    rolling_size: Union[None, int] = 60,
    expanding: bool = False,
    lag_number: int = 1,
    weight_multiplier: float = 100,
    weight_min: Union[None, float] = None,
    weight_max: Union[None, float] = None,
    name: str = None,
):
    """
    Calculates an out-of-sample strategy based on rolling or expanding window regression.

    Parameters:
    y (pd.Series or pd.DataFrame): Dependent variable (strategy returns).
    X (pd.Series or pd.DataFrame): Independent variable(s) (predictors).
    intercept (bool, default=True): If True, includes an intercept in the regression.
    rolling_size (int or None, default=60): Size of the rolling window for in-sample fitting.
    expanding (bool, default=False): If True, uses an expanding window instead of rolling.
    lag_number (int, default=1): Number of lags to apply to the predictors.
    weight_multiplier (float, default=100): Multiplier to adjust strategy weights.
    weight_min (float or None, default=None): Minimum allowable weight.
    weight_max (float or None, default=None): Maximum allowable weight.
    name (str, default=None): Name for labeling the strategy returns.

    Returns:
    pd.DataFrame: Time series of strategy returns.
    """
    raise Exception("Function not available - needs testing prior to use")
    try:
        y = y.copy()
        X = X.copy()
    except:
        pass
    replication_oos = calc_replication_oos(
        y=y,
        X=X,
        intercept=intercept,
        rolling_size=rolling_size,
        lag_number=lag_number,
        expanding=expanding
    )
    actual_returns = replication_oos['Actual']
    predicted_returns = replication_oos['Prediction']
    strategy_weights = predicted_returns * weight_multiplier
    weight_min = weight_min if weight_min is not None else strategy_weights.min()
    weight_max = weight_max if weight_max is not None else strategy_weights.max()
    strategy_weights = strategy_weights.clip(lower=weight_min, upper=weight_max)
    strategy_returns = (actual_returns * strategy_weights).to_frame()
    if name:
        strategy_returns.columns = [name]
    else:
        strategy_returns.columns = [f'{y.columns[0]} Strategy']
    return strategy_returns
    

def calc_iterative_regression(
    multiple_y: Union[pd.DataFrame, pd.Series],
    X: Union[pd.DataFrame, pd.Series],
    annual_factor: Union[None, int] = 12,
    intercept: bool = True,
    warnings: bool = True,
    calc_treynor_info_ratios: bool = True,
    calc_sortino_ratio: bool = False,
    keep_columns: Union[list, str] = None,
    drop_columns: Union[list, str] = None,
    keep_indexes: Union[list, str] = None,
    drop_indexes: Union[list, str] = None,
    drop_before_keep: bool = False
):
    """
    Performs iterative regression across multiple dependent variables (assets).

    Parameters:
    multiple_y (pd.DataFrame or pd.Series): Dependent variables for multiple assets.
    X (pd.DataFrame or pd.Series): Independent variable(s) (predictors).
    annual_factor (int or None, default=12): Factor for annualizing regression statistics.
    intercept (bool, default=True): If True, includes an intercept in the regression.
    warnings (bool, default=True): If True, prints warnings about assumptions.
    calc_treynor_info_ratios (bool, default=True): If True, calculates Treynor and Information ratios.
    calc_sortino_ratio (bool, default=False): If True, calculates the Sortino ratio.
    keep_columns (list or str, default=None): Columns to keep in the resulting DataFrame.
    drop_columns (list or str, default=None): Columns to drop from the resulting DataFrame.
    keep_indexes (list or str, default=None): Indexes to keep in the resulting DataFrame.
    drop_indexes (list or str, default=None): Indexes to drop from the resulting DataFrame.
    drop_before_keep (bool, default=False): If True, drops specified columns/indexes before keeping.

    Returns:
    pd.DataFrame: Summary statistics for each asset regression.
    """
    multiple_y = multiple_y.copy()
    X = X.copy()

    if 'date' in multiple_y.columns.str.lower():
        multiple_y = multiple_y.rename({'Date': 'date'}, axis=1)
        multiple_y = multiple_y.set_index('date')
    multiple_y.index.name = 'date'

    if 'date' in X.columns.str.lower():
        X = X.rename({'Date': 'date'}, axis=1)
        X = X.set_index('date')
    X.index.name = 'date'

    regressions = pd.DataFrame({})
    for asset in multiple_y.columns:
        y = multiple_y[[asset]]
        new_regression = calc_regression(
            y, X, annual_factor=annual_factor, intercept=intercept, warnings=warnings,
            calc_treynor_info_ratios=calc_treynor_info_ratios,
            calc_sortino_ratio=calc_sortino_ratio,
        )
        warnings = False
        regressions = pd.concat([regressions, new_regression], axis=0)
    
    return filter_columns_and_indexes(
        regressions,
        keep_columns=keep_columns,
        drop_columns=drop_columns,
        keep_indexes=keep_indexes,
        drop_indexes=drop_indexes,
        drop_before_keep=drop_before_keep
    )


def calc_replication_oos(
    y: Union[pd.Series, pd.DataFrame],
    X: Union[pd.Series, pd.DataFrame],
    intercept: bool = True,
    rolling_size: Union[None, int] = 60,
    expanding: bool = False,
    return_r_squared_oos: float = False,
    lag_number: int = 1,
    keep_columns: Union[list, str] = None,
    drop_columns: Union[list, str] = None,
    keep_indexes: Union[list, str] = None,
    drop_indexes: Union[list, str] = None,
    drop_before_keep: bool = False
):
    """
    Performs out-of-sample replication of a time series regression with rolling or expanding windows.

    Parameters:
    y (pd.Series or pd.DataFrame): Dependent variable (actual returns).
    X (pd.Series or pd.DataFrame): Independent variable(s) (predictors).
    intercept (bool, default=True): If True, includes an intercept in the regression.
    rolling_size (int or None, default=60): Size of the rolling window for in-sample fitting.
    expanding (bool, default=False): If True, uses an expanding window instead of rolling.
    return_r_squared_oos (float, default=False): If True, returns the out-of-sample R-squared.
    lag_number (int, default=1): Number of lags to apply to the predictors.
    keep_columns (list or str, default=None): Columns to keep in the resulting DataFrame.
    drop_columns (list or str, default=None): Columns to drop from the resulting DataFrame.
    keep_indexes (list or str, default=None): Indexes to keep in the resulting DataFrame.
    drop_indexes (list or str, default=None): Indexes to drop from the resulting DataFrame.
    drop_before_keep (bool, default=False): If True, drops specified columns/indexes before keeping.

    Returns:
    pd.DataFrame: Summary statistics for the out-of-sample replication.
    """
    raise Exception("Function not available - needs testing prior to use")
    try:
        y = y.copy()
        X = X.copy()
    except:
        pass
    if isinstance(X, pd.Series):
        X = pd.DataFrame(X)
    if 'date' in X.columns.str.lower():
        X = X.rename({'Date': 'date'}, axis=1)
        X = X.set_index('date')
    X.index.name = 'date'

    X = X.shift(lag_number)

    df = y.join(X, how='inner')
    y = df.iloc[:, [0]].copy()
    X = df.iloc[:, 1:].copy()

    if intercept:
        X = sm.add_constant(X)

    summary_pred = pd.DataFrame({})

    for i, last_is_date in enumerate(y.index):
        if i < (rolling_size):
            continue
        y_full = y.iloc[:i].copy()
        if expanding:
            y_rolling = y_full.copy()
        else:
            y_rolling = y_full.iloc[-rolling_size:]
        X_full = X.iloc[:i].copy()
        if expanding:
            X_rolling = X_full.copy()
        else:
            X_rolling = X_full.iloc[-rolling_size:]

        reg = sm.OLS(y_rolling, X_rolling, hasconst=intercept, missing='drop').fit()
        y_pred = reg.predict(X.iloc[i, :])
        naive_y_pred = y_full.mean()
        y_actual = y.iloc[i]
        summary_line = (
            reg.params
            .to_frame()
            .transpose()
            .rename(columns=lambda c: c.replace('const', 'Alpha') if c == 'const' else c + ' Lag Beta')
        )
        summary_line['Prediction'] = y_pred[0]
        summary_line['Naive Prediction'] = naive_y_pred.squeeze()
        summary_line['Actual'] = y_actual.squeeze()
        summary_line.index = [y.index[i]]
        summary_pred = pd.concat([summary_pred, summary_line], axis=0)

    summary_pred['Prediction Error'] = summary_pred['Prediction'] - summary_pred['Actual']
    summary_pred['Naive Prediction Error'] = summary_pred['Naive Prediction'] - summary_pred['Actual']

    rss = (np.array(summary_pred['Prediction Error']) ** 2).sum()
    tss = (np.array(summary_pred['Naive Prediction Error']) ** 2).sum()

    oos_rsquared = 1 - rss / tss

    if return_r_squared_oos:
        return pd.DataFrame(
            {'R^Squared OOS': oos_rsquared},
            index=[
                y.columns[0] + " ~ " + 
                " + ".join([
                    c.replace('const', 'Alpha') if c == 'const' else c + ' Lag Beta' for c in X.columns
                ])]
        )

    print("OOS R^Squared: {:.4%}".format(oos_rsquared))

    return filter_columns_and_indexes(
        summary_pred,
        keep_columns=keep_columns,
        drop_columns=drop_columns,
        keep_indexes=keep_indexes,
        drop_indexes=drop_indexes,
        drop_before_keep=drop_before_keep
    )


def calc_replication_oos_not_lagged_features(
    y: Union[pd.Series, pd.DataFrame],
    X: Union[pd.Series, pd.DataFrame],
    intercept: bool = True,
    rolling_size: Union[None, int] = 60,
    return_r_squared_oos: float = False,
    r_squared_time_series: bool = False,
    return_parameters: bool = True,
    oos: int = 1,
    keep_columns: Union[list, str] = None,
    drop_columns: Union[list, str] = None,
    keep_indexes: Union[list, str] = None,
    drop_indexes: Union[list, str] = None,
    drop_before_keep: bool = False
):
    """
    Performs out-of-sample replication without lagged features.

    Parameters:
    y (pd.Series or pd.DataFrame): Dependent variable (actual returns).
    X (pd.Series or pd.DataFrame): Independent variable(s) (predictors).
    intercept (bool, default=True): If True, includes an intercept in the regression.
    rolling_size (int or None, default=60): Size of the rolling window for in-sample fitting.
    return_r_squared_oos (float, default=False): If True, returns the out-of-sample R-squared.
    r_squared_time_series (bool, default=False): If True, calculates time-series R-squared.
    return_parameters (bool, default=True): If True, returns regression parameters.
    oos (int, default=1): Number of periods for out-of-sample evaluation.
    keep_columns (list or str, default=None): Columns to keep in the resulting DataFrame.
    drop_columns (list or str, default=None): Columns to drop from the resulting DataFrame.
    keep_indexes (list or str, default=None): Indexes to keep in the resulting DataFrame.
    drop_indexes (list or str, default=None): Indexes to drop from the resulting DataFrame.
    drop_before_keep (bool, default=False): If True, drops specified columns/indexes before keeping.

    Returns:
    pd.DataFrame: Summary statistics for the out-of-sample replication.
    """
    raise Exception("Function not available - needs testing prior to use")
    try:
        y = y.copy()
        X = X.copy()
    except:
        pass
    if isinstance(X, pd.Series):
        X = pd.DataFrame(X)
    if 'date' in X.columns.str.lower():
        X = X.rename({'Date': 'date'}, axis=1)
        X = X.set_index('date')
    X.index.name = 'date'

    oos_print = "In-Sample" if oos == 0 else f"{oos}OS"

    summary = defaultdict(list)

    if isinstance(y, pd.Series):
        y = pd.DataFrame(y)
    y_name = y.columns[0]
    
    for idx in range(rolling_size, len(y.index)+1-oos, 1):
        X_rolling = X.iloc[idx-rolling_size:idx].copy()
        y_rolling = y.iloc[idx-rolling_size:idx, 0].copy()

        y_oos = y.iloc[idx-1+oos, 0].copy()
        X_oos = X.iloc[idx-1+oos, :].copy()

        if intercept:
            X_rolling = sm.add_constant(X_rolling)

        try:
            regr = sm.OLS(y_rolling, X_rolling, missing="drop", hasconst=intercept).fit()
        except ValueError:
            y_rolling = y_rolling.reset_index(drop=True)
            X_rolling = X_rolling.reset_index(drop=True)
            regr = sm.OLS(y_rolling, X_rolling, missing="drop", hasconst=intercept).fit()

        for jdx, coeff in enumerate(regr.params.index):
            if coeff != 'const':
                summary[f"{coeff} Beta {oos_print}"].append(regr.params[jdx])
            else:
                summary[f"{coeff} {oos_print}"].append(regr.params[jdx])

        if intercept:
            y_pred = regr.params[0] + (regr.params[1:] @ X_oos)
        else:
            y_pred = regr.params @ X_oos

        summary[f"{y_name} Replicated"].append(y_pred)
        summary[f"{y_name} Actual"].append(y_oos)

    summary = pd.DataFrame(summary, index=X.index[rolling_size-1+oos:])

    if r_squared_time_series:
        time_series_error = pd.DataFrame({})
        for idx in range(rolling_size, len(y.index)+1-oos, 1):
            y_rolling = y.iloc[idx-rolling_size:idx, 0].copy()
            y_oos = y.iloc[idx-1+oos, 0].copy()
            time_series_error.loc[y.index[idx-1+oos], 'Naive Error'] = y_oos - y_rolling.mean()
        time_series_error['Model Error'] = summary[f"{y_name} Actual"] - summary[f"{y_name} Replicated"]
        oos_rsquared = (
            1 - time_series_error['Model Error'].apply(lambda x: x ** 2).sum()
            / time_series_error['Naive Error'].apply(lambda x: x ** 2).sum()
        )
    else:
        oos_rsquared = (
            1 - (summary[f"{y_name} Actual"] - summary[f"{y_name} Replicated"]).var()
            / summary[f"{y_name} Actual"].var()
        )

    if return_r_squared_oos:
        return oos_rsquared
    
    if not return_parameters:
        summary = summary[[f"{y_name} Actual", f"{y_name} Replicated"]]

    if not intercept:
        summary = summary.rename(columns=lambda c: c.replace(' Replicated', f' Replicated no Intercept'))

    if not intercept:
        print(f"R^Squared {oos_print} without Intercept: {oos_rsquared:.2%}")

    else:
        print(f"R^Squared {oos_print}: {oos_rsquared:.2%}")

    summary = summary.rename(columns=lambda c: (
        c.replace(' Replicated', f' Replicated {oos_print}').replace(' Actual', f' Actual {oos_print}')
    ))

    return filter_columns_and_indexes(
        summary,
        keep_columns=keep_columns,
        drop_columns=drop_columns,
        keep_indexes=keep_indexes,
        drop_indexes=drop_indexes,
        drop_before_keep=drop_before_keep
    )


def create_portfolio(
    returns: pd.DataFrame,
    weights: Union[dict, list],
    port_name: Union[None, str] = None
):
    """
    Creates a portfolio by applying the specified weights to the asset returns.

    Parameters:
    returns (pd.DataFrame): Time series of asset returns.
    weights (dict or list): Weights to apply to the returns. If a list is provided, it will be converted into a dictionary.
    port_name (str or None, default=None): Name for the portfolio. If None, a name will be generated based on asset weights.

    Returns:
    pd.DataFrame: The portfolio returns based on the provided weights.
    """
    if 'date' in returns.columns.str.lower():
        returns = returns.rename({'Date': 'date'}, axis=1)
        returns = returns.set_index('date')
    returns.index.name = 'date'

    if isinstance(weights, list):
        returns = returns.iloc[:, :len(weights)]
        weights = dict(zip(returns.columns, weights))

    returns = returns[list(weights.keys())]
    port_returns = pd.DataFrame(returns @ list(weights.values()))

    if port_name is None:
        port_name = " + ".join([f"{n} ({w:.2%})" for n, w in weights.items()])
    port_returns.columns = [port_name]
    return port_returns


def calc_ewma_volatility(
        excess_returns: pd.Series,
        theta : float = 0.94,
        initial_vol : float = .2 / np.sqrt(252)
    ) -> pd.Series:
    var_t0 = initial_vol ** 2
    ewma_var = [var_t0]
    for i in range(len(excess_returns.index)):
        new_ewma_var = ewma_var[-1] * theta + (excess_returns.iloc[i] ** 2) * (1 - theta)
        ewma_var.append(new_ewma_var)
    ewma_var.pop(0) # Remove var_t0
    ewma_vol = [np.sqrt(v) for v in ewma_var]
    return pd.Series(ewma_vol, index=excess_returns.index)


def calc_garch_volatility(
        excess_returs: pd.Series,
        p: int = 1,
        q: int = 1
    ) -> pd.Series:
    model = arch_model(excess_returs, vol='Garch', p=p, q=q)
    fitted_model = model.fit(disp='off')
    fitted_values = fitted_model.conditional_volatility
    return fitted_values


def calc_garch_volatility(
        excess_returs: pd.Series,
        p: int = 1,
        q: int = 1
    ):
    model = arch_model(excess_returs, vol='Garch', p=p, q=q)
    fitted_model = model.fit(disp='off')
    fitted_values = fitted_model.conditional_volatility
    return pd.Series(fitted_values, index=excess_returs.index)


def calc_var_cvar_summary(
    returns: Union[pd.Series, pd.DataFrame],
    quantile: Union[None, float] = .05,
    window: Union[None, str] = None,
    return_hit_ratio: bool = False,
    filter_first_hit_ratio_date: Union[None, str, datetime.date] = None,
    return_stats: Union[str, list] = ['Returns', 'VaR', 'CVaR', 'Vol'],
    full_time_sample: bool = False,
    z_score: float = None,
    shift: int = 1,
    normal_vol_formula: bool = False,
    ewma_theta : float = .94,
    ewma_initial_vol : float = .2 / np.sqrt(252),
    garch_p: int = 1,
    garch_q: int = 1,
    keep_columns: Union[list, str] = None,
    drop_columns: Union[list, str] = None,
    keep_indexes: Union[list, str] = None,
    drop_indexes: Union[list, str] = None,
    drop_before_keep: bool = False,
):
    """
    Calculates a summary of VaR (Value at Risk) and CVaR (Conditional VaR) for the provided returns.

    Parameters:
    returns (pd.Series or pd.DataFrame): Time series of returns.
    quantile (float or None, default=0.05): Quantile to calculate the VaR and CVaR.
    window (str or None, default=None): Window size for rolling calculations.
    return_hit_ratio (bool, default=False): If True, returns the hit ratio for the VaR.
    return_stats (str or list, default=['Returns', 'VaR', 'CVaR', 'Vol']): Statistics to return in the summary.
    full_time_sample (bool, default=False): If True, calculates using the full time sample.
    z_score (float, default=None): Z-score for parametric VaR calculation.
    shift (int, default=1): Period shift for VaR/CVaR calculations.
    normal_vol_formula (bool, default=False): If True, uses the normal volatility formula.
    keep_columns (list or str, default=None): Columns to keep in the resulting DataFrame.
    drop_columns (list or str, default=None): Columns to drop from the resulting DataFrame.
    keep_indexes (list or str, default=None): Indexes to keep in the resulting DataFrame.
    drop_indexes (list or str, default=None): Indexes to drop from the resulting DataFrame.
    drop_before_keep (bool, default=False): If True, drops specified columns/indexes before keeping.

    Returns:
    pd.DataFrame: Summary of VaR and CVaR statistics.
    """
    if window is None:
        print('Using "window" of 60 periods, since none was specified')
        window = 60
    if isinstance(returns, pd.DataFrame):
        returns_series = returns.iloc[:, 0]
        returns_series.index = returns.index
        returns = returns_series.copy()

    summary = pd.DataFrame({})

    # Returns
    summary[f'Returns'] = returns

    # VaR
    summary[f'Expanding {window:.0f} Historical VaR ({quantile:.2%})'] = returns.expanding(min_periods=window).quantile(quantile)
    summary[f'Rolling {window:.0f} Historical VaR ({quantile:.2%})'] = returns.rolling(window=window).quantile(quantile)
    if normal_vol_formula:
        summary[f'Expanding {window:.0f} Volatility'] = returns.expanding(window).std()
        summary[f'Rolling {window:.0f} Volatility'] = returns.rolling(window).std()
    else:
        summary[f'Expanding {window:.0f} Volatility'] = np.sqrt((returns ** 2).expanding(window).mean())
        summary[f'Rolling {window:.0f} Volatility'] = np.sqrt((returns ** 2).rolling(window).mean())
    summary[f'EWMA {ewma_theta:.2f} Volatility'] = calc_ewma_volatility(returns, theta=ewma_theta, initial_vol=ewma_initial_vol)
    summary[f'GARCH({garch_p:.0f}, {garch_q:.0f}) Volatility'] = calc_garch_volatility(returns, p=garch_p, q=garch_q)

    z_score = norm.ppf(quantile) if z_score is None else z_score
    summary[f'Expanding {window:.0f} Parametric VaR ({quantile:.2%})'] = summary[f'Expanding {window:.0f} Volatility'] * z_score
    summary[f'Rolling {window:.0f} Parametric VaR ({quantile:.2%})'] = summary[f'Rolling {window:.0f} Volatility'] * z_score
    summary[f'EWMA {ewma_theta:.2f} Parametric VaR ({quantile:.2%})'] = summary[f'EWMA {ewma_theta:.2f} Volatility'] * z_score
    summary[f'GARCH({garch_p:.0f}, {garch_q:.0f}) Parametric VaR ({quantile:.2%})'] = summary[f'GARCH({garch_p:.0f}, {garch_q:.0f}) Volatility'] * z_score

    if return_hit_ratio:
        shift_stats = [
            f'Expanding {window:.0f} Historical VaR ({quantile:.2%})',
            f'Rolling {window:.0f} Historical VaR ({quantile:.2%})',
            f'Expanding {window:.0f} Parametric VaR ({quantile:.2%})',
            f'Rolling {window:.0f} Parametric VaR ({quantile:.2%})',
            f'EWMA {ewma_theta:.2f} Parametric VaR ({quantile:.2%})',
            f'GARCH({garch_p:.0f}, {garch_q:.0f}) Parametric VaR ({quantile:.2%})'
        ]
        summary_shift = summary.copy()
        summary_shift[shift_stats] = summary_shift[shift_stats].shift()
        if filter_first_hit_ratio_date:
            if isinstance(filter_first_hit_ratio_date, (datetime.date, datetime.datetime)):
                filter_first_hit_ratio_date = filter_first_hit_ratio_date.strftime("%Y-%m-%d")
            summary_shift = summary_shift.loc[filter_first_hit_ratio_date:]
        summary_shift = summary_shift.dropna(axis=0)
        summary_shift[shift_stats] = summary_shift[shift_stats].apply(lambda x: (x - summary_shift['Returns']) > 0)
        hit_ratio = pd.DataFrame(summary_shift[shift_stats].mean(), columns=['Hit Ratio'])
        hit_ratio['Hit Ratio Error'] = (hit_ratio['Hit Ratio'] - quantile) / quantile
        hit_ratio['Hit Ratio Absolute Error'] = abs(hit_ratio['Hit Ratio Error'])
        hit_ratio = hit_ratio.sort_values('Hit Ratio Absolute Error')
        return filter_columns_and_indexes(
            hit_ratio,
            keep_columns=keep_columns,
            drop_columns=drop_columns,
            keep_indexes=keep_indexes,
            drop_indexes=drop_indexes,
            drop_before_keep=drop_before_keep
        )

    # CVaR
    summary[f'Expanding {window:.0f} Historical CVaR ({quantile:.2%})'] = returns.expanding(window).apply(lambda x: x[x < x.quantile(quantile)].mean())
    summary[f'Rolling {window:.0f} Historical CVaR ({quantile:.2%})'] = returns.rolling(window).apply(lambda x: x[x < x.quantile(quantile)].mean())
    summary[f'Expanding {window:.0f} Parametrical CVaR ({quantile:.2%})'] = - norm.pdf(z_score) / quantile * summary[f'Expanding {window:.0f} Volatility']
    summary[f'Rolling {window:.0f} Parametrical CVaR ({quantile:.2%})'] = - norm.pdf(z_score) / quantile * summary[f'Rolling {window:.0f} Volatility']
    summary[f'EWMA {ewma_theta:.2f} Parametrical CVaR ({quantile:.2%})'] = - norm.pdf(z_score) / quantile * summary[f'EWMA {ewma_theta:.2f} Volatility']
    summary[f'GARCH({garch_p:.0f}, {garch_q:.0f}) Parametrical CVaR ({quantile:.2%})'] = - norm.pdf(z_score) / quantile * summary[f'GARCH({garch_p:.0f}, {garch_q:.0f}) Volatility']

    if shift > 0:
        shift_columns = [c for c in summary.columns if not bool(re.search("returns", c))]
        summary[shift_columns] = summary[shift_columns].shift(shift)
        print(f'VaR and CVaR are given shifted by {shift:0f} period(s).')
    else:
        print('VaR and CVaR are given in-sample.')

    if full_time_sample:
        summary = summary.loc[:, lambda df: [c for c in df.columns if bool(re.search('expanding', c.lower()))]]
    return_stats = [return_stats.lower()] if isinstance(return_stats, str) else [s.lower() for s in return_stats]
    return_stats = list(map(lambda x: 'volatility' if x == 'vol' else x, return_stats))
    if return_stats == ['all'] or set(return_stats) == set(['returns', 'var', 'cvar', 'volatility']):
        return filter_columns_and_indexes(
            summary,
            keep_columns=keep_columns,
            drop_columns=drop_columns,
            keep_indexes=keep_indexes,
            drop_indexes=drop_indexes,
            drop_before_keep=drop_before_keep
        )
    return filter_columns_and_indexes(
        summary.loc[:, lambda df: df.columns.map(lambda c: bool(re.search(r"\b" + r"\b|\b".join(return_stats) + r"\b", c.lower())))],
        keep_columns=keep_columns,
        drop_columns=drop_columns,
        keep_indexes=keep_indexes,
        drop_indexes=drop_indexes,
        drop_before_keep=drop_before_keep
    )


def calc_rolling_oos_port(
    returns: pd.DataFrame,
    weights_func,
    window: Union[None, int] = None,
    weights_func_params: dict = {},
    port_name: str = 'Portfolio OOS',
    expanding: bool = False,
    keep_columns: Union[list, str] = None,
    drop_columns: Union[list, str] = None,
    keep_indexes: Union[list, str] = None,
    drop_indexes: Union[list, str] = None,
    drop_before_keep: bool = False
):
    """
    Calculates a rolling out-of-sample portfolio based on a rolling or expanding window optimization.

    Parameters:
    returns (pd.DataFrame): Time series of asset returns.
    weights_func (function): Function to calculate the portfolio weights.
    window (int or None, default=None): Rolling window size for in-sample optimization.
    weights_func_params (dict, default={}): Additional parameters for the weights function.
    port_name (str, default='Portfolio OOS'): Name for the portfolio.
    expanding (bool, default=False): If True, uses an expanding window instead of a rolling one.
    keep_columns (list or str, default=None): Columns to keep in the resulting DataFrame.
    drop_columns (list or str, default=None): Columns to drop from the resulting DataFrame.
    keep_indexes (list or str, default=None): Indexes to keep in the resulting DataFrame.
    drop_indexes (list or str, default=None): Indexes to drop from the resulting DataFrame.
    drop_before_keep (bool, default=False): If True, drops specified columns/indexes before keeping.

    Returns:
    pd.DataFrame: Out-of-sample portfolio returns.
    """
    raise Exception("Function not available - needs testing prior to use")
    if window is None:
        print('Using "window" of 60 periods for in-sample optimization, since none were provided.')
        window = 60
    returns = returns.copy()
    if 'date' in returns.columns.str.lower():
        returns = returns.rename({'Date': 'date'}, axis=1)
        returns = returns.set_index('date')
    returns.index.name = 'date'

    port_returns_oos = pd.DataFrame({})

    for idx in range(0, len(returns.index)-window):
        modified_idx = 0 if expanding else idx
        weights_func_all_params = {'returns': returns.iloc[modified_idx:(window+idx), :]}
        weights_func_all_params.update(weights_func_params)
        wts = weights_func(**weights_func_all_params).iloc[:, 0]
        idx_port_return_oos = sum(returns.iloc[window, :].loc[wts.index] * wts)
        idx_port_return_oos = pd.DataFrame(
            {port_name: idx_port_return_oos},
            index=[returns.index[idx+window]]
        )
        port_returns_oos = pd.concat([port_returns_oos, idx_port_return_oos])

    return filter_columns_and_indexes(
        port_returns_oos,
        keep_columns=keep_columns,
        drop_columns=drop_columns,
        keep_indexes=keep_indexes,
        drop_indexes=drop_indexes,
        drop_before_keep=drop_before_keep
    )


def calc_fx_exc_ret(
    fx_rates: pd.DataFrame,
    rf_rates: pd.DataFrame,
    transform_to_log_fx_rates: bool = True,
    transform_to_log_rf_rates: bool = True,
    rf_to_fx: dict = None,
    base_rf: str = None,
    base_rf_series: Union[pd.Series, pd.DataFrame] = None,
    annual_factor: Union[int, None] = None,
    return_exc_ret: bool = False,
    keep_columns: Union[list, str] = None,
    drop_columns: Union[list, str] = None,
    keep_indexes: Union[list, str] = None,
    drop_indexes: Union[list, str] = None,
    drop_before_keep: bool = False
):
    """
    Calculates foreign exchange excess returns by subtracting risk-free rates from FX rates.

    Parameters:
    fx_rates (pd.DataFrame): Time series of FX rates.
    rf_rates (pd.DataFrame): Time series of risk-free rates.
    transform_to_log_fx_rates (bool, default=True): If True, converts FX rates to log returns.
    transform_to_log_rf_rates (bool, default=True): If True, converts risk-free rates to log returns.
    rf_to_fx (dict, default=None): Mapping of risk-free rates to FX pairs.
    base_rf (str, default=None): Base risk-free rate to use for calculations.
    base_rf_series (pd.Series or pd.DataFrame, default=None): Time series of the base risk-free rate.
    annual_factor (int or None, default=None): Factor for annualizing the returns.
    return_exc_ret (bool, default=False): If True, returns the excess returns instead of summary statistics.
    keep_columns (list or str, default=None): Columns to keep in the resulting DataFrame.
    drop_columns (list or str, default=None): Columns to drop from the resulting DataFrame.
    keep_indexes (list or str, default=None): Indexes to keep in the resulting DataFrame.
    drop_indexes (list or str, default=None): Indexes to drop from the resulting DataFrame.
    drop_before_keep (bool, default=False): If True, drops specified columns/indexes before keeping.

    Returns:
    pd.DataFrame: Summary statistics or excess returns based on FX rates and risk-free rates.
    """
    raise Exception("Function not available - needs testing prior to use")
    fx_rates = fx_rates.copy()
    rf_rates = rf_rates.copy()
    if isinstance(base_rf_series, (pd.Series, pd.DataFrame)):
        base_rf_series = base_rf_series.copy()

    if rf_to_fx is None:
        rf_to_fx = {
            'GBP1M': 'USUK',
            'EUR1M': 'USEU',
            'CHF1M': 'USSZ',
            'JPY1M': 'USJP'
        }

    if transform_to_log_fx_rates:
        fx_rates = fx_rates.applymap(lambda x: math.log(x))

    if transform_to_log_rf_rates:
        rf_rates = rf_rates.applymap(lambda x: math.log(x + 1))

    if base_rf is None and base_rf_series is None:
        print("No 'base_rf' or 'base_rf_series' was provided. Trying to use 'USD1M' as the base risk-free rate.")
        base_rf = 'USD1M'
    if base_rf_series is None:
        base_rf_series = rf_rates[base_rf]

    all_fx_holdings_exc_ret = pd.DataFrame({})
    for rf, fx in rf_to_fx.items():
        fx_holdings_exc_ret = fx_rates[fx] - fx_rates[fx].shift(1) + rf_rates[rf].shift(1) - base_rf_series.shift(1)
        try:
            rf_name = re.sub('[0-9]+M', '', rf)
        except:
            rf_name = rf
        fx_holdings_exc_ret = fx_holdings_exc_ret.dropna(axis=0).to_frame(rf_name)
        all_fx_holdings_exc_ret = all_fx_holdings_exc_ret.join(fx_holdings_exc_ret, how='outer')

    if not return_exc_ret:
        return filter_columns_and_indexes(
            calc_summary_statistics(all_fx_holdings_exc_ret, annual_factor=annual_factor),
            keep_columns=keep_columns, drop_columns=drop_columns,
            keep_indexes=keep_indexes, drop_indexes=drop_indexes,
            drop_before_keep=drop_before_keep
        )
    else:
        return filter_columns_and_indexes(
            all_fx_holdings_exc_ret,
            keep_columns=keep_columns, drop_columns=drop_columns,
            keep_indexes=keep_indexes, drop_indexes=drop_indexes,
            drop_before_keep=drop_before_keep
        )
    

def calc_fx_regression(
    fx_rates: pd.DataFrame,
    rf_rates: pd.DataFrame,
    transform_to_log_fx_rates: bool = True,
    transform_to_log_rf_rates: bool = True,
    rf_to_fx: dict = None,
    base_rf: str = None,
    base_rf_series: Union[pd.Series, pd.DataFrame] = None,
    annual_factor: Union[int, None] = None,
    keep_columns: Union[list, str] = None,
    drop_columns: Union[list, str] = None,
    keep_indexes: Union[list, str] = None,
    drop_indexes: Union[list, str] = None,
    drop_before_keep: bool = False,
    print_analysis: bool = True
):
    """
    Calculates FX regression and provides an analysis of how the risk-free rate differentials affect FX rates.

    Parameters:
    fx_rates (pd.DataFrame): Time series of FX rates.
    rf_rates (pd.DataFrame): Time series of risk-free rates.
    transform_to_log_fx_rates (bool, default=True): If True, converts FX rates to log returns.
    transform_to_log_rf_rates (bool, default=True): If True, converts risk-free rates to log returns.
    rf_to_fx (dict, default=None): Mapping of risk-free rates to FX pairs.
    base_rf (str, default=None): Base risk-free rate to use for calculations.
    base_rf_series (pd.Series or pd.DataFrame, default=None): Time series of the base risk-free rate.
    annual_factor (int or None, default=None): Factor for annualizing returns.
    keep_columns (list or str, default=None): Columns to keep in the resulting DataFrame.
    drop_columns (list or str, default=None): Columns to drop from the resulting DataFrame.
    keep_indexes (list or str, default=None): Indexes to keep in the resulting DataFrame.
    drop_indexes (list or str, default=None): Indexes to drop from the resulting DataFrame.
    drop_before_keep (bool, default=False): If True, drops specified columns/indexes before keeping.
    print_analysis (bool, default=True): If True, prints an analysis of the regression results.

    Returns:
    pd.DataFrame: Summary of regression statistics for the FX rates and risk-free rate differentials.
    """
    raise Exception("Function not available - needs testing prior to use")
    fx_rates = fx_rates.copy()
    rf_rates = rf_rates.copy()
    if isinstance(base_rf_series, (pd.Series, pd.DataFrame)):
        base_rf_series = base_rf_series.copy()

    if rf_to_fx is None:
        rf_to_fx = {
            'GBP1M': 'USUK',
            'EUR1M': 'USEU',
            'CHF1M': 'USSZ',
            'JPY1M': 'USJP'
        }

    if transform_to_log_fx_rates:
        fx_rates = fx_rates.applymap(lambda x: math.log(x))

    if transform_to_log_rf_rates:
        rf_rates = rf_rates.applymap(lambda x: math.log(x + 1))

    if base_rf is None and base_rf_series is None:
        print("No 'base_rf' or 'base_rf_series' was provided. Trying to use 'USD1M' as the base risk-free rate.")
        base_rf = 'USD1M'
    if base_rf_series is None:
        base_rf_series = rf_rates[base_rf]

    if annual_factor is None:
        print("Regression assumes 'annual_factor' equals to 12 since it was not provided")
        annual_factor = 12

    all_regressions_summary = pd.DataFrame({})

    for rf, fx in rf_to_fx.items():
        try:
            rf_name = re.sub('[0-9]+M', '', rf)
        except:
            rf_name = rf
        factor = (base_rf_series - rf_rates[rf]).to_frame('Base RF - Foreign RF')
        strat = fx_rates[fx].diff().to_frame(rf_name)
        regression_summary = calc_regression(strat, factor, annual_factor=annual_factor, warnings=False)
        all_regressions_summary = pd.concat([all_regressions_summary, regression_summary])

    if print_analysis:
        try:
            print('\n' * 2)
            for currency in all_regressions_summary.index:
                fx_beta = all_regressions_summary.loc[currency, 'Base RF - Foreign RF Beta']
                fx_alpha = all_regressions_summary.loc[currency, 'Alpha']
                print(f'For {currency} against the base currency, the Beta is {fx_beta:.2f}.')
                if 1.1 >= fx_beta and fx_beta >= 0.85:
                    print(
                        'which shows that, on average, the difference in risk-free rate is mainly offset by the FX rate movement.'
                    )
                elif fx_beta > 1.1:
                    print(
                        'which shows that, on average, the difference in risk-free rate is more than offset by the FX rate movement.,\n'
                        'Therefore, on average, the currency with the lower risk-free rate outperforms.'
                    )
                elif fx_beta < 0.85 and fx_beta > 0.15:
                    print(
                        'which shows that, on average, the difference in risk-free rate is only partially offset by the FX rate movement.\n'
                        'Therefore, on average, the currency with the higher risk-free rate outperforms.'
                    )
                elif fx_beta <= 0.15 and fx_beta >= -0.1:
                    print(
                        'which shows that, on average, the difference in risk-free rate is almost not offset by the FX rate movement.\n'
                        'Therefore, on average, the currency with the higher risk-free rate outperforms.'
                    )
                elif fx_beta <= 0.15 and fx_beta >= -0.1:
                    print(
                        'which shows that, on average, the difference in risk-free rate is almost not offset by the FX rate movement.\n'
                        'Therefore, on average, the currency with the higher risk-free rate outperforms.'
                    )
                else:
                    print(
                        'which shows that, on average, the change FX rate helps the currency with the highest risk-free return.\n'
                        'Therefore, the difference between returns is increased, on average, by the changes in the FX rate.'
                    )
                print('\n' * 2)
        except:
            print('Could not print analysis. Review function.')

    return filter_columns_and_indexes(
        all_regressions_summary,
        keep_columns=keep_columns, drop_columns=drop_columns,
        keep_indexes=keep_indexes, drop_indexes=drop_indexes,
        drop_before_keep=drop_before_keep
    )


def calc_dynamic_carry_trade(
    fx_rates: pd.DataFrame,
    rf_rates: pd.DataFrame,
    transform_to_log_fx_rates: bool = True,
    transform_to_log_rf_rates: bool = True,
    rf_to_fx: dict = None,
    base_rf: str = None,
    base_rf_series: Union[pd.Series, pd.DataFrame] = None,
    annual_factor: Union[int, None] = None,
    return_premium_series: bool = False,
    keep_columns: Union[list, str] = None,
    drop_columns: Union[list, str] = None,
    keep_indexes: Union[list, str] = None,
    drop_indexes: Union[list, str] = None,
    drop_before_keep: bool = False
):
    """
    Calculates the dynamic carry trade strategy based on FX rates and risk-free rate differentials.

    Parameters:
    fx_rates (pd.DataFrame): Time series of FX rates.
    rf_rates (pd.DataFrame): Time series of risk-free rates.
    transform_to_log_fx_rates (bool, default=True): If True, converts FX rates to log returns.
    transform_to_log_rf_rates (bool, default=True): If True, converts risk-free rates to log returns.
    rf_to_fx (dict, default=None): Mapping of risk-free rates to FX pairs.
    base_rf (str, default=None): Base risk-free rate to use for calculations.
    base_rf_series (pd.Series or pd.DataFrame, default=None): Time series of the base risk-free rate.
    annual_factor (int or None, default=None): Factor for annualizing the returns.
    return_premium_series (bool, default=False): If True, returns the premium series instead of summary statistics.
    keep_columns (list or str, default=None): Columns to keep in the resulting DataFrame.
    drop_columns (list or str, default=None): Columns to drop from the resulting DataFrame.
    keep_indexes (list or str, default=None): Indexes to keep in the resulting DataFrame.
    drop_indexes (list or str, default=None): Indexes to drop from the resulting DataFrame.
    drop_before_keep (bool, default=False): If True, drops specified columns/indexes before keeping.

    Returns:
    pd.DataFrame: Summary of the carry trade strategy statistics or premium series.
    """
    raise Exception("Function not available - needs testing prior to use")
    if annual_factor is None:
        print("Regression assumes 'annual_factor' equals to 12 since it was not provided")
        annual_factor = 12
        
    fx_regressions = calc_fx_regression(
        fx_rates, rf_rates, transform_to_log_fx_rates, transform_to_log_rf_rates,
        rf_to_fx, base_rf, base_rf_series, annual_factor
    )

    fx_rates = fx_rates.copy()
    rf_rates = rf_rates.copy()
    if isinstance(base_rf_series, (pd.Series, pd.DataFrame)):
        base_rf_series = base_rf_series.copy()

    if rf_to_fx is None:
        rf_to_fx = {
            'GBP1M': 'USUK',
            'EUR1M': 'USEU',
            'CHF1M': 'USSZ',
            'JPY1M': 'USJP'
        }

    if transform_to_log_fx_rates:
        fx_rates = fx_rates.applymap(lambda x: math.log(x))

    if transform_to_log_rf_rates:
        rf_rates = rf_rates.applymap(lambda x: math.log(x + 1))

    if base_rf is None and base_rf_series is None:
        print("No 'base_rf' or 'base_rf_series' was provided. Trying to use 'USD1M' as the base risk-free rate.")
        base_rf = 'USD1M'
    if base_rf_series is None:
        base_rf_series = rf_rates[base_rf]

    all_expected_fx_premium = pd.DataFrame({})
    for rf in rf_to_fx.keys():
        try:
            rf_name = re.sub('[0-9]+M', '', rf)
        except:
            rf_name = rf
        fx_er_usd = (base_rf_series.shift(1) - rf_rates[rf].shift(1)).to_frame('ER Over USD')
        expected_fx_premium = fx_regressions.loc[rf_name, 'Alpha'] + (fx_regressions.loc[rf_name, 'Base RF - Foreign RF Beta'] - 1) * fx_er_usd
        expected_fx_premium = expected_fx_premium.rename(columns={'ER Over USD': rf_name})
        all_expected_fx_premium = all_expected_fx_premium.join(expected_fx_premium, how='outer')

    if return_premium_series:
        return filter_columns_and_indexes(
            all_expected_fx_premium,
            keep_columns=keep_columns, drop_columns=drop_columns,
            keep_indexes=keep_indexes, drop_indexes=drop_indexes,
            drop_before_keep=drop_before_keep
        )
    
    all_expected_fx_premium = all_expected_fx_premium.dropna(axis=0)
    summary_statistics = (
        all_expected_fx_premium
        .applymap(lambda x: 1 if x > 0 else 0)
        .agg(['mean', 'sum', 'count'])
        .set_axis(['% of Periods with Positive Premium', 'Nº of Positive Premium Periods', 'Total Number of Periods'])
    )
    summary_statistics = pd.concat([
        summary_statistics,
        (
            all_expected_fx_premium
            .agg(['mean', 'std', 'min', 'max', 'skew', 'kurtosis'])
            .set_axis(['Mean', 'Vol', 'Min', 'Max', 'Skewness', 'Kurtosis'])
        )
    ])
    summary_statistics = summary_statistics.transpose()
    summary_statistics['Annualized Mean'] = summary_statistics['Mean'] * annual_factor
    summary_statistics['Annualized Vol'] = summary_statistics['Vol'] * math.sqrt(annual_factor)
    
    return filter_columns_and_indexes(
        summary_statistics,
        keep_columns=keep_columns, drop_columns=drop_columns,
        keep_indexes=keep_indexes, drop_indexes=drop_indexes,
        drop_before_keep=drop_before_keep
    )

In [5]:
factors

Unnamed: 0_level_0,LVL,HMS,IMO
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2010-01-05,0.001741,-0.003471,0.006597
2010-01-06,0.012889,0.016399,-0.020703
2010-01-07,-0.011484,0.011919,-0.001153
2010-01-08,0.000436,0.007519,0.000570
2010-01-11,-0.007377,0.006306,-0.000623
...,...,...,...
2025-10-27,-0.001392,-0.004590,0.007534
2025-10-28,-0.002979,-0.012895,0.002998
2025-10-29,0.008762,0.006220,0.006162
2025-10-30,0.019519,0.027967,-0.011074


In [6]:
returns

Unnamed: 0_level_0,CL,GC,HO,LE,NG,PL,RB,SB,SI,ZC,ZL,ZM,ZS
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
2010-01-05,0.003190,0.000358,0.001643,0.011127,-0.041978,0.008897,0.009789,0.000724,0.019553,0.000597,-0.004646,0.010759,0.002620
2010-01-06,0.017244,0.015920,0.004148,-0.004344,0.065993,0.013980,0.005459,0.027858,0.021484,0.007164,-0.000983,-0.004696,-0.001663
2010-01-07,-0.006251,-0.002465,-0.008896,-0.000291,-0.033783,0.000515,-0.000796,-0.014432,0.009360,-0.010077,-0.016720,-0.034287,-0.031176
2010-01-08,0.001089,0.004501,0.007648,-0.001164,-0.009817,0.007469,0.009555,-0.016786,0.006818,0.013174,-0.011503,-0.000652,-0.004667
2010-01-11,-0.002779,0.010982,-0.009181,-0.009030,-0.051313,0.015148,-0.005846,-0.028333,0.012190,-0.001182,-0.008601,-0.006845,-0.011106
...,...,...,...,...,...,...,...,...,...,...,...,...,...
2025-10-27,-0.003089,-0.028288,0.013732,-0.021070,0.041768,-0.009725,-0.001196,-0.034068,-0.037518,0.012995,0.009946,0.013941,0.024478
2025-10-28,-0.018920,-0.008921,-0.020073,-0.005790,-0.028181,-0.000887,0.002499,-0.006224,0.012091,0.007580,-0.010045,0.027834,0.010307
2025-10-29,0.005486,0.004412,0.015541,0.017143,0.009268,0.009068,0.025192,0.003479,0.012647,0.004630,-0.001990,0.007178,0.001855
2025-10-30,0.001488,0.004418,0.014726,0.016746,0.171801,0.010683,0.015048,-0.009709,0.014815,-0.008641,-0.010167,0.022352,0.010183


# 1. Commodity Returns and Factors

### 1.1 Factor Summary Statistics

For each of the three factors, report only the following summary statistics (rounded to at least 6 decimal places):
- Annualized Mean Return
- Annualized Volatility
- Annualized Sharpe Ratio

In [11]:
factor_stats = calc_summary_statistics(
    returns=factors,
    annual_factor=252,              # daily → ~252 trading days per year
    provided_excess_returns=True,   # treat as excess returns → Sharpe = mean/vol
    correlations=False,             # we don’t need correlation matrix here
    return_tangency_weights=False   # we don’t need tangency weights for this part
)

factor_stats_1_1 = factor_stats[[
    "Annualized Mean",
    "Annualized Vol",
    "Annualized Sharpe"
]]

# 4. Round to at least 6 decimal places
pd.options.display.float_format = "{:,.6f}".format

print(factor_stats_1_1)

     Annualized Mean  Annualized Vol  Annualized Sharpe
LVL         0.064439        0.154613           0.416775
HMS         0.008842        0.226259           0.039078
IMO         0.007281        0.153281           0.047500


### 1.2 Factor Correlations

Calculate and report the correlation matrix between the three factors (rounded to at least 6 decimal places).

In [None]:
# 1. Ensure we’re using the same factor DataFrame as before
factors = factors[["LVL", "HMS", "IMO"]]

# 2. Compute the correlation matrix
corr_matrix = factors.corr()

# 3. Round to six decimal places
corr_matrix = corr_matrix.round(6)

# 4. Display with full precision
pd.options.display.float_format = "{:,.6f}".format

print(corr_matrix)
# Xtx make pretty later

         LVL       HMS       IMO
LVL 1.000000  0.400202  0.010439
HMS 0.400202  1.000000 -0.071747
IMO 0.010439 -0.071747  1.000000


### 1.3 Interpretation

Does the factor construction make sense given the correlations you observe?

Yes, the factor construction makes sense. The moderate positive correlation between LVL and HMS shows that periods of broad commodity strength tend to favor hard over soft commodities, which is economically intuitive. Meanwhile, IMO is nearly uncorrelated with the others, confirming it captures a distinct production-chain dynamic independent of overall commodity movements.

### 1.4 Tangency Portfolio Weights

Build a tangency portfolio using the three factors as assets.

Report the weights of each factor in the tangency portfolio, rounded to at least 6 decimal places. You may assume that the factors use *excess* return.

In [13]:
# Tangency portfolio weights (daily data ⇒ annual_factor=252)
tangency_weights = calc_tangency_weights(
    returns=factors,
    annual_factor=252,
    name="Tangency"
)

# Round to six decimal places
tangency_weights = tangency_weights.round(6)

print(tangency_weights)

     Tangency Weights
LVL          1.171917
HMS         -0.250927
IMO          0.079011


### 1.5 Interpretation

What do the tangency portfolio weights suggest about the relative importance of each factor?

The tangency portfolio places the greatest weight on LVL, showing it is the dominant source of risk-adjusted return among the three factors. The small positive weight on IMO suggests it provides modest diversification benefits, while the negative weight on HMS indicates that hard-minus-soft exposure adds volatility without improving expected return. Overall, the portfolio is driven primarily by the broad LVL factor, with the other two used mainly to fine-tune risk.

### 1.6. 

Estimate an autoregression of the factor `LVL`...

$$r_t = \gamma + \rho\, r_{t-1} + \epsilon_t$$

Only report $\rho$ (rounded to at least 6 decimal places).

Does the `LVL` factor exhibit momentum?

In [15]:
# 1. Set up LVL and its lag
lvl = factors[["LVL"]].copy()
lvl_lag = lvl.shift(1).rename(columns={"LVL": "LVL_lag"})

# 2. Align y_t and r_{t-1} and drop the first NaN
ar_df = lvl.join(lvl_lag, how="inner").dropna()

# 3. Run the autoregression r_t = γ + ρ r_{t-1} + ε_t
ar_results = calc_regression(
    y=ar_df["LVL"],
    X=ar_df[["LVL_lag"]],
    intercept=True,
    annual_factor=252,   # annual_factor choice does NOT affect ρ
    warnings=False
)

rho = ar_results.loc["LVL", "LVL_lag Beta"]
print("ρ =", round(rho, 6))

ρ = 0.017041


The estimate $\rho = 0.017041$ is very close to zero, meaning the LVL factor’s current return is almost uncorrelated with its past return. This implies the LVL factor does not exhibit meaningful momentum — its returns are effectively serially independent over time.

# 2. Single Factor Model

### 2.1 LVL Factor

We want to test the hypothesis that:

$$
\mathbb{E}[r_{i}] = \beta_{i,LVL} \cdot \mathbb{E}[r_{LVL}]
$$

Regress each commodity's returns against the LVL factor, and report the mean absolute alpha, $\bar{\alpha}$ and $r^2$ across all commodities. 

Annualize the alpha.

Output *exactly* the following two numbers rounded to 6 decimal places:
- Mean Absolute Annualized Alpha across all commodities
- Mean R-squared across all commodities

In [17]:
# 1. Run time-series regressions of each commodity on LVL
lvl_regressions = calc_iterative_regression(
    multiple_y=returns,            # each commodity return
    X=factors[["LVL"]],            # single factor
    annual_factor=252,             # daily data
    intercept=True,                # include alpha
    warnings=False
)

# 2. Compute the two required summary metrics
mean_abs_annualized_alpha = lvl_regressions["Annualized Alpha"].abs().mean()
mean_r2 = lvl_regressions["R-Squared"].mean()

# 3. Round and print with exactly 6 decimals
print("annualized alpha", round(mean_abs_annualized_alpha, 6))
print("mean r2", round(mean_r2, 6))

annualized alpha 0.034876
mean r2 0.25989


### 2.2. Interpretation

If our hypothesis were true, what would you expect the values of $\bar{\alpha}$ and $\bar{r^2}$ to be?

If the hypothesis
$\mathbb{E}[r_i] = \beta_{i,LVL} \cdot \mathbb{E}[r_{LVL}]$
were true, every asset’s return would be fully explained by its exposure to the LVL factor.
In that case, we would expect mean absolute alpha ($\bar{\alpha}$) to be zero and mean $R^2$ to be one, since there would be no unexplained return variation and all variation would be captured by the factor.

### 2.3 Cross-Sectional Test

Let's test the one-factor `LVL` model directly. From `2.1`, we already have what we need:

- The dependent variable, (y): mean excess returns from each of the commodities.
- The regressor, (x): the market beta for each commodity from the time-series regressions.

Then we can estimate the following equation:

$$
\underbrace{\mathbb{E}\left[\tilde{r}^{i}\right]}_{n\times 1\text{ data}} = \textcolor{ForestGreen}{\underbrace{\eta}_{\text{regression intercept}}} + \underbrace{{\beta}^{i,\text{LVL}};}_{n\times 1\text{ data}}~ \textcolor{ForestGreen}{\underbrace{\lambda_{\text{LVL}}}_{\text{regression estimate}}} + \textcolor{ForestGreen}{\underbrace{\upsilon}_{n\times 1\text{ residuals}}}
$$

Report exactly the following 3 numbers (rounded to at least 6 decimal places):
* The R-squared of this regression
* The intercept estimate, $\hat{\eta}$
* The regression coefficient $\lambda_{LVL}$

In [33]:
# 1. Get each commodity’s mean excess return (dependent variable)
mean_excess_returns = returns.mean().to_frame("Mean Excess Return")

# 2. Extract the LVL betas from your previous time-series regressions
lvl_betas = lvl_regressions[["LVL Beta"]]

# 3. Run the cross-sectional regression:
cross_section_lvl = calc_regression(
    y=mean_excess_returns,
    X=lvl_betas,
    intercept=True,
    annual_factor=252,   # consistent with daily data
    warnings=False
)

r2 = cross_section_lvl.loc["Mean Excess Return", "R-Squared"]
eta = cross_section_lvl.loc["Mean Excess Return", "Alpha"]
lambda_lvl = cross_section_lvl.loc["Mean Excess Return", "LVL Beta Beta"]

# Nicely formatted output
print(f"R^2        = {r2:.6f}")
print(f"eta        = {eta:.6f}")
print(f"lambda_LVL = {lambda_lvl:.6f}")

R^2        = 0.093357
eta        = 0.000163
lambda_LVL = 0.000092


### 2.4.

Does your time-series or cross-sectional estimate give a higher premium to the `LVL` factor?

The time-series estimate from Section 1.1 gives an annualized mean (risk premium) for the LVL factor of 0.064439.
If your cross-sectional regression from 2.3 produced a smaller $\hat{\lambda}_{LVL}$, this means the time-series estimate assigns a higher premium to the LVL factor. In other words, the factor itself earns about 6.4 % per year, while the cross-sectional pricing across commodities attributes a lower return per unit of LVL exposure—so the time-series result implies a stronger reward for LVL risk.

# 3. Trading the Model

#### 3.1 Beta Estimation

For each commodity, report their `LVL` beta rounded to at least 6 decimal places. 

Display a table of these betas, sorted from lowest to highest.

#### Remember
We estimated the betas in the time-series regression in `2.1`.

#### Hint
Use `df.sort_values(by=<YOUR_DF>, ascending=True)` to sort your results.

In [20]:
# 1. Extract LVL betas from the time-series regressions
lvl_betas_sorted = lvl_regressions[["LVL Beta"]].copy()

# 2. Round to six decimal places
lvl_betas_sorted = lvl_betas_sorted.round(6)

# 3. Sort from lowest to highest beta
lvl_betas_sorted = lvl_betas_sorted.sort_values(by="LVL Beta", ascending=True)

# 4. Display the result
print(lvl_betas_sorted)

    LVL Beta
LE  0.243876
GC  0.441604
ZM  0.698967
SB  0.742221
ZS  0.770218
ZC  0.823626
PL  0.834771
ZL  0.850391
SI  1.038445
NG  1.407854
HO  1.533688
RB  1.695943
CL  1.918395


#### 3.2 Portfolio Formation

Regardless of your answer to `3.1`, allocate your portfolio as follows:

- Go long `GC`, `LE`, and `ZM`
- Go short `CL`, `HO`, and `RB`

Go long `1` and short `0.25`.

That is, your portfolio should be:
$$
r_{port} =  1 \cdot \left(
    \frac{1}{3} \cdot r_{GC} + \frac{1}{3} \cdot r_{LE} + \frac{1}{3} \cdot r_{ZM}
\right) - 0.25 \cdot \left(\frac{1}{3} \cdot r_{CL} + \frac{1}{3} \cdot r_{HO} + \frac{1}{3} \cdot r_{RB}\right)
$$
Report the last 5 daily returns of your betting against beta portfolio, rounded to at least 6 decimal places.

In [21]:
# 1. Define long and short commodity tickers
long_assets = ["GC", "LE", "ZM"]
short_assets = ["CL", "HO", "RB"]

# 2. Compute the long and short legs
long_leg = returns[long_assets].mean(axis=1)         # equal-weight long basket
short_leg = returns[short_assets].mean(axis=1)       # equal-weight short basket

# 3. Form the betting-against-beta portfolio
r_port = 1.0 * long_leg - 0.25 * short_leg

# 4. Round to 6 decimal places and show last 5 daily returns
r_port_rounded = r_port.round(6)
print(r_port_rounded.tail(5))

Date
2025-10-27   -0.012593
2025-10-28    0.007415
2025-10-29    0.005726
2025-10-30    0.011900
2025-10-31    0.007463
dtype: float64


#### 3.3 Performance Evaluation

For your portfolio, report the following performance statistics (rounded to at least 6 decimal places):

- Annualized Return
- Annualized Volatility
- Annualized Sharpe Ratio

In [22]:
# 1. Evaluate performance of the betting-against-beta portfolio
portfolio_stats = calc_summary_statistics(
    returns=r_port.to_frame("BAB_Portfolio"),  # convert to DataFrame
    annual_factor=252,                         # daily data
    provided_excess_returns=True,              # already excess returns
    correlations=False,
    return_tangency_weights=False
)

# 2. Select the requested metrics
performance = portfolio_stats[[
    "Annualized Mean",
    "Annualized Vol",
    "Annualized Sharpe"
]].round(6)

print(performance)

               Annualized Mean  Annualized Vol  Annualized Sharpe
BAB_Portfolio         0.053383        0.144502           0.369425


#### 3.4 

For your portfolio, test the hypothesis that its premium can be explained by the `LVL` factor alone:

$$
\mathbb{E}[r_{port}] = \beta_{port,LVL} \cdot \mathbb{E}[r_{LVL}]
$$

Report exactly the following 3 numbers (rounded to at least 6 decimal places):
- Annualized Alpha
- `LVL` Beta
- R-squared for the regression

In [24]:
# 1. Run regression of portfolio returns on LVL factor
bab_lvl_reg = calc_regression(
    y=r_port.to_frame("BAB_Portfolio"),
    X=factors[["LVL"]],
    intercept=True,
    annual_factor=252,
    warnings=False
)

# 2. Extract the three required numbers
annualized_alpha = bab_lvl_reg.loc["BAB_Portfolio", "Annualized Alpha"]
lvl_beta = bab_lvl_reg.loc["BAB_Portfolio", "LVL Beta"]
r2 = bab_lvl_reg.loc["BAB_Portfolio", "R-Squared"]

# 3. Print rounded to six decimals
print("alpha", round(annualized_alpha, 6))
print("beta", round(lvl_beta, 6))
print("r2", round(r2, 6))

alpha 0.05129
beta 0.03248
r2 0.001208


# 4. Multi-Factor Model

We now want to test a multi-factor model using `LVL` and `HMS` and `IMO` as factors:

$$
\mathbb{E}[r_{i}] = \beta_{i,WTI} \cdot \mathbb{E}[r_{LVL}] + \beta_{i,HMS} \cdot \mathbb{E}[r_{HMS}] + \beta_{i,IMO} \cdot \mathbb{E}[r_{IMO}]
$$

#### 4.1 Time Series Test
Estimate the *time series* test of this pricing model. Regress each commodity's returns against the three factors, and report the following for each commodity (rounded to at least 6 decimal places):
- Annualized Alpha
- `LVL`, `HMS`, and `IMO` Betas
- R-squared

In [25]:
# 1. Run time-series regressions of each commodity on LVL, HMS, and IMO
multi_regressions = calc_iterative_regression(
    multiple_y=returns,                    # each commodity's returns
    X=factors[["LVL", "HMS", "IMO"]],     # all three factors
    annual_factor=252,                     # daily data
    intercept=True,                        # include alpha
    warnings=False
)

# 2. Select and round the required columns
multi_factor_results = multi_regressions[[
    "Annualized Alpha",
    "LVL Beta",
    "HMS Beta",
    "IMO Beta",
    "R-Squared"
]].round(6)

# 3. Display the table
print(multi_factor_results)

    Annualized Alpha  LVL Beta  HMS Beta  IMO Beta  R-Squared
CL         -0.035599  1.527030  0.656709  0.653682   0.637664
GC          0.071595  0.373286  0.123614 -0.393933   0.348973
HO         -0.023058  1.205122  0.543109  1.014183   0.759600
LE          0.059421  0.328136 -0.148049  0.236226   0.119500
NG          0.080357  1.035345  0.662592 -1.501411   0.373050
PL         -0.009842  0.711887  0.217593 -0.439602   0.364752
RB         -0.015995  1.243184  0.749484  1.335847   0.786582
SB         -0.053905  1.065017 -0.542152 -0.510662   0.321713
SI          0.056488  0.901595  0.246072 -0.701687   0.434255
ZC         -0.031637  1.246405 -0.717257 -0.262703   0.514722
ZL         -0.026659  1.055894 -0.356494  0.316702   0.436060
ZM         -0.028435  1.159886 -0.789761  0.154944   0.511819
ZS         -0.042733  1.147212 -0.645459  0.098414   0.707169


#### 4.2 

Report the annualized Sharpe ratio (rounded to at least 6 decimal places) of the tangency portfolio formed from 
* the individual commodities.
* the three factors

What should be true of the Sharpe ratios if the factor pricing model is accurate?

In [27]:
# ----- Tangency from INDIVIDUAL COMMODITIES -----
# Tangency portfolio using all commodity returns as assets
tan_port_commodities = calc_tangency_weights(
    returns=returns,
    annual_factor=252,
    cov_mat=1,
    name="Commodities Tangency",
    return_port_ret=True
)

tan_comm_stats = calc_summary_statistics(
    returns=tan_port_commodities,
    annual_factor=252,
    provided_excess_returns=True,
    correlations=False,
    return_tangency_weights=False
)

comm_sharpe = tan_comm_stats.loc["Commodities Tangency Portfolio", "Annualized Sharpe"]


# ----- Tangency from the THREE FACTORS -----
tan_port_factors = calc_tangency_weights(
    returns=factors[["LVL", "HMS", "IMO"]],
    annual_factor=252,
    cov_mat=1,
    name="Factors Tangency",
    return_port_ret=True
)

tan_fact_stats = calc_summary_statistics(
    returns=tan_port_factors,
    annual_factor=252,
    provided_excess_returns=True,
    correlations=False,
    return_tangency_weights=False
)

fact_sharpe = tan_fact_stats.loc["Factors Tangency Portfolio", "Annualized Sharpe"]


# ----- Print the two Sharpe ratios -----
print("indiv", round(comm_sharpe, 6))
print("factors", round(fact_sharpe, 6))

indiv 0.852239
factors 0.440601


 - If the LVL–HMS–IMO factor pricing model is correct and the factors span all priced risk in the commodities:
 - The tangency Sharpe ratio using all commodities should be equal to the tangency Sharpe ratio using only the factors (up to sampling noise).


The fact that the commodity-level tangency Sharpe is almost twice as high implies the three factors miss additional priced sources of risk or return, so the factor model is incomplete.

#### 4.3 Cross-Sectional Test

Run the cross-sectional test of this multi-factor model:

$$
\mathbb{E}\left[\tilde{r}^{i}\right] = \lambda_{0} + \lambda_{LVL} \cdot \beta_{i,LVL} + \lambda_{HMS} \cdot \beta_{i,HMS} + \lambda_{IMO} \cdot \beta_{i,IMO} + \nu_{i}
$$

Report exactly the following numbers (rounded to at least 6 decimal places):
- $\lambda_{0}$
- $\lambda_{LVL}$
- $\lambda_{HMS}$
- $\lambda_{IMO}$
- $r^2$ of the cross-sectional regression
- MAE of the cross-sectional regression

Annualize the MAE.

In [29]:
# 1. Compute the dependent variable: mean excess return for each commodity
mean_excess_returns = returns.mean().to_frame("Mean Excess Return")

# 2. Get the betas from the multi-factor time-series regressions (4.1)
multi_betas = multi_regressions[["LVL Beta", "HMS Beta", "IMO Beta"]]

# 3. Run the cross-sectional regression
multi_cross_section = calc_regression(
    y=mean_excess_returns,
    X=multi_betas,
    intercept=True,
    annual_factor=252,   # daily data (doesn't affect cross-sectional R², etc.)
    warnings=False
)

# 4. Extract the required statistics
lambda_0   = multi_cross_section.loc["Mean Excess Return", "Alpha"]
lambda_lvl = multi_cross_section.loc["Mean Excess Return", "LVL Beta Beta"]
lambda_hms = multi_cross_section.loc["Mean Excess Return", "HMS Beta Beta"]
lambda_imo = multi_cross_section.loc["Mean Excess Return", "IMO Beta Beta"]
r2         = multi_cross_section.loc["Mean Excess Return", "R-Squared"]

# 5. Compute annualized mean absolute error (MAE)
residuals = mean_excess_returns["Mean Excess Return"] - (
    lambda_0 +
    multi_betas["LVL Beta"] * lambda_lvl +
    multi_betas["HMS Beta"] * lambda_hms +
    multi_betas["IMO Beta"] * lambda_imo
)
mae = residuals.abs().mean() * 252  # annualize

# 6. Print labeled results (rounded to 6 decimals)
print(f"lambda_0   = {lambda_0:.6f}")
print(f"lambda_LVL = {lambda_lvl:.6f}")
print(f"lambda_HMS = {lambda_hms:.6f}")
print(f"lambda_IMO = {lambda_imo:.6f}")
print(f"R^2        = {r2:.6f}")
print(f"MAE_ann    = {mae:.6f}")

lambda_0   = 0.000321
lambda_LVL = -0.000065
lambda_HMS = 0.000200
lambda_IMO = -0.000067
R^2        = 0.646543
MAE_ann    = 0.015664


#### 4.4 Interpretation

Do the results of the cross-sectional test support the multi-factor model?

Yes — your results do support the multi-factor model.

The high $R^2 = 0.646543$ means that the three factors (LVL, HMS, IMO) explain a substantial portion of the variation in mean returns across commodities. The small intercept ($\lambda_0 = 0.000321$) and modest annualized pricing error ($\text{MAE} = 0.015664$) indicate that the model leaves little systematic mispricing. Although the individual factor premia are small, the overall fit suggests that the multi-factor specification captures the main drivers of commodity returns reasonably well.

#### 4.5 Risk Premia

Compare the risk premia ($\lambda$'s) from the cross-sectional test to the average returns of each factor. Report exactly the following table (rounded to at least 6 decimal places):

- Average return of each factor
- Risk premia from the cross-sectional test

Annualize the estimates.

In [30]:
# 1. Annualized average returns of each factor (from factors DataFrame)
avg_factor_returns = (factors.mean() * 252).to_frame("Avg Annualized Return")

# 2. Add the cross-sectional risk premia (λ’s) from 4.3
cross_section_premia = pd.DataFrame({
    "Risk Premium (λ)": {
        "LVL": lambda_lvl,
        "HMS": lambda_hms,
        "IMO": lambda_imo
    }
})

# 3. Combine both into one table and round to 6 decimals
risk_premia_comparison = avg_factor_returns.join(cross_section_premia).round(6)

print(risk_premia_comparison)

     Avg Annualized Return  Risk Premium (λ)
LVL               0.064439         -0.000065
HMS               0.008842          0.000200
IMO               0.007281         -0.000067


#### 4.6 Interpretation

What do you observe from the comparison of average returns and risk premia? 

Theoretically, what could cause the risk premium of a factor to deviate from its average return?

The comparison shows that the estimated risk premia are much smaller than the factors’ average returns, implying that most of the factors’ historical performance is not explained by priced risk. This suggests that part of their average return may reflect noise, sample-specific shocks, or non-systematic effects. Theoretically, risk premia deviate from average returns when factors contain idiosyncratic or transient components that are not truly priced in equilibrium. Sampling error, omitted factors, or structural changes in commodity markets can also drive a wedge between realized mean returns and true underlying risk compensation.