In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import csv
from math import sqrt
from sklearn.metrics import mean_squared_error
from datetime import datetime, timedelta
from statsmodels.tsa.statespace.sarimax import SARIMAX
from warnings import catch_warnings, filterwarnings
from multiprocessing import cpu_count
from joblib import Parallel, delayed

In [2]:
get_ipython().run_line_magic('matplotlib', 'inline')

In [2]:
# read dataset daily-min-temperatures.csv
with open("daily-min-temperatures.csv") as f:
    temp_df = pd.DataFrame(csv.DictReader(f))
temp_df.head()

Unnamed: 0,Date,Temp
0,1981-01-01,20.7
1,1981-01-02,17.9
2,1981-01-03,18.8
3,1981-01-04,14.6
4,1981-01-05,15.8


In [3]:
temp_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3650 entries, 0 to 3649
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   Date    3650 non-null   object
 1   Temp    3650 non-null   object
dtypes: object(2)
memory usage: 57.2+ KB


In [4]:
temp_df['Date'] = pd.to_datetime(temp_df.Date)

In [5]:
temp_df.set_index('Date')

Unnamed: 0_level_0,Temp
Date,Unnamed: 1_level_1
1981-01-01,20.7
1981-01-02,17.9
1981-01-03,18.8
1981-01-04,14.6
1981-01-05,15.8
...,...
1990-12-27,14.0
1990-12-28,13.6
1990-12-29,13.5
1990-12-30,15.7


In [6]:
temp_df['Temp'] = pd.to_numeric(temp_df.Temp)
temp_df.info()
type(temp_df.values)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3650 entries, 0 to 3649
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype         
---  ------  --------------  -----         
 0   Date    3650 non-null   datetime64[ns]
 1   Temp    3650 non-null   float64       
dtypes: datetime64[ns](1), float64(1)
memory usage: 57.2 KB


numpy.ndarray

## Stationarity test with ADF
The Augmented Dickey-Fuller (ADF) test is a statistical test used to determine whether a given time series is stationary or not. In statistics and econometrics, stationarity is an important concept when analyzing time series data. A stationary time series is one whose statistical properties, such as mean, variance, and autocorrelation, do not change over time.

The ADF test is used to check for the presence of a unit root in a time series, which is a root of the characteristic equation for the autoregressive (AR) process that makes the time series non-stationary. In simpler terms, a unit root indicates that the time series is non-stationary.

Here's how the ADF test works and when it is used:

1. **Null Hypothesis (H0)**: The null hypothesis of the ADF test is that the time series has a unit root, meaning it is non-stationary. In other words, the null hypothesis assumes that the time series has a unit root and is not stationary.

2. **Alternative Hypothesis (H1)**: The alternative hypothesis is that the time series is stationary; it does not have a unit root.

3. **Test Statistic**: The ADF test computes a test statistic, which is essentially a t-statistic, and its value is compared to critical values from a specific distribution.

4. **Critical Values**: The critical values are used to determine the significance of the test statistic. If the test statistic is less than the critical value, you can reject the null hypothesis in favor of the alternative, indicating that the time series is stationary.

5. **Interpretation**: If the ADF test statistic is less than the critical value, you can conclude that the time series is stationary. If the test statistic is greater than the critical value, you fail to reject the null hypothesis, indicating that the time series is non-stationary.

The ADF test is commonly used in time series analysis, especially in financial and economic forecasting, to assess whether a particular financial or economic time series is stationary. Stationarity is important because many time series forecasting methods, like ARIMA (AutoRegressive Integrated Moving Average), assume that the data is stationary. If the data is non-stationary, it might need differencing or other transformations to make it suitable for modeling.

In summary, the ADF test is a statistical test used to check for stationarity in time series data by assessing the presence of a unit root. It is a valuable tool in time series analysis and helps in choosing appropriate models for forecasting and analysis.


In [8]:
from statsmodels.tsa.stattools import adfuller
result = adfuller(temp_df.Temp.values)
print('ADF Statistic: %f' % result[0])
print('p-value: %f' % result[1])
print('Critical Values:')
for key, value in result[4].items():
	print('\t%s: %.3f' % (key, value))

ADF Statistic: -4.444805
p-value: 0.000247
Critical Values:
	1%: -3.432
	5%: -2.862
	10%: -2.567


