In [None]:
# Below loads all the preliminaries for the analysis.
import numpy as np
import pandas as pd
import itertools
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()
from warnings import simplefilter


# statistical modelling
import statsmodels.tsa.api as smt
import statsmodels.api as sm
from scipy.optimize import minimize
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.stattools import acf, pacf

from statsmodels.tsa.holtwinters import SimpleExpSmoothing   
from statsmodels.tsa.holtwinters import ExponentialSmoothing

from sklearn.metrics import r2_score, median_absolute_error, mean_absolute_error
from sklearn.metrics import median_absolute_error, mean_squared_error, mean_squared_log_error


In [None]:
"""
#ML modelling
from sklearn.metrics import r2_score, median_absolute_error, mean_absolute_error
from sklearn.metrics import median_absolute_error, mean_squared_error, mean_squared_log_error

from sktime.forecasting.naive import NaiveForecaster
from sktime.forecasting.theta import ThetaForecaster
from sktime.forecasting.trend import PolynomialTrendForecaster
from sktime.performance_metrics.forecasting import (
    MeanAbsolutePercentageError,
    mean_absolute_percentage_error,
)
#from sktime.transformations.series.detrend import Deseasonalizer, Detrender
#from sktime.utils.plotting import plot_series
"""

In [None]:
"""



from tqdm import tqdm_notebook

from itertools import product

def mean_absolute_percentage_error(y_true, y_pred):
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

import warnings
warnings.filterwarnings('ignore')
"""
#pd.set_option('display.max_rows', None)

In [None]:
data = pd.read_csv(r'D:\Documents\gamestop.csv', index_col=0, parse_dates=['date'])
data.sort_values(by='date', ascending='True')


In [None]:
ts = data
ts = ts.sort_index()
#ts.index = ts.index.to_period(freq='M')
ts = ts[725:4000]
#ts = ts.fillna(method='ffill')

In [None]:
ts = ts.asfreq('M')
#ts = ts.fillna(method='ffill')
ts['close_price'].interpolate(method='linear', inplace=True)

In [None]:
ts = ts['close_price']

In [None]:
ts.describe()

In [None]:
#line graph of closing price, shows seasonality/trend 
plt.figure(figsize=(10,6))
plt.grid(True)
plt.xlabel('Time')
plt.ylabel('Close Prices')
plt.plot(ts)
plt.title('Etherium-GBP Closing Prices')
plt.show()


In [None]:
#Density shows positive skewness 
sns.set_style('darkgrid')
sns.kdeplot(ts, shade=True)
plt.title('Etherium-GBP Closing Price (Density)', loc = 'right', fontsize=14)

plt.show()

In [None]:
def exponential_smoothing(series, alpha):

    result = [series[0]] # first value is same as series
    for n in range(1, len(series)):
        result.append(alpha * series[n] + (1 - alpha) * result[n-1])
    return result
  
def plot_exponential_smoothing(series, alphas):
 
    plt.figure(figsize=(17, 8))
    for alpha in alphas:
        plt.plot(exponential_smoothing(series, alpha), label="Alpha {}".format(alpha))
    plt.plot(series.values, "c", label = "Actual")
    plt.legend(loc="best")
    plt.axis('tight')
    plt.title('Single Exponential Smoothing')
    plt.grid(True);

plot_exponential_smoothing(ts, [0.1, 0.1])

In [None]:
def double_exponential_smoothing(series, alpha, beta):

    result = [series[0]]
    for n in range(1, len(series)+1):
        if n == 1:
            level, trend = series[0], series[1] - series[0]
        if n >= len(series): # forecasting
            value = result[-1]
        else:
            value = series[n]
        last_level, level = level, alpha * value + (1 - alpha) * (level + trend)
        trend = beta * (level - last_level) + (1 - beta) * trend
        result.append(level + trend)
    return result

def plot_double_exponential_smoothing(series, alphas, betas):
     
    plt.figure(figsize=(17, 8))
    for alpha in alphas:
        for beta in betas:
            plt.plot(double_exponential_smoothing(series, alpha, beta), label="Alpha {}, beta {}".format(alpha, beta))
    plt.plot(series.values, label = "Actual")
    plt.legend(loc="best")
    plt.axis('tight')
    plt.title("Double Exponential Smoothing")
    plt.grid(True)
    
plot_double_exponential_smoothing(ts, alphas=[0.1, 0.32], betas=[0.1, 0.02])

In [None]:
ts_add = ExponentialSmoothing(ts, trend='add',seasonal='add',seasonal_periods=65).fit().fittedvalues

ts_mul = ExponentialSmoothing(ts, trend='mul',seasonal='mul',seasonal_periods=65).fit().fittedvalues

plt.figure(figsize=(10,6))

