In [33]:
import numpy as np
import pandas as pd
import json

from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.stattools import adfuller
from statsmodels.stats.stattools import jarque_bera
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.stats.diagnostic import het_arch

from sklearn.metrics import mean_squared_error, mean_absolute_error

from helpers.DataSaham import DataSaham


import warnings
warnings.filterwarnings('ignore')
pd.options.mode.chained_assignment = None

## Define Class

In [6]:
class ArimaModel():
    def __init__(self, data, p, q):
        self.data = data.copy()
        self.p = p
        self.q = q
        
    def adf_test(self, data):
        adf_test = adfuller(data)
        return adf_test[0], adf_test[1]

    def is_stasioner(self, data, alpha=0.05):
        _, pvalue = self.adf_test(data)
        return pvalue < alpha

    def get_n_diff(self, data):
        n_diff = 0
        data_tmp = data.copy()
        while not self.is_stasioner(data_tmp):
            n_diff += 1
            data_tmp = data_tmp.diff().dropna()            
        return n_diff

    def split_data(self, data, n_test):
        return data[:-n_test], data[-n_test:]

    def get_error_metrics(self, y_true, y_predict):
        return mean_absolute_error(y_true, y_predict), mean_squared_error(y_true, y_predict, squared=False)

    def jarque_bera_test(self, residuals):
        jq_bera = jarque_bera(residuals)
        return jq_bera[0], jq_bera[1]

    def ljungbox_test(self, residuals):
        return acorr_ljungbox(residuals).iloc[0, :].to_list()
    
    def het_arch_test(self, residuals):
        _, pvalue, _, _ = het_arch(residuals)
        return pvalue 
    
    def get_model(self, data, order):
        model = ARIMA(data, order=order)
        return model.fit()

    def get_prediction(self, n_test, model=None):
        if model is None: model = self.model
        return model.predict().to_numpy(), model.forecast(n_test).to_numpy()
    
    def __print_order(self):
        p, d, q = self.order
        return f"{p}, {d}, {q}"
    
    def __repr__(self):
        return f"<ArimaModel({self.__print_order()})>"
    
    def __str__(self):
        return f"<ArimaModel({self.__print_order()})>"
    
    def fit(self, n_test, d=None):
        self.n_test = n_test
        if d is None: d = self.get_n_diff(self.data)
        self.order = (self.p, d, self.q)
        
        self.data_train, self.data_test = self.split_data(self.data, n_test)
        self.model = self.get_model(self.data_train, self.order)
        
        self.aic = self.model.aic
        self.prediction_train, self.prediction_test =  self.get_prediction(n_test)    
        
        self.error_test = self.prediction_test - self.data_test
        self.residuals = self.model.resid
        
        self.mae_train, self.rmse_train = self.get_error_metrics(self.data_train, self.prediction_train)
        self.mae_test, self.rmse_test = self.get_error_metrics(self.data_test, self.prediction_test)
        
        self.jq_bera_statistic, self.jq_bera_pvalue = self.jarque_bera_test(self.residuals)
        self.lb_stat, self.lb_pvalue = self.ljungbox_test(self.residuals)
        self.het_pvalue = self.het_arch_test(self.residuals)

In [7]:
class GridSearchArima():
    def __init__(self, data, p_max, q_max):
        self.p_max = p_max
        self.q_max = q_max
        self.data = data
        
    def __append_summary(self, model):
        self.summary.append({
            'order' : model.order,
            'aic' : model.aic,
            'ljung_pvalue' : model.lb_pvalue,
            'jq_bera_pvalue' : model.jq_bera_pvalue,
            'het_pvalue' : model.het_pvalue,
            'rmse_train' : model.rmse_train,
            'mae_train' : model.mae_train,
            'rmse_test' : model.rmse_test,
            'mae_test' : model.rmse_test
            })        

    def fit(self, n_test, save_model=False):
        self.save_model = save_model
        self.models = []
        self.summary = []
        self.best_model = None

        for p in range(self.p_max + 1):
            for q in range(self.q_max + 1):
                if (p == q) and (p == 0):
                    continue
                
                try:
                    model = ArimaModel(self.data, p, q)
                    model.fit(n_test)
                    self.__append_summary(model)
                    
                    if self.best_model is None: self.best_model = model
                    elif self.best_model.aic > model.aic: self.best_model = model
                    
                    if save_model:
                        self.models.append(model)
                except:
                    print(f"Error to fit ARIMA {p}, {q}")
                
        self.summary = pd.DataFrame(self.summary).sort_values('aic').reset_index(drop=True)

