# Tasks Test 

In [138]:
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt 
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px
from sklearn.linear_model import LinearRegression
import task1 as t1 
from scipy.stats import spearmanr

In [14]:

data_root_min = '/Users/huayuzhu/Desktop/exam/raw_data/minute'
data_root = '/Users/huayuzhu/Desktop/exam/raw_data/daily'
output_dir = '/Users/huayuzhu/Desktop/exam/'
def get_min_data_from_csv(file_path):
    """
    Reads a CSV file into a DataFrame, with the first column as row indices.

    Parameters
    ----------
    file_path : str
        The file path of the CSV file to be read.

    Returns
    -------
    pd.DataFrame
        Transposed DataFrame with datetime as index.
    """
    try:
        data = pd.read_csv(file_path, index_col=0) 
        data.index = pd.to_datetime(data.index)
    except FileNotFoundError:
        print(f"File '{file_path}' not found.")
    except Exception as e:
        print(f"An error occurred: {e}")
    return data
def get_data_from_csv(file_path):
    """
    Reads a CSV file into a DataFrame, with the first column as row indices, 
    transposes the result, and converts the row indices to datetime.

    Parameters
    ----------
    file_path : str
        The file path of the CSV file to be read.

    Returns
    -------
    pd.DataFrame
        Transposed DataFrame with datetime as index.
    """
    try:
        data = pd.read_csv(file_path, index_col=0).T
        data.index = pd.to_datetime(data.index)
    except FileNotFoundError:
        print(f"File '{file_path}' not found.")
    except Exception as e:
        print(f"An error occurred: {e}")
    return data
amount = get_min_data_from_csv(f'{data_root_min}/amount.csv')
volume = get_min_data_from_csv(f'{data_root_min}/volume.csv')
close  = get_min_data_from_csv(f'{data_root_min}/close.csv')
open  = get_min_data_from_csv(f'{data_root_min}/open.csv')

S_DQ_RET = get_data_from_csv(f'{data_root}/S_DQ_RET.csv')
S_905_DQ_RET =  get_data_from_csv(f'{data_root}/905S_DQ_RET.csv')
S_DQ_MV = get_data_from_csv(f'{data_root}/S_DQ_MV.csv')
S_RESTRICT = get_data_from_csv(f'{data_root}/S_RESTRICT.csv')
S_DQ_OPEN = get_data_from_csv(f'{data_root}/S_DQ_OPEN.csv')
S_DQ_ADJ_FACTOR = get_data_from_csv(f'{data_root}/S_DQ_ADJFACTOR.csv')
S_DQ_CLOSE = get_data_from_csv(f'{data_root}/S_DQ_CLOSE.csv')
S_DQ_VOLUME = get_data_from_csv(f'{data_root}/S_DQ_VOLUME.csv')


In [12]:
S_DQ_ADJ_FACTOR = S_DQ_ADJ_FACTOR/100

In [13]:
S_ADJ_CLOSE = S_DQ_CLOSE * S_DQ_ADJ_FACTOR

In [15]:
S_ADJ_OPEN = S_DQ_OPEN * S_DQ_ADJ_FACTOR

In [91]:
def calculate_factor(df_adj_close, df_adj_open, df_volume, df_market_value, N):
    """
    Calculates the mean overnight return for the days where the past N days' turnover rates
    are in the top and bottom 20%.

    Parameters
    ----------
    df_adj_close : DataFrame
        Adjusted closing prices with dates as index and tickers as columns.
    df_adj_open : DataFrame
        Adjusted opening prices with dates as index and tickers as columns.
    df_volume : DataFrame
        Trading volume with dates as index and tickers as columns.
    df_market_value : DataFrame
        Market value with dates as index and tickers as columns.
    N : int
        The number of days to look back for turnover rates.

    Returns
    -------
    pd.DataFrame
        A DataFrame with the calculated mean overnight returns.
    """
    overnight_returns = df_adj_open / df_adj_close.shift(1) - 1
    turnover_rates = df_volume / df_market_value
    
    df = pd.concat([overnight_returns,turnover_rates], keys = ['OverNight','TurnOver'],axis = 1)
    def calc_within_window(window_df):
        turnover_window = window_df.loc[:, 'TurnOver']
        overnight_window = window_df.loc[:, 'OverNight']
        ranked_turnover = turnover_window.rank(pct=True)
        is_top_20 = ranked_turnover >= 0.8
        is_bottom_20 = ranked_turnover <= 0.2
        mean_over_night = overnight_window[is_top_20 | is_bottom_20].mean()
        
        return mean_over_night
    df_factor = pd.DataFrame(index = df.index[20:], columns = overnight_returns.columns)
    # this is not applied for the large dataset, but due to time limit, I used for loop here for simplicity 
    for i in range(len(df_factor)-1): 
        df_factor.iloc[i] = calc_within_window(df[i:20+i])
    return df_factor 


