Bloco 1 - Importa√ß√£o das bibliotecas necessarias

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from scipy import stats
from scipy.optimize import minimize
import os

Bloco 2 - Fun√ß√µes auxiliares

In [None]:
# --- Fun√ß√µes auxiliares (copiadas e mantidas) ---
def substituir_outliers_media_local(x, window=3):
    x_corrigido = x.copy()
    q1, q3 = np.percentile(x, [25, 75])
    iqr = q3 - q1
    lim_inf = q1 - 1.5 * iqr
    lim_sup = q3 + 1.5 * iqr
    for i in range(len(x)):
        if x[i] < lim_inf or x[i] > lim_sup:
            inicio = max(0, i - window)
            fim = min(len(x), i + window + 1)
            vizinhos = x[inicio:fim]
            vizinhos = vizinhos[(vizinhos >= lim_inf) & (vizinhos <= lim_sup)]
            if len(vizinhos) > 0:
                x_corrigido[i] = np.mean(vizinhos)
    return x_corrigido

def classificar_origem(Cin, Cout, Cin_lag, Cout_lag, ratio_th=1.1, diff_in=3, diff_ext=2):
    eps = 1e-6
    if (Cin / (Cout + eps)) > ratio_th: #and (Cin - Cin_lag) > diff_in and (Cout - Cout_lag) < diff_ext:
        return "interno"
    elif (Cin / (Cout + eps)) < ratio_th or Cin <= Cout:
        return "externo"
    else:
        return "desconhecido"

def gerar_blocos_externos(n_total, tamanho_bloco=6):
    return [list(range(i, i + tamanho_bloco)) for i in range(n_total - tamanho_bloco + 1)]

def simular_bloco(Finf, k_prime, Cin_obs, Cout, bloco, dt=1):
    Cin_mod = np.full_like(Cin_obs, np.nan)
    Cin_mod[bloco[0]] = Cin_obs[bloco[0]]
    for t in bloco[1:]:
        Cin_mod[t] = Finf * Cout[t] + (Cin_mod[t - 1] - Finf * Cout[t]) * np.exp(-k_prime * dt)
        #Cin_mod[t] = Finf * Cout[t] - (Cin[t] - Cin[t-1])
    return Cin_mod

Bloco 3 - Modelo_MBB_Bootstrap

In [None]:
def bootstrap_mbb_modelo(Cin, Cout, blocos_possiveis, n_iter=1000, dt=1):
    Finf_vals, k_vals, r2_vals, rmse_vals = [], [], [], []
    simulacoes = []
    n_blocos = len(Cin) // 6
    for _ in range(n_iter):
        blocos_amostrados = [blocos_possiveis[i] for i in np.random.randint(0, len(blocos_possiveis), n_blocos)]
        def erro_total(params):
            Finf, k_prime = params
            Cin_mod = np.full_like(Cin, np.nan)
            for bloco in blocos_amostrados:
                Cin_sim = simular_bloco(Finf, k_prime, Cin, Cout, bloco, dt)
                Cin_mod = np.where(np.isnan(Cin_sim), Cin_mod, Cin_sim)
            mask = ~np.isnan(Cin_mod)
            if np.sum(mask) < 5:
                return np.inf
            return np.sqrt(np.mean((Cin[mask] - Cin_mod[mask]) ** 2))

        res = minimize(erro_total, [0.5, 0.5], bounds=[(0.01, 1.0), (0.01, 1.0)])
        if res.success:
            Finf_opt, k_opt = res.x
            Finf_vals.append(Finf_opt)
            k_vals.append(k_opt)
            Cin_mod = np.full_like(Cin, np.nan)
            for bloco in blocos_amostrados:
                Cin_sim = simular_bloco(Finf_opt, k_opt, Cin, Cout, bloco, dt)
                Cin_mod = np.where(np.isnan(Cin_sim), Cin_mod, Cin_sim)
            mask = ~np.isnan(Cin_mod)
            rmse_vals.append(np.sqrt(np.mean((Cin[mask] - Cin_mod[mask]) ** 2)))
            if np.sum(mask) > 5:
                r2_vals.append(r2_score(Cin[mask], Cin_mod[mask]))
            simulacoes.append(Cin_mod)
    return (np.array(Finf_vals), np.array(k_vals), np.array(simulacoes), np.array(r2_vals), np.array(rmse_vals))

