In [25]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, input_file_name,split
from urllib.request import urlopen
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType
from pyspark.sql import functions as F

In [26]:
conf = SparkConf()\
        .setAppName('appEleicao')\
        .setMaster('local[2]')
        
sc = SparkContext.getOrCreate(conf=conf);
spark = SparkSession(sc)

In [27]:

eleicao_schema = StructType(fields=[
    StructField('ele', StringType(), False),
    StructField('dg', StringType(), True),
    StructField('hg', StringType(), True), 
    StructField('nadf', StringType(), True),  
    StructField(
        'abr', ArrayType(
            StructType([
                StructField('dt', StringType(), False),
                StructField('ht', StringType(), True),
                StructField('tpabr', StringType(), True),
                StructField('cdabr', StringType(), True),
                StructField(
                    'cand', ArrayType(
                        StructType([
                            StructField('seq', StringType(), False),
                            StructField('n', StringType(), False),
                            StructField('vap', StringType(), False), 
                            StructField('e', StringType(), False),
                            StructField('st', StringType(), False),
                        ])
                    )
                )
            ])
        )
    )
])

In [28]:
#Lendo os dados das eleicoes
eleicoes_bruto = spark.read.option("multiline","true")\
                .json(r"..\eleicoes_2022\*\data\resultado\presidente\*.json", schema=eleicao_schema)\
                .withColumn('filepath', input_file_name())

In [29]:
df_muncipios = spark.read.option("header","true").csv(r"..\eleicoes_2022\*\data\municipios\*.csv")

In [30]:
eleicoes_explode_abr = eleicoes_bruto\
                    .select("ele", "dg", "hg", "nadf", "filepath" , explode("abr").alias("abrExplode"))\
                    .select("ele", "dg", "hg", "nadf", "filepath", "abrExplode.*")

#eleicoes_explode_abr.show()

In [31]:
eleicoes_explode_cand = eleicoes_explode_abr\
                    .select("ele", "dg", "hg", "nadf", "dt", "ht", "tpabr", "cdabr", "filepath", explode("cand").alias("candExplode"))\
                    .select("ele", "dg", "hg", "nadf", "dt", "ht", "tpabr", "cdabr", "filepath", "candExplode.*")

#eleicoes_explode_cand.show()

In [32]:
df_eleicoes = eleicoes_explode_cand.filter("tpabr = 'ZONA'")

In [33]:
df_eleicoes = df_eleicoes.withColumn("split_filepath", split(df_eleicoes["filepath"], "/"))

df_eleicoes = df_eleicoes.withColumn('filename', df_eleicoes["split_filepath"]\
                         .getItem(F.size(df_eleicoes["split_filepath"]) - 1))\
                         .drop("split_filepath", "filepath")

df_eleicoes = df_eleicoes.withColumn('cod_mun', df_eleicoes["filename"].substr(3, 5))




In [34]:
df_eleicoes = df_eleicoes.join(df_muncipios,on=(df_eleicoes["cod_mun"] == df_muncipios["abr_cd"]),how="inner")\
                         .drop("abr_cd", "abr_cdi","abr_c", "abr_z","url")


In [37]:
df_eleicoes.coalesce(1)\
                   .write\
                   .format("parquet")\
                   .mode("overwrite")\
                   .save(path=r"resultado") 