In [None]:
 from google.colab import drive
 drive.mount('/content/drive')

In [None]:
from pandas import read_csv
from pandas import read_excel
from matplotlib import pyplot

from math import sqrt
from multiprocessing import cpu_count
from joblib import Parallel
from joblib import delayed
from warnings import catch_warnings
from warnings import filterwarnings
from statsmodels.tsa.statespace.sarimax import SARIMAX
from sklearn.metrics import mean_squared_error

In [None]:
# load the dataset
ita_complete = ita_complete = read_csv("/content/drive/My Drive/Colab Notebooks/dpc-covid19-ita-andamento-nazionale.csv")
data=ita_complete[["date","totale_positivi","deceduti"]]
data.rename(columns={'totale_positivi':'total_pos', 'deceduti':'deaths'}, inplace=True)
series=data.total_pos

In [None]:
# grid search sarima hyperparameters

# one-step sarima forecast
def sarima_forecast(history, config):
  order, sorder, trend = config
  # define model
  model = SARIMAX(history, order=order, seasonal_order=sorder, trend=trend, 
                  enforce_stationarity=False, enforce_invertibility=False)
  # fit model
  model_fit = model.fit(disp=False)
  # make one step forecast
  yhat = model_fit.predict(len(history), len(history))
  return yhat[0]

# root mean squared error or rmse
def measure_rmse(actual, predicted):
  return sqrt(mean_squared_error(actual, predicted))

# split a univariate dataset into train/test sets
def train_test_split(data, n_test):
  return data[:-n_test], data[-n_test:]

# walk-forward validation for univariate data
def walk_forward_validation(data, n_test, cfg):
  predictions = list()
  # split dataset
  train, test = train_test_split(data, n_test)
  # seed history with training dataset
  history = [x for x in train]
  # step over each time-step in the test set
  for i in range(len(test)):
    # fit model and make forecast for history
    yhat = sarima_forecast(history, cfg)
    # store forecast in list of predictions
    predictions.append(yhat)
    # add actual observation to history for the next loop
    history.append(test[i])
  # estimate prediction error
  error = measure_rmse(test, predictions)
  return error

# score a model, return None on failure
def score_model(data, n_test, cfg, debug=False):
  result = None
  # convert config to a key
  key = str(cfg)
  # show all warnings and fail on exception if debugging
  if debug:
    result = walk_forward_validation(data, n_test, cfg)
  else:
    # one failure during model validation suggests an unstable config
    try:
      # never show warnings when grid searching, too noisy
      with catch_warnings():
        filterwarnings("ignore")
        result = walk_forward_validation(data, n_test, cfg)
    except:
      error = None
  # check for an interesting result
  if result is not None:
    print(' > Model[%s] %.3f' % (key, result))
  return (key, result)

# grid search configs
def grid_search(data, cfg_list, n_test, parallel=True):
  scores = None
  if parallel:
    # execute configs in parallel
    executor = Parallel(n_jobs=cpu_count(), backend='multiprocessing')
    tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list)
    scores = executor(tasks)
  else:
    scores = [score_model(data, n_test, cfg) for cfg in cfg_list]
  # remove empty results
  scores = [r for r in scores if r[1] != None]
  # sort configs by error, asc
  scores.sort(key=lambda tup: tup[1])
  return scores

# create a set of sarima configs to try
def sarima_configs(seasonal=[0]):
  models = list()
  # define config lists
  p_params = [0, 1, 2]
  d_params = [0, 1, 2]
  q_params = [0, 1, 2]
  t_params = ['n','c','t','ct']
  P_params = [0, 1, 2]
  D_params = [0, 1, 2]
  Q_params = [0, 1, 2]
  m_params = seasonal
  # create config instances
  for p in p_params:
    for d in d_params:
      for q in q_params:
        for t in t_params:
          for P in P_params:
            for D in D_params:
              for Q in Q_params:
                for m in m_params:
                  cfg = [(p,d,q), (P,D,Q,m), t]
                  models.append(cfg)
  return models

if __name__ == '__main__':
  # define dataset
  series=data.total_pos.values
  # data split
  n_test = 44
  # model configs
  cfg_list = sarima_configs(seasonal=[0,7])
  # grid search
  scores = grid_search(series, cfg_list, n_test)
  print('done')
  # list top 10 configs
  for cfg, error in scores[:10]:
    print(cfg, error)

 > Model[[(0, 0, 0), (0, 0, 0, 7), 'n']] 32882.252
 > Model[[(0, 0, 0), (0, 0, 0, 0), 'n']] 32882.252
 > Model[[(0, 0, 0), (0, 0, 1, 0), 'n']] 16437.056
 > Model[[(0, 0, 0), (0, 0, 1, 7), 'n']] 14010.072
 > Model[[(0, 0, 0), (0, 0, 2, 0), 'n']] 9388.191
 > Model[[(0, 0, 0), (0, 1, 0, 7), 'n']] 8636.926
 > Model[[(0, 0, 0), (0, 1, 1, 7), 'n']] 5844.766
 > Model[[(0, 0, 0), (0, 0, 2, 7), 'n']] 34777.666
 > Model[[(0, 0, 0), (0, 2, 0, 7), 'n']] 3133.956
 > Model[[(0, 0, 0), (0, 2, 1, 7), 'n']] 4132.431
 > Model[[(0, 0, 0), (0, 1, 2, 7), 'n']] 6063.449
 > Model[[(0, 0, 0), (1, 0, 0, 0), 'n']] 1270.318
 > Model[[(0, 0, 0), (1, 0, 0, 7), 'n']] 8493.572
 > Model[[(0, 0, 0), (1, 0, 1, 0), 'n']] 883.630
 > Model[[(0, 0, 0), (1, 0, 1, 7), 'n']] 5859.474
 > Model[[(0, 0, 0), (0, 2, 2, 7), 'n']] 3699.636
 > Model[[(0, 0, 0), (1, 0, 2, 0), 'n']] 801.863
 > Model[[(0, 0, 0), (1, 1, 0, 7), 'n']] 2604.372
 > Model[[(0, 0, 0), (1, 0, 2, 7), 'n']] 5949.630
 > Model[[(0, 0, 0), (1, 1, 1, 7), 'n']] 3951.4

In [None]:
# analisi ripetuta su dati owid (test su ultimi 80)

In [None]:
ita_owid=read_excel("/content/drive/My Drive/Colab Notebooks/ita_owid.xlsx", index_col=0)

In [None]:
if __name__ == '__main__':
  # define dataset
  series=ita_owid.cases.values
  # data split
  n_test = 80
  # model configs
  cfg_list = sarima_configs()
  # grid search
  scores = grid_search(series, cfg_list, n_test)
  print('done')
  # list top 3 configs
  for cfg, error in scores[:3]:
    print(cfg, error)

 > Model[[(0, 0, 0), (0, 0, 0, 0), 'n']] 379.613
 > Model[[(0, 0, 0), (0, 0, 1, 0), 'n']] 302.432
 > Model[[(0, 0, 0), (1, 0, 0, 0), 'n']] 139.513
 > Model[[(0, 0, 0), (1, 0, 1, 0), 'n']] 135.516
 > Model[[(0, 0, 0), (1, 0, 2, 0), 'n']] 135.579
 > Model[[(0, 0, 0), (2, 0, 0, 0), 'n']] 135.412
 > Model[[(0, 0, 0), (0, 0, 2, 0), 'n']] 214.720
 > Model[[(0, 0, 0), (2, 0, 1, 0), 'n']] 135.309
 > Model[[(0, 0, 0), (0, 0, 0, 0), 'c']] 974.194
 > Model[[(0, 0, 0), (0, 0, 1, 0), 'c']] 559.859
 > Model[[(0, 0, 0), (2, 0, 2, 0), 'n']] 136.842
 > Model[[(0, 0, 0), (0, 0, 2, 0), 'c']] 381.106
 > Model[[(0, 0, 0), (1, 0, 0, 0), 'c']] 139.273
 > Model[[(0, 0, 0), (1, 0, 1, 0), 'c']] 137.665
 > Model[[(0, 0, 0), (1, 0, 2, 0), 'c']] 269.101
 > Model[[(0, 0, 0), (2, 0, 0, 0), 'c']] 135.092
 > Model[[(0, 0, 0), (2, 0, 1, 0), 'c']] 483.395
 > Model[[(0, 0, 0), (0, 0, 0, 0), 't']] 1743.274
 > Model[[(0, 0, 0), (0, 0, 1, 0), 't']] 952.452
 > Model[[(0, 0, 0), (2, 0, 2, 0), 'c']] 195.242
 > Model[[(0, 0, 0)

In [None]:
  for cfg, error in scores[:10]:
    print(cfg, error)

[(1, 1, 1), (2, 0, 2, 0), 'n'] 127.87694093765667
[(1, 1, 1), (2, 0, 2, 0), 'c'] 128.2573038397775
[(2, 1, 2), (1, 0, 0, 0), 'n'] 128.50307613839433
[(0, 1, 1), (2, 0, 2, 0), 'n'] 128.64753323327082
[(1, 1, 0), (2, 0, 2, 0), 'c'] 128.65016756914105
[(2, 1, 2), (1, 0, 0, 0), 'c'] 128.71031016581213
[(2, 1, 2), (1, 0, 1, 0), 'c'] 129.07193258750402
[(1, 1, 0), (2, 0, 2, 0), 'n'] 129.10963222354457
[(0, 1, 1), (2, 0, 2, 0), 'c'] 129.45310337112764
[(2, 1, 2), (0, 0, 1, 0), 'c'] 130.19731014552855
