In [1]:
import pandas as pd
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import mean_squared_log_error, mean_squared_error
from tqdm.notebook import tqdm
from scipy.special import expit
from matplotlib import pyplot as plt
import scipy.optimize as opt
import plotly.express as px
import plotly
from plotly.subplots import make_subplots
from sklearn.metrics import r2_score
import plotly.graph_objects as go


regional_url = "https://raw.githubusercontent.com/pcm-dpc/COVID-19/master/dati-regioni/dpc-covid19-ita-regioni.csv"
national_url = "https://raw.githubusercontent.com/pcm-dpc/COVID-19/master/dati-andamento-nazionale/dpc-covid19-ita-andamento-nazionale.csv"


Il notebook si interfaccia con il portale github del Consiglio dei Ministri ed attinge agli ultimi dati disponibili.

In [3]:
if 'national_get' not in locals():
    national_get = pd.read_csv(national_url, sep = ",")
    
if 'regional_get' not in locals():
    regional_get = pd.read_csv(regional_url, sep =",")

# Stima Modello - Metodologia

Per la stima del modello è stata utilizzata una logistica, i cui parametri sono stati ottimizzati tramite least squared. L'ottimizzazione viene ripetuta su 100 set di parametri iniziali estratti casualmente, in modo tale da ottenere convergenza.

Il Modello usato è:

