<a href="https://colab.research.google.com/github/Tien84/BitcoinPrediction/blob/master/GridSearch_SARIMA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/driver')

Mounted at /content/driver


In [2]:
# grid search sarima hyperparameters 
from math import sqrt
from multiprocessing import cpu_count
from joblib import Parallel
from joblib import delayed
from warnings import catch_warnings
from warnings import filterwarnings
from statsmodels.tsa.statespace.sarimax import SARIMAX
from sklearn.metrics import mean_squared_error
from pandas import read_csv

In [3]:
# one-step sarima forecast
def sarima_forecast(history, config):
	order, sorder, trend = config
	# define model
	model = SARIMAX(history, order=order, seasonal_order=sorder, trend=trend, enforce_stationarity=False, enforce_invertibility=False)
	# fit model
	model_fit = model.fit(disp=False)
	# make one step forecast
	yhat = model_fit.predict(len(history), len(history))
	return yhat[0]

In [4]:
# root mean squared error or rmse
def measure_rmse(actual, predicted):
	return sqrt(mean_squared_error(actual, predicted))

In [5]:
# root mean squared error or mape
def measure_mape(actual, predicted):
	return mean(abs(array(predicted) - array(actual))/abs(actual))

In [6]:
# split a univariate dataset into train/test sets
def train_test_split(data, n_test):
	return data[:-n_test], data[-n_test:]

In [7]:
# walk-forward validation for data
def walk_forward_validation(data, n_test, cfg):
	predictions = list()
	# split dataset
	train, test = train_test_split(data, n_test)
	# seed history with training dataset
	history = [x for x in train]
	# step over each time-step in the test set
	for i in range(len(test)):
		# fit model and make forecast for history
		yhat = sarima_forecast(history, cfg)
		# store forecast in list of predictions
		predictions.append(yhat)
		# add actual observation to history for the next loop
		history.append(test[i])
	# estimate prediction error
	error = measure_rmse(test, predictions)
	return error

In [8]:
# walk-forward validation for data
def walk_forward_validation_1(data, n_test, cfg):
	predictions = list()
	# split dataset
	train, test = train_test_split(data, n_test)
	# seed history with training dataset
	history = [x for x in train]
	# step over each time-step in the test set
	for i in range(len(test)):
		# fit model and make forecast for history
		yhat = sarima_forecast(history, cfg)
		# store forecast in list of predictions
		predictions.append(yhat)
		# add actual observation to history for the next loop
		history.append(test[i])
	# estimate prediction error
	error = measure_mape(test, predictions)
	return error

In [None]:
# score a model, return None on failure
def score_model(data, n_test, cfg, debug=False):
	result = None
	# convert config to a key
	key = str(cfg)
	# show all warnings and fail on exception if debugging
	if debug:
		result = walk_forward_validation(data, n_test, cfg)
	else:
		# one failure during model validation suggests an unstable config
		try:
			# never show warnings when grid searching, too noisy
			with catch_warnings():
				filterwarnings("ignore")
				result = walk_forward_validation(data, n_test, cfg)
		except:
			error = None
	# check for an interesting result
	if result is not None:
		print(' > Model[%s] %.3f' % (key, result))
	return (key, result)

In [13]:
# score a model, return None on failure
def score_model_1(data, n_test, cfg, debug=False):
	result = None
	# convert config to a key
	key = str(cfg)
	# show all warnings and fail on exception if debugging
	if debug:
		result = walk_forward_validation_1(data, n_test, cfg)
	else:
		# one failure during model validation suggests an unstable config
		try:
			# never show warnings when grid searching, too noisy
			with catch_warnings():
				filterwarnings("ignore")
				result = walk_forward_validation_1(data, n_test, cfg)
		except:
			error = None
	# check for an interesting result
	if result is not None:
		print(' > Model[%s] %.3f' % (key, result))
	return (key, result)

In [None]:
# grid search configs
def grid_search(data, cfg_list, n_test, parallel=True):
	scores = None
	if parallel:
		# execute configs in parallel
		executor = Parallel(n_jobs=cpu_count(), backend='multiprocessing')
		tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list)
		scores = executor(tasks)
	else:
		scores = [score_model(data, n_test, cfg) for cfg in cfg_list]
	# remove empty results
	scores = [r for r in scores if r[1] != None]
	# sort configs by error, asc
	scores.sort(key=lambda tup: tup[1])
	return scores

