In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import joblib
from scipy import stats
import seaborn as sns
import math
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import warnings
import os

In [4]:
# ------------------ FUNÇÕES GERAIS---------------------------------------

# ---------------função para agrupar dados por RGI--------------------------
def filtraMun(df):
    dadosAGP = df.groupby(['co_ibge','nome_rgi','UF','ano','epiweek'])['atend_arbov'].sum()
    dataAGP = pd.DataFrame(dadosAGP)
    dataAGP.reset_index(inplace=True)
    return dataAGP

# ---------------funções para calcular estatísticas EARS C1, C2 e C3 com limites --------------------------

def calculate_c1(observations, baseline, alpha):
    c1_stats = np.full(len(observations), np.nan)  # Inicializa com NaN
    c1_alarms = np.full(len(observations), 'Não')
    c1_upper_bounds = np.full(len(observations), np.nan)  # Inicializa com NaN

    # Cálculo do quantil da distribuição normal padrão para um teste de uma cauda
    z_alpha = stats.norm.ppf(1 - alpha)

    for t in range(baseline, len(observations)):
        mean_baseline = np.mean(observations[t-baseline:t]) # Calcula a Média Móvel
        # Ajuste para usar o denominador específico da descrição
        std_baseline = np.sqrt(np.sum((observations[t-baseline:t] - mean_baseline)**2) / (baseline - 1) )  # Calcula o Desvio Padrão

        # Prevenindo divisão por zero
        if std_baseline > 0:
            c1_stats[t] = (observations[t] - mean_baseline) / std_baseline # Calcula o C1
            c1_upper_bounds[t] = mean_baseline + z_alpha * std_baseline # Calcula o Limite crítico
            c1_upper_bounds[t] = np.rint(c1_upper_bounds[t]).astype(int)

            # Condição para acionar o alarme
            if ((c1_stats[t] >= z_alpha) and (observations[t] > c1_upper_bounds[t])) :
                c1_alarms[t] = 'Sim'
        else:
            c1_stats[t] = 0

    return c1_stats, c1_alarms, c1_upper_bounds


def calculate_c2(observations, baseline, alpha):
    c2_stats = np.full(len(observations), np.nan)
    c2_alarms = np.full(len(observations), 'Não')
    c2_upper_bounds = np.full(len(observations), np.nan)

    z_alpha = stats.norm.ppf(1 - alpha)

    # Ajuste: O loop deve começar em 'baseline + 2'
    for t in range(baseline + 2, len(observations)):
        baseline_data = observations[t - baseline - 2:t - 2]
        mean_adjusted_baseline = np.mean(baseline_data)

        # Denominador atualizado para 'baseline - 1'
        std_adjusted_baseline = np.sqrt(np.sum((baseline_data - mean_adjusted_baseline) ** 2) / (baseline - 1))

        if std_adjusted_baseline > 0:
            c2_stats[t] = (observations[t] - mean_adjusted_baseline) / std_adjusted_baseline
            c2_upper_bounds[t] = mean_adjusted_baseline + z_alpha * std_adjusted_baseline
            c2_upper_bounds[t] = np.rint(c2_upper_bounds[t]).astype(int)

            if ((c2_stats[t] >= z_alpha) and (observations[t] > c2_upper_bounds[t])):
                c2_alarms[t] = 'Sim'
        else:
            c2_stats[t] = 0

    return c2_stats, c2_alarms, c2_upper_bounds


def calculate_c3(observations, baseline, alpha):

    c3_stats = np.full(len(observations), np.nan)
    c3_alarms = np.full(len(observations), 'Não')
    c3_upper_bounds = np.full(len(observations), np.nan)


    # Calcula o quantil da distribuição normal padrão para um teste de duas caudas
    z_alpha = stats.norm.ppf(1 - alpha)

    # Precisamos primeiro calcular C2 para usar no cálculo de C3
    c2_stats, _, _ = calculate_c2(observations, baseline=baseline, alpha=alpha)  # Assume que calculate_c2 retorna c2_stats, c2_alarms, c2_upper_bounds

    for t in range(baseline + 4, len(observations)):
        # C3 é baseado nos valores de C2 do ponto atual (t) e dos dois pontos anteriores

        c3_stats[t] = sum(max(0, c2_stats[i] - 1) for i in range(t-2, t))


        baseline_data = observations[t-baseline:t-4]
        mean_adjusted_baseline = np.mean(baseline_data)

        std_adjusted_baseline = np.sqrt(np.sum((baseline_data - mean_adjusted_baseline) ** 2) / (baseline - 1))

        fator = (z_alpha - sum(max(0, c2_stats[i] - 1) for i in range(t-2, t)))

        # Define o limite superior para C3
        if std_adjusted_baseline > 0 and fator > 0:
            c3_upper_bounds[t] = mean_adjusted_baseline + std_adjusted_baseline * fator
            c3_upper_bounds[t] = np.rint(c3_upper_bounds[t]).astype(int)
        else:
            c3_upper_bounds[t] = 0

        # Se a estatística C3 for maior que o limite superior, um alarme é disparado
        if ((c3_stats[t] >= z_alpha) and (observations[t] >= c3_upper_bounds[t])):
            c3_alarms[t] = 'Sim'

    return c3_stats, c3_alarms, c3_upper_bounds






