#### Informacoes

Temos na célula abaixo tanto algumas informações sobre o tamanho geral.
Como também o mesmo select que vai ser feito posteriormente mas antes dos processamento

In [15]:
import pyspark 
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.functions import col,udf
from pyspark.sql.functions import lpad, to_date
import pandas as pd


spark = SparkSession.builder.master("local").appName("hdfs_test").getOrCreate()

dados = spark.read.load("hdfs://localhost:9000/dadostse/",format='csv',sep=';',inferSchema="true",header="true",encoding='latin1')
print(f"O tamanho de linhas é: {dados.count()}")
dados.select("NM_CANDIDATO","DT_NASCIMENTO","DT_ELEICAO","NM_EMAIL","NR_TITULO_ELEITORAL_CANDIDATO","DS_GENERO",).show(20)



O tamanho de linhas é: 884010
+--------------------+-------------+----------+---------------+-----------------------------+---------+
|        NM_CANDIDATO|DT_NASCIMENTO|DT_ELEICAO|       NM_EMAIL|NR_TITULO_ELEITORAL_CANDIDATO|DS_GENERO|
+--------------------+-------------+----------+---------------+-----------------------------+---------+
|JOSÉ MARIA RICARD...|   01/05/1966|03/10/2004|         #NULO#|                 027990051325|MASCULINO|
|JOSÉ ARNALDO DE A...|   24/06/1958|03/10/2004|         #NULO#|                 002685981317|MASCULINO|
|LEONAM MENDES DA ...|   24/10/1974|03/10/2004|         #NULO#|                 024965891384|MASCULINO|
|RUBENS CANDIDO DE...|   02/07/1962|03/10/2004|         #NULO#|                 035048940281|MASCULINO|
|JOAQUIM CUSTODIO ...|   30/01/1945|03/10/2004|         #NULO#|                 013790440272|MASCULINO|
|     WILSON LOURENÇO|   12/10/1950|03/10/2004|         #NULO#|                 050118770230|MASCULINO|
|DIVINO JOSÉ DE ME...|   14/03/196

                                                                                

### Spark + HDFS

Aqui foram feitos os processamentos com Spark. 

Os arquivos estavam salvos no HDFS.


In [43]:
import pyspark 
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.functions import col,udf
from pyspark.sql.functions import lpad, to_date
import pandas as pd


spark = SparkSession.builder.master("local").appName("hdfs_test").getOrCreate()

dados = spark.read.load("hdfs://localhost:9000/dadostse/",format='csv',sep=';',inferSchema="true",header="true",encoding='latin1')


# Parte de Genero e Profissao 
genero = spark.read.load("hdfs://localhost:9000/utils/grupos.csv",format='csv',sep=',',inferSchema="true",header="true",encoding='latin1').toPandas()
prof = spark.read.load("hdfs://localhost:9000/utils/lista_canonicos.csv",format='csv',sep=',',inferSchema="true",header="true",encoding='utf8').toPandas()

# Valores nulos


"""
Vai substituir todos os valores dentro dessa lista por null
Vai substituir todos os -3 por -1
"""
dados = dados.replace(["#NE","#NE#","#NULO","#NULO#","NAO INFORMADO"],"null")
dados = dados.replace([-3],-1)
    

def gerarGenero(nome):
    """
    Função que vai retornar o genero de acordo com o nome
    Vai receber logo o primeiro nome, colocar um regex dado o contains
    Vai procurar dentro do dataset de nomes o genero e retornar como uma string
    """
    nome = f"[|]{nome}?[|]" # Regex para achar o nome exato por causa do contains
    gen = genero.loc[genero['names'].str.contains(nome,regex=True)]
    gender = gen['classification'].to_string()[-1]
    if(gender=="M" or gender=="F"):
        return gender

    return "null" 

def generoFuncao(dados):
    """
    Vai gerar a coluna DS_GEN_NOVO, com os generos de acordo com a base de dados
    Vai utilizar da gerarGenero
    """
    funcaoGenero = udf(lambda x: gerarGenero(x.split(" ")[0]),StringType())
    return dados.withColumn("DS_GEN_NOVO",funcaoGenero(col("NM_CANDIDATO")))



def tituloEleitoral(dados):
    """
    Funcao que vai completar os numeros de titulo de eleitor para 12 digitos.
    """

    return dados.withColumn("NR_TITULO_ELEITORAL_CANDIDATO",lpad(col("NR_TITULO_ELEITORAL_CANDIDATO"),12,"0"))


def arrumarDatas(dados):
    """
    Funcao que vai padronizar todas as datas para yyyy-MM-dd
    """

    return dados.withColumn("DT_NASCIMENTO",to_date("DT_NASCIMENTO","dd/MM/yyyy")).withColumn("DT_ELEICAO",to_date("DT_ELEICAO","dd/MM/yyyy")).withColumn("DT_GERACAO",to_date("DT_GERACAO","dd/MM/yyyy")) #DT_NASCIMENTO


def gerarOcupacao(nome):
    """
    Vai retornar o codigo cbo se existir.
    Faz o check igualmente do TSE para o CBO, ou seja, vai chegar se as strings sao ambas iguais
    """
    codigo = prof.loc[prof['termo'].str.contains(nome.capitalize())]
    codigoString = codigo['codigo'].to_string()
    if(codigoString[-1].isnumeric()):
        return codigoString[-7:]
    else:
        return "null"

