In [None]:
import os
import csv
import numpy as np
import pandas as pd
from darts import TimeSeries
from darts.models import ARIMA
import matplotlib.pyplot as plt
from darts.metrics import mape, mse
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import ParameterGrid
from darts.dataprocessing.transformers import Scaler
from darts.models import TiDEModel, BlockRNNModel, NBEATSModel, XGBModel, RegressionModel, LinearRegressionModel, RandomForest

In [None]:
def calculate_directional_accuracy(actual, forecast):  
    acc = 0
    for i in range(len(forecast)):
        actual_change = actual[i + 1] - actual[i]
        predicted_change = forecast[i] - actual[i]
        if (actual_change > 0 and predicted_change > 0) or (actual_change < 0 and predicted_change < 0) or (actual_change == 0 and predicted_change == 0):
            acc += 1

    da = round(acc / len(forecast) * 100, 2)
    return da

def calculate_directional_accuracy_with_thresholds(actual, forecast):
    thresholds = [0, 5, 10] 
    combinations = [(up, -down) for up in thresholds for down in thresholds if not (up > 0 and down > 0)]
    results = []
    for up, down in combinations:
        acc = 0
        for i in range(len(forecast)):
            actual_change = ((actual[i + 1] - actual[i]) / actual[i]) * 100
            predicted_change = ((forecast[i] - actual[i]) / actual[i]) * 100
            if (actual_change > up and predicted_change > up) or \
                (actual_change < down and predicted_change < down) or\
                (actual_change == predicted_change):
                acc += 1

        da = round(acc / len(forecast) * 100, 2)
        results.append((up, down, da))
    
    return results

def calculate_mape(actual, forecast):
    actual, forecast = np.array(actual), np.array(forecast)
    non_zero_actual = np.where(actual == 0, np.nan, actual)
    mape = np.mean(np.abs((actual - forecast) / non_zero_actual)) * 100
    return np.nanmean(mape)

def calculate_rmse(actual, forecast):
    actual, forecast = np.array(actual), np.array(forecast)
    if len(actual) != len(forecast):
        raise ValueError("The length of actual and forecast arrays must match.")
    mse = np.mean((forecast - actual) ** 2)
    return np.sqrt(mse)

def cal_err_and_acc(predicted_ts, val_ts, condition=True):
    val = val_ts.pd_dataframe()
    predicted = predicted_ts.pd_dataframe()

    val.reset_index(drop=True, inplace=True)
    predicted.reset_index(drop=True, inplace=True)

    actual = val['Close']
    forecast = predicted['Closing_Price']
    
    mape_error = calculate_mape(actual.iloc[1:], forecast)
    rmse_error = calculate_rmse(actual.iloc[1:], forecast)
    dir_acc = calculate_directional_accuracy_with_thresholds(actual, forecast)
    
    if condition:
        return mape_error, rmse_error, dir_acc
    else:
        print(f"MAPE = {mape_error:.2f} %")
        print(f"RMSE = {rmse_error:.2f} %\n")
        print(f"Directional Accuracy = {dir_acc:.2f} %")

In [None]:
stock = "CBG"
arima_predict = pd.read_csv(f"{os.getcwd()}/../backtest/{stock}/ARIMA.csv")
arima_predict_ts = TimeSeries.from_dataframe(arima_predict[['Closing_Price']], time_col=None)
ins_predict = pd.read_csv(f"/{os.getcwd()}/../backtest/{stock}/InsightWave.csv")
ins_predict_ts = TimeSeries.from_dataframe(ins_predict[['Closing_Price']], time_col=None)
val = pd.read_csv(f"{os.getcwd()}/../data/Fundamental+Technical Data/STOCK_DATA_WEEKLY/{stock}.csv")
val = val[['Close']].iloc[-35-1:]
val_ts = TimeSeries.from_dataframe(val, time_col=None)
arima_avg_mape, arima_avg_rmse, arima_avg_dir = cal_err_and_acc(arima_predict_ts, val_ts, True)
ins_avg_mape, ins_avg_rmse, ins_avg_dir = cal_err_and_acc(ins_predict_ts, val_ts, True)
print(f"ARIMA : MAPE {round(arima_avg_mape,2)}, RSME {round(arima_avg_rmse,2)}")
print(f"InsightWave : MAPE {round(ins_avg_mape,2)}, RSME {round(ins_avg_rmse,2)}")

In [25]:
def get_sim_fri_close(stock, mode, percentage):
    # Buy in Closing Price in Friday
    val = pd.read_csv(f"{os.getcwd()}/../data/Fundamental+Technical Data/STOCK_DATA_WEEKLY/{stock}.csv")
    val = val[['Date','Open','Close']].iloc[-35-1:].reset_index(drop=True)
    predict = pd.read_csv(f"{os.getcwd()}/../backtest/{stock}/{mode}.csv")
    predict = predict[['Date', 'Closing_Price']]

    # Backtest parameters
    starting_money = 1000000
    stock_count = 0
    money = starting_money

    for i in range(len(predict)):
        act = val.loc[i]['Close']
        pred = predict.loc[i]['Closing_Price']
        
        diff = ((pred - act) / act) * 100
        if diff > percentage or i == 0:
            # Assuming you buy as much stock as possible with the money you have
            bought_stocks = money // act
            bought_stocks = int(bought_stocks / 100) * 100
            if money > bought_stocks * act:
                money -= bought_stocks * act
                stock_count += bought_stocks
                # print(f"Buy {bought_stocks} stocks at {act} on {val.loc[i]['Date']}, Money left: {money}")
            # else:
                # print(f"Hold {stock_count} stocks at {act} on {val.loc[i]['Date']}, Money left: {money}")
        elif diff < -percentage and stock_count > 0:
            # Sell all stocks
            money += stock_count * act
            # print(f"Sell {stock_count} stocks at {act} on {val.loc[i]['Date']}, Money left: {money}")
            stock_count = 0
        # else:
        #     print(f"Hold {stock_count} stocks at {act} on {val.loc[i]['Date']}, Money left: {money}")

    # Sell any remaining stocks at the last available price
    final_money = money + stock_count * val.iloc[-1]['Close']
    profit_loss = final_money - starting_money

    return profit_loss