Bloco 4 - Loop para rodar todas as casas e fra√ß√µes

Sa√≠das:

Resultados para Finf e k' (media +/- DP) e IC(95%)

Graficos das s√©ries temporais Cin (origem externa e origem interna)

Metricas de avalia√ß√£o do modelo

In [None]:
casas = ['casa1PM.xlsx', 'casa2PM.xlsx', 'casa3PM.xlsx', 'casa4PM.xlsx']
fracoes = ['PM1', 'PM25', 'PM10']


fig, axs = plt.subplots(len(casas), len(fracoes), figsize=(20, 16), sharex=False)
fig.subplots_adjust(hspace=0.4, wspace=0.3)
 # Lista para guardar os resultados
resultados_modelo = []
for i, arquivo in enumerate(casas):
    df = pd.read_excel(arquivo)
    for j, fracao in enumerate(fracoes):
        coluna_cin = f'cin{fracao}'
        coluna_cout = f'cout{fracao}'
        if coluna_cin not in df.columns or coluna_cout not in df.columns:
            continue  # Pula se a coluna n√£o existir

        Cin = df[coluna_cin].values
        Cout = df[coluna_cout].values
        Cout_corrigido = substituir_outliers_media_local(Cout, window=3)

        # Lag
        Cin_lag = np.roll(Cin, 1); Cin_lag[0] = Cin[0]
        Cout_lag = np.roll(Cout_corrigido, 1); Cout_lag[0] = Cout_corrigido[0]
        classes = [classificar_origem(Cin[k], Cout_corrigido[k], Cin_lag[k], Cout_lag[k])
                   for k in range(len(Cin))]

        df_filtrado = pd.DataFrame({'Cin': Cin, 'Cout': Cout_corrigido, 'Classe': classes})
        df_ext = df_filtrado[df_filtrado['Classe'] == 'externo'].reset_index(drop=True)
        Cin_ext = df_ext['Cin'].values
        Cout_ext = df_ext['Cout'].values

        if len(Cin_ext) < 20:
            continue  # Evita rodar se poucos dados externos

        blocos = gerar_blocos_externos(len(Cin_ext), tamanho_bloco=6)
        Finf_vals, k_vals, simulacoes, r2_vals, rmse_vals = bootstrap_mbb_modelo(Cin_ext, Cout_ext, blocos, n_iter=1000)

        simulacoes_validas = simulacoes[~np.isnan(simulacoes).all(axis=1)]
        Cin_mod_mean = np.nanmean(simulacoes_validas, axis=0)
        Cin_mod_std = np.nanstd(simulacoes_validas, axis=0)
        tempo = np.arange(len(Cin_mod_mean))
        mask_val = ~np.isnan(Cin_mod_mean)
        
       

        # ...dentro do seu loop por casa e fra√ß√£o (logo ap√≥s a simula√ß√£o):
        # -----------------------------------------
        # Estat√≠sticas de Finf, k', RMSE, R¬≤
        Finf_mean = np.mean(Finf_vals)
        Finf_std = np.std(Finf_vals)
        Finf_ic_min = np.percentile(Finf_vals, 2.5)
        Finf_ic_max = np.percentile(Finf_vals, 97.5)

        k_mean = np.mean(k_vals)
        k_std = np.std(k_vals)
        k_ic_min = np.percentile(k_vals, 2.5)
        k_ic_max = np.percentile(k_vals, 97.5)

        rmse_mean = np.mean(rmse_vals)
        r2_mean = np.mean(r2_vals)
        rmse_min = np.min(rmse_vals)

        # --- Valor-p da regress√£o simples entre Cin_mod_mean e Cin_ext ---
        slope, intercept, r_val, p_val, stderr = stats.linregress(Cin_mod_mean[mask_val], Cin_ext[mask_val])

        # Armazenar no DataFrame
        resultados_modelo.append({
            "Casa": arquivo[:-5],
            "PM": fracao,
            "Finf_m√©dio": Finf_mean,
            "Finf_std": Finf_std,
            "Finf_IC95_min": Finf_ic_min,
            "Finf_IC95_max": Finf_ic_max,
            "k'_m√©dio": k_mean,
            "k'_std": k_std,
            "k'_IC95_min": k_ic_min,
            "k'_IC95_max": k_ic_max,
            "RMSE_m√©dio": rmse_mean,
            "RMSE_m√≠nimo": rmse_min,
            "R¬≤_m√©dio": r2_mean,
            "p_valor": p_val
        })

      
        CinCin_mod_mean = np.full_like(Cin_mod_mean, np.nan)
        if j == 0:
            CinCin_mod_mean=Cin_mod_mean[mask_val]
        elif j== 1:
            CinCin_mod_mean=Cin_mod_mean[mask_val]
        else:
            CinCin_mod_mean=Cin_mod_mean[mask_val]
            
        Cin_int=Cin_ext[mask_val] - CinCin_mod_mean[mask_val]
        erro=(CinCin_mod_mean[mask_val]-Cin_ext[mask_val])/Cin_ext[mask_val]
        resultados_modelo[-1]["Erro_m√©dio"] = np.mean(erro)
        resultados_modelo[-1]["Erro_m√°ximo"] = np.max(erro)
        resultados_modelo[-1]["Erro_m√≠nimo"] = np.min(erro)
        resultados_modelo[-1]["Erro_mediano"] = np.median(erro)
        #print(f"\nüìä Resultados para {arquivo[:-5]} - {fracao}:")
        #print(f"  Finf m√©dio: {Finf_mean:.4f} ¬± {Finf_std:.4f} (IC95%: {Finf_ic_min:.4f} ‚Äì {Finf_ic_max:.4f})")
        #print(f"  k' m√©dio: {k_mean:.4f} ¬± {k_std:.4f} (IC95%: {k_ic_min:.4f} ‚Äì {k_ic_max:.4f})")
        #print(f"  RMSE m√©dio: {rmse_mean:.4f} ¬µg/m¬≥, M√≠nimo: {rmse_min:.4f} ¬µg/m¬≥")
        #print(f"  R¬≤ m√©dio: {r2_mean:.4f}, p-valor: {p_val:.4f}")
        print(f"  Erro m√©dio: {resultados_modelo[-1]['Erro_m√©dio']:.4f}, M√°ximo: {resultados_modelo[-1]['Erro_m√°ximo']:.4f}, M√≠nimo: {resultados_modelo[-1]['Erro_m√≠nimo']:.4f}, Mediano: {resultados_modelo[-1]['Erro_mediano']:.4f}")
        
        ax = axs[i, j]
        ax.plot(tempo[mask_val], Cin_int, label='Cin interno', color='blue', lw=1.5, linestyle='--')
        ax.plot(tempo[mask_val], Cin_ext[mask_val], label='Cin observado', color='blue', lw=1.5)
        ax.plot(tempo[mask_val], CinCin_mod_mean, label='Cin modelado', color='black', lw=1.5)
        ax.fill_between(tempo[mask_val],
                        CinCin_mod_mean - Cin_mod_std[mask_val],
                        CinCin_mod_mean + Cin_mod_std[mask_val],
                        color='black', alpha=0.3, label='¬±1œÉ')
        ax.set_title(f"{arquivo[:-5]} - {fracao}")
        ax.set_xlabel("Tempo (h)")
        ax.set_ylabel("Concentra√ß√£o (¬µg/m¬≥)")
        
        if i == 0 and j == 0:
            ax.legend()
        
