In [69]:
import pandas as pd
import hvplot.pandas
import holoviews as hv
from holoviews import opts
import panel as pn
import numpy as np
from datetime import timedelta  
from scipy.optimize import curve_fit
from bokeh.models.formatters import DatetimeTickFormatter
pn.extension()
pd.options.plotting.backend = 'holoviews'
# Ignore warnings
import warnings
warnings.filterwarnings('ignore')

# Graphing options
hv.renderer('bokeh').theme = 'light_minimal' # Graph style
def_opts = {'width': 500, 'height': 400, 'padding': 0.1, 'shared_axes': False, 
            'yformatter': '%d', 'xlabel': 'Days since Feb 24'}
opts.defaults(opts.Scatter(**def_opts), opts.Curve(**def_opts))

## Data import

For this part we take data from Protezione Civile, which are more reliable.
Source: https://github.com/pcm-dpc/COVID-19/tree/master/dati-andamento-nazionale

Ideas for analysis:

[1] https://www.researchgate.net/publication/339240777_Estimation_of_the_final_size_of_coronavirus_epidemic_by_the_logistic_model

[2] https://www.researchgate.net/publication/339311383_Estimation_of_the_final_size_of_the_coronavirus_epidemic_by_the_SIR_model

In [53]:
dati_italia = 'https://raw.githubusercontent.com/pcm-dpc/COVID-19/master/dati-andamento-nazionale/dpc-covid19-ita-andamento-nazionale.csv'
columns = ['totale_casi', 'deceduti', 'dimessi_guariti']
Italia = pd.read_csv(dati_italia, usecols = columns)
Italia.columns = ['guar', 'dec', 'tot']
Italia = Italia[['tot', 'guar', 'dec']]

opts = {'kind': 'scatter'}
Italia.tot.plot(label = 'Total cases', ylabel= 'Number of cases', **opts)+ \
Italia.tot.diff().plot(label = 'New cases', **opts)

### Logistic model

In [10]:
def sigmoid(t, K, A, r):
    return K/(1+A*np.exp(-r*(t)))

extra_days = 30    # How much we extend the fit beyond the current date for prediction
m = 10
n_values = [2, 1, 0]
K_values = []
for n in n_values:
    # Last 3 number of cases, starting from n before the last dates
    N = Italia.index.size
    C2m, Cm, C = Italia.tot[N-n-2*m], Italia.tot[N-n-m], Italia.tot[N-n-1]
    K_guess = Cm*(C2m*Cm-2*C2m*C+Cm*C)/(Cm**2-C*C2m)
    r_guess = 1/m*np.log(C*(Cm-C2m)/(C2m*(C-Cm)))
    A_guess = (C-Cm)*(Cm-C2m)/(Cm**2-C*C2m)*(C/C2m*(Cm-C2m)/(C-Cm))**((Italia.index.size-m)/m)
    init_guesses = [K_guess, A_guess, r_guess]   
    
    # The index is in datetime format, with which I can't fit
    # -> I consider days from beginning (0,1,2,3,...) as index
    popt, pcov = curve_fit(sigmoid, Italia.index.values[0:N-n],
                           Italia.tot[0:N-n].values, 
                           p0 = init_guesses)
    K_values.append(popt[0])

fit = pd.Series(index = [i for i in range(0, N-1+extra_days)], 
                data = sigmoid(np.arange(0, Italia.index.size+extra_days-1, 1), *popt))

Italia.tot.plot(kind = 'scatter', width = 600)*fit.plot(color = 'red', label = 'Fit', width = 600)

Calculate the expected K with Shank's transformation

In [11]:
K_min1, K_0, K_plus1 = K_values[0], K_values[1], K_values[2]
K_pred = (K_plus1*K_min1 - K_0**2)/(K_plus1-2*K_0+K_min1)
K_pred

119064.54154491905

Now fit again with the newly predicted K value

In [116]:
def sigmoid(t, K, A, r):                # Function used for fitting
    return K/(1+A*np.exp(-r*(t)))