def get_sim_mon_open(stock, mode, percentage):
    # Buy in Closing Price in Friday
    val = pd.read_csv(f"{os.getcwd()}/../data/Fundamental+Technical Data/STOCK_DATA_WEEKLY/{stock}.csv")
    val = val[['Date','Open','Close']].iloc[-35-1:].reset_index(drop=True)
    predict = pd.read_csv(f"{os.getcwd()}/../backtest/{stock}/{mode}.csv")
    predict = predict[['Date', 'Closing_Price']]

    open_data = pd.read_csv(f"{os.getcwd()}/../data/Fundamental+Technical Data/STOCK_DATA/{stock}.csv")

    # Backtest parameters
    starting_money = 1000000
    stock_count = 0
    money = starting_money

    for i in range(len(predict)):
        specific_date_from_val = val.loc[i]['Date']
        today_row = open_data.index[open_data['Date'] == specific_date_from_val].to_list()[0]
        next_open_value = open_data.loc[today_row+1]['Open']
        next_date = open_data.loc[today_row+1]['Date']

        act = val.loc[i]['Close']
        pred = predict.loc[i]['Closing_Price']
        
        diff = ((pred - act) / act) * 100

        if diff > percentage or i == 0:
            # Assuming you buy as much stock as possible with the money you have
            bought_stocks = money // next_open_value
            bought_stocks = int(bought_stocks / 100) * 100
            if money > bought_stocks * next_open_value:
                money -= bought_stocks * next_open_value
                stock_count += bought_stocks
                # print(f"Buy {bought_stocks} stocks at {next_open_value} on {next_date}, Money left: {money}")
            # else:
                # print(f"Hold {stock_count} stocks at {next_open_value} on {next_date}, Money left: {money}")
        elif diff < -percentage and stock_count > 0:
            # Sell all stocks
            money += stock_count * act
            # print(f"Sell {stock_count} stocks at {next_open_value} on {next_date}, Money left: {money}")
            stock_count = 0
        # else:
        #     print(f"Hold {stock_count} stocks at {next_open_value} on {next_date}, Money left: {money}")

    # Sell any remaining stocks at the last available price
    final_money = money + stock_count * val.iloc[-1]['Close']
    profit_loss = final_money - starting_money

    return profit_loss

def DCA(stock):
    # Buy in Closing Price in Friday
    val = pd.read_csv(f"{os.getcwd()}/../data/Fundamental+Technical Data/STOCK_DATA_WEEKLY/{stock}.csv")
    val = val[['Date','Open','Close']].iloc[-35:].reset_index(drop=True)

    # DCA

    # Backtest parameters
    starting_money = 1000000
    stock_count = 0
    money = starting_money
    dca_interval = 4  # Buy every 4 data points
    allocated_money_per_buy = starting_money // ((35 // dca_interval)+1)

    # print(f"Initial Money: {starting_money}, Allocating {allocated_money_per_buy} per purchase")

    # Perform DCA strategy
    for i in range(len(val)):
        if i % dca_interval == 0 and money > 0:
            # Buy stocks every 4 weeks
            act = val.loc[i]['Close']
            bought_stocks = allocated_money_per_buy // act
            money -= bought_stocks * act
            stock_count += bought_stocks
            # print(f"Buy {bought_stocks} stocks at {act} on {val.loc[i]['Date']}, Money left: {money}")

    # Assuming selling all remaining stocks at the last available price
    final_money = money + stock_count * val.iloc[-1]['Close']
    profit_loss = final_money - starting_money
    return profit_loss

In [27]:
STOCK_LIST = ["ADVANC", "BANPU", "BH", "BTS", "CBG", "CPALL", "CPF", "INTUCH", "IVL", "KBANK", "LH", "PTT", "PTTEP", "PTTGC", "SCB", "SCC", "TISCO", "TU", "WHA"]

for stock in STOCK_LIST:
    percentage = 2
    arima = get_sim_mon_open(stock, "ARIMA", percentage)
    ins = get_sim_mon_open(stock, "InsightWave", percentage)
    dca = DCA(stock)

    # if max(arima, ins, dca) == arima:
    #     best = "arima"
    # elif max(arima, ins, dca) == ins:
    #     best = "ins"
    # else:
    #     best = "dca"

    print(f"{stock},{arima}, {ins}")

    # print(arima)
    # print("---")
    # print(ins)
    # print("---")
    # print(dca)

ADVANC,117300.0, 142800.0
BANPU,-408075.0, -466195.0
BH,23000.0, 200000.0
BTS,-141959.99999999988, -63784.99999999988
CBG,-271750.0, 218525.0
CPALL,-116550.0, -11600.0
CPF,-198940.0, -114730.0
INTUCH,-70950.0, 58625.0
IVL,-315625.0, -74325.0
KBANK,-73550.0, 97950.0
LH,-180899.99999999988, 29210.000000000233
PTT,91500.0, 38125.0
PTTEP,-136800.0, -159050.0
PTTGC,-184625.0, -93850.0
SCB,4700.0, 33200.0
SCC,-104400, -104400
TISCO,-77500.0, 67950.0
TU,-194010.0, 114870.0
WHA,-9800.0, 138467.99999999977


In [None]:
dt = get_sim_mon_open("SCB", "ARIMA", 2)
dt