In [2]:
# --------- LEITURA DO DADOS APS ---------------------
df = pd.read_csv('dados/xxx.csv')
display(df)


Unnamed: 0,municipio,co_ibge,ano,epiweek,atend_totais,atend_ivas,atend_arbov,sinan,cod_rgi,nome_rgi,co_ibge_7,uf_code,UF,pop_2022
0,Alta Floresta D´oeste,110001,2017,8,821,21,2,0,110005,Cacoal,1100015,11,RO,22516.0
1,Alta Floresta D´oeste,110001,2017,25,754,23,0,1,110005,Cacoal,1100015,11,RO,22516.0
2,Alta Floresta D´oeste,110001,2017,26,777,16,0,0,110005,Cacoal,1100015,11,RO,22516.0
3,Alta Floresta D´oeste,110001,2018,15,547,16,0,0,110005,Cacoal,1100015,11,RO,22516.0
4,Alta Floresta D´oeste,110001,2018,25,371,11,0,0,110005,Cacoal,1100015,11,RO,22516.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2598322,Brasília,530010,2022,46,62665,7804,246,323,530001,Distrito Federal,5300108,53,DF,2817381.0
2598323,Brasília,530010,2022,48,47702,9562,184,471,530001,Distrito Federal,5300108,53,DF,2817381.0
2598324,Brasília,530010,2023,21,95201,10764,579,737,530001,Distrito Federal,5300108,53,DF,2817381.0
2598325,Brasília,530010,2023,33,92706,7672,248,364,530001,Distrito Federal,5300108,53,DF,2817381.0


In [5]:
# ------------------ PROCESSAMENTO PRINCIPAL -----------------------

warnings.filterwarnings("ignore")


dadosAGP = filtraMun(df)

# Obter todas as regiões únicas no dataframe
regioes_unicas = dadosAGP['co_ibge'].unique() # para todas as 510 RGI

# DataFrame resultado final
result_final = pd.DataFrame()

# Obter todas as regiões únicas no dataframe
mun_t = dadosAGP['co_ibge'].unique()

for regiao_desejada in mun_t:
    df_rgi = dadosAGP[dadosAGP['co_ibge'] == regiao_desejada]
    dados = df_rgi[df_rgi['ano'] >= 2019].copy()
    serie = dados['atend_arbov'].to_numpy()

    baseline_value = 8
    alpha = 0.10

    c1_stats, c1_alarms, c1_upperbounds = calculate_c1(serie, baseline_value, alpha)
    c2_stats, c2_alarms, c2_upperbounds = calculate_c2(serie, baseline_value, alpha)
    c3_stats, c3_alarms, c3_upperbounds = calculate_c3(serie, baseline_value, alpha)


# Adiciona os resultados dos alarmes, estatísticas e limites superiores ao DataFrame
    dados['C1_stats'] = c1_stats
    dados['C2_stats'] = c2_stats
    dados['C3_stats'] = c3_stats

    dados['C1_upperbound'] = c1_upperbounds
    dados['C2_upperbound'] = c2_upperbounds
    dados['C3_upperbound'] = c3_upperbounds

    dados['C1_alarms'] = c1_alarms
    dados['C2_alarms'] = c2_alarms
    dados['C3_alarms'] = c3_alarms

    dados['C2_excesso'] = dados['atend_arbov'] - dados['C2_upperbound']

    # O DataFrame final é atualizado aqui
    result_final = pd.concat([result_final, dados])

# Salvando o DataFrame final como um arquivo CSV
os.makedirs('resultados', exist_ok=True)
result_final.to_csv('resultados/xxx.csv', index=False)

print("Resultado salvo com sucesso.")



Resultado salvo com sucesso.