![image](https://wikimedia.org/api/rest_v1/media/math/render/svg/6f42e36c949c94189976ae00853af9a1b618e099)

I parametri stimati sono:

- L : Valore di Picco (Massimo valore che si raggiungerà)
- X0 : Punto di Flesso (giorno in cui si avrà un'inversione nella crescita)
- k : Growth Rate (Tasso di crescita)

Stimando i tre parametri giornalmente è possibile rappresentare queste informazioni come una serie temporale, con lo scopo di vedere come si modificano con il passare del tempo.

Verranno quindi rappresentate le serie di questi tre parametri e le variazioni percentuali giornaliere (assoluta nel caso di X0).

Nei grafici a seguire saranno rappresentate le evoluzioni temporali delle tre misure (nel caso del punto di flesso, viene rappresentato il numero di giorni che mancano per raggiungerlo).

In [26]:
def find_doubling_time(sequence_, function, *param):
    T_ = max(sequence_)
    new_T = list(range(T_, T_ + 1000))
    predicted_ = function(T_, *param)
    simulation_ = [function(x, *param) for x in new_T]
    double_T = new_T[np.argmax(simulation_//predicted_ == 2)]
    return(double_T - T_)

def sigmoid(t, *param):
    M, beta, alpha, const = param[0], param[1], param[2], param[3]
    return M / (1 + np.exp(-beta * (t-alpha))) + const

def exponential(t, *param):
    M, beta = param[0], param[1]
    return(M * np.exp(beta * t))

def predict(function, y, *param):
    sequence_ = list(range(sum(y == 0), len(y)))
    predicted_ = function(sequence_, *param)
    return(predicted_)

def inital_point_(function, y, number_iteration = 100, verbose = False, range_M = (1, 100000),
                  range_beta = (0, 5), range_alpha = (0, 1000), range_const = (0, 100)):
    
    if range_alpha is None:
        bounds = ([range_M[0], range_beta[0]], [range_M[1], range_beta[1]])
    else:
        bounds = ([range_M[0], range_beta[0], range_alpha[0], range_const[0]],
                  [range_M[1], range_beta[1], range_alpha[1], range_const[1]])

    score = +np.inf
    
    for i in range(number_iteration):
        M = np.random.uniform(low = range_M[0], high = range_M[1])
        beta = np.random.uniform(low = range_beta[0], high = range_beta[1])
        const = np.random.uniform(low = range_const[0], high = range_const[1])
        
        if range_alpha is not None:
            alpha = np.random.uniform(low = range_alpha[0], high = range_alpha[1])
            p0 = [M, beta, alpha, const]
        else:
            p0 = [M, beta, const]
            
        sequence_ = list(range(sum(y == 0), len(y)))
        
        popt, _ = opt.curve_fit(function, sequence_, 
                           y, p0 = p0, bounds = bounds, maxfev = 10000)
        
        fun = mean_squared_log_error(y, [function(x, *popt) for x in sequence_])
        
        if fun< score:
            score = fun
            model = popt
            time = find_doubling_time(sequence_, function, *popt)
            if verbose:
                print(fun)
    return(model, score, time)

def GET_stats(data, date_list, function, begin_time = 20, **kwargs):
    DATA_ = []
    MAX_num = []
    DAY_to_half = []
    GROWTH_param = []
    SCORE_ = []
    CONST_ = []
    R_SQUARED = []
    
    LEN_ = len(data)+1
    
    for i, step in tqdm(enumerate(range(begin_time, LEN_))):
        temp = data[:step]

        with np.errstate(over = 'ignore', invalid = 'ignore'):
            model, score, double_time = inital_point_(**kwargs, function = function, y = temp)
            if len(model) > 3:
                M, beta, alpha,const = model
                missing_time = alpha - len(temp) #alpha è il punto di flesso, togliendo il numero di giorni presenti si ottiene i giorni mancanti al flesso
                DAY_to_half += [missing_time]
            else:
                M, beta, const = model
                
            DATA_ += [date_list.loc[step - 1]]
            MAX_num += [M]
            GROWTH_param += [beta]
            SCORE_ += [score]
            CONST_ += [const]
            R_SQUARED += [r2_score(temp, predict(sigmoid, temp, *model))]
            
    if len(model) > 3:
        time_varying_info = pd.DataFrame.from_dict({'Data': DATA_, 'Picco_casi': MAX_num, 'Day_until_half': DAY_to_half,
                                         'Growth': GROWTH_param, 'score': SCORE_, 'const': CONST_, 'R_2': R_SQUARED})
    else:
        time_varying_info = pd.DataFrame.from_dict({'Data': DATA_, 'Picco_casi': MAX_num,
                                         'Growth': GROWTH_param, 'score': SCORE_, 'const': CONST_, 'R_2': R_SQUARED})
        
    return(time_varying_info)

def run_fit(data, label, function = sigmoid):
    data = data[label].values
    kwargs = {'range_M' : (1, max(data) * 10)}

    date_list = pd.to_datetime(national_get.data).dt.date.astype(str)

    df = GET_stats(data, date_list, sigmoid, **kwargs)
    return(df)

def get_change(df):
    temp = df[['Picco_casi', 'Growth']].pct_change()*100
    temp['Day_until_half'] = df['Day_until_half'].diff(1)
    df = pd.concat([df['Data'], temp], axis = 1).dropna()
    return(df)

def print_plot(df, title):
    
    #GRAFICI DATI PUNTUALI
    fig1 = go.Scatter(x = df['Data'], y = df['Day_until_half'], mode='lines',
                      name ='Numero di Giorni Mancanti a Metà del Picco')

    fig2 = go.Scatter(x = df['Data'], y = df['Growth'], mode='lines', name = 'Growth Rate')

    fig3 = go.Scatter(x = df['Data'], y = df['Picco_casi'], mode='lines', name = 'Picco dei Casi')

    
    #GRAFICI CAMBIAMENTI PERCENTUALI
    pct_df = get_change(df)
    
    fig4 = go.Scatter(x = pct_df['Data'], y = pct_df['Day_until_half'], mode='lines',
                  name ='Numero di Giorni Mancanti a Metà del Picco')

    fig5 = go.Scatter(x = pct_df['Data'], y = pct_df['Growth'], mode='lines', name = 'Growth Rate')

    fig6 = go.Scatter(x = pct_df['Data'], y = pct_df['Picco_casi'], mode='lines', name = 'Picco dei Casi')

    fig = make_subplots(rows=2, cols=3, subplot_titles=("Giorni Mancanti al punto di Flesso",
                                                        "Growth Rate", "Picco dei Casi",
                                                        "Cambiamento Assoluto",
                                                        "Cambiamento % Giornaliero", "Cambiamento % Giornaliero"))

    fig.add_trace(fig1, row=1, col=1)
    fig.add_trace(fig2, row=1, col=2)
    fig.add_trace(fig3, row=1, col=3)

    fig.add_trace(fig4, row=2, col=1)
    fig.add_trace(fig5, row=2, col=2)
    fig.add_trace(fig6, row=2, col=3)
    
    fig.update_layout(title_text = title, showlegend = False)

    fig.show()


# Analisi Dati Nazionali

## Totale Casi

In [27]:
# df = run_fit(national_get, 'totale_casi')
print_plot(df, 'Grafici: Totale dei Casi')

## Deceduti

In [28]:
df = run_fit(national_get, 'deceduti')
print_plot(df, title = 'Grafici: Deceduti')


HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))




## Ricoverati con Sintomi

In [29]:
df = run_fit(national_get, 'ricoverati_con_sintomi')
print_plot(df, 'Grafici: Ricoverati con Sintomi')


HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))




# Analisi Dati Regionali

In [30]:
# SELEZIONARE LA REGIONE
region = 'Lombardia'
region_temp = regional_get[regional_get['denominazione_regione'] == region]

## Totale Casi

In [31]:
df = run_fit(region_temp, 'totale_casi')
print_plot(df, 'Grafici: Totale dei Casi')


HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))




## Deceduti

In [32]:
df = run_fit(region_temp, 'deceduti')
print_plot(df,'Grafici: Deceduti')


HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))




## Ricoverati Con Sintomi

In [33]:
df = run_fit(region_temp, 'ricoverati_con_sintomi')
print_plot(df, 'Grafici: Ricoverati con Sintomi')


HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))


