In [24]:
from prophet import Prophet
import pandas as pd
import numpy as np
from matplotlib import pyplot
from sklearn.metrics import mean_absolute_percentage_error
import matplotlib.pyplot as plt
import os
import statistics
pd.set_option('display.float_format', lambda x: '%.4f' % x)

In [25]:
parentdir = "./Historical Data/"

all_filenames = os.listdir(parentdir)

min_split_date = "2013-01-01" #YYYY-MM-DD
split_date = "2022-01-01" #YYYY-MM-DD
max_test_date = "2023-01-01"

real_training_data_dict = {}
real_testing_data_dict = {}
forecasted_testing_data_dict = {}

flag = 0
for x in all_filenames:
    if x.split(".")[1] != "csv":
        continue
    else:
        path = parentdir + x
        
        stock = pd.read_csv(path)
        stock = stock[["Date", "Close Price"]]
        stock.columns = ["ds", "y"]
        stock["ds"] = pd.to_datetime(stock["ds"])
        stock = stock.sort_values(by='ds').reset_index(drop=True)

        training_stock = stock.loc[stock['ds'] >= min_split_date]
        training_stock = training_stock.loc[stock['ds'] < split_date]
        
        testing_stock = stock.loc[stock['ds'] >= split_date]
        testing_stock = testing_stock.loc[stock['ds'] < max_test_date]

        np.random.seed(42)
        model = Prophet()
        model.fit(training_stock)
        
        testing_stock_df = pd.DataFrame(testing_stock["ds"].to_list(), columns=['ds']) 
        
        forecast = model.predict(testing_stock_df)
        
        mape_yhat = mean_absolute_percentage_error(testing_stock["y"].to_list(), forecast["yhat"].to_list())
        mape_upper = mean_absolute_percentage_error(testing_stock["y"].to_list(), forecast["yhat_upper"].to_list())
        mape_lower = mean_absolute_percentage_error(testing_stock["y"].to_list(), forecast["yhat_lower"].to_list())

        if flag == 0:
            real_training_data_dict['dates'] = training_stock['ds'].to_list()
            real_testing_data_dict['dates'] = testing_stock['ds'].to_list()
            forecasted_testing_data_dict['dates'] = testing_stock['ds'].to_list()            
        
        stock_name = x.split(".")[0].split("-")[0]
        
        real_training_data_dict[stock_name + " training"] = training_stock['y'].to_list()
        real_testing_data_dict[stock_name + " testing"] = testing_stock['y'].to_list()
        
        # Yhat upper for sensex and the rest based on MAPE
        if "Sensex" in stock_name:
            forecasted_testing_data_dict[stock_name + " yhat_upper"] = forecast["yhat_upper"].to_list()            
        else:
            if mape_yhat < mape_upper and mape_yhat < mape_lower:
                forecasted_testing_data_dict[stock_name + " yhat"] = forecast["yhat"].to_list()            
            elif mape_lower < mape_upper and mape_lower < mape_yhat:
                forecasted_testing_data_dict[stock_name + " yhat_lower"] = forecast["yhat_lower"].to_list()            
            else:
                forecasted_testing_data_dict[stock_name + " yhat_upper"] = forecast["yhat_upper"].to_list()            
            
        # Y hat same for all the stocks
        # forecasted_testing_data_dict[stock_name + " yhat_upper"] = forecast["yhat_upper"].to_list()            
        
        # Y hat different as per MAPE
        # if mape_yhat < mape_upper and mape_yhat < mape_lower:
        #     forecasted_testing_data_dict[stock_name + " yhat"] = forecast["yhat"].to_list()            
        # elif mape_lower < mape_upper and mape_lower < mape_yhat:
        #     forecasted_testing_data_dict[stock_name + " yhat_lower"] = forecast["yhat_lower"].to_list()            
        # else:
        #     forecasted_testing_data_dict[stock_name + " yhat_upper"] = forecast["yhat_upper"].to_list()            