Stationary. The more negative this statistic, the more likely we are to reject the null hypothesis.
As part of the output, we get a look-up table to help determine the ADF statistic. We can see that our statistic value of -4 is less than the value of -3.449 at 1%.

In [9]:
# one-step sarima forecast
def sarima_forecast2(history, config):
    order, sorder, trend, exog_raw = config

    try:
       # define model
        model = SARIMAX(history, order=order, seasonal_order=sorder, trend=trend, enforce_stationarity=False, enforce_invertibility=False)
        # fit model
        model_fit = model.fit(disp=False)
        # make one step forecast, start and end are the same
        #yhat = model_fit.predict(start=len(history), end=len(history), exog=exog_raw[len(history),:])
        pre = model_fit.get_prediction(start=len(history), end=len(history))
        yhat = pre.predicted_mean
        yhat_ci = pre.conf_int(alpha=0.5)
    except Exception as e: 
        print(e)
        print("No prediction")
        print("Length: "+str(len(history)))
        yhat = [0] # no prediction
        yhat_ci = [0]
        model = None
        model_fit = None
    return yhat[0], yhat_ci, model, model_fit

In [10]:
# split a univariate dataset into train/test sets
# n_test: number of time steps to use in the test set
def train_test_split(data, n_test):
    return data[:-n_test], data[-n_test:]

In [11]:
# getting predictions for univariate data
def walk_forward_validation2(data, n_test, cfg):
    predictions_df = pd.DataFrame()
    predictions = list()
    predictions_lci = list()
    predictions_hci = list()
    # split dataset
    train, test = train_test_split(data, n_test)

    # seed history with training dataset
    history = [x for x in train]
    # step over each time-step in the test set
    for i in range(len(test)):
        # fit model and make forecast for history
        yhat, yhat_ci, model, results = sarima_forecast2(history, cfg)
        # store forecast in list of predictions
        predictions.append(yhat)
        predictions_lci.append(yhat_ci[:,0])
        predictions_hci.append(yhat_ci[:,1])
        # add actual observation to history for the next loop
        history.append(test[i])
    # estimate prediction error
    #error = measure_rmse(test, predictions)
    predictions_df["PRED"] = predictions
    predictions_df["PRED_LCI"] = predictions_lci
    predictions_df["PRED_HCI"] = predictions_hci
    return [test, predictions_df, model, results]

In [12]:
# root mean squared error or rmse
def measure_rmse(actual, predicted):
    return sqrt(mean_squared_error(actual, predicted))

In [13]:
def score_model(data, n_test, cfg):
    result = None
    # convert config to a key
    order, sorder, trend, exog_data = cfg

    key = str(order)+', '+str(sorder)+', '+str(trend)
    # show all warnings and fail on exception if debugging
    # one failure during model validation suggests an unstable config
    try:
        # never show warnings when grid searching, too noisy
        with catch_warnings():
            filterwarnings("ignore")
            test, predictions_df, model, results = walk_forward_validation2(data, n_test, cfg)
            result = measure_rmse(test, predictions_df['PRED'])    
    except:
        result = None
    # check for an interesting result
    if result is not None:
        print(' > Model[%s] %.3f' % (key, result))
    return (key, result)

In [14]:
def grid_search(data, cfg_list, n_test, parallel=True):
    scores = None
    if parallel:
        try:
            #  Parallel object with the number of cores to use and set it to the number of scores detected in your hardware
            executor = Parallel(n_jobs=cpu_count()-1, backend='multiprocessing', verbose=10)
            tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list)
            scores = executor(tasks)
        except:
            scores = [None]
    else:
        scores = [score_model(data, n_test, cfg) for cfg in cfg_list]
    # remove empty results
    scores = [r for r in scores if r[1] != None]
    # sort configs by error, asc
    scores.sort(key=lambda tup: tup[1])
    return scores