ts.plot(title='Holt Winters Triple Exponential Smoothing: Additive and Multiplicative Seasonality')
ts_add.plot(title='Holt Winters Triple Exponential Smoothing: Additive and Multiplicative Seasonality')
ts_mul.plot(title='Holt Winters Triple Exponential Smoothing: Additive and Multiplicative Seasonality')

In [None]:
from typing import *

def initial_trend(series: List, uppercase_m: int) -> float:
    return sum([
        float(series[i+uppercase_m] - series[i]) / uppercase_m
        for i in range(uppercase_m)
    ]) / uppercase_m


def initial_seasonality(series: List, uppercase_m: int) -> List:
    initial_season = []
    n_seasons = int(len(series)/uppercase_m)

    season_averages = [sum(
        series[uppercase_m * i:uppercase_m * i + uppercase_m]
    ) / uppercase_m for i in range(n_seasons)]

    initial_season.extend([
        sum([series[uppercase_m*j+i]-season_averages[j]
             for j in range(n_seasons)]) / n_seasons
        for i in range(uppercase_m)
    ])

    return initial_season
def winters_es(series: List,
               uppercase_m: int,
               alpha: float=0.2,
               beta: float=0.2,
               gamma: float=0.15,
               future_steps: int=1) -> List:
    
    i_l = [series[0]]
    i_t = [initial_trend(series, uppercase_m)]
    i_s = initial_seasonality(series, uppercase_m)

    forecasts = []
    for t in range(len(series) + future_steps):

        if t >= len(series):
            m = t - len(series) + 1
            forecasts.append(
                (i_l[-1] + m * i_t[-1]) + i_s[t % uppercase_m]
            )

        else:
            l_t = alpha * (series[t] - i_s[t % uppercase_m]) + (1 - alpha) * (i_l[-1] + i_t[-1])

            i_t[-1] = beta * (l_t - i_l[-1]) + (1 - beta) * i_t[-1]
            i_l[-1] = l_t

            i_s[t % uppercase_m] = gamma * (series[t] - l_t) + (1 - gamma) * i_s[t % uppercase_m]

            forecasts.append(
                (i_l[-1] + i_t[-1]) + i_s[t % uppercase_m]
            )

    return forecasts

In [None]:
#ts = ts.asfreq('D', method='pad')
forecast = winters_es(ts, 12)

In [None]:
plt.figure(
figsize=(10, 6)
    )


data = ts.values.tolist()

k = 10

last_months = list(range(34, 34 + k))

forecast = winters_es(data, 12, future_steps=8)
plt.plot(data, linewidth=3, label='Tesla Closing Price')
plt.plot(forecast, linewidth=3, label='Forecast Close Price')
plt.axvspan(*(last_months[0], last_months[-1]), 
facecolor='grey', 
alpha=0.1)
plt.legend()
plt.show()

In [None]:
decomp = seasonal_decompose(ts)

decomp.plot()
plt.show()

---------------------------------------------------

AR MODEL

In [None]:
#USE BOTH DATA_DIFF AND TS FOR PAPER

from statsmodels.tsa.ar_model import AutoReg
from sklearn.metrics import mean_squared_error
from math import sqrt

X = ts.values
ar_train, ar_test = X[0:120], X[121:157]
# train autoregression
window = 20
model = AutoReg(ar_train, lags=25)
model_fit = model.fit()
coef = model_fit.params

# walk forward over time steps in test
history = ar_train[len(ar_train)-window:]
history = [history[i] for i in range(len(history))]
predictions = list()
for t in range(len(ar_test)):
	length = len(history)
	lag = [history[i] for i in range(length-window,length)]
	yhat = coef[0]
	for d in range(window):
		yhat += coef[d+1] * lag[window-d-1]
	obs = ar_test[t]
	predictions.append(yhat)
	history.append(obs)
	print('predicted=%f, expected=%f' % (yhat, obs))
rmse = sqrt(mean_squared_error(ar_test, predictions))
print('Test RMSE: %.3f' % rmse)
# plot
plt.plot(ar_test)
plt.plot(predictions, color='red')
plt.show()

MA Model

In [None]:
def plot_moving_average(series, window, plot_intervals=False, scale=1.96):

    rolling_mean = series.rolling(window=window).mean()
    
    plt.figure(figsize=(17,8))
    plt.title('Simple Moving Average\n window size = {}'.format(window))
    plt.plot(rolling_mean, 'g', label='Rolling mean trend')
    
    #Plot confidence intervals for smoothed values
    if plot_intervals:
        mae = mean_absolute_error(series[window:], rolling_mean[window:])
        deviation = np.std(series[window:] - rolling_mean[window:])
        lower_bound = rolling_mean - (mae + scale * deviation)
        upper_bound = rolling_mean + (mae + scale * deviation)
        plt.plot(upper_bound, 'r--', label='Upper bound / Lower bound')
        plt.plot(lower_bound, 'r--')
            
    plt.plot(series[window:], label='Actual values')
    plt.legend(loc='best')
    plt.grid(True)
    