extra_days = 30            # How much we extend the fit beyond the current date for prediction
N = Italia.index.size      
J = 7                      # N-J will be the number of K values measured with Shanks Transformation: i.e. I will fit the curve from index 0 to N-J, then to N-J+1, N-J+2, etc.
m = 10                     # From Equations (11), (12), (13) of [1]. I find the best initial guesses based on the results at k-2m, k-m and k
K_values = []
fits = pd.DataFrame(index = [i for i in range(N+1+extra_days)])
layout = pn.Column()       # In each cycle I append to this layout the plot fit

# In each cycle I perform the fit of the curve stopping at index n, with N-J<n<N-1
for n in range(N-J, N):
    # Find best initial guesses, based on (11), (12), (13) of [1]
    C2m, Cm, C = Italia.tot[n-2*m], Italia.tot[n-m], Italia.tot[n]
    K_guess = Cm*(C2m*Cm-2*C2m*C+Cm*C)/(Cm**2-C*C2m)
    r_guess = 1/m*np.log(C*(Cm-C2m)/(C2m*(C-Cm)))
    A_guess = (C-Cm)*(Cm-C2m)/(Cm**2-C*C2m)*(C/C2m*(Cm-C2m)/(C-Cm))**((n-2-m)/m)  # t_k = n-2. E.g. when n=N-1
    init_guesses = [K_guess, A_guess, r_guess]   
    # Fit
    popt, pcov = curve_fit(sigmoid, Italia.index.values[0:n+1],    # Have to put n+1 because in this cycle N-J<=n<=N, then when I use [] in a df, the last element is not considered
                           Italia.tot[0:n+1].values, 
                           p0 = init_guesses)
    K_values.append(popt[0])
    # Create fit curves that extend by 'extra_days' beyond the current date
    fit = pd.Series(index = [i for i in range(N+extra_days)], 
                             data = sigmoid(np.arange(0, N+extra_days), *popt))
    fit.name = 'n = '+str(n)
    fits = fits.join(fit)        # I put all of them in a single df

plt_pred = Italia.tot.plot(kind = 'scatter')*fit.plot(color = 'red', label = 'Fit')
plt_pred.opts(padding = 0.1, width = 600, height = 400, yformatter = '%d', 
              xlabel = ' ')

data_list = [Italia.tot.iloc[:i+1].plot(kind = 'scatter') for i in range(N-J, N+1)]
fit_list = [fits.iloc[:, i].plot(c = 'r', label = 'Fit') for i in range(J)]

fits.plot(color = hv.Cycle('PuRd'), width = 700)
hv.Layout(data_list).cols(2)



In [66]:
K = pd.DataFrame(data =K_values, columns=['K'], index = [n for n in range(N-J, N)])
K.plot.bar(label = 'Estimated final size of epidemic', ylabel = '', 
           xlabel = 'Days since Feb 24', yformatter = '%d', height = 400)

In [67]:
def ShankTransform (S): # S is a pd.Series
    S_shank = S.dropna().copy()
    S_shank.drop([S_shank.index[0], S_shank.index[-1]], inplace = True)  # Delete first and last rows, which will contain NaNs
    for n in S_shank.index.values:
        S_shank[n] = (S[n+1]*S[n-1]-S[n]**2) / (S[n+1]-2*S[n]+S[n-1])
    S_shank.name = 'R('+S.name+')'
    return S_shank

K_shank = pd.DataFrame(data = K_values, index = [n for n in range(N-J, N)], columns = ['K'])
for i in range(len(K_values)//2):
    K_shank = K_shank.join(ShankTransform(K_shank.iloc[:,i]))
K_shank

Unnamed: 0,K,R(K),R(R(K)),R(R(R(K)))
22,64710.259571,,,
23,67488.05639,63702.798305,,
24,77924.862474,48977.19714,73628.726044,
25,94246.156133,12405.443616,40040.070078,63641.00893
26,114633.185284,125489.518697,119409.940744,
27,121717.198258,119064.52285,,
28,117476.585412,,,
