# Predict Volatility

Implementing the code from ```model-garch.ipynd``` on list of stocks and efts in ```stocks_and_etfs/```.

In [3]:
import os
import sys
import mysql.connector

import pandas as pd
import numpy as np
import math
import timeit
import warnings
warnings.filterwarnings('ignore')

# Environment variables
from dotenv import load_dotenv
load_dotenv("../mysql.env")

# Visualization + diagnositic
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
import scipy.stats as scs
from scipy.stats import probplot, shapiro
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.stats.diagnostic import acorr_ljungbox, het_arch
import statsmodels.formula.api as smf
import statsmodels.tsa.api as smt
import statsmodels.api as sm

# Model
from arch import arch_model
from arch.__future__ import reindexing

# Performance
from sklearn.metrics import confusion_matrix, precision_score, f1_score

print('Machine: {} {}\n'.format(os.uname().sysname,os.uname().machine))
print(sys.version)

# number of processes for parall processing
# None is defaul, os.cpu_count() is used.
num_p = 10

Machine: Darwin x86_64

3.8.12 | packaged by conda-forge | (default, Sep 16 2021, 01:59:00) 
[Clang 11.1.0 ]


In [None]:
from os import path

dpath="AllStock.csv"
print("Check data file {}..".format(dpath))

In [None]:
HOST=os.environ.get("HOST")
PORT=os.environ.get("PORT")
USER=os.environ.get("DBUSER")
PASSWORD=os.environ.get("PASSWORD")

if path.isfile(dpath):
    print("Load data from {}".format(dpath))
    df = pd.read_csv(dpath)
else:
    try: 
        conn = mysql.connector.connect(
            host=HOST,
            port=PORT,
            user=USER,
            password=PASSWORD,
            database="GlobalMarketData"
        )
        query = f"SELECT Date, Symbol, Close from histdailyprice3;"
        histdailyprice3 = pd.read_sql(query, conn)
        df = histdailyprice3.copy()
        df.to_csv(dpath, index=False)
        conn.close()
    except Exception as e:
        #conn.close()
        print(str(e))

In [None]:
df.set_index("Date", drop=True, inplace=True)

In [2]:
stock_list = pd.read_csv("../stocks_and_etfs/stock_list.csv")
etf_list = pd.read_csv("../stocks_and_etfs/etf_list.csv")
symbol_list = stock_list.append(etf_list).rename({"0": "Symbol"}, axis=1).reset_index(drop=True)

NameError: name 'pd' is not defined

## Model Evaluation

1. Multithread is not working
2. Multiprocess

In [None]:
def evaluate_model(residuals, st_residuals, lags=50):
    results = {
        'LM_pvalue': None,
        'F_pvalue': None,
        'SW_pvalue': None,
        'AIC': None,
        'params': {'p': None, 'q': None}
    }
    arch_test = het_arch(residuals, nlags=lags)
    shap_test = shapiro(st_residuals)
    # We want falsey values for each of these hypothesis tests
    results['LM_pvalue'] = [arch_test[1], arch_test[1] < .05]
    results['F_pvalue'] = [arch_test[3], arch_test[3] < .05]
    results['SW_pvalue'] = [shap_test[1], shap_test[1] < .05]
    return results

In [None]:
def calc_model(data, p, q, all_results, i):
    try:
#         print("calc_model({},{},{},{})".format(p, q, all_results, i))
        model = arch_model(data, vol='GARCH', p=p, q=q, dist='normal')
#         print("calc_model.model_fit")
        model_fit = model.fit(disp='off')
        resid = model_fit.resid
#         print("calc_model.divide")
        st_resid = np.divide(resid, model_fit.conditional_volatility)
        results = evaluate_model(resid, st_resid)
        results['AIC'] = model_fit.aic
        results['params']['p'] = p
        results['params']['q'] = q
        all_results[i] = results
    except Exception as e:
        print("calc_model:{}".format(e)) 

In [None]:
from threading import Thread

def multi_gridsearch(data, p_rng, q_rng):
    n_sym = len(p_rng)*len(q_rng)
    print("multi_gridsearch: {} trials.".format(n_sym))
    top_score, top_results = float('inf'), None
    top_models = []
    
    threads = [None] * n_sym
    all_results = [None] * n_sym

    try:
        i = 0
#         print("Starting {} threads".format(n_sym))
        for p in p_rng:
            for q in q_rng:
                print("Start {} thread.".format(i))
                threads[i] = Thread(target=calc_model, args=(data, p, q, all_results, i))
                threads[i].start()
                i=i+1
      
        for i in range(len(threads)):
            threads[i].join()
            print("Join {} thread.".format(i))
            

        print("All Grid threads Finish.")

        for i in range(len(all_results)):
            results = all_results[i]
            if results['AIC'] < top_score: 
                top_score = results['AIC']
                top_results = results
            elif results['LM_pvalue'][1] is False:
                top_models.append(results)
    except Exception as e:
        print("multi_gridsearch:{}".format(e))
    top_models.append(top_results)
    return top_models

# USE THIS ONE

In [None]:
def p_calc_model(data, p, q):
    res = {}
    try:
#         print("calc_model({},{})".format(p, q))
        model = arch_model(data, vol='GARCH', p=p, q=q, dist='normal')
#         print("calc_model.model_fit")
        model_fit = model.fit(disp='off')
        resid = model_fit.resid