In [86]:
factor_values = calculate_factor(S_ADJ_CLOSE, S_ADJ_OPEN,S_DQ_VOLUME , S_DQ_MV, N=20)

In [87]:
factor_values

Unnamed: 0,1,2,3,4,5,6,7,8,9,10,...,873593,873665,873679,873690,873693,873703,873726,873749,873806,873833
2015-02-02,97.433688,97.559973,,98.910844,,98.280437,98.671057,99.5095,99.534661,,...,,,,,,,,,,
2015-02-03,97.220458,97.694433,,98.774616,,98.245672,98.671057,99.5095,99.441482,,...,,,,,,,,,,
2015-02-04,97.650309,97.694433,,98.796041,,98.371221,98.686205,99.693501,99.580154,,...,,,,,,,,,,
2015-02-05,97.666331,97.694433,,98.949448,,98.405889,98.686205,99.693501,98.740599,,...,,,,,,,,,,
2015-02-06,98.29922,98.232129,,98.844627,,98.405889,98.678285,99.693501,98.740599,,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2022-12-26,99.759484,99.477524,,99.252325,99.258936,101.014385,99.014257,99.228328,98.758149,99.142896,...,,,,,,,,,,
2022-12-27,99.84363,99.769678,,99.263864,99.258936,101.108637,99.014257,99.275409,98.842166,99.371442,...,,,,,,,,,,
2022-12-28,99.54827,99.110717,,99.056008,99.258936,101.108637,99.026579,99.089453,98.842166,99.224857,...,,,,,,,,,,
2022-12-29,99.561745,98.951162,,99.029306,99.258936,100.384743,99.073105,99.089453,98.805486,99.371317,...,,,,,,,,,,


In [203]:
factor_values

Unnamed: 0,1,2,3,4,5,6,7,8,9,10,...,873593,873665,873679,873690,873693,873703,873726,873749,873806,873833
2015-02-02,97.433688,97.559973,,98.910844,,98.280437,98.671057,99.5095,99.534661,,...,,,,,,,,,,
2015-02-03,97.220458,97.694433,,98.774616,,98.245672,98.671057,99.5095,99.441482,,...,,,,,,,,,,
2015-02-04,97.650309,97.694433,,98.796041,,98.371221,98.686205,99.693501,99.580154,,...,,,,,,,,,,
2015-02-05,97.666331,97.694433,,98.949448,,98.405889,98.686205,99.693501,98.740599,,...,,,,,,,,,,
2015-02-06,98.29922,98.232129,,98.844627,,98.405889,98.678285,99.693501,98.740599,,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2022-12-26,99.759484,99.477524,,99.252325,99.258936,101.014385,99.014257,99.228328,98.758149,99.142896,...,,,,,,,,,,
2022-12-27,99.84363,99.769678,,99.263864,99.258936,101.108637,99.014257,99.275409,98.842166,99.371442,...,,,,,,,,,,
2022-12-28,99.54827,99.110717,,99.056008,99.258936,101.108637,99.026579,99.089453,98.842166,99.224857,...,,,,,,,,,,
2022-12-29,99.561745,98.951162,,99.029306,99.258936,100.384743,99.073105,99.089453,98.805486,99.371317,...,,,,,,,,,,


In [204]:
test1 = t1.TestInfo()
factor1 = t1.OneFactorTest(factor_values)
factor1.compare_with_benchmark(test1,'Factor1',False)



ValueError: Lengths must match to compare

In [109]:
class TestInfo: 
    def __init__(self, group_number = 10, trading_frequency = 'W', initial_capital = None, trading_cost =0.0):
        """
        Parameters:
        - group_number(int): Number of groups to divide into (default 10 for deciles).
        - trading_frequency(str): "M" (monthly), "W" (weekly) (default to weekly)
        - initial_capital(float): initial capital plan to invest, default to the capital needed at first period trading 
        - trading cost(float): trading cost percentage, default to 0 

        """
        self.group_number = group_number 
        self.trading_frequency = trading_frequency
        self.initial_capital = initial_capital 
        self.trading_cost = trading_cost 