def ocupacaoFuncao(dados):
    """
    Vai gerar a coluna CD_CBO com a funcao gerarOcupacao
    """

    funcaoOcupacao = udf(lambda x: gerarOcupacao(x),StringType())
    return dados.withColumn("CD_CBO",funcaoOcupacao(col("DS_OCUPACAO")))


dados = arrumarDatas(dados)
dados = tituloEleitoral(dados)
dados = generoFuncao(dados)
dados = ocupacaoFuncao(dados)
print(f"O tamanho de linhas é: {dados.count()}")
dados.select("NM_CANDIDATO","DT_NASCIMENTO","DT_ELEICAO","NM_EMAIL","NR_TITULO_ELEITORAL_CANDIDATO","DS_GENERO","CD_CBO","DS_GEN_NOVO").show(20)


                                                                                

O tamanho de linhas é: 442005


[Stage 317:>                                                        (0 + 1) / 1]

+--------------------+-------------+----------+--------------------+-----------------------------+---------+-------+-----------+
|        NM_CANDIDATO|DT_NASCIMENTO|DT_ELEICAO|            NM_EMAIL|NR_TITULO_ELEITORAL_CANDIDATO|DS_GENERO| CD_CBO|DS_GEN_NOVO|
+--------------------+-------------+----------+--------------------+-----------------------------+---------+-------+-----------+
|ASSIR RODRIGUES P...|   1955-08-07|2004-10-03|                null|                 023237020132|MASCULINO|1414-10|          M|
|VILMA DE SANT`ANN...|   1968-02-05|2004-10-03|                null|                 147826820191| FEMININO|   null|          F|
|       TARCIO FOGAÇA|   1961-09-23|2004-10-03|                null|                 049874290124|MASCULINO|   null|          M|
|LUIS VANDERLEI CO...|   1967-05-04|2004-10-03|                null|                 028090260183|MASCULINO|   null|          M|
|ODETE MOREIRA DA ...|   1949-12-29|2004-10-03|                null|                 062796890159

                                                                                

### Queries SQL pelo SparkSQL em cima de dados

- Participação Feminina
- Idade media dos candidatos

In [44]:
# Idade Media dos Candidatos
from pyspark.sql.functions import avg,lit


dados.filter(dados["DT_ELEICAO"] < lit('1995')).select(avg("NR_IDADE_DATA_POSSE").alias(f"idade media 1994")).show()

dados.filter((dados["DT_ELEICAO"] > lit('1995')) & (dados["DT_ELEICAO"] < lit('1999'))).select(avg("NR_IDADE_DATA_POSSE").alias(f"idade media 1998")).show()

dados.filter((dados["DT_ELEICAO"] > lit('1999')) & (dados["DT_ELEICAO"] < lit('2003'))).select(avg("NR_IDADE_DATA_POSSE").alias(f"idade media 2002")).show()

dados.filter(dados["DT_ELEICAO"] > lit('2003')).select(avg("NR_IDADE_DATA_POSSE").alias(f"idade media 2004")).show()




                                                                                

+------------------+
|  idade media 1994|
+------------------+
|46.073717466526254|
+------------------+



                                                                                

+-----------------+
| idade media 1998|
+-----------------+
|45.71773124834349|
+-----------------+



                                                                                

+-----------------+
| idade media 2002|
+-----------------+
|45.46976641449003|
+-----------------+





+-----------------+
| idade media 2004|
+-----------------+
|43.75791295439343|
+-----------------+



                                                                                

In [45]:
# Participação Feminina 
from pyspark.sql.functions import count, lit

dados.filter((dados["DT_ELEICAO"] < lit('1995')) & (dados["DS_GENERO"]=="FEMININO")).select(count("DS_GENERO").alias(f"Qnt mulheres 1994")).show()

dados.filter((dados["DT_ELEICAO"] > lit('1995')) & (dados["DT_ELEICAO"] < lit('1999')) & (dados["DS_GENERO"]=="FEMININO")).select(count("DS_GENERO").alias(f"Qnt mulheres 1998")).show()

dados.filter((dados["DT_ELEICAO"] > lit('1999')) & (dados["DT_ELEICAO"] < lit('2003')) & (dados["DS_GENERO"]=="FEMININO")).select(count("DS_GENERO").alias(f"Qnt mulheres 2002")).show()

dados.filter((dados["DT_ELEICAO"] > lit('2003')) & (dados["DS_GENERO"]=="FEMININO")).select(count("DS_GENERO").alias(f"Qnt mulheres 2004")).show()


                                                                                

+-----------------+
|Qnt mulheres 1994|
+-----------------+
|              452|
+-----------------+



                                                                                

+-----------------+
|Qnt mulheres 1998|
+-----------------+
|             1886|
+-----------------+



                                                                                

+-----------------+
|Qnt mulheres 2002|
+-----------------+
|             2602|
+-----------------+





+-----------------+
|Qnt mulheres 2004|
+-----------------+
|            85397|
+-----------------+



                                                                                