#### INSTALAR LIBS

In [None]:
%pip install -r requirements.txt

#### DEFINIR CONSTANTES

In [8]:
#----------- Arquivos e drivers ----------------
GZIP_INPUT_FILE = './data/votacao_candidato_munzona_2022_BRASIL.parquet.gzip'
JDBC_DRIVER_PATH = './drivers/postgresql-42.7.0.jar'

#----------- Configurações do banco de dados ------------
hostname_or_ip = "localhost"
port = "5439"
db = "metabase"
user = "postgres"
password = "postgres"
db_url = "jdbc:postgresql://" + hostname_or_ip + ":" + port + "/" + db+"?driver=org.postgresql.Drive"
properties = {
    "user": user,
    "password": password,
    "driver": "org.postgresql.Driver"
}

#### CRIANDO SEÇÃO SPARK

In [9]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('spark')\
    .config("spark.driver.extraClassPath", JDBC_DRIVER_PATH)\
    .config("spark.executor.extraClassPath",JDBC_DRIVER_PATH)\
    .getOrCreate()


#### LER PARQUET FILE

In [10]:
df = spark.read.parquet(GZIP_INPUT_FILE)
df.show()

+--------+--------------------+--------------+-----+--------------------+-------+-----------------+------------+--------------------+--------------------+-----------------------+----------+-------------+--------------------+---------------+-----------------------+-------------------+-----------------+------------------------+-------------------------+----------------+
|NR_TURNO|          DS_ELEICAO|TP_ABRANGENCIA|SG_UF|        NM_MUNICIPIO|NR_ZONA|         DS_CARGO|NR_CANDIDATO|        NM_CANDIDATO|   NM_URNA_CANDIDATO|DS_SITUACAO_CANDIDATURA|NR_PARTIDO|   SG_PARTIDO|          NM_PARTIDO|   NM_COLIGACAO|DS_COMPOSICAO_COLIGACAO|ST_VOTO_EM_TRANSITO|QT_VOTOS_NOMINAIS|NM_TIPO_DESTINACAO_VOTOS|QT_VOTOS_NOMINAIS_VALIDOS|DS_SIT_TOT_TURNO|
+--------+--------------------+--------------+-----+--------------------+-------+-----------------+------------+--------------------+--------------------+-----------------------+----------+-------------+--------------------+---------------+------------------

#### TRANSFORMAR PARQUET PARA STAR SCHEMA

In [13]:
from utils.transform_spark_df_to_star_schema import transform_spark_dataframe_into_star_schema

star_schema = transform_spark_dataframe_into_star_schema(
    df,
    colunas_fato=["QT_VOTOS_NOMINAIS_VALIDOS", "QT_VOTOS_NOMINAIS"],
    tabela_fato_nome="tabela_fato",
    mapping_colunas_dimensao={
        'dim_municipio': ["SG_UF", "NM_MUNICIPIO","NM_UE"],
        'dim_cargo': ["DS_CARGO"],
        'dim_ds_eleicao':["DS_ELEICAO"],
        'dim_partido':["SG_PARTIDO","NM_PARTIDO", "NR_PARTIDO"],
        'dim_candidato':["NM_CANDIDATO", "NR_CANDIDATO","NM_URNA_CANDIDATO"],
        'dim_turno':["NR_TURNO"],
        'dim_tp_agrangencia':["TP_ABRANGENCIA"],
        'dim_zona':["NR_ZONA"],
        'dim_situacao_candidatura':["DS_SITUACAO_CANDIDATURA"],
        'dim_coligacao':["NM_COLIGACAO", "DS_COMPOSICAO_COLIGACAO"],
        'dim_voto_transito':["ST_VOTO_EM_TRANSITO"],
        'dim_destinacao_votos':["NM_TIPO_DESTINACAO_VOTOS"],
        'dim_totalizacao_candidato':["DS_SIT_TOT_TURNO"],
        
    },   
)