real_training_data = pd.DataFrame.from_dict(real_training_data_dict)
real_testing_data = pd.DataFrame.from_dict(real_testing_data_dict)
forecasted_testing_data = pd.DataFrame.from_dict(forecasted_testing_data_dict)

16:31:05 - cmdstanpy - INFO - Chain [1] start processing
16:31:06 - cmdstanpy - INFO - Chain [1] done processing
16:31:06 - cmdstanpy - INFO - Chain [1] start processing
16:31:07 - cmdstanpy - INFO - Chain [1] done processing
16:31:07 - cmdstanpy - INFO - Chain [1] start processing
16:31:08 - cmdstanpy - INFO - Chain [1] done processing
16:31:08 - cmdstanpy - INFO - Chain [1] start processing
16:31:09 - cmdstanpy - INFO - Chain [1] done processing
16:31:09 - cmdstanpy - INFO - Chain [1] start processing
16:31:10 - cmdstanpy - INFO - Chain [1] done processing
16:31:10 - cmdstanpy - INFO - Chain [1] start processing
16:31:11 - cmdstanpy - INFO - Chain [1] done processing
16:31:11 - cmdstanpy - INFO - Chain [1] start processing
16:31:12 - cmdstanpy - INFO - Chain [1] done processing
16:31:12 - cmdstanpy - INFO - Chain [1] start processing
16:31:13 - cmdstanpy - INFO - Chain [1] done processing
16:31:13 - cmdstanpy - INFO - Chain [1] start processing
16:31:14 - cmdstanpy - INFO - Chain [1]

In [26]:
real_training_data_percent_diff_dict = {}
real_testing_data_percent_diff_dict = {}
forecasted_testing_data_percent_diff_dict = {}

for x in range(0, len(real_training_data.columns)):
    if real_training_data.columns[x] == "dates":
        real_training_data_percent_diff_dict[real_training_data.columns[x]] = real_training_data[real_training_data.columns[x]].to_list()[1:]
    else:
        temp = []
        for y in range(0, (len(real_training_data[real_training_data.columns[x]].to_list())-1) ):
            temp1 = (real_training_data[real_training_data.columns[x]].to_list()[y+1]/real_training_data[real_training_data.columns[x]].to_list()[y] - 1) * 100
            temp.append(temp1)
        
        real_training_data_percent_diff_dict[real_training_data.columns[x] + " % Diff"] = temp

for x in range(0, len(real_testing_data.columns)):
    if real_testing_data.columns[x] == "dates":
        real_testing_data_percent_diff_dict[real_testing_data.columns[x]] = real_testing_data[real_testing_data.columns[x]].to_list()[1:]
    else:
        temp = []
        for y in range(0, (len(real_testing_data[real_testing_data.columns[x]].to_list())-1) ):
            temp1 = (real_testing_data[real_testing_data.columns[x]].to_list()[y+1]/real_testing_data[real_testing_data.columns[x]].to_list()[y] - 1) * 100
            temp.append(temp1)
        
        real_testing_data_percent_diff_dict[real_testing_data.columns[x] + " % Diff"] = temp


for x in range(0, len(forecasted_testing_data.columns)):
    if forecasted_testing_data.columns[x] == "dates":
        forecasted_testing_data_percent_diff_dict[forecasted_testing_data.columns[x]] = forecasted_testing_data[forecasted_testing_data.columns[x]].to_list()[1:]
    else:
        temp = []
        for y in range(0, (len(forecasted_testing_data[forecasted_testing_data.columns[x]].to_list())-1) ):
            temp1 = (forecasted_testing_data[forecasted_testing_data.columns[x]].to_list()[y+1]/forecasted_testing_data[forecasted_testing_data.columns[x]].to_list()[y] - 1) * 100
            temp.append(temp1)
        
        forecasted_testing_data_percent_diff_dict[forecasted_testing_data.columns[x] + " % Diff"] = temp
        
