In [1]:
from datetime import datetime, timedelta

# grafico
import chart_studio
import chart_studio.plotly as py
import plotly.graph_objects as go

# manipulacion de dats
import numpy as np
import pandas as pd

# modelos
from scipy.optimize import curve_fit

# herramientas de ML
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import cross_val_score

In [2]:
df = pd.read_csv('c:\\Users\\juank\modelo_predictivo_COVID19\\Loja\\Dataset\\Enero-Marzo_2022_Serie temporal.csv')

In [3]:
casos = df.iloc[:,[0,-1]].groupby('provincia').sum()
fechaMasReciente = casos.columns[0]

casos = casos.sort_values(by = fechaMasReciente, ascending = False)
casos = casos[(casos[fechaMasReciente] >= 10)]
casos.reset_index(inplace=True)
casos = casos[casos['provincia'].isin(['Loja'])]
casos.set_index('provincia', inplace=True)
casos

Unnamed: 0_level_0,28/03/2022
provincia,Unnamed: 1_level_1
Loja,33266


In [6]:
def logistic(t, a, b, c, d):
    return c + (d - c)/(1 + a * np.exp(- b * t))

def exponential(t, a, b, c):
    return a * np.exp(b * t) + c

def linear_prediction(x_train, y_train, x_pred_len, x_range):
    x_samp, y_samp = np.reshape(x_train, (-1, 1)), np.reshape(y_train, (-1, 1))        # 1
    model = LinearRegression().fit(x_samp, y_samp)
    test_x = np.arange(x_pred_len)
    y_fit = model.predict(test_x.reshape (-1, 1))
    linear_plot = go.Scatter(
        x=x_range,
        y=np.round_(y_fit.reshape(y_fit.size)),
        mode='lines',
        name='Linear'
    )
    return y_fit, linear_plot

def logistic_prediction(x_train, y_train, x_pred_len, x_range):
    test_x = np.arange(x_pred_len)
    lpopt, lpcov = curve_fit(logistic, x_train, y_train, maxfev=10000)
    lerror = np.sqrt(np.diag(lpcov))

    # for logistic curve at half maximum, slope = growth rate/2. so doubling time = ln(2) / (growth rate/2)
    ldoubletime = np.log(2)/(lpopt[1]/2)
    # standard error
    ldoubletimeerror = 1.96 * ldoubletime * np.abs(lerror[1]/lpopt[1])

    # calculate R^2
    residuals = y_train - logistic(x_train, *lpopt)
    ss_res = np.sum(residuals**2)
    ss_tot = np.sum((y_train - np.mean(y_train))**2)
    logisticr2 = 1 - (ss_res / ss_tot)  
    
    y_fit = logistic(test_x, *lpopt)

    if logisticr2 > 0.95:
        logistic_plot = go.Scatter(
            x=x_range,
            y=np.round_(y_fit),
            mode='lines',
            name='Logistica'
        )
        print('\n** Con ajuste lineal**\n')
        print('\tR^2:', logisticr2)
        print('\tTiempo para duplicarse (ritmo actual): ', round(ldoubletime,2), '(±', round(ldoubletimeerror,2),') días')
        logisticworked = True
        return y_fit, logistic_plot
    return y_fit, None

def exponential_prediction(x_train, y_train, x_pred_len, x_range):
    epopt, epcov = curve_fit(exponential, x_train, y_train, bounds=([0,0,-100],[100,0.9,100]), maxfev=10000)
    eerror = np.sqrt(np.diag(epcov))

    # for exponential curve, slope = growth rate. so doubling time = ln(2) / growth rate
    edoubletime = np.log(2)/epopt[1]
    # standard error
    edoubletimeerror = 1.96 * edoubletime * np.abs(eerror[1]/epopt[1])

    # calculate R^2
    residuals = y_train - exponential(x_train, *epopt)
    ss_res = np.sum(residuals**2)
    ss_tot = np.sum((y_train - np.mean(y_train))**2)
    expr2 = 1 - (ss_res / ss_tot)
    
    test_x = np.arange(x_pred_len)
    y_fit = exponential(test_x, *epopt)

    if expr2 > 0.95:
        exponential_plot = go.Scatter(
            x=x_range,
            y=np.round_(y_fit),
            mode='lines',
            name='Exponencial'
        )
        print('\n** Con ajuste exponencial **\n')
        print('\tR^2:', expr2)
        print('\tTiempo para duplicarse (ritmo actual): ', round(edoubletime,2), '(±', round(edoubletimeerror,2),') días')
        exponentialworked = True
        return y_fit, exponential_plot
    return y_fit, None

