# Imports

In [None]:
# imports de bibliotecas
from functools import reduce
from pyspark.sql.functions import col, lit, when, concat, ceil
import pandas as pd
from statsmodels.tsa.holtwinters import SimpleExpSmoothing, ExponentialSmoothing
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX

# Métodos

Importação das origens

In [None]:
# cria tabelas de eleitorado a partir dos arquivos csv
def cria_tabelas_eleitorado(anos_eleitorado):
	for ano in anos_eleitorado:
		csv_eleitorado = spark.read.csv(f"/Volumes/workspace/perfil_eleitorado/tabelas_eleitorado/tb_eleitorado_{ano}.csv", header=True, inferSchema=True)
		csv_eleitorado.write.saveAsTable(f"workspace.perfil_eleitorado.tb_eleitorado_{ano}")

# cria tabela de locais de votação a partir do arquivo csv
def cria_tabela_locais_de_votacao():
    csv_locais_de_votacao = spark.read.csv("/Volumes/workspace/perfil_eleitorado/tabelas_eleitorado/locais_de_votacao.csv", header=True, inferSchema=True)
    csv_locais_de_votacao.write.saveAsTable("workspace.perfil_eleitorado.locais_de_votacao")

# lê tabelas origem e salva em dataframes
def ler_tabelas_origem():
    tb_eleitorado_2024 = spark.table("workspace.perfil_eleitorado.tb_eleitorado_2024")
    tb_eleitorado_2022 = spark.table("workspace.perfil_eleitorado.tb_eleitorado_2022")
    tb_eleitorado_2020 = spark.table("workspace.perfil_eleitorado.tb_eleitorado_2020")
    tb_eleitorado_2018 = spark.table("workspace.perfil_eleitorado.tb_eleitorado_2018")
    tb_eleitorado_2016 = spark.table("workspace.perfil_eleitorado.tb_eleitorado_2016")
    tb_eleitorado_2014 = spark.table("workspace.perfil_eleitorado.tb_eleitorado_2014")
    tb_eleitorado_2012 = spark.table("workspace.perfil_eleitorado.tb_eleitorado_2012")
    locais_de_votacao = spark.table("workspace.perfil_eleitorado.locais_de_votacao")
    return (
        tb_eleitorado_2024,
        tb_eleitorado_2022,
        tb_eleitorado_2020,
        tb_eleitorado_2018,
        tb_eleitorado_2016,
        tb_eleitorado_2014,
        tb_eleitorado_2012,
        locais_de_votacao
    )

Preparação das bases de origem (base de cálculo e base de validação)

In [None]:
# une as tabelas utilizadas para base de cáculo
def unificar_eleitorado(anos_eleitorado_modelo):
	dataframes = [globals()[f"tb_eleitorado_{ano}"] for ano in anos_eleitorado_modelo]
	return reduce(lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), dataframes)

# adiciona bairro aos dataframes de cálculo e validação
def adicionar_bairro_eleitorado(df_eleitorado_unificado, locais_de_votacao):
	df_eleitorado_bairro = df_eleitorado_unificado.join(
		locais_de_votacao.select("ANO_ELEICAO", "ZONA", "NUMERO_LOCAL_VOTACAO", "BAIRRO"),
		on=["ANO_ELEICAO", "ZONA", "NUMERO_LOCAL_VOTACAO"],
		how="left"
	)
	return df_eleitorado_bairro