In [8]:
class WFValidationArima():
    def __init__(self, data, n_split, test_size):
        self.data = data.copy()
        self.n_split = n_split
        self.test_size = test_size
        self.__init_property()
        
    def __init_property(self):
        self.iterasi = []
        self.__best_model = self.__errors = None
        self.rmse_total= self.rmse_eachday = None
        self.mae_total= self.mae_eachday = None
        
    def __grid_search(self, data):
        gridsearch = GridSearchArima(data, self.p_max, self.q_max)
        gridsearch.fit(self.test_size)
        return gridsearch
    
    def __get_rmse(self, errors : np.ndarray) -> tuple:
        squared = np.power(errors, 2)
        return np.sqrt(squared.mean(axis=0)), np.sqrt(squared.mean())

    def __get_mae(self, errors : np.ndarray) -> tuple:
        absolute = np.abs(errors)
        return absolute.mean(axis=0), absolute.mean()
    
    @property
    def best_model(self):
        if not self.grid_search:
            return self.iterasi
        
        if self.__best_model is None:
            self.__best_model = [gridsearch.best_model for gridsearch in self.iterasi] 
        return self.__best_model
    
    @property
    def errors(self):
        if self.__errors is None:
            self.__errors = [model.error_test.to_numpy() for model in self.best_model]
        return self.__errors
    
    def __fit_model(self, data, p_max=None, q_max=None, p=None, q=None):
        if self.grid_search: 
            if (p_max is None) or (q_max is None):
                raise Exception('Please specify p_max and q_max variabel')
            
            self.p_max, self.q_max = p_max, q_max
            self.iterasi.append(self.__grid_search(data))
        else:
            if (p is None) or (q is None):
                raise Exception('Please specify p and q variabel')
            
            self.p, self.q = p, q
            self.iterasi.append(ArimaModel(data, p, q))
            
    def __get_metrics(self):
        self.rmse_eachday, self.rmse_total = self.__get_rmse(self.errors)
        self.mae_eachday, self.mae_total = self.__get_mae(self.errors)
    
    def fit(self, grid_search=True, p_max=None, q_max=None, p=None, q=None):
        self.grid_search = grid_search
        n = len(self.data)
        self.n_test = self.test_size * self.n_split
        self.n_training = n - self.n_test

        self.__init_property()
        for i in range(self.n_split):
            end_train = self.n_training + (i * self.test_size)
            n_data = end_train + self.test_size
            data = self.data[:n_data]
            self.__fit_model(data, p_max, q_max, p, q)
            
        self.__get_metrics()

## ARIMA Walk Forward Validation

In [9]:
def category_sentiment(compound, threshold_neg=-0.05, threshold_pos=0.05):
    if compound is None : return  None
    if compound < threshold_neg : return "Negative"
    if compound > threshold_pos : return "Positive"
    return "Netral"

all_articles = pd.read_json('data/data_vader.json')
saham = pd.read_json('data/table_saham.json')
article_saham = pd.read_json("data/table_article_saham.json")

all_articles['sentiment_category'] = all_articles.mean_compound.apply(category_sentiment)

bri = DataSaham('BBRI', all_articles, saham, article_saham, end_date="2021-12-31", dropna=True)
bri.data