class OneFactorTest: 
    """
    A class to conduct single factor testing for stock trading strategies. It ranks stocks, handles trade intervals,
    executes trades, and evaluates performance metrics against a benchmark.

    Attributes
    ----------
    factor : DataFrame
        Factor data used to rank stocks.
    stock_price_open : DataFrame
        Stock opening prices.
    restricted_stock_df : DataFrame
        Information on stocks that are restricted from trading.
    benchmark : DataFrame
        Benchmark data for comparison.

    Methods
    -------
    rank_stock(test_info):
        Ranks stocks based on the provided factor values.

    trade_interval(df, test_info):
        Resamples the given dataframe to the specified trading frequency and forward fills missing data.

    trade(test_info):
        Executes trades based on stock rankings and calculates performance.

    evaluation_metrics(performance, adj1=48, var=0.05):
        Calculates various performance metrics like attribution analysis and risk analysis metrics.

    eval_combined(test_info):
        Evaluates and combines performance metrics for top and bottom ranked stocks.

    compare_with_benchmark(test_info, name, output=False):
        Compares the strategy's performance with a benchmark, returns a dataframe of excess returns.

    plot_comparison_with_benchmark(test_info, name):
        Generates a plot comparing the daily and cumulative excess returns against a benchmark.
    """

    def __init__(self,test_factor):
        min_date = max(test_factor.index.min(), S_DQ_OPEN.index.min())
        max_date = min(test_factor.index.max(), S_DQ_OPEN.index.max())

        self.factor = test_factor.loc[min_date:max_date]
        self.stock_price_open = S_DQ_OPEN.loc[min_date:max_date]
        self.restricted_stock_df = S_RESTRICT
        self.benchmark = S_905_DQ_RET.loc[min_date:max_date]
    
    def rank_stock(self,test_info): 
        factor_value = self.factor.copy()
        percentile = factor_value.rank(axis=1, pct=True)
        group_number = test_info.group_number - ((1 - percentile) * test_info.group_number) // 1
        return group_number
    
    def trade_interval(self,df,test_info):
        df_factor = df.resample(test_info.trading_frequency).first()   
        df_factor = df_factor.ffill() 
        return df_factor

    def trade(self,test_info): 
        df = self.rank_stock(test_info)
        df_price = self.stock_price_open

        # decide initial capital to trade 
        if test_info.initial_capital is not None: 
            initial_k = test_info.initial_capital
        else:
            p_1_top = df.iloc[0] == 1.0
            p_1_bottom = df.iloc[0] == 10.0
            p_1_top_capital = df_price.iloc[0][p_1_top].sum()
            p_1_bottom_capital = df_price.iloc[0][p_1_bottom].sum()
            initial_k = 1000 * (p_1_bottom_capital + p_1_top_capital)
        side_k = initial_k / 2
        top_pnl_tracker = {'total_pnl': [0], 'cumulative_pnl': [0]}
        top_return_tracker = {'return': [0], 'net_return': [0]}
        bottom_pnl_tracker = {'total_pnl': [0], 'cumulative_pnl': [0]}
        bottom_return_tracker = {'net_return': [0]}
        
        factor_resample = self.trade_interval(df,test_info)
        price_resample = self.trade_interval(df_price, test_info)
        #min = np.max(factor_resample.index.min(),price_resample.index.min())
        #max = np.min(factor_resample.index.max(),price_resample.index.max())
        #factor_resample = factor_resample.loc[min:max]
        #price_resample = price_resample.loc[min:max]


        top_r = factor_resample.copy() 
        bottom_r = factor_resample.copy() 
        
        if (factor_resample.index != price_resample.index).any():
            print('please debug')
        for i, (date, signals) in enumerate(factor_resample.iterrows()):
            # here equal weighted portfolio strategy is used, Markovitz, or CAPM optimization may give better result  
            top = signals == 1.0
            bottom = signals == 10.0
            top_k = price_resample.loc[date][top].sum()
            bottom_k = price_resample.loc[date][bottom].sum()
            top_ratio = side_k / top_k if top_k else 0
            bottom_ratio = side_k / bottom_k if bottom_k else 0
            top_r.loc[date] = signals.apply(lambda x: top_ratio if x == 1 else 0)
            bottom_r.loc[date] = signals.apply(lambda x: bottom_ratio if x == 1 else 0)

            # profit calculation 
            if i > 0: 
                price_change = price_resample.iloc[i] - price_resample.iloc[i-1]
                
                top_pnl = (top_r.iloc[i-1] * price_change).sum()
                bottom_pnl = (bottom_r.iloc[i-1] * price_change).sum()
        
                top_cumulative_pnl = top_pnl_tracker['cumulative_pnl'][i-1] + top_pnl
                bottom_cumulative_pnl = bottom_pnl_tracker['cumulative_pnl'][i-1] + bottom_pnl

                top_pnl_tracker['total_pnl'].append(top_pnl)
                top_pnl_tracker['cumulative_pnl'].append(top_cumulative_pnl)

                bottom_pnl_tracker['total_pnl'].append(bottom_pnl)
                bottom_pnl_tracker['cumulative_pnl'].append(bottom_cumulative_pnl)

                top_return_tracker['net_return'].append(top_pnl/side_k - test_info.trading_cost)
                bottom_return_tracker['net_return'].append(bottom_pnl/side_k - test_info.trading_cost)
        df_top_pnl_tracker = pd.DataFrame(top_pnl_tracker, index=factor_resample.index)
        df_top_return_tracker = pd.DataFrame(top_return_tracker, index=factor_resample.index)
        df_top_performance = pd.concat([df_top_pnl_tracker, df_top_return_tracker], axis=1)
        df_top_performance['cumulative_return'] = ((df_top_performance['net_return'] + 1).cumprod() -1)/100

        df_bottom_pnl_tracker = pd.DataFrame(bottom_pnl_tracker, index=factor_resample.index)
        df_bottom_return_tracker = pd.DataFrame(bottom_return_tracker, index=factor_resample.index)
        df_bottom_performance = pd.concat([df_bottom_pnl_tracker, df_bottom_return_tracker], axis=1)
        df_bottom_performance['cumulative_return'] = ((df_bottom_performance['net_return'] + 1).cumprod() -1)/100

        return df_top_performance, df_bottom_performance
    
    def evaluation_mectrics(self,performance,adj1 = 48,var=0.05): 
        summary = dict()
        data = performance.loc[:,['net_return']].dropna()
        summary["Annualized Return"] = data.mean() * adj1
        summary["Annualized Volatility"] = data.std() * np.sqrt(adj1)
        summary["Annualized Sharpe Ratio"] = (
            summary["Annualized Return"] / summary["Annualized Volatility"]
        )
        summary["Annualized Sortino Ratio"] = summary["Annualized Return"] / (
            data[data < 0].std() * np.sqrt(adj1)
        )

        summary["Skewness"] = data.skew()
        summary["Excess Kurtosis"] = data.kurtosis()
        summary[f"VaR ({var})"] = data.quantile(var, axis=0)
        summary[f"CVaR ({var})"] = data[data <= data.quantile(var, axis=0)].mean()
        summary["Min"] = data.min()
        summary["Max"] = data.max()

        wealth_index = 1000 * (1 + data).cumprod()
        previous_peaks = wealth_index.cummax()
        drawdowns = (wealth_index - previous_peaks) / previous_peaks

        summary["Max Drawdown"] = drawdowns.min()

        summary["Bottom"] = drawdowns.idxmin()
        summary["Peak"] = previous_peaks.idxmax()
        return pd.DataFrame(summary) 
    
    def eval_combined(self,test_info): 
        per_top, per_bottom = self.trade(test_info)
        top = self.evaluation_mectrics('top',per_top,adj1 = 48,var=0.05)
        bottom = self.evaluation_mectrics('bottom',per_bottom,adj1 = 48,var=0.05)
        return pd.concat([top,bottom], axis=0,keys=['top', 'bottom'])
    
    def compare_with_benchmark(self,test_info,name,output = False):
        per_top, per_bottom = self.trade(test_info)
        ret_top = per_top.loc[:,['net_return']].dropna()
        ret_bottom = per_bottom.loc[:,['net_return']].dropna()
 

        daily_resampled_top = ret_top.resample('D').ffill().div(5)
        daily_resampled_bottom = ret_bottom.resample('D').ffill().div(5)
        daily_resampled_top = daily_resampled_top.reindex(self.benchmark.index, method='ffill')  
        daily_resampled_bottom = daily_resampled_bottom.reindex(self.benchmark.index, method='ffill') 

        daily_resampled_bottom.fillna(0, inplace= True)
        daily_resampled_top.fillna(0, inplace = True)

        if (self.benchmark.index != daily_resampled_bottom.index).any(): 
            print('please debug')
        summary = pd.DataFrame(index = self.benchmark.index) 
        summary["Daily excess_return TOP"] = (daily_resampled_top['net_return'] - self.benchmark[905])
        summary["Cumulative excess_return TOP"] = (1 + summary['Daily excess_return TOP']).cumprod() - 1
        summary["Daily excess_return BOTTOM"] = (daily_resampled_top['net_return'] - self.benchmark[905])
        summary["Cumulative excess_return BOTTOM"] = (1 + summary['Daily excess_return BOTTOM']).cumprod() - 1

        if output:
            summary.to_csv(f'{output_dir}/{name}_compairson_ret.csv')

        return summary
    
    def plot_comparison_with_benchmark(self, test_info,name):
        df = self.compare_with_benchmark(test_info,name)
        fig = make_subplots(specs=[[{"secondary_y": True}]])

        fig.add_trace(
            go.Scatter(x=df.index, y=df['Daily excess_return TOP'], name='Top Daily'),
            secondary_y=False,
        )

        fig.add_trace(
            go.Scatter(x=df.index, y=df['Daily excess_return BOTTOM'], name='Bottom Daily'),
            secondary_y=False,
        )


        fig.add_trace(
            go.Scatter(x=df.index, y=df['Cumulative excess_return TOP'], name='Top Cumulative'),
            secondary_y=True,
        )

        fig.add_trace(
            go.Scatter(x=df.index, y=df['Cumulative excess_return BOTTOM'], name='Bottom Cumulative'),
            secondary_y=True,
        )
       
        fig.update_layout(
            title_text=name+"Comparison of Daily and Cumulative Excess Returns",
            plot_bgcolor='white',  
            xaxis_showgrid=False,  
            yaxis_showgrid=False,  
            yaxis2_showgrid=False,  
            autosize=True,  
            template='plotly_white',
            colorway=px.colors.qualitative.Vivid
        )


