In [None]:
# 필요한 라이브러리 설치

# !pip install -q hmmlearn
# !pip install -q plotly
# !pip install -q sklearn
# !pip install -q yfinance

In [None]:
# 필요한 라이브러리 호출

import numpy as np
from sklearn.cluster import KMeans
import pandas as pd
from hmmlearn.hmm import GaussianHMM
import plotly.graph_objects as go
from plotly.graph_objs.scatter.marker import Line
from plotly.subplots import make_subplots
import plotly.express as px
from sklearn.cluster import AgglomerativeClustering
from sklearn.mixture import GaussianMixture
import warnings
import math
import yfinance as yf

# 날짜 관리
import datetime
from datetime import datetime, timedelta

warnings.filterwarnings('ignore')

In [None]:
# yfinance로 데이터 불러오기

# 파라미터
ticker = '^KS11' # KOSPI(^KS11), KOSPI 200(^KS200), S&P 500(^SPX), S&P 500 선물(ES=F)
start_dt = (datetime.now() - timedelta(days=365*30)).strftime('%Y-%m-%d')
end_dt = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')

# 데이터 불러오기
prices = yf.download(ticker, start=start_dt, end=end_dt)
prices = prices[['Close']]
prices.columns = [ticker]
prices

In [None]:
# 호출한 데이터 확인

trading_instrument = prices.columns[0]
prices.columns.name = trading_instrument
px.line(prices[trading_instrument])

In [None]:
# 모델에 넣을 데이터 준비

def prepare_data_for_model_input(prices, ma):
    '''
        Input:
        prices (df) - Dataframe of close prices
        ma (int) - legth of the moveing average

        Output:
        prices(df) - An enhanced prices dataframe, with moving averages and log return columns
        prices_array(nd.array) - an array of log returns
    '''

    intrument = prices.columns.name
    prices[f'{intrument}_ma'] = prices.rolling(ma).mean()
    prices[f'{intrument}_log_return'] = np.log(prices[f'{intrument}_ma']/prices[f'{intrument}_ma'].shift(1)).dropna()

    prices.dropna(inplace = True)
    prices_array = np.array([[q] for q in prices[f'{intrument}_log_return'].values])

    return prices, prices_array

In [None]:
# 모델에 넣을 데이터 확인

prices, prices_array = prepare_data_for_model_input(prices, 7)
print(prices.info())
prices

In [None]:
# 레짐을 확인할 모델

class RegimeDetection:

    def get_regimes_hmm(self, input_data, params):
        hmm_model = self.initialise_model(GaussianHMM(), params).fit(input_data)
        return hmm_model

    def get_regimes_clustering(self, params):
        clustering =  self.initialise_model(AgglomerativeClustering(), params)
        return clustering

    def get_regimes_gmm(self, input_data, params):
        gmm = self.initialise_model(GaussianMixture(), params).fit(input_data)
        return gmm

    def initialise_model(self, model, params):
        for parameter, value in params.items():
            setattr(model, parameter, value)
        return model


In [None]:
# 시각화 함수

def plot_hidden_states(hidden_states, prices_df):

    '''
    Input:
    hidden_states(numpy.ndarray) - array of predicted hidden states
    prices_df(df) - dataframe of close prices

    Output:
    Graph showing hidden states and prices

    '''

    colors = ['blue', 'green']
    n_components = len(np.unique(hidden_states))
    fig = go.Figure()

    for i in range(n_components):
        mask = hidden_states == i
        print('Number of observations for State ', i,":", len(prices_df.index[mask]))

        fig.add_trace(go.Scatter(x=prices_df.index[mask], y=prices_df[f"{prices_df.columns.name}"][mask],
                    mode='markers',  name='Hidden State ' + str(i), marker=dict(size=4,color=colors[i])))

    fig.update_layout(height=400, width=900, legend=dict(
            yanchor="top", y=0.99, xanchor="left",x=0.01), margin=dict(l=20, r=20, t=20, b=20)).show()


In [None]:
# 레짐 확인
regime_detection = RegimeDetection()

In [None]:
# 클러스터링 모델 확인

params = {'n_clusters': 2, 'linkage': 'complete', 'metric': 'manhattan', 'random_state':100}

clustering = regime_detection.get_regimes_clustering(params)
clustering_states = clustering.fit_predict(prices_array)

plot_hidden_states(np.array(clustering_states), prices[[f'{trading_instrument}']])

In [None]:
# gmm 모델 확인
params = {'n_components':2, 'covariance_type': 'full', 'max_iter': 100000, 'n_init': 30,'init_params': 'kmeans', 'random_state':100}