> dict_values([['SG_UF', 'NM_MUNICIPIO', 'NM_UE'], ['DS_CARGO'], ['DS_ELEICAO'], ['SG_PARTIDO', 'NM_PARTIDO', 'NR_PARTIDO'], ['NM_CANDIDATO', 'NR_CANDIDATO', 'NM_URNA_CANDIDATO'], ['NR_TURNO'], ['TP_ABRANGENCIA'], ['NR_ZONA'], ['DS_SITUACAO_CANDIDATURA'], ['NM_COLIGACAO', 'DS_COMPOSICAO_COLIGACAO'], ['ST_VOTO_EM_TRANSITO'], ['NM_TIPO_DESTINACAO_VOTOS'], ['DS_SIT_TOT_TURNO']])


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `NM_UE` cannot be resolved. Did you mean one of the following? [`SG_UF`, `NR_TURNO`, `NR_ZONA`, `DS_CARGO`, `NM_PARTIDO`].;
'Project [QT_VOTOS_NOMINAIS_VALIDOS#1430L, QT_VOTOS_NOMINAIS#1428L, SG_UF#1414, NM_MUNICIPIO#1415, 'NM_UE, DS_CARGO#1417, DS_ELEICAO#1412, SG_PARTIDO#1423, NM_PARTIDO#1424, NR_PARTIDO#1422L, NM_CANDIDATO#1419, NR_CANDIDATO#1418L, NM_URNA_CANDIDATO#1420, NR_TURNO#1411L, TP_ABRANGENCIA#1413, NR_ZONA#1416L, DS_SITUACAO_CANDIDATURA#1421, NM_COLIGACAO#1425, DS_COMPOSICAO_COLIGACAO#1426, ST_VOTO_EM_TRANSITO#1427, NM_TIPO_DESTINACAO_VOTOS#1429, DS_SIT_TOT_TURNO#1431]
+- Relation [NR_TURNO#1411L,DS_ELEICAO#1412,TP_ABRANGENCIA#1413,SG_UF#1414,NM_MUNICIPIO#1415,NR_ZONA#1416L,DS_CARGO#1417,NR_CANDIDATO#1418L,NM_CANDIDATO#1419,NM_URNA_CANDIDATO#1420,DS_SITUACAO_CANDIDATURA#1421,NR_PARTIDO#1422L,SG_PARTIDO#1423,NM_PARTIDO#1424,NM_COLIGACAO#1425,DS_COMPOSICAO_COLIGACAO#1426,ST_VOTO_EM_TRANSITO#1427,QT_VOTOS_NOMINAIS#1428L,NM_TIPO_DESTINACAO_VOTOS#1429,QT_VOTOS_NOMINAIS_VALIDOS#1430L,DS_SIT_TOT_TURNO#1431] parquet


#### VISUALIZANDO DENTRO DAS TABELAS (OPCIONAL)

In [65]:
# Exemplo de como acessar as tabelas:
tabela_fato_nome, df_fato = star_schema[0]
# Visualize o conteúdo da tabela fato
print(f"Conteúdo da Tabela Fato ({tabela_fato_nome}):")
df_fato.show(truncate=False)
# Exiba o esquema da tabela fato
print("Esquema da Tabela Fato:")
df_fato.printSchema()
# Exemplo de como acessar as tabelas de dimensão:
df_dimensao1_nome, df_dimensao1 = star_schema[1]
# Visualize o conteúdo da dimensão 1
print(f"Conteúdo da Dimensão 1 ({df_dimensao1_nome}):")
df_dimensao1.show(truncate=False)
# Exiba o esquema da dimensão 1
print("Esquema da Dimensão 1:")
df_dimensao1.printSchema()
# Exemplo de como acessar as tabelas de dimensão:
df_dimensao1_nome, df_dimensao1 = star_schema[4]
# Visualize o conteúdo da dimensão 1
print(f"Conteúdo da Dimensão 1 ({df_dimensao1_nome}):")
df_dimensao1.show(truncate=False)
# Exiba o esquema da dimensão 1
print("Esquema da Dimensão 1:")
df_dimensao1.printSchema()

Conteúdo da Tabela Fato (tabela_fato):
+-------------------------+-----------------+----------------+------------+-----------------+--------------+----------------+------------+---------------------+-----------+---------------------------+----------------+--------------------+-----------------------+----------------------------+
|QT_VOTOS_NOMINAIS_VALIDOS|QT_VOTOS_NOMINAIS|sk_dim_municipio|sk_dim_cargo|sk_dim_ds_eleicao|sk_dim_partido|sk_dim_candidato|sk_dim_turno|sk_dim_tp_agrangencia|sk_dim_zona|sk_dim_situacao_candidatura|sk_dim_coligacao|sk_dim_voto_transito|sk_dim_destinacao_votos|sk_dim_totalizacao_candidato|
+-------------------------+-----------------+----------------+------------+-----------------+--------------+----------------+------------+---------------------+-----------+---------------------------+----------------+--------------------+-----------------------+----------------------------+
|0                        |0                |4726            |6           |1         

#### INSERIR MODELO ESTRELA NO BANCO DE DADOS

In [5]:

for item in star_schema:
    table_name,dataframe = item
    print(f"Writing {table_name} to DW")
    dataframe.write.jdbc(url=db_url, table=f"votacao_candidato.{table_name}", mode="overwrite", properties=properties)

Writing tabela_fato to DW
Writing dim_municipio to DW
Writing dim_cargo to DW
Writing dim_ds_eleicao to DW
Writing dim_partido to DW
Writing dim_candidato to DW
Writing dim_turno to DW
Writing dim_tp_agrangencia to DW
Writing dim_zona to DW
Writing dim_situacao_candidatura to DW
Writing dim_coligacao to DW
Writing dim_voto_transito to DW
Writing dim_destinacao_votos to DW
Writing dim_totalizacao_candidato to DW


#### FINALIZA SPARK SESSION

In [6]:
spark.stop()