# Coisas temporariamente descartadas de AR_Notebook

#### 0.* Testa modelo para estacionariedade e unit-root

In [23]:
def test_model(ts, window=30, lags=30):
    # Rolling statistics
    rolling_stats = roll_stats(ts=ts, window=window)
    plotscatter(rolling_stats, title='Rolling Statistics')

    # Stationarity test
    test_stationarity_kpss(ts['Value'])

    # Unit-root test
    test_unitroot_adf(ts['Value'])
    test_unitroot_phillips_perron(ts['Value'])

    # Plot ACF, PACF & QQ
    tsplot(ts['Value'], lags=lags)

## 0. Functions V - Being-validated

In [193]:
def outliers_modified_z_score(ys):
    '''Modified Z score for outliers'''
    threshold = 3.5

    median_y = np.median(ys)
    median_absolute_deviation_y = np.median([np.abs(y - median_y) for y in ys])
    modified_z_scores = [0.6745 * (y - median_y) / median_absolute_deviation_y
                         for y in ys]
    return np.where(np.abs(modified_z_scores) > threshold)

553

In [197]:
def outliers_iqr(ys):
    '''Outlier Interquartile Range'''
    quartile_1, quartile_3 = np.percentile(ys, [25, 75])
    iqr = quartile_3 - quartile_1
    lower_bound = quartile_1 - (iqr * 1.5)
    upper_bound = quartile_3 + (iqr * 1.5)
    return np.where((ys > upper_bound) | (ys < lower_bound))

996

## 0. Functions VI - Unused

In [15]:
def subtract_mean(ts, columns=None):
    '''Subtract the mean of one or more time series.'''
    if columns is None: columns = list(ts.columns)
    
    for col in columns:
        mu = ts[col].mean()
        ts[col] = ts[col] - mu
        
    return ts, mu

In [16]:
def best_model_order(ts, p_rng, q_rng):
    '''Selects ARMA parameters that best 
        
       Input:
       - ts: a time series
       - p_rng: values to be tested as number of AR terms
       - q_rng: values to be tested as number of MA terms 
       
       Outputs
       - AIC: a matrix whose entries (i,j) are the Akaike Information Criterion values for a model ARMA(i,j)
    '''
    ts = ts.dropna(axis=0,how='any')
    best_aic = np.inf 
    best_order = None
    
    # Some models raise an exception of dividing by NaN or 0
    np.seterr(divide='print', invalid='print')

    AIC = np.full(shape=(max(p_rng)+1,max(q_rng)+1), fill_value=np.nan)
    for i in p_rng:
        for j in q_rng:
            print(i,j, end=' / ')
            if i is 0 and j is 0: continue
            try:
                tmp_mdl = sm.tsa.ARIMA(ts, order=(i,j)).fit(method='mle', trend='n', maxiter=300)
                tmp_aic = tmp_mdl.aic
                AIC[i,j] = tmp_aic
                if tmp_aic < best_aic:
                    best_aic = tmp_aic
                    best_order = (i, 0, j)
            except: continue
    print('AIC: {:.2f} | order: {}'.format(best_aic, best_order))                    
    return AIC

In [17]:
def generate_arima_garch_series(n=1000, p=0, d=0, q=0, m=0, s=0, phi=None,theta=None, alpha=None,beta=None, seed=None):
    '''Generate a time series with ARIMA and GARCH effects.'''
    # Define seed
    if seed is not None: np.random.seed(seed)
        
    # Coeficientes da parte AR
    if phi is None:
        char_roots = np.array([10])
        while any(char_roots > 1): # Condicao de estacionariedade (se nao for respeitada, diverge a serie)
            phi = np.random.rand(p+1)*2 - 1 # inclui phi_0 (constante)
            char_roots = np.abs(1 / np.roots(np.r_[np.flip(-phi[1:],axis=-1),[1]]))
    
    # Coeficientes da parte MA
    if theta is None:
        theta_roots = np.array([0])
        while any(theta_roots < 1):
            theta = np.r_[[1],(np.random.rand(q)*2 - 1)] # inclui theta_0 = 1
            tmp_theta = -np.flip(theta, axis=-1)
            tmp_theta[-1] = 1  # ajusta elemento 
            theta_roots = np.abs(np.roots(tmp_theta))
        
    # Ruido branco
    if m>0 or s>0:
        a, alpha, beta = generate_garch_process(n=n, m=m, s=s, alpha=alpha, beta=beta) # GARCH process residuals
    else:
        a = np.random.randn(n) # white noise residuals

    # Calcula partes referentes ao MA
    r_ma = np.convolve(a, theta, 'full') 
    if q>0: r_ma = r_ma[:-q]
    
    # Calcula parte referente ao AR e soma com a parte do MA
    r = np.ones(n)
    for i in np.arange(n):
        past_values = r[max(0,i-p):i+1]
        coeff = phi[0:min(i+1,p+1)]
        r[i] = np.convolve(past_values, coeff, 'valid') + r_ma[i] # adds shock
    
    # Realiza d integracoes ('I' do 'ARIMA')
    for i in range(d):
        r = r.cumsum()
    
    return_dict = {'r':r, 'phi':phi, 'theta':theta, 'a':a, 'alpha':alpha, 'beta':beta}
    
    return return_dict