def plotCases(dataframe, column, provincia):
    co = dataframe[dataframe[column] == provincia].iloc[:,4:].T.sum(axis = 1)
    co = pd.DataFrame(co)
    co.columns = ['Cases']
    co = co.loc[co['Cases'] > 0]
    
    # Define base training data
    y = np.array(co['Cases'])
    x = np.arange(y.size)
    # Define test size
    x_test_len = round(x.size * 3)
    test_x = np.arange(x_test_len)
    
    start_date = datetime.strptime(co.index[0], '%d/%m/%y').date()
    end_date = start_date + timedelta(days=x_test_len)
    x_range = pd.Series(np.arange(np.datetime64(str(start_date)), np.datetime64(str(end_date))))
    
    recentdbltime = float('NaN')
    
    # get linear model prediction and plot
    linear_y, linear_plot = linear_prediction(x, y, x_test_len, x_range)
    
#     difference = np.diff(a) / a[:-1] * 100.
#     print(f'Difference: {difference}')
#     print(f'Rate: {difference.mean()}')
    
    if len(y) >= 7:
        
        current = y[-1]
        lastweek = y[-8]
        
        if current > lastweek:
            print('\n** Basado en los datos de la última semana **\n')
            print('\tCasos confirmados en',co.index[-1],'\t',current)
            print('\tCasos confirmados en',co.index[-8],'\t',lastweek)
            ratio = current/lastweek
            print('\tProporción:',round(ratio,2))
            print('\tIncremento semanal:',round( 100 * (ratio - 1), 1),'%')
            dailypercentchange = round( 100 * (pow(ratio, 1/7) - 1), 1)
            print('\tIncremento diario:', dailypercentchange, '% por día')
            recentdbltime = round( 7 * np.log(2) / np.log(ratio), 1)
            print('\tTiempo que tarda en duplicarse (al ritmo actual):',recentdbltime,'días')

    original_data = go.Scatter(
        x=pd.date_range(start=start_date, end=datetime.strptime(co.index[-1], '%d/%m/%y').date()),
        y=y,
        mode='markers',
        name='Casos confirmados'
    )
    plot_data = []
    plot_data.append(original_data)
    plot_data.append(linear_plot)
    
    logisticworked = False
    exponentialworked = False
    
    test_x = np.arange(round(x.size * 2.1))
    
    
    logistic_y, logistic_plot = logistic_prediction(x, y, x_test_len, x_range)
    if logistic_plot:
        plot_data.append(logistic_plot)
    
    
    exponential_y, exponential_plot = exponential_prediction(x, y, x_test_len, x_range)
    if exponential_plot:
        plot_data.append(exponential_plot)
    
#     plt.title(country + ' Casos acumulados de COVID-19. (Actualizado a '+mostrecentdate+')', fontsize="x-large")
#     plt.xlabel('Días', fontsize="x-large")
#     plt.ylabel('Casos totales', fontsize="x-large")
#     plt.legend(fontsize="x-large")
#     plt.show()
    
    


    layout = dict(
        title = f'{country}',
        xaxis_type='date'
    )
    fig = go.Figure(data=plot_data, layout=layout)
    fig.show()
    # fig.write_html("result.html")
    # create online chart
    # py.plot(plot_data, layout=layout, auto_open=False)

    if logisticworked and exponentialworked:
        if round(logisticr2,2) > round(expr2,2):
            return [ldoubletime, ldoubletimeerror, recentdbltime]
        else:
            return [edoubletime, edoubletimeerror, recentdbltime]
            
    if logisticworked:
        return [ldoubletime, ldoubletimeerror, recentdbltime]
    
    if exponentialworked:
        return [edoubletime, edoubletimeerror, recentdbltime]
    
    else:
        return [float('NaN'), float('NaN'), recentdbltime]

In [5]:
topcountries = casos.index
inferreddoublingtime = []
recentdoublingtime = []
errors = []
countries = []
print('\n')

for c in topcountries:
    print(c)
    a = plotCases(df, 'provincia', c)
    if a:
        countries.append(c)
        inferreddoublingtime.append(a[0])
        errors.append(a[1])
        recentdoublingtime.append(a[2])
    print('\n')



Loja


ValueError: unconverted data remains: 22