#Smooth by the previous 5 days (by week)
plot_moving_average(ts, 7, plot_intervals=True)

#Smooth by the previous month (30 days)
plot_moving_average(ts, 30, plot_intervals=True)

#Smooth by previous quarter (90 days)
plot_moving_average(ts, 90, plot_intervals=True)

In [None]:
# Take the first difference to remove to make the process stationary
# data_diff = ts - ts.shift(1)

# Use pandas to make difference 
data_diff = ts.diff()

# Remove nans
data_diff = data_diff.dropna()

In [None]:
#Dickey-Fuller TEST


def tsplot(y, lags=None, figsize=(12, 7), syle='bmh'):
    
    if not isinstance(y, pd.Series):
        y = pd.Series(y)
        
    with plt.style.context(style='bmh'):
        fig = plt.figure(figsize=figsize)
        layout = (2,2)
        ts_ax = plt.subplot2grid(layout, (0,0), colspan=2)
        acf_ax = plt.subplot2grid(layout, (1,0))
        pacf_ax = plt.subplot2grid(layout, (1,1))
        
        y.plot(ax=ts_ax)
        p_value = sm.tsa.stattools.adfuller(y)[1]
        ts_ax.set_title('Time Series Analysis Plots\n Dickey-Fuller: p={0:.5f}'.format(p_value))
        smt.graphics.plot_acf(y, lags=lags, ax=acf_ax)
        smt.graphics.plot_pacf(y, lags=lags, ax=pacf_ax)
        plt.tight_layout()

#plots normal time series for Dickey-Fuller Test
tsplot(ts.values, lags=65)

tsplot(data_diff.values[1:], lags=65)

In [None]:
adf_result=adfuller(ts.values)
#to help you, we added the names of every value
dict(zip(['adf', 'pvalue', 'usedlag', 'nobs', 'critical' 'values', 'icbest'],adf_result))

In [None]:
import pmdarima as pm
from pmdarima.arima import auto_arima

In [None]:
# Train Test Split Index
train_size = 0.8
split_idx = round(len(ts)* train_size)
split_idx

# Split
arima_train = ts.iloc[:split_idx]
arima_test = ts.iloc[split_idx:]

# Visualize split
fig,ax= plt.subplots(figsize=(15,10))
kws = dict(marker='o')
plt.plot(arima_train, label='Train', **kws)
plt.plot(arima_test, label='Test', **kws)
ax.legend(bbox_to_anchor=[1,1]);

In [None]:
arima_model = auto_arima(arima_train, start_p=0, start_q=0,
                             max_p=5, max_q=5, m=12,
                             start_P=0, seasonal=True,
                             d=1, D=1, trace=True,
                             error_action='ignore',  # don't want to know if an order does not work
                             suppress_warnings=True,  # don't want convergence warnings
                             stepwise=True)  # set to stepwise
arima_model.summary()

In [None]:
arima_model.plot_diagnostics()

In [None]:
prediction = pd.DataFrame(arima_model.predict(n_periods = 31), index=arima_test.index)

prediction.columns = ['predicted_close_price']

print(prediction)

In [None]:
#forecasting with ARIMA

plt.figure(figsize=(15, 10))
plt.plot(arima_train, label = "Training")
plt.plot(arima_test, label = "Test")
plt.plot(prediction, label = "Predicted")
plt.legend(loc = "upper right")
plt.show()

MACHINE LEARNING BELOW

In [None]:
from sktime.forecasting.base import ForecastingHorizon
from sktime.transformers.single_series.detrend import Deseasonalizer, Detrender
from sktime.forecasting.trend import PolynomialTrendForecaster
from sktime.forecasting.model_selection import (
    temporal_train_test_split,
)
from sktime.utils.plotting import plot_series
from sktime.forecasting.compose import (
    TransformedTargetForecaster,
    ReducedRegressionForecaster
)

In [None]:
data = pd.read_csv(r'D:\Documents\tesla.csv', parse_dates=['Date'])
data.sort_values(by='Date', ascending='True')


In [None]:
y_train, y_test = temporal_train_test_split(ts, test_size=0.2)

print(x_train.shape, y_train.shape, x_test.shape, y_test.shape)



In [None]:
steps = [
    (
        "extract",
        RandomIntervalFeatureExtractor(
            n_intervals="sqrt", features=[np.mean, np.std, _slope]
        ),
    ),
    ("clf", DecisionTreeClassifier()),
]
time_series_tree = Pipeline(steps)

In [None]:
time_series_tree.fit(x_train, y_train)
time_series_tree.score(x_test, y_test)