# ajusta coluna de faixa etária
def ajustar_faixa_etaria(df):
    df = df.withColumn(
        "FAIXA_ETARIA",
        when(col("FAIXA_ETARIA") == "16 anos                       ", "MENOS QUE 20")
        .when(col("FAIXA_ETARIA") == "17 anos                       ", "MENOS QUE 20")
        .when(col("FAIXA_ETARIA") == "18 anos                       ", "MENOS QUE 20")
        .when(col("FAIXA_ETARIA") == "19 anos                       ", "MENOS QUE 20")
        .when(col("FAIXA_ETARIA") == "20 anos                       ", "DE 20 A 29")
        .when(col("FAIXA_ETARIA") == "21 a 24 anos                  ", "DE 20 A 29")
        .when(col("FAIXA_ETARIA") == "25 a 29 anos                  ", "DE 20 A 29")
        .when(col("FAIXA_ETARIA") == "30 a 34 anos                  ", "DE 30 A 39")
        .when(col("FAIXA_ETARIA") == "35 a 39 anos                  ", "DE 30 A 39")
        .when(col("FAIXA_ETARIA") == "40 a 44 anos                  ", "DE 40 A 49")
        .when(col("FAIXA_ETARIA") == "45 a 49 anos                  ", "DE 40 A 49")
        .when(col("FAIXA_ETARIA") == "50 a 54 anos                  ", "DE 50 A 59")
        .when(col("FAIXA_ETARIA") == "55 a 59 anos                  ", "DE 50 A 59")
        .when(col("FAIXA_ETARIA") == "60 a 64 anos                  ", "DE 60 A 69")
        .when(col("FAIXA_ETARIA") == "65 a 69 anos                  ", "DE 60 A 69")
        .when(col("FAIXA_ETARIA") == "70 a 74 anos                  ", "DE 70 A 79")
        .when(col("FAIXA_ETARIA") == "75 a 79 anos                  ", "DE 70 A 79")
        .when(col("FAIXA_ETARIA") == "80 a 84 anos                  ", "ACIMA DE 80")
        .when(col("FAIXA_ETARIA") == "85 a 89 anos                  ", "ACIMA DE 80")
        .when(col("FAIXA_ETARIA") == "90 a 94 anos                  ", "ACIMA DE 80")
        .when(col("FAIXA_ETARIA") == "95 a 99 anos                  ", "ACIMA DE 80")
        .when(col("FAIXA_ETARIA") == "100 anos ou mais              ", "ACIMA DE 80")
        .when(col("FAIXA_ETARIA") == "16 anos", "MENOS QUE 20")
        .when(col("FAIXA_ETARIA") == "17 anos", "MENOS QUE 20")
        .when(col("FAIXA_ETARIA") == "18 anos", "MENOS QUE 20")
        .when(col("FAIXA_ETARIA") == "19 anos", "MENOS QUE 20")
        .when(col("FAIXA_ETARIA") == "20 anos", "DE 20 A 29")
        .when(col("FAIXA_ETARIA") == "21 a 24 anos", "DE 20 A 29")
        .when(col("FAIXA_ETARIA") == "25 a 29 anos", "DE 20 A 29")
        .when(col("FAIXA_ETARIA") == "30 a 34 anos", "DE 30 A 39")
        .when(col("FAIXA_ETARIA") == "35 a 39 anos", "DE 30 A 39")
        .when(col("FAIXA_ETARIA") == "40 a 44 anos", "DE 40 A 49")
        .when(col("FAIXA_ETARIA") == "45 a 49 anos", "DE 40 A 49")
        .when(col("FAIXA_ETARIA") == "50 a 54 anos", "DE 50 A 59")
        .when(col("FAIXA_ETARIA") == "55 a 59 anos", "DE 50 A 59")
        .when(col("FAIXA_ETARIA") == "60 a 64 anos", "DE 60 A 69")
        .when(col("FAIXA_ETARIA") == "65 a 69 anos", "DE 60 A 69")
        .when(col("FAIXA_ETARIA") == "70 a 74 anos", "DE 70 A 79")
        .when(col("FAIXA_ETARIA") == "75 a 79 anos", "DE 70 A 79")
        .when(col("FAIXA_ETARIA") == "80 a 84 anos", "ACIMA DE 80")
        .when(col("FAIXA_ETARIA") == "85 a 89 anos", "ACIMA DE 80")
        .when(col("FAIXA_ETARIA") == "90 a 94 anos", "ACIMA DE 80")
        .when(col("FAIXA_ETARIA") == "95 a 99 anos", "ACIMA DE 80")
        .when(col("FAIXA_ETARIA") == "100 anos ou mais", "ACIMA DE 80")
        .otherwise(col("FAIXA_ETARIA"))
    )
    return df