real_training_data_percent_diff_data = pd.DataFrame.from_dict(real_training_data_percent_diff_dict)
real_testing_data_percent_diff_data = pd.DataFrame.from_dict(real_testing_data_percent_diff_dict)
forecasted_testing_data_percent_diff_data = pd.DataFrame.from_dict(forecasted_testing_data_percent_diff_dict)
        

In [27]:
final_results_dict = {}

temp = []
for x in range(0, len(real_training_data_percent_diff_data.columns)):
    if real_training_data_percent_diff_data.columns[x] == "dates":
        continue
    
    if "Sensex" in real_training_data_percent_diff_data.columns[x]:
        sensex_col_ind = x
        
    temp.append(real_training_data_percent_diff_data.columns[x].split(" ")[0])
    
final_results_dict["Stocks"] = temp

temp = []
for x in range(0, len(real_training_data_percent_diff_data.columns)):
    if real_training_data_percent_diff_data.columns[x] == "dates":
        continue
    
    cov = np.cov(real_training_data_percent_diff_data[real_training_data_percent_diff_data.columns[x]].to_list(), real_training_data_percent_diff_data[real_training_data_percent_diff_data.columns[sensex_col_ind]].to_list())

    var = statistics.variance(real_training_data_percent_diff_data[real_training_data_percent_diff_data.columns[sensex_col_ind]].to_list())

    beta = cov[0][1]/var

    temp.append(beta)

final_results_dict["Ex-Post Beta (Real Training Data)"] = temp

temp = []
for x in range(0, len(real_testing_data_percent_diff_data.columns)):
    if real_testing_data_percent_diff_data.columns[x] == "dates":
        continue
    
    cov = np.cov(real_testing_data_percent_diff_data[real_testing_data_percent_diff_data.columns[x]].to_list(), real_testing_data_percent_diff_data[real_testing_data_percent_diff_data.columns[sensex_col_ind]].to_list())

    var = statistics.variance(real_testing_data_percent_diff_data[real_testing_data_percent_diff_data.columns[sensex_col_ind]].to_list())

    beta = cov[0][1]/var

    temp.append(beta)

final_results_dict["Ex-Ante Beta (Real Testing Data)"] = temp

temp = []
for x in range(0, len(forecasted_testing_data_percent_diff_data.columns)):
    if forecasted_testing_data_percent_diff_data.columns[x] == "dates":
        continue
    
    cov = np.cov(forecasted_testing_data_percent_diff_data[forecasted_testing_data_percent_diff_data.columns[x]].to_list(), forecasted_testing_data_percent_diff_data[forecasted_testing_data_percent_diff_data.columns[sensex_col_ind]].to_list())

    var = statistics.variance(forecasted_testing_data_percent_diff_data[forecasted_testing_data_percent_diff_data.columns[sensex_col_ind]].to_list())

    beta = cov[0][1]/var

    temp.append(beta)

final_results_dict["Ex-Ante Beta (Forecasted Testing Data)"] = temp

In [28]:
temp = []
for x in range(0, len(real_training_data.columns)):
    if real_training_data.columns[x] == "dates":
        continue
    
    pv = real_training_data[real_training_data.columns[x]].to_list()[0]
    fv = real_training_data[real_training_data.columns[x]].to_list()[-1]
    n = 1/9
    fv_by_pv = fv / pv
    
    rate = pow(fv_by_pv, n) - 1
    temp.append(rate)
    
    if "Sensex" in real_training_data.columns[x]:
        ex_ante_market_return = rate
    
final_results_dict["Ex-Post Returns (Real Training Data)"] = temp

temp = []
for x in range(0, len(real_testing_data.columns)):
    if real_testing_data.columns[x] == "dates":
        continue
    
    pv = real_testing_data[real_testing_data.columns[x]].to_list()[0]
    fv = real_testing_data[real_testing_data.columns[x]].to_list()[-1]
    n = 1/1
    fv_by_pv = fv / pv
    
    rate = pow(fv_by_pv, n) - 1
    temp.append(rate)