In [112]:
test1 = TestInfo()
factor1 = OneFactorTest(factor_values)
factor1.compare_with_benchmark(test1,'Factor1',False)

Unnamed: 0,Daily excess_return TOP,Cumulative excess_return TOP,Daily excess_return BOTTOM,Cumulative excess_return BOTTOM
2015-02-02,-0.010589,-0.010589,-0.010589,-0.010589
2015-02-03,-0.016629,-0.027042,-0.016629,-0.027042
2015-02-04,-0.003534,-0.030480,-0.003534,-0.030480
2015-02-05,0.017621,-0.013396,0.017621,-0.013396
2015-02-06,0.020192,0.006525,0.020192,0.006525
...,...,...,...,...
2022-12-26,-0.022332,-0.848989,-0.022332,-0.848989
2022-12-27,-0.008957,-0.850342,-0.008957,-0.850342
2022-12-28,-0.002613,-0.850733,-0.002613,-0.850733
2022-12-29,-0.012321,-0.852572,-0.012321,-0.852572


In [200]:
def calc_rankIC(factor, price_df, N): 
    ret = (price_df.shift(-N)/price_df - 1)[:-5]
    factor = factor[:-5]
    rankIC = pd.DataFrame(index=factor.resample('M').mean().index)

    for ticker in factor.columns:
        combined = pd.concat([factor[ticker], ret[ticker]], axis=1, keys=['factor', 'returns']) 
        combined.dropna(inplace=True)
        if combined.empty:
             rankIC[ticker] = pd.Series(pd.NA, index=rankIC.index) 
        else:
            monthly_corr = combined.resample('M').apply(lambda x: x['factor'].corr(x['returns'], method='spearman'))
            rankIC[ticker] = monthly_corr 
    return rankIC

def calc_ICIR(factor, price_df, N):
    rankIC = calc_rankIC(factor, price_df, N)
    ICIR = rankIC.mean(axis = 0)/ rankIC.std(axis =0)
    ICIR = ICIR.to_frame()
    return ICIR.T 




In [201]:
calc_ICIR(factor_values.loc['2016':],S_DQ_OPEN.loc['2016':],5)

Unnamed: 0,1,2,3,4,5,6,7,8,9,10,...,873593,873665,873679,873690,873693,873703,873726,873749,873806,873833
0,-0.300946,-0.057467,,-0.225103,-0.047986,-0.061878,-0.150308,-0.068883,-0.15833,-0.169956,...,,,,,,,,,,