Unnamed: 0_level_0,close,sma,wma,macd,cci,%k,%d,rsi,williams_r,mfi,momentum,sentiment_score,sentiment_category_score,kurs
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1
2003-12-15,110.0,104.75,105.545455,1.651442,162.202381,100.000000,91.666667,68.310546,-0.000000,73.007657,2.5,0.000000,0.000000,8478.5
2003-12-16,110.0,105.25,106.500000,1.904385,209.302326,83.333333,94.444444,68.310546,-16.666667,74.382588,5.0,0.000000,0.000000,8481.5
2003-12-17,115.0,106.50,108.272727,2.479718,262.271062,100.000000,94.444444,75.726371,-0.000000,76.093629,12.5,0.000000,0.000000,8492.5
2003-12-18,127.5,108.75,112.090909,3.899370,373.080397,100.000000,94.444444,85.108585,-0.000000,80.712587,22.5,0.000000,0.000000,8495.5
2003-12-19,125.0,111.00,115.045455,4.767765,282.464455,78.571429,92.857143,78.567800,-21.428571,80.531851,22.5,0.000000,0.000000,8492.5
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2021-12-24,4070.0,4116.00,4098.363636,-27.238870,-118.979266,10.000000,11.666667,43.294691,-90.000000,39.019719,-60.0,0.172154,0.700000,14220.0
2021-12-27,4060.0,4106.00,4088.181818,-29.568117,-109.347443,5.000000,13.333333,42.508237,-95.000000,29.559493,-100.0,0.333912,0.933333,14225.0
2021-12-28,4090.0,4102.00,4085.272727,-28.662904,-77.146412,22.222222,12.407407,45.695240,-77.777778,24.742846,-40.0,0.245945,0.800000,14225.0
2021-12-29,4080.0,4090.00,4081.272727,-28.424771,-82.966520,16.666667,14.629630,44.803673,-83.333333,25.965313,-120.0,0.117452,0.800000,14250.0


In [10]:
close =  bri.data.close.copy()

walk_validation = WFValidationArima(close, n_split=10, test_size=10)
walk_validation.fit(p_max=10, q_max=10)

Error to fit ARIMA 10, 8
Error to fit ARIMA 7, 8
Error to fit ARIMA 8, 5
Error to fit ARIMA 5, 6
Error to fit ARIMA 8, 6
Error to fit ARIMA 8, 6
Error to fit ARIMA 8, 6


In [11]:
walk_validation.rmse_total, walk_validation.mae_total

(128.50082715799888, 89.59264715388456)

In [12]:
walk_validation.rmse_eachday, walk_validation.mae_eachday

(array([ 65.37116696,  92.89988298,  58.57915464,  66.8573118 ,
        104.57592002, 107.28436589, 133.47424752, 170.26122747,
        153.79000072, 226.7548185 ]),
 array([ 44.4068609 ,  76.28541664,  42.60154725,  55.93046934,
         80.09994106,  82.07557159, 102.11348744, 136.76662793,
        107.5755259 , 168.07102349]))

In [13]:
walk_validation.best_model

[<ArimaModel(7, 1, 10)>,
 <ArimaModel(7, 1, 10)>,
 <ArimaModel(7, 1, 10)>,
 <ArimaModel(6, 1, 2)>,
 <ArimaModel(7, 1, 3)>,
 <ArimaModel(2, 1, 9)>,
 <ArimaModel(2, 1, 9)>,
 <ArimaModel(3, 1, 8)>,
 <ArimaModel(9, 1, 4)>,
 <ArimaModel(9, 1, 9)>]

In [43]:
summary = []

for index, model in enumerate(walk_validation.best_model):
    summary.append({
        'fold' : index + 1,
        'order' : str(model.order),
        'aic' : model.aic,
        'rmse_train' : model.rmse_train,
        'rmse_test' : model.rmse_test,
        'mae_train' : model.mae_train,
        'mae_test' : model.mae_test,
        'jq_bera_pvalue' : model.jq_bera_pvalue,
        'lb_pvalue' : model.lb_pvalue,
        'het_pvalue' : model.het_pvalue,
        'error_test' : model.error_test.to_list()
    })

summary = {
    'rmse_total' : walk_validation.rmse_total,
    'mae_total' : walk_validation.mae_total,
    'rmse_eachday' : list(walk_validation.rmse_eachday),
    'mae_eachday' : list(walk_validation.mae_eachday),
    'summary_fold' : summary
}


with open('data/pemodelan/hasil_arima.json', 'w') as file:
    file.write(json.dumps(summary))