final_results_dict["Ex-Ante Returns (Real Testing Data)"] = temp

temp = []
for x in range(0, len(forecasted_testing_data.columns)):
    if forecasted_testing_data.columns[x] == "dates":
        continue
    
    pv = forecasted_testing_data[forecasted_testing_data.columns[x]].to_list()[0]
    fv = forecasted_testing_data[forecasted_testing_data.columns[x]].to_list()[-1]
    n = 1/1
    fv_by_pv = fv / pv
    
    rate = pow(fv_by_pv, n) - 1
    temp.append(rate)
    
    if "Sensex" in forecasted_testing_data.columns[x]:
        ex_post_market_return = rate
    
final_results_dict["Ex-Ante Returns (Forecasted Testing Data)"] = temp

In [29]:
rf = 0.07

temp = []
for x in range(0, len(final_results_dict["Stocks"])):
    if final_results_dict["Stocks"][x] == "Sensex":
        capm = 0
    
    else:
        beta = final_results_dict["Ex-Post Beta (Real Training Data)"][x]
        capm = rf + (beta * (ex_ante_market_return - rf))
    
    temp.append(capm)
    
final_results_dict["Ex-Post CAPM (Real Training Data)"] = temp

temp = []
for x in range(0, len(final_results_dict["Stocks"])):
    if final_results_dict["Stocks"][x] == "Sensex":
        capm = 0
    
    else:
        beta = final_results_dict["Ex-Ante Beta (Forecasted Testing Data)"][x]
        capm = rf + (beta * (ex_post_market_return - rf))
    
    temp.append(capm)
    
final_results_dict["Ex-Ante CAPM (Forecasted Training Data)"] = temp

temp = []
for x in range(0, len(final_results_dict["Stocks"])):
    if final_results_dict["Stocks"][x] == "Sensex":
        diff = 0
    
    else:
        if final_results_dict["Ex-Ante Returns (Real Testing Data)"][x] >=0 and final_results_dict["Ex-Ante CAPM (Forecasted Training Data)"][x] >=0:
            diff = abs(final_results_dict["Ex-Ante Returns (Real Testing Data)"][x] - final_results_dict["Ex-Ante CAPM (Forecasted Training Data)"][x])

        elif final_results_dict["Ex-Ante Returns (Real Testing Data)"][x] <0 and final_results_dict["Ex-Ante CAPM (Forecasted Training Data)"][x] <0:
            diff = abs(final_results_dict["Ex-Ante Returns (Real Testing Data)"][x] - final_results_dict["Ex-Ante CAPM (Forecasted Training Data)"][x])
        
        elif final_results_dict["Ex-Ante Returns (Real Testing Data)"][x] >=0 and final_results_dict["Ex-Ante CAPM (Forecasted Training Data)"][x] <0:
            diff = abs(final_results_dict["Ex-Ante Returns (Real Testing Data)"][x] - final_results_dict["Ex-Ante CAPM (Forecasted Training Data)"][x])
            
        else:
            diff = abs(final_results_dict["Ex-Ante CAPM (Forecasted Training Data)"][x] - final_results_dict["Ex-Ante Returns (Real Testing Data)"][x])
    
    temp.append(diff)
    
final_results_dict["Ex-Ante Real - Ex-Ante CAPM"] = temp