#         print("calc_model.divide")
        st_resid = np.divide(resid, model_fit.conditional_volatility)
        res = evaluate_model(resid, st_resid)
        res['AIC'] = model_fit.aic
        res['params']['p'] = p
        res['params']['q'] = q
    except Exception as e:
        print("calc_model:{}".format(e)) 
    return res

# USE THIS ONE

In [None]:
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support

def multip_gridsearch(data, p_rng, q_rng):
    n_sym = len(p_rng)*len(q_rng)
    print("multi_gridsearch: {} trials.".format(n_sym))
    top_score, top_results = float('inf'), None
    top_models = []
    
    try:
        ll=[]
        for p in p_rng:
            for q in q_rng:
                ll.append((data, p, q))    
        print("Starting {} threads".format(n_sym))
#         print(ll)
        with Pool(processes=num_p) as pool:
            all_results = pool.starmap(p_calc_model, ll)
            
        print("All Grid threads Finish.")

        for i in range(len(all_results)):
            results = all_results[i]
            if results['AIC'] < top_score: 
                top_score = results['AIC']
                top_results = results
            elif results['LM_pvalue'][1] is False:
                top_models.append(results)
    except Exception as e:
        print("multi_gridsearch:{}".format(e))
    top_models.append(top_results)
    return top_models

In [None]:
def gridsearch(data, p_rng, q_rng):
    top_score, top_results = float('inf'), None
    top_models = []
    for p in p_rng:
        for q in q_rng:
            try:
                print("gridsearch({},{}) is started.".format(p,q))
                model = arch_model(data, vol='GARCH', p=p, q=q, dist='normal')
                print("model_fit")
                model_fit = model.fit(disp='off')
                resid = model_fit.resid
                st_resid = np.divide(resid, model_fit.conditional_volatility)
                print("evaluate_model")
                results = evaluate_model(resid, st_resid)
                results['AIC'] = model_fit.aic
                results['params']['p'] = p
                results['params']['q'] = q
                if results['AIC'] < top_score: 
                    top_score = results['AIC']
                    top_results = results
                elif results['LM_pvalue'][1] is False:
                    top_models.append(results)
                print("gridsearch({},{}) is done.".format(p,q))
            except Exception as e:
                print("gridsearch:{}".format(e)) 
                continue
    top_models.append(top_results)
    return top_models

In [None]:
def apply_Model(data, test_size, p, q):
#     print("apply_Model({},{},{},{})".format(data, test_size, p, q))
    train = symbol_df['pct_change'][:-(test_size-i)]
    model = arch_model(symbol_df['pct_change'], p = p, q = q, mean = 'constant', vol = 'GARCH', dist = 'normal')
    model_fit = model.fit(disp='off')
    pred = model_fit.forecast(horizon=1)
    return np.sqrt(pred.variance.values[-1,:][0])       

In [None]:
columns = ["Symbol", "Precision Macro", "Precision Micro", "F1 Macro", "F1 Micro", "Proc TIme"]
garch_performance = pd.DataFrame(columns=columns)
garch_performance

# Loop through stocks and etfs

In [None]:
symbol_list = symbol_list[0:1]
print(symbol_list)

In [None]:
import time

p_rng = range(1,30)
q_rng = range(1,40)

for symbol in symbol_list.Symbol:
    try:
        print("Start Process {}".format(symbol))
        start = time.time()
        symbol_df = df[df.Symbol == symbol]
        symbol_df['pct_change'] = 100 * symbol_df['Close'].pct_change()
        symbol_df.dropna(inplace=True)

        top_models = multip_gridsearch(symbol_df['pct_change'], p_rng, q_rng)
        print("{}'s top model={}".format(symbol, top_models))

        p = top_models[0]['params']['p']
        q = top_models[0]['params']['q']

        rolling_predictions = []
        test_size = round(len(symbol_df) * 0.2)
        print("{}'s test size: {}/{}".format(symbol, test_size, len(symbol_df)))
        ll=[]
        for i in range(test_size):
            ll.append((symbol_df['pct_change'], i, p, q))
        # print(ll)
        print("Starting {} apply_model threads".format(len(ll)))
        with Pool(processes=num_p) as pool:
            rolling_predictions = pool.starmap(apply_Model, ll)
        print("Apply_model threads Finish.")
        # print("* rolling_predictions : {}".format(rolling_predictions))

        rolling_predictions = pd.Series(rolling_predictions, index=symbol_df['pct_change'].index[-test_size:])

        y_pred = np.array(rolling_predictions >= 2)
        y_true = np.array(abs(symbol_df['pct_change'][-test_size:]) >= 2)

        precision_macro = precision_score(y_true, y_pred, average='macro')
        precision_micro = precision_score(y_true, y_pred, average='micro')
        f1_macro = f1_score(y_true, y_pred, average='macro')
        f1_micro = f1_score(y_true, y_pred, average='micro')

        end = time.time()
        total_time = end-start
        garch_performance.loc[len(garch_performance.index)] = [symbol, precision_macro, precision_micro, f1_macro, f1_micro, total_time]
        garch_performance.to_csv("../reports/GARCH_performance.csv")
    
    except Exception as e:
        print("calc_model:{}".format(e)) 

In [None]:
garch_performance

In [None]:
print("Total # of symbols is {}".format(len(symbol_list.Symbol)))

In [None]:
miss = set(symbol_list.Symbol.tolist()) - set(garch_performance.Symbol.tolist())
print("Symbols cannot be processed {}".format(miss))