gmm_model = regime_detection.get_regimes_gmm(prices_array, params)
gmm_states = gmm_model.predict(prices_array)

plot_hidden_states(np.array(gmm_states), prices[[f'{trading_instrument}']])

In [None]:
# hmm 모델 확인
params = {'n_components':2, 'covariance_type':"full", 'random_state':100}

hmm_model = regime_detection.get_regimes_hmm(prices_array, params)
hmm_states = hmm_model.predict(prices_array)

plot_hidden_states(np.array(hmm_states), prices[[f'{trading_instrument}']])

In [None]:
# 머신러닝을 활용한 모델 확인

def feed_forward_training(model, params, prices, split_index, retrain_step):
    '''
    Input:
    model (<class 'method'>) - either gmm (Gaussian Mixture Models) or hmm (Hidden Markov Model)
    params (dict) - dictionary of parameters for a model
    prices (df) - Dataframe of close prices
    split_index (str) - index to split initial traing dataset and out of sample testing set
    retrain_step (int) - number of observations after which we retrain the model

    Output:
    states_pred (numpy.ndarray) - array of predicted hidden states
    '''
    # train/test split and initial model training
    init_train_data = prices[:split_index]
    test_data = prices[split_index:]
    rd_model = model(init_train_data, params)

    # predict the state of the next observation
    states_pred = []
    for i in range(math.ceil(len(test_data))):
        split_index += 1
        preds = rd_model.predict(prices[:split_index]).tolist()
        states_pred.append(preds[-1])

        # retrain the existing model
        if i % retrain_step == 0:
            rd_model = model(prices[:split_index], params)

    return  states_pred

In [None]:
# gmm 테스트

model_gmm =  regime_detection.get_regimes_gmm
params = {'n_components':2, 'covariance_type':"full", 'random_state':100, 'max_iter': 100000, 'n_init': 30,'init_params': 'kmeans', 'random_state':100}
split_index = np.where(prices.index > '2006-01-01')[0][0]

In [None]:
states_pred_gmm = feed_forward_training(model_gmm, params, prices_array, split_index, 20)
plot_hidden_states(np.array(states_pred_gmm), prices[[f'{trading_instrument}']][split_index:])

In [None]:
# hmm 테스트
model_hmm =  regime_detection.get_regimes_hmm
params = {'n_components':2, 'covariance_type': 'full', 'random_state':100}

In [None]:
states_pred_hmm = feed_forward_training(model_hmm, params, prices_array, split_index, 20)
plot_hidden_states(np.array(states_pred_hmm), prices[[f'{trading_instrument}']][split_index:])

In [None]:
prices_with_states = pd.DataFrame(prices[split_index:][f'{trading_instrument}'])
prices_with_states['State'] = states_pred_hmm
prices_with_states.head()

In [None]:
prices_with_states['P&L_daily'] = np.log(prices_with_states[trading_instrument] / prices_with_states[trading_instrument].shift(1)).dropna()
prices_with_states.head()

In [None]:
prices_with_states['State'] = prices_with_states['State'].shift(1)
prices_with_states.dropna(inplace = True)

In [None]:
prices_with_states['Position'] = np.where(prices_with_states['State'] == 1,1,-1)
prices_with_states.head()

In [None]:
prices_with_states[prices_with_states['State']==0].tail(20)

In [None]:
prices_with_states['Daily_Outcome_hmm'] = prices_with_states['Position'] * prices_with_states['P&L_daily']
prices_with_states['Cumulative_Outcome_BaH'] = prices_with_states['P&L_daily'].cumsum()
prices_with_states['Cumulative_Outcome_hmm'] = prices_with_states['Daily_Outcome_hmm'].cumsum()
prices_with_states

In [None]:
fig = go.Figure()

fig.add_trace(go.Line(x=prices_with_states.index, y=prices_with_states["Cumulative_Outcome_BaH"],
                      name = 'Cumulative_Outcome_BaH', line_color = 'navy'))

fig.add_trace(go.Line(x= prices_with_states.index, y=prices_with_states['Cumulative_Outcome_hmm'],
                      name = 'Cumulative_Outcome_hmm', line_color = 'olive'))

fig.add_trace(go.Line(x= prices_with_states.index, y=prices_with_states['Cumulative_Outcome_hmm']-prices_with_states["Cumulative_Outcome_BaH"],
                      name = 'Cumulative_Outcome_hmm-Cumulative_Outcome_BaH', line_color = 'red'))

fig.update_layout(height=400, width=900, legend=dict(
    yanchor="top", y=0.99, xanchor="left",x=0.01),
    margin=dict(l=20, r=20, t=20, b=20))

fig.show()