In [15]:
# grid search configs
def grid_search_1(data, cfg_list, n_test, parallel=True):
	scores = None
	if parallel:
		# execute configs in parallel
		executor = Parallel(n_jobs=cpu_count(), backend='multiprocessing')
		tasks = (delayed(score_model_1)(data, n_test, cfg) for cfg in cfg_list)
		scores = executor(tasks)
	else:
		scores = [score_model_1(data, n_test, cfg) for cfg in cfg_list]
	# remove empty results
	scores = [r for r in scores if r[1] != None]
	# sort configs by error, asc
	scores.sort(key=lambda tup: tup[1])
	return scores

In [11]:
# create a set of sarima configs to try
def sarima_configs(seasonal=[0]):
	models = list()
	# define config lists
	p_params = [0, 1, 2, 3]
	d_params = [0, 1]
	q_params = [0, 1, 2, 3]
	t_params = ['n','c','t','ct']
	P_params = [0, 1, 2, 3]
	D_params = [0, 1]
	Q_params = [0, 1, 2, 3]
	m_params = seasonal
	# create config instances
	for p in p_params:
		for d in d_params:
			for q in q_params:
				for t in t_params:
					for P in P_params:
						for D in D_params:
							for Q in Q_params:
								for m in m_params:
									cfg = [(p,d,q), (P,D,Q,m), t]
									models.append(cfg)
	return models

In [None]:
if __name__ == '__main__':
	# load dataset
	series = read_csv('/content/driver/My Drive/DataSet/DataBitcoin.csv', header=0, index_col=0)
	data = series['Adj Close'].values
	print(data.shape)
	# data split
	#n_test = 165
	n_test = 768
	# model configs
	cfg_list = sarima_configs()
	# grid search
	scores = grid_search(data, cfg_list, n_test)
	print('done')
	# list top 3 configs
	for cfg, error in scores[:3]:
		print(cfg, error)

(2558,)
 > Model[[(0, 0, 0), (0, 0, 0, 0), 'n']] 39460.016
 > Model[[(0, 0, 0), (0, 0, 0, 0), 'c']] 29936.472
 > Model[[(0, 0, 0), (0, 0, 0, 0), 't']] 20538.582
 > Model[[(0, 0, 0), (0, 0, 0, 0), 'ct']] 19437.937
 > Model[[(0, 0, 1), (0, 0, 0, 0), 'n']] 20343.405
 > Model[[(0, 0, 1), (0, 0, 0, 0), 'c']] 15535.828
 > Model[[(0, 0, 1), (0, 0, 0, 0), 't']] 10860.146
 > Model[[(0, 0, 1), (0, 0, 0, 0), 'ct']] 10303.705
 > Model[[(0, 0, 2), (0, 0, 0, 0), 'n']] 11815.698
 > Model[[(0, 0, 2), (0, 0, 0, 0), 'c']] 9383.221
 > Model[[(0, 0, 2), (0, 0, 0, 0), 't']] 6802.453
 > Model[[(0, 0, 3), (0, 0, 0, 0), 'n']] 8193.694
 > Model[[(0, 0, 2), (0, 0, 0, 0), 'ct']] 6533.651
 > Model[[(0, 0, 3), (0, 0, 0, 0), 'c']] 6519.974
 > Model[[(0, 0, 3), (0, 0, 0, 0), 't']] 5703.801
 > Model[[(0, 1, 0), (0, 0, 0, 0), 'n']] 1494.268
 > Model[[(0, 1, 0), (0, 0, 0, 0), 'c']] 1494.583
 > Model[[(0, 1, 0), (0, 0, 0, 0), 't']] 1495.342
 > Model[[(0, 1, 0), (0, 0, 0, 0), 'ct']] 1495.611
 > Model[[(0, 1, 1), (0, 0, 0

In [None]:
if __name__ == '__main__':
	# load dataset
	series = read_csv('/content/driver/My Drive/DataSet/DataBitcoin.csv', header=0, index_col=0)
	data = series['Adj Close'].values
	print(data.shape)
	# data split
	#n_test = 165
	n_test = 768
	# model configs
	cfg_list = sarima_configs()
	# grid search
	scores_mape = grid_search_1(data, cfg_list, n_test)
	print('done')
	# list top 3 configs
	for cfg, error in scores_mape[:3]:
		print(cfg, error)

(2558,)