temp = []
for x in range(0, len(final_results_dict["Stocks"])):
    if final_results_dict["Stocks"][x] == "Sensex":
        diff = 0
    
    else:
        if final_results_dict["Ex-Ante Returns (Real Testing Data)"][x] >=0 and final_results_dict["Ex-Post CAPM (Real Training Data)"][x] >=0:
            diff = abs(final_results_dict["Ex-Ante Returns (Real Testing Data)"][x] - final_results_dict["Ex-Post CAPM (Real Training Data)"][x])

        elif final_results_dict["Ex-Ante Returns (Real Testing Data)"][x] <0 and final_results_dict["Ex-Post CAPM (Real Training Data)"][x] <0:
            diff = abs(final_results_dict["Ex-Ante Returns (Real Testing Data)"][x] - final_results_dict["Ex-Post CAPM (Real Training Data)"][x])
        
        elif final_results_dict["Ex-Ante Returns (Real Testing Data)"][x] >=0 and final_results_dict["Ex-Post CAPM (Real Training Data)"][x] <0:
            diff = abs(final_results_dict["Ex-Ante Returns (Real Testing Data)"][x] - final_results_dict["Ex-Post CAPM (Real Training Data)"][x])
            
        else:
            diff = abs(final_results_dict["Ex-Post CAPM (Real Training Data)"][x] - final_results_dict["Ex-Ante Returns (Real Testing Data)"][x])
    
    temp.append(diff)
    
final_results_dict["Ex-Ante Real - Ex-Post CAPM"] = temp

final_results_data = pd.DataFrame.from_dict(final_results_dict)

final_results_data

Unnamed: 0,Stocks,Ex-Ante Beta (Real Training Data),Ex-Post Beta (Real Testing Data),Ex-Post Beta (Forecasted Testing Data),Ex-Ante Returns (Real Training Data),Ex-Post Returns (Real Testing Data),Ex-Post Returns (Forecasted Testing Data),Ex-Ante CAPM (Real Training Data),Ex-Post CAPM (Forecasted Training Data),Ex-Post Real - Ex-Post CAPM,Ex-Post Real - Ex-Ante CAPM
0,Asian_Paints,0.809,0.7962,0.2497,-0.0293,-0.0965,0.2529,0.1176,0.1351,0.2316,0.214
1,Axis_Bank,1.4634,1.0335,-0.103,-0.0745,0.3412,0.1739,0.156,0.0431,0.298,0.1851
2,Bajaj_Fin,1.2442,1.4023,-0.0189,0.199,-0.0897,0.2194,0.1431,0.0651,0.1548,0.2329
3,Bajaj_Finserv,1.1008,1.1209,0.1725,0.3788,-0.9088,0.3328,0.1347,0.115,1.0238,1.0435
4,Bharti_Airtel,0.8572,0.7225,1.2075,0.0878,0.1671,0.1833,0.1204,0.3849,0.2178,0.0467
5,HCL_Tech,0.6273,0.9176,0.0398,0.087,-0.2166,0.1464,0.1069,0.0804,0.297,0.3235
6,HDFC_Bank,1.0485,1.1106,-0.0974,0.0895,0.0708,0.0764,0.1316,0.0446,0.0262,0.0608
7,HUL,0.5835,0.7336,0.078,0.1804,0.0835,0.0626,0.1043,0.0903,0.0069,0.0208
8,ICICI_Bank,1.469,1.0217,0.4458,-0.0486,0.165,0.2358,0.1564,0.1863,0.0212,0.0087
9,Indusind,1.4625,1.3529,-0.5438,0.0855,0.3388,-0.0555,0.156,-0.0718,0.4106,0.1828


In [30]:
# create a excel writer object
with pd.ExcelWriter("final_results_excel_normal_yhat_upper_only_sensex.xlsx") as writer:
   
    # use to_excel function and specify the sheet_name and index 
    # to store the dataframe in specified sheet
    final_results_data.to_excel(writer, sheet_name="Final Results", index=False)
    real_training_data.to_excel(writer, sheet_name="real_training_data", index=False)
    real_testing_data.to_excel(writer, sheet_name="real_testing_data", index=False)
    forecasted_testing_data.to_excel(writer, sheet_name="forecasted_testing_data", index=False)
    real_training_data_percent_diff_data.to_excel(writer, sheet_name="real_training_data_percent_diff_data", index=False)
    real_testing_data_percent_diff_data.to_excel(writer, sheet_name="real_testing_data_percent_diff_data", index=False)
    forecasted_testing_data_percent_diff_data.to_excel(writer, sheet_name="forecasted_testing_data_percent_diff_data", index=False)