#plt.suptitle("Modelo MBM", fontsize=16)
plt.tight_layout(rect=[0, 0, 1, 0.94])
plt.show()

# Criar DataFrame com os resultados finais
df_resultados = pd.DataFrame(resultados_modelo)

# Exibir
print("\nüìã RESULTADOS FINAIS POR MODELO (CASA √ó FRA√á√ÉO):")
print(df_resultados)

# (Opcional) Salvar em Excel
df_resultados.to_excel("resultados_modelo_mbm_notebook.xlsx", index=False)

Bloco 5 - Estilo do grafico

In [None]:
# Lista para guardar os resultados
arquivos = {
    "casa1PM.xlsx": "R1",
    "casa2PM.xlsx": "R2",
    "casa3PM.xlsx": "R3",
    "casa4PM.xlsx": "R4",
}
fracoes = {
    "PM1": "MP1",
    "PM25": "MP2.5",
    "PM10": "MP10",
}

fig, axs = plt.subplots(len(casas), len(fracoes), figsize=(20, 16), sharex=False)
fig.subplots_adjust(hspace=0.4, wspace=0.3)

for i, (arquivo, casa_label) in enumerate(arquivos.items()):
    df = pd.read_excel(arquivo)
    for j, (fracao, fracao_label) in enumerate(fracoes.items()):
        coluna_cin = f'cin{fracao}'
        coluna_cout = f'cout{fracao}'
        if coluna_cin not in df.columns or coluna_cout not in df.columns:
            continue  # Pula se a coluna n√£o existir
        
        
        tempo = df.index.values
        Finf_mean = np.mean(Finf_vals)
        Cin = df[coluna_cin].values
        Cout = df[coluna_cout].values
        Cout_corrigido = substituir_outliers_media_local(Cout, window=3)

        Cin_ext = Finf_mean*Cout_corrigido
        Cin_ext_std = np.nanstd(Cin_ext)
        Cin_int = Cin - Cin_ext
        
        
        
        
        
        ax = axs[i, j]
        
        ax.plot(tempo, Cin_int, label='Cin interno', color='darkblue', lw=1.5, linestyle='--')
        ax.plot(tempo, Cin_ext, label='Cin externo', color='blue', lw=1.8)
        ax.plot(tempo, Cin, label='Cin observado', color='black', lw=1, linestyle='-')
        ax.fill_between(tempo,
                        Cin_ext - Cin_ext_std,
                        Cin_ext + Cin_ext_std,
                        color='black', alpha=0.3, label='¬±1œÉ')
        #ax.set_title(f"{arquivo[:-5]} - {fracao}")
        #ax.set_xlabel("Tempo (h)")
        #ax.set_ylabel("Concentra√ß√£o (¬µg/m¬≥)")
        #ax.text(0, 360, 45, f"Erro Mediano: {resultados_modelo[-1]['Erro_mediano']:.4f}", ha='right', va='top', fontsize=10)
        
        # Aqui: ajustar os ticks (tamanho e cor)
        ax.tick_params(axis='both', which='major', labelsize=16, colors='black')
        ax.tick_params(axis='both', which='minor', labelsize=16, colors='black')
        
        for spine in ['bottom', 'left', 'right', 'top']:
            ax.spines[spine].set_color('black')

        ax.axhline(0, color='black', linestyle=':', linewidth=2)
        # T√≠tulos e r√≥tulos
        if i == 0:
            ax.set_title(f"{fracao_label}", fontsize=16, color='black')
        if j == 0:
            ax.set_ylabel(f"{casa_label}\nConcentra√ß√£o (¬µg/m¬≥)", fontsize=16, color='black')
        if i == 3 and j == 1:
            ax.set_xlabel("Tempo (√≠ndice)", fontsize=16, color='black')
        if i == 0 and j == 2:
            ax.legend(fontsize=16, ncol=2)
        
        
        #print(f"  Finf m√©dio: {Finf_mean:.4f} ¬± {Finf_std:.4f} (IC95%: {Finf_ic_min:.4f} ‚Äì {Finf_ic_max:.4f})")
        #print(f"  k' m√©dio: {k_mean:.4f} ¬± {k_std:.4f} (IC95%: {k_ic_min:.4f} ‚Äì {k_ic_max:.4f})")
        #print(f"  RMSE m√©dio: {rmse_mean:.4f} ¬µg/m¬≥, M√≠nimo: {rmse_min:.4f} ¬µg/m¬≥")
        #print(f"  R¬≤ m√©dio: {r2_mean:.4f}, p-valor: {p_val:.4f}")
        
# Legenda fora do gr√°fico
#handles, labels = axs[0, 0].get_legend_handles_labels()
#fig.legend(handles, labels, loc='upper right', ncol=3, fontsize=16)
#fig.suptitle("MBM Cin com IC 95%\nResid√™ncias (linhas) √ó Fra√ß√µes (colunas)", fontsize=16, color='black')

plt.tight_layout(rect=[0.05, 0.05, 1, 0.94])
plt.show()