# salva tabelas trabalhadas, completas
def salvar_tabelas_trabalhadas(df_eleitorado_bairro, df_eleitorado_bairro_2024):
    df_eleitorado_bairro.write.saveAsTable("workspace.perfil_eleitorado.eleitorado_bairro")
    df_eleitorado_bairro_2024.write.saveAsTable("workspace.perfil_eleitorado.eleitorado_bairro_2024")


Preparação das bases agrupadas (base de cálculo e base de validação)

In [None]:
# leitura das tabelas trabalhadas
def ler_tabelas_trabalhadas():
    df_eleitorado_bairro = spark.table("workspace.perfil_eleitorado.eleitorado_bairro")
    df_eleitorado_bairro_2024 = spark.table("workspace.perfil_eleitorado.eleitorado_bairro_2024")
    return df_eleitorado_bairro, df_eleitorado_bairro_2024

# separação da amostra de trabalho
def filtrar_bairro(df, bairro):
    return df.filter(col("BAIRRO") == bairro)

# agrupamento pela quantidade total
def agrupar_quantidade_total(df_base, tabela_destino):
    df = (
        df_base
        .groupBy(col("ANO_ELEICAO").alias("ano"))
        .agg({"QUANTIDADE_ELEITORES": "sum"})
        .withColumnRenamed("sum(QUANTIDADE_ELEITORES)", "quantidade")
        .withColumn("agrupamento", lit("QUANTIDADE TOTAL"))
        .orderBy("agrupamento", "ano")
        .select("ano", "agrupamento", "quantidade")
    )
    df.write.saveAsTable(tabela_destino)

# agrupamento pelas demais variáveis
def agrupar_e_salvar(df_base, agrupamento_nome, agrupamento_coluna, tabela_destino):
    df_qtd = (
        df_base
        .groupBy(
            col("ANO_ELEICAO").alias("ano"),
            col(agrupamento_coluna).alias("agrupamento")
        )
        .agg({"QUANTIDADE_ELEITORES": "sum"})
        .withColumnRenamed("sum(QUANTIDADE_ELEITORES)", "quantidade")
        .withColumn(
            "agrupamento",
            concat(lit(agrupamento_nome), col("agrupamento"))
        )
        .orderBy("agrupamento", "ano")
    )
    df_qtd.write.mode("append").saveAsTable(tabela_destino)



Projeção

In [None]:
# Projeta as informações a partir da base apontada
def projetar_base(tabela_origem, tabela_destino):
    base_predicao = spark.table(tabela_origem)
    data = base_predicao.toPandas()

    resultados = []

    for agrupamento in data['agrupamento'].unique():
        df_agrupamento = data[data['agrupamento'] == agrupamento].sort_values('ano')
        serie = df_agrupamento.set_index('ano')['quantidade']

        # SES
        ses = SimpleExpSmoothing(serie, initialization_method="estimated").fit()
        prev_ses = ses.forecast(1).iloc[0]

        # ARIMA
        arima = ARIMA(serie, order=(1, 1, 1)).fit()
        prev_arima = arima.forecast(steps=1).iloc[0]

        # SARIMA
        sarima = SARIMAX(serie, order=(1, 1, 1), seasonal_order=(1, 0, 1, 2)).fit(disp=False)
        prev_sarima = sarima.forecast(steps=1).iloc[0]

        # Holt-Winters
        hw = ExponentialSmoothing(serie, trend='add', seasonal=None).fit()
        prev_hw = hw.forecast(steps=1).iloc[0]

        resultados.append({
            'AGRUPAMENTO': agrupamento,
            'SES_2024': round(prev_ses, 2),
            'ARIMA_2024': round(prev_arima, 2),
            'SARIMA_2024': round(prev_sarima, 2),
            'HOLT_WINTERS_2024': round(prev_hw, 2),
        })

    df_resultados = pd.DataFrame(resultados)

    df_resultados_spark = spark.createDataFrame(df_resultados)

    df_resultados_spark = (
        df_resultados_spark
        .withColumn("SES_2024", ceil("SES_2024").cast("int"))
        .withColumn("ARIMA_2024", ceil("ARIMA_2024").cast("int"))
        .withColumn("SARIMA_2024", ceil("SARIMA_2024").cast("int"))
        .withColumn("HOLT_WINTERS_2024", ceil("HOLT_WINTERS_2024").cast("int"))
    )

    display(df_resultados_spark)

    df_resultados_spark.write.saveAsTable(tabela_destino)



# Pipeline

In [None]:
#PASSO 1 - importação das origens

#criação das tabelas de eleitorado
anos_eleitorado = [2012, 2014, 2016, 2018, 2020, 2022, 2024]
cria_tabelas_eleitorado(anos_eleitorado)

#criação da tabela de locais de votação
cria_tabela_locais_de_votacao()

#leitura das tabelas de origem
(
    tb_eleitorado_2024,
    tb_eleitorado_2022,
    tb_eleitorado_2020,
    tb_eleitorado_2018,
    tb_eleitorado_2016,
    tb_eleitorado_2014,
    tb_eleitorado_2012,
    locais_de_votacao
) = ler_tabelas_origem()

# -----------------------------------


#PASSO 2 - Preparação das bases de origem (base de cálculo e base de validação)

#unifica as tabelas utilizadas para base de cáculo
anos_eleitorado_modelo = [2022, 2020, 2018, 2016, 2014, 2012]
df_eleitorado_unificado = unificar_eleitorado(anos_eleitorado_modelo)

#adiciona bairro aos dataframes de cálculo e validação
df_eleitorado_bairro = adicionar_bairro_eleitorado(df_eleitorado_unificado, locais_de_votacao)
df_eleitorado_bairro_2024 = adicionar_bairro_eleitorado(tb_eleitorado_2024, locais_de_votacao)

#ajuste de coluna de faixa etária
df_eleitorado_bairro = ajustar_faixa_etaria(df_eleitorado_bairro)
df_eleitorado_bairro_2024 = ajustar_faixa_etaria(df_eleitorado_bairro_2024)

#salva tabelas trabalhadas
salvar_tabelas_trabalhadas(df_eleitorado_bairro, df_eleitorado_bairro_2024)

# -----------------------------------


#PASSO 3 - Preparação das bases agrupadas (base de cálculo e base de validação)

#leitura das tabelas trabalhadas
df_eleitorado_bairro, df_eleitorado_bairro_2024 = ler_tabelas_trabalhadas()

#separação da amostra de trabalho
bairro = "CENTRO"
df_eleitorado_bairro_amostra = filtrar_bairro(df_eleitorado_bairro, bairro)
df_eleitorado_bairro_2024_amostra = filtrar_bairro(df_eleitorado_bairro_2024, bairro)

#definição das bases agrupadas
base_predicao = "workspace.perfil_eleitorado.base_predicao"
base_validacao = "workspace.perfil_eleitorado.base_validacao"

#agrupamento pela variável de quantidade total
agrupar_quantidade_total(df_eleitorado_bairro_amostra, base_predicao)
agrupar_quantidade_total(df_eleitorado_bairro_2024_amostra, base_validacao)

#agrupamento pelas demais variáveis
agrupamentos = [
    {"nome": "ESCOLARIDADE - ", "coluna": "ESCOLARIDADE"},
    {"nome": "GENERO - ", "coluna": "GENERO"},
    {"nome": "ESTADO CIVIL - ", "coluna": "ESTADO_CIVIL"},
    {"nome": "FAIXA ETARIA - ", "coluna": "FAIXA_ETARIA"},
]
for agrupamento in agrupamentos:
    agrupar_e_salvar(
        df_eleitorado_bairro_amostra,
        agrupamento["nome"],
        agrupamento["coluna"],
        base_predicao
    )
    agrupar_e_salvar(
        df_eleitorado_bairro_2024_amostra,
        agrupamento["nome"],
        agrupamento["coluna"],
        base_validacao
    )

# -----------------------------------


#PASSO 4 - Projeção

base_final = "workspace.perfil_eleitorado.base_projecao_final"
projetar_base(base_predicao, base_final)
