In [1]:
import re

import numpy as np
import pandas as pd
import statsmodels.api as sm

In [2]:
def read_data(data_filepath):
    df = pd.read_csv(
        data_filepath,
        compression="zip",
        sep=";",
        decimal=",",
        thousands=".",
    )

    m = re.compile(r"^([\dA-Z_]+) - (.*)")

    def get_variable_code(x):
        mm = m.match(x)
        if mm:
            return mm.group(1)
        return x

    df = df.rename(columns=get_variable_code)

    return df


def calc_tarifa_media(x, tipo_servico="Água e Esgoto"):
    year = x["ano_referencia"].values[0]
    if year == 1995:
        if tipo_servico == "Água e Esgoto":
            # 1995 - Água e Esgoto: (FN001) / ((AG011 + ES007) * 1000)
            return x["FN001"].fillna(0) / (
                (x["AG011"].fillna(0) + x["ES007"].fillna(0)) * 1000
            )
        elif tipo_servico == "Esgoto":
            # 1995 - Esgoto: (FN001) / (ES007 * 1000)
            return x["FN001"].fillna(0) / (x["ES007"].fillna(0) * 1000)
        elif tipo_servico == "Água":
            # 1995 - Água: (FN001) / (AG011 * 1000)
            return x["FN001"].fillna(0) / (x["AG011"].fillna(0) * 1000)
    elif year in (1996, 1997):
        if tipo_servico == "Água e Esgoto":
            # 1997 a 1996 - Água e Esgoto: (FN002 + FN003) / ((AG011 + ES007) * 1000)
            return (x["FN002"].fillna(0) + x["FN003"].fillna(0)) / (
                (x["AG011"].fillna(0) + x["ES007"].fillna(0)) * 1000
            )
        elif tipo_servico == "Esgoto":
            # 1997 a 1996 - Esgoto: (FN003) / (ES007 * 1000)
            return x["FN003"].fillna(0) / (x["ES007"].fillna(0) * 1000)
        elif tipo_servico == "Água":
            # 1997 a 1996 - Água: (FN002) / (AG011 * 1000)
            return x["FN002"].fillna(0) / (x["AG011"].fillna(0) * 1000)
    elif year in range(1998, 2006 + 1):
        if tipo_servico == "Água e Esgoto":
            # 2006 a 1998 - Água e Esgoto: (FN002 + FN003 + FN007) / ((AG011 + ES007) * 1000)
            return (
                x["FN002"].fillna(0) + x["FN003"].fillna(0) + x["FN007"].fillna(0)
            ) / ((x["AG011"].fillna(0) + x["ES007"].fillna(0)) * 1000)
        elif tipo_servico == "Esgoto":
            # 2006 a 1998 - Esgoto: (FN003) / (ES007 * 1000)
            return x["FN003"].fillna(0) / (x["ES007"].fillna(0) * 1000)
        elif tipo_servico == "Água":
            # 2006 a 1998 - Água: (FN002 + FN007)/ (AG011 * 1000)
            return (x["FN002"].fillna(0) + x["FN007"].fillna(0)) / (
                x["AG011"].fillna(0) * 1000
            )
    elif year in range(2007, 2021 + 1):
        if tipo_servico == "Água e Esgoto":
            # 2021 a 2007 - Água e Esgoto: (FN002 + FN003 + FN007 + FN038) / ((AG011 + ES007) * 1000)
            return (
                x["FN002"].fillna(0)
                + x["FN003"].fillna(0)
                + x["FN007"].fillna(0)
                + x["FN038"].fillna(0)
            ) / ((x["AG011"].fillna(0) + x["ES007"].fillna(0)) * 1000)
        elif tipo_servico == "Esgoto":
            # 2021 a 2007 - Esgoto: (FN003 + FN038) / (ES007 * 1000)
            return (x["FN003"].fillna(0) + x["FN038"].fillna(0)) / (
                x["ES007"].fillna(0) * 1000
            )
        elif tipo_servico == "Água":
            # 2021 a 2007 - Água: (FN002 + FN007) / (AG011 * 1000)
            return (x["FN002"].fillna(0) + x["FN007"].fillna(0)) / (
                x["AG011"].fillna(0) * 1000
            )
    return np.nan