In [15]:
# create a set of sarima configs to try
# no seasonal component by default, we need to have more than one years data for that
def sarima_configs(seasonal=[0], exog_data=None):
    models = list()
    # define config lists
    p_params = [0, 1, 2]
    d_params = [1]
    q_params = [0, 1, 2]
    t_params = ['n','c','t','ct']
    P_params = [0]
    D_params = [0]
    Q_params = [0]
    m_params = seasonal
    # create config instances
    for p in p_params:
        for d in d_params:
            for q in q_params:
                for t in t_params:
                    for P in P_params:
                        for D in D_params:
                            for Q in Q_params:
                                for m in m_params:
                                    cfg = [(p,d,q), (P,D,Q,m), t, exog_data]
                                    models.append(cfg)
    return models

In [None]:
# main program
if __name__ == '__main__':
    # define dataset
    data = temp_df['Temp'].values
    # data split (we have more than 3000 days)
    n_test = 265
    # model configs
    # To add seasonality of one year, we add 365, because we have daily data
    #cfg_list = sarima_configs(seasonal=[0, 12]) <- if we have monthly data
    cfg_list = sarima_configs(seasonal=[0],exog_data=None)
    print("Testing " + str(len(cfg_list)) + "models")
    # grid search
    scores = grid_search(data, cfg_list, n_test, parallel=False)
    #scores = grid_search(data, cfg_list, n_test)
    print('done')
    # list top 3 configs
    for cfg, error in scores[:3]:
        print(cfg, error)

Testing 72models
 > Model[(0, 0, 0), (0, 0, 0, 0), n] 10.891
 > Model[(0, 0, 0), (0, 0, 0, 0), c] 3.533
 > Model[(0, 0, 0), (0, 0, 0, 0), t] 7.348
 > Model[(0, 0, 0), (0, 0, 0, 0), ct] 3.584
 > Model[(0, 0, 1), (0, 0, 0, 0), n] 6.345
 > Model[(0, 0, 1), (0, 0, 0, 0), c] 2.757
 > Model[(0, 0, 1), (0, 0, 0, 0), t] 4.638
 > Model[(0, 0, 1), (0, 0, 0, 0), ct] 2.787
 > Model[(0, 0, 2), (0, 0, 0, 0), n] 4.732
 > Model[(0, 0, 2), (0, 0, 0, 0), c] 2.585
 > Model[(0, 0, 2), (0, 0, 0, 0), t] 3.736
 > Model[(0, 0, 2), (0, 0, 0, 0), ct] 2.606
 > Model[(0, 1, 0), (0, 0, 0, 0), n] 2.673
 > Model[(0, 1, 0), (0, 0, 0, 0), c] 2.673
 > Model[(0, 1, 0), (0, 0, 0, 0), t] 2.674
 > Model[(0, 1, 0), (0, 0, 0, 0), ct] 2.674
 > Model[(0, 1, 1), (0, 0, 0, 0), n] 2.558
 > Model[(0, 1, 1), (0, 0, 0, 0), c] 2.559
 > Model[(0, 1, 1), (0, 0, 0, 0), t] 2.598
 > Model[(0, 1, 1), (0, 0, 0, 0), ct] 2.599
 > Model[(0, 1, 2), (0, 0, 0, 0), n] 2.320
 > Model[(0, 1, 2), (0, 0, 0, 0), c] 2.321
 > Model[(0, 1, 2), (0, 0, 0, 0

Generation of the final model (this can take some minutes depending on the hardware)

In [13]:
data =  temp_df['Temp'].values
# data split
n_test = 265
# winning parameters
cfg = [(3,1,1), (0,0,0,0), 'n', None]
test, preds_df, md, results = walk_forward_validation2(data=data, n_test=n_test, cfg=cfg)
print(results.summary())

                               SARIMAX Results                                
Dep. Variable:                      y   No. Observations:                 3649
Model:               SARIMAX(3, 1, 1)   Log Likelihood               -8376.942
Date:                Sat, 16 May 2020   AIC                          16763.884
Time:                        18:19:04   BIC                          16794.890
Sample:                             0   HQIC                         16774.927
                               - 3649                                         
Covariance Type:                  opg                                         
                 coef    std err          z      P>|z|      [0.025      0.975]
------------------------------------------------------------------------------
ar.L1          0.4964      0.019     26.212      0.000       0.459       0.534
ar.L2         -0.1304      0.018     -7.096      0.000      -0.166      -0.094
ar.L3       5.042e-05      0.018      0.003      0.9

In [14]:
# saving the model
results.save('AUSTemp_SARIMA.pkl')