In [18]:
def generate_garch_process(n=1000, m=1, s=1, alpha=None, beta=None, seed=None):
    '''Generate a GARCH process.'''
    # Define seed
    if seed is not None: np.random.seed(seed)
        
    # Coeficientes da parte AR *do GARCH*
    if alpha is None:
        alpha = np.random.rand(m+1) # constante (alpha_0) + coeficientes alpha_i de a_{t-i}, i=1,...,m
        
    # Coeficientes da parte MA *do GARCH*
    if beta is None:
        beta = np.random.rand(s)    # coeficientes beta_j de sigma2_{t-j}, j=1,...,s
        
    # Condicao de validade:
    if alpha[0] > 1: alpha[0] = np.random.rand()
    for i in range(max(m,s)):
        c1 = c2 = 0
        if i<m: c1 = alpha[i+1] # se ainda houver coeficiente alpha
        if i<s: c2 = beta[i]    # se ainda houver coeficiente beta
        if c1+c2 >= 1:
            delta = np.random.rand()*0.2
            c1 = c1/(c1+c2) - delta
            c2 = c2/(c1+c2) - delta
        if i<m: alpha[i+1] = c1 # se ainda houver coeficiente alpha
        if i<s: beta[i] = c2    # se ainda houver coeficiente beta
        
    # Ruido branco
    eps = np.random.randn(n)
        
    # Shocks 
    a = np.ones(n) # importante ser inicializado como 1
    sigma2 = np.ones(n)
    for t in np.arange(n):
        coef_shocks = alpha[0:min(t+1,m+1)]                   # [alpha_0, alpha_1, alpha_2, ...]
        past_shocks = np.flip(a[max(0,t-m):t+1], axis=-1)     # flip([..., a_{t-2}, a_{t-1}, 1])
        coef_sigmas = beta[0:min(t,s)]                        # [beta_1, beta_2, ...]
        past_sigmas = np.flip(sigma2[max(0,t-s):t], axis=-1)  # flip([..., sigma_{t-2}, sigma_{t-1}])
        
        # Sigma^2_t
        sigma2[t] = np.sqrt(np.inner(coef_shocks,past_shocks**2) + np.inner(coef_sigmas,past_sigmas**2))
        
        # a_t
        a[t] = sigma2[t] * eps[t] # multiplica pelo ruido
        
    return a, alpha, beta

In [67]:
def get_confusion_matrix(ts_true, ts_pred):
    df = pd.DataFrame(data={'Real': ts_true, 'Pred': ts_pred})
    df = df.dropna(how='any', axis=0)

    conf_dict = {'TP': np.sum(df['Pred'][df['Real'] ==  1] ==  1), 
                 'TN': np.sum(df['Pred'][df['Real'] == -1] == -1), 
                 'FP': np.sum(df['Pred'][df['Real'] == -1] ==  1), 
                 'FN': np.sum(df['Pred'][df['Real'] ==  1] == -1)
                }

    print('{:20s}{:20s}{:20s}'.format('','Real Positive', 'Real Negative'))
    print('{:20s}{:10d}{:20d}'.format('Predicted Positive',conf_dict['TP'], conf_dict['FP']))
    print('{:20s}{:10d}{:20d}'.format('Predicted Negative',conf_dict['FN'], conf_dict['TN']))
    return conf_dict

In [45]:
def quality_metrics(conf_dict):
    TP, TN, FP, FN = conf_dict['TP'], conf_dict['TN'], conf_dict['FP'], conf_dict['FN'] 
    metrics = {'Accuracy':    float((TP + TN)/(TP + TN + FP + FN)),
               'Precision':   float((TP)/(TP + FP)),
               'Recall':      float((TP)/(TP + FN)), # similar to sensitivity
               'Specificity': float((TN)/(TN + FP)),
               'Sensitivity': float((TP)/(TP + FN)) # similar to recall
              }
    return metrics