def calculo_agrupado(
    df: pd.DataFrame,
    colunas_agrupamento: list[str],
    func,
) -> pd.DataFrame:
    return (
        df.groupby(colunas_agrupamento, dropna=True)
        .sum(numeric_only=True)
        .reset_index()
        .assign(calculo=func)[colunas_agrupamento + ["calculo"]]
        .replace([np.inf, -np.inf], np.nan)
        .dropna()
        .set_index(colunas_agrupamento)
    )


def calc_tarifa_media_agrupado(
    df: pd.DataFrame, colunas_agrupamento: list[str]
) -> pd.DataFrame:
    tarifa_media = calculo_agrupado(
        df,
        colunas_agrupamento=colunas_agrupamento,
        func=lambda x: calc_tarifa_media(x, tipo_servico="Água e Esgoto"),
    ).rename(columns={"calculo": "tarifa_media"})
    tarifa_media_agua = calculo_agrupado(
        df,
        colunas_agrupamento=colunas_agrupamento,
        func=lambda x: calc_tarifa_media(x, tipo_servico="Água"),
    ).rename(columns={"calculo": "tarifa_media_agua"})
    tarifa_media_esgoto = calculo_agrupado(
        df,
        colunas_agrupamento=colunas_agrupamento,
        func=lambda x: calc_tarifa_media(x, tipo_servico="Esgoto"),
    ).rename(columns={"calculo": "tarifa_media_esgoto"})
    return pd.concat((tarifa_media, tarifa_media_agua, tarifa_media_esgoto), axis=1)


def deflate_tarifa_media(df, ipca):
    return (
        df.merge(
            ipca,
            how="left",
            left_on="ano_referencia",
            right_on="data",
        )
        .assign(
            tarifa_media=lambda x: x["tarifa_media"] / x["deflator"],
            tarifa_media_agua=lambda x: x["tarifa_media_agua"] / x["deflator"],
            tarifa_media_esgoto=lambda x: x["tarifa_media_esgoto"] / x["deflator"],
        )
        .drop(columns=["data", "deflator"])
    )


def remove_outliers_municipios(df_tmr_mun):
    df_tmr_mun_clean = pd.DataFrame()
    for year in df_tmr_mun["ano_referencia"].unique():
        df_tmr_mun_year = df_tmr_mun[df_tmr_mun["ano_referencia"] == year]
        for sigla_uf in df_tmr_mun_year["sigla_uf"].unique():
            df_tmr_mun_year_uf = df_tmr_mun_year[
                df_tmr_mun_year["sigla_uf"] == sigla_uf
            ]
            a, b = df_tmr_mun_year_uf["tarifa_media"].quantile(q=[0.25, 0.75]).values
            iqr = b - a
            q1 = a - 1.5 * iqr
            q3 = b + 1.5 * iqr
            _df_tmr_mun_year_uf = df_tmr_mun_year_uf[
                (df_tmr_mun_year_uf["tarifa_media"] >= q1)
                & (df_tmr_mun_year_uf["tarifa_media"] <= q3)
            ]
            df_tmr_mun_clean = pd.concat(
                (
                    df_tmr_mun_clean,
                    _df_tmr_mun_year_uf,
                ),
                ignore_index=True,
            )
    return df_tmr_mun_clean


def calc_hp_filter(df_tmr_mun, λ=1 / 7):
    df_tmr_mun_hp = pd.DataFrame()
    for id_municipio in df_tmr_mun.id_municipio.unique():
        df_mun = df_tmr_mun[df_tmr_mun["id_municipio"] == id_municipio].set_index(
            "ano_referencia"
        )
        try:
            _, tendency_tm = sm.tsa.hp_filter.hpfilter(
                df_mun["tarifa_media"],
                λ,
            )
            _, tendency_tm_agua = sm.tsa.hp_filter.hpfilter(
                df_mun["tarifa_media_agua"],
                λ,
            )
            _, tendency_tm_esgoto = sm.tsa.hp_filter.hpfilter(
                df_mun["tarifa_media_esgoto"],
                λ,
            )
            tendency_tm = pd.DataFrame(tendency_tm)
            tendency_tm_agua = pd.DataFrame(tendency_tm_agua)
            tendency_tm_esgoto = pd.DataFrame(tendency_tm_esgoto)
            tendency = pd.concat(
                (
                    tendency_tm,
                    tendency_tm_agua,
                    tendency_tm_esgoto,
                ),
                axis=1,
            ).assign(id_municipio=id_municipio)
            df_tmr_mun_hp = pd.concat((df_tmr_mun_hp, tendency))
        except:
            pass
    df_tmr_mun_hp = df_tmr_mun_hp.reset_index()
    return df_tmr_mun_hp

In [3]:
data_filepath = "data/agua-esgoto-desagregado.zip"
ipca_filepath = "data/processed_ipca.csv"
municipio_filepath = "data/processed_municipio.csv"

# READ
df = read_data(data_filepath)
ipca = pd.read_csv(ipca_filepath)
municipio = pd.read_csv(municipio_filepath)

# PROCESS
df = df.merge(
    municipio,
    left_on=["codigo_municipio", "sigla_uf"],
    right_on=["id_municipio_6", "sigla_uf"],
    how="left",
)

df_tm = df[
    [
        "ano_referencia",
        "id_municipio",
        "sigla_uf",
        "nome_regiao",
        "natureza_juridica",
        "FN001",
        "FN002",
        "FN003",
        "FN007",
        "FN038",
        "AG011",
        "ES007",
    ]
]

# Tarifa Média do Brasil
df_tm_br = calc_tarifa_media_agrupado(
    df_tm,
    ["ano_referencia"],
).reset_index()

# Tarifa Média por Grande Região
df_tm_regiao = calc_tarifa_media_agrupado(
    df_tm,
    ["ano_referencia", "nome_regiao"],
).reset_index()

# Tarifa Média por Unidade da Federação
df_tm_uf = calc_tarifa_media_agrupado(
    df_tm,
    ["ano_referencia", "sigla_uf"],
).reset_index()

# Tarifa Média por Município
df_tm_mun = calc_tarifa_media_agrupado(
    df_tm,
    ["ano_referencia", "id_municipio", "sigla_uf", "nome_regiao"],
).reset_index()

# Tarifa Média por Natureza Jurídica
df_tm_natjur = calc_tarifa_media_agrupado(
    df_tm,
    ["ano_referencia", "natureza_juridica"],
).reset_index()

# Deflacionar
df_tmr_br = deflate_tarifa_media(df_tm_br, ipca)
df_tmr_regiao = deflate_tarifa_media(df_tm_regiao, ipca)
df_tmr_uf = deflate_tarifa_media(df_tm_uf, ipca)
df_tmr_mun = deflate_tarifa_media(df_tm_mun, ipca)
df_tmr_natjur = deflate_tarifa_media(df_tm_natjur, ipca)

# Tratar outliers municipais
# Remover outliers
df_tmr_mun2 = remove_outliers_municipios(df_tmr_mun)

# Filtro Hodrick–Prescott
df_tmr_mun3 = (
    calc_hp_filter(df_tmr_mun2, λ=2)
    .pivot(
        index=["ano_referencia"],
        columns=["id_municipio"],
        values="tarifa_media_trend",
    )
    .interpolate(method="linear")
    .reset_index()
    .melt(
        id_vars=["ano_referencia"],
        var_name="id_municipio",
        value_name="tarifa_media_trend",
    )
    .merge(municipio)
    .dropna()
)

df_tmr_mun_pct_change = (
    df_tmr_mun.pivot_table(
        index=["ano_referencia"],
        columns=["id_municipio"],
        values="tarifa_media",
    )
    .pct_change()
    .reset_index()
    .melt(
        id_vars=["ano_referencia"],
        var_name="id_municipio",
        value_name="tarifa_media_pct_change",
    )
    .dropna()
)

# SAVE PROCESSED DATA
df_tmr_br.to_csv("data/processed_tmr_br.csv", index=False)
df_tmr_regiao.to_csv("data/processed_tmr_regiao.csv", index=False)
df_tmr_uf.to_csv("data/processed_tmr_uf.csv", index=False)
df_tmr_natjur.to_csv("data/processed_tmr_natjur.csv", index=False)
df_tmr_mun.to_csv("data/processed_tmr_mun.csv", index=False)
df_tmr_mun2.to_csv("data/processed_tmr_mun2.csv", index=False)
df_tmr_mun3.to_csv("data/processed_tmr_mun3.csv", index=False)
df_tmr_mun_pct_change.to_csv("data/processed_tmr_mun_pct_change.csv", index=False)

  df = pd.read_csv(
