## Desafio Engenheiro de Dados Jr - Cidacs

Imports

In [32]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import expr, col, trim, regexp_replace, row_number, lit, when, monotonically_increasing_id
import pandas as pd
import random

Configuração do SparkSession

In [2]:
spark = SparkSession.builder \
    .appName("DesafioCidacs") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.network.timeout", "600s") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.heartbeatInterval", "60s") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.sql.shuffle.partitions", "50") \
    .getOrCreate()

# Para otimização de leitura e escrita de dados
spark.conf.set("spark.sql.parquet.compression.codec", "snappy")

# Para melhorar a performance do Spark SQL
spark.conf.set("spark.sql.broadcastTimeout", "1000")

#### Extração do DataBase CNEFE - BAHIA

In [3]:
#Base bahia, código 29
df_principal = spark.read.text("./BASE_CNEFE_BA/base.txt")

In [8]:
df_principal.show(10)

+--------------------+
|               value|
+--------------------+
|2900108050000011R...|
|2900108050000011R...|
|2900108050000011R...|
|2900108050000011R...|
|2900108050000011R...|
|2900108050000011R...|
|2900108050000011R...|
|2900108050000011R...|
|2900108050000011R...|
|2900108050000011R...|
+--------------------+
only showing top 10 rows



- Filtrando pela cidade de Salvador

In [20]:
#Sabemos que o código de salvador é 27408, vamos filtrar só o que nos interessa
df_salvador = df_principal.filter(expr("substr(value,3,5)") == '27408')

In [10]:
#Verificando se a filtragem deu certo
df_salvador.show()

+--------------------+
|               value|
+--------------------+
|2927408050600011T...|
|2927408050600011T...|
|2927408050600011T...|
|2927408050600011T...|
|2927408050600011T...|
|2927408050600011T...|
|2927408050600011T...|
|2927408050600011T...|
|2927408050600011T...|
|2927408050600011T...|
|2927408050600011T...|
|2927408050600011T...|
|2927408050600011T...|
|2927408050600011T...|
|2927408050600011R...|
|2927408050600011L...|
|2927408050600011L...|
|2927408050600011L...|
|2927408050600011L...|
|2927408050600011L...|
+--------------------+
only showing top 20 rows



In [5]:
#Tamanho da base df_salvador
tam_df_salvador = df_salvador.count()
print(tam_df_salvador)

1097829


#### Formatando DataSet

In [21]:
'''
Vamos filtrar se baseando no Layout do IBGE, baixado no mesmo local da base.
substr(coluna,posição inicial, tamanho)
'''
df_salvador = df_salvador.select( expr("substr(value,17,20)").alias("TP_Logradouro"),
                                    expr("substr(value,37,30)").alias("Titulo_Logradouro"),
                                    expr("substr(value,67,60)").alias("Nome_Logradouro"),
                                    expr("substr(value,127,8)").alias("Num_Logradouro"),
                                    expr("substr(value,12,4)").alias("Setor_Censitario"),
                                    expr("substr(value,142,180)").alias("Complemento")
                                   )

In [13]:
df_salvador.show()

+--------------------+--------------------+--------------------+--------------+----------------+--------------------+
|       TP_Logradouro|   Titulo_Logradouro|     Nome_Logradouro|Num_Logradouro|Setor_Censitario|         Complemento|
+--------------------+--------------------+--------------------+--------------+----------------+--------------------+
|TRAVESSA            |                 ...|BASILIO MAGALHAES...|      0       |            0001|                 ...|
|TRAVESSA            |                 ...|BASILIO MAGALHAES...|      0       |            0001|                 ...|
|TRAVESSA            |                 ...|BASILIO MAGALHAES...|      0       |            0001|                 ...|
|TRAVESSA            |                 ...|BASILIO MAGALHAES...|      0       |            0001|                 ...|
|TRAVESSA            |                 ...|BASILIO MAGALHAES...|      19      |            0001|                 ...|
|TRAVESSA            |                 ...|BASILIO MAGAL

In [14]:
#Informações do DataFrame
df_salvador.printSchema()

root
 |-- TP_Logradouro: string (nullable = true)
 |-- Titulo_Logradouro: string (nullable = true)
 |-- Nome_Logradouro: string (nullable = true)
 |-- Num_Logradouro: string (nullable = true)
 |-- Setor_Censitario: string (nullable = true)
 |-- Complemento: string (nullable = true)



#### Criando Base A

In [37]:
"""
Agora vamos tirar uma amostra no tamanho de 100 mil da base de salvador
Vamos fazer uma amostragem aleatória sem reposição usando o método "sample"
"""
tam_amostra_A  = 100000

fracao_amostra = tam_amostra_A/tam_df_salvador # = 0.0910888672097385

#arrendondar "fraction" pra cima e limitar por 100 mil, além de incluir a seed pra manter a Reprodutibilidade

df_A = df_salvador.sample(withReplacement = False, fraction = 0.092, seed = 42).limit(tam_amostra_A) 

In [16]:
#contagem base A
df_A.count()

100000

- Vamos formatar a base A, deixar ela mais apresentável

In [38]:
#Vamos tirar os espações em excesso dos campos nas laterais da string usando a função trim()

df_A = df_A.withColumn("TP_Logradouro", trim(col("TP_Logradouro")))\
           .withColumn("Titulo_Logradouro", trim(col("Titulo_Logradouro")))\
           .withColumn("Nome_Logradouro", trim(col("Nome_Logradouro")))\
           .withColumn("Num_Logradouro", trim(col("Num_Logradouro")))\
           .withColumn("Setor_Censitario", trim(col("Setor_Censitario")))\
           .withColumn("Complemento", trim(col("Complemento")))

In [18]:
df_A.show(10)

+-------------+-----------------+-----------------+--------------+----------------+--------------------+
|TP_Logradouro|Titulo_Logradouro|  Nome_Logradouro|Num_Logradouro|Setor_Censitario|         Complemento|
+-------------+-----------------+-----------------+--------------+----------------+--------------------+
|     TRAVESSA|                 |BASILIO MAGALHAES|             0|            0001|                    |
|        LARGO|                 |     DA MARIQUITA|            10|            0001|APARTAMENTO      ...|
|          RUA|                 |          DO MEIO|             0|            0001|                    |
|          RUA|                 |          DO MEIO|             0|            0001|                    |
|          RUA|                 |          DO MEIO|            74|            0001|             SOBRADO|
|          RUA|                 |          DO MEIO|            74|            0001|              TERREO|
|          RUA|                 |          DO MEIO|    

In [39]:
#Agora vamos tirar o espaço em excesso entre as palavra no campo COMPLEMENTO

df_A = df_A.withColumn("Complemento", regexp_replace("Complemento", '\\s+', ' '))

In [35]:
#Podemos verificar que os espaços foram retirados e o ID incluído

df_A.show(10)

+---+-------------+-----------------+-----------------+--------------+----------------+---------------+
| Id|TP_Logradouro|Titulo_Logradouro|  Nome_Logradouro|Num_Logradouro|Setor_Censitario|    Complemento|
+---+-------------+-----------------+-----------------+--------------+----------------+---------------+
|  0|     TRAVESSA|                 |BASILIO MAGALHAES|             0|            0001|               |
|  1|        LARGO|                 |     DA MARIQUITA|            10|            0001|APARTAMENTO 202|
|  2|          RUA|                 |          DO MEIO|             0|            0001|               |
|  3|          RUA|                 |          DO MEIO|             0|            0001|               |
|  4|          RUA|                 |          DO MEIO|            74|            0001|        SOBRADO|
|  5|          RUA|                 |          DO MEIO|            74|            0001|         TERREO|
|  6|          RUA|                 |          DO MEIO|         

In [40]:
#Transformando a base Spark em uma base Pandas
df_A_pd = df_A.toPandas()

In [42]:
#Exportando em fomato CSV
df_A_pd.to_csv("./Bases_Geradas/baseA.csv", na_rep = "NULL", index = True, encoding = 'utf-8', index_label="Id")

#### Descritiva Simples

 - A base "df_salvador" importada tem um pouco mais de 1M de linhas
 
 - A amostra A tem **100 mil linhas** e conseguimos notar que, todas as colunas são, por enquanto, do tipo **string**

 - Havia excesso de espaços no campo **COMPLEMENTO**,  **LOGRADOURO** e **TITULO**

 - Além disso, há valores ausentes(string vazia) em **COMPLEMENTO** e **TITULO**

In [43]:
df_A.select(col("Setor_Censitario")).distinct().count()

593

- Observamos também que há **593 setores censitários** na amostra **A**

- Outra observação é que vamos manter as **strings** vazias da amostra **A**, em seguida faremos o mesmo pra **B**, pois as colunas **COMPLEMENTO** e **TITULO** são opcionais e não necessariamente tem valores, portanto usaremos o algoritmo nessas condições.

#### Criando Base B

- Amostragem

In [44]:
'''
Vamos criar a base B a partir da A, já formatada e menor, isso vai nos trazer mais desempenho
e mais simplicidade para a aplicação do algoritmo
'''

#vamos usar 6 mil registros para a base B

tam_amostra_B = 6000

#fração em relação ao tamanho base A

fracao_amostra_B = tam_amostra_B/tam_amostra_A # 6000/100000 = 0.06

#Amostragem aleatória, arrendodando pra cima e limitando pelo tamanho da amostra

df_B = df_A.sample(fraction = 0.065, seed = 32, withReplacement = False).limit(tam_amostra_B)

In [45]:
#Contagem

df_B.count()

6000

In [46]:
#Vamos reformatar o Id, que veio aleatoriamente de acordo com a amostragem
windowSpec = Window.orderBy(lit(1))
df_B = df_B.withColumn('Id', row_number().over(windowSpec).cast('int') - 1)

In [47]:
#df resultante

df_B.show(10)

+-------------+-----------------+---------------+--------------+----------------+--------------------+---+
|TP_Logradouro|Titulo_Logradouro|Nome_Logradouro|Num_Logradouro|Setor_Censitario|         Complemento| Id|
+-------------+-----------------+---------------+--------------+----------------+--------------------+---+
|          RUA|                 |   FONTE DO BOI|           176|            0001|APARTAMENTO 303 B...|  0|
|      AVENIDA|                 |   OSWALDO CRUZ|           222|            0001|              LOJA 4|  1|
|          RUA|                 |   FONTE DO BOI|            55|            0002|APARTAMENTO 05 AN...|  2|
|          RUA|                 | BARRO VERMELHO|             0|            0003|APARTAMENTO 204 B...|  3|
|          RUA|                 | BARRO VERMELHO|            93|            0003|     APARTAMENTO 105|  4|
|        PRACA|                 | AUGUSTO SEVERO|             6|            0004|                    |  5|
|          RUA|                 |    

#### Inserindo Mil Registros Com Valores Ausentes Em Uma Das Variáveis


- Para fazermos isso vamos usar a seguinte abordagem:
    
    - Vamos criar uma **lista** de valores random baseados **tamanho** da amostra **B**.

    - Inserir o valor None(vazio em python), em uma das colunas, no caso vamos usar a coluna **NUMERO** (Num_Logradouro)

    - Usaremos **.withColumn** com **.when().otherwise()** para comparar os valores da **lista** com o **Id** do **DataFrame** e em seguida inserir os valores ausentes.
  

In [48]:
#variável com a quantidade de valores ausentes
qtd_ausentes = 1000

#seed para preservar a reprodutibilidade dos dados
random.seed(32)

#Criando lista de valores random
lista_random = random.sample(range(tam_amostra_B), qtd_ausentes)

In [49]:
#Tamanho lista
print(len(lista_random))

#verificar se os valores são distintos entre si
print(len(set(lista_random)))

1000
1000


In [78]:
#visualizando
print(lista_random[:10])

[634, 1749, 1184, 2483, 5720, 1948, 4065, 199, 5900, 315]


In [51]:
# Verificar se há string vazia na coluna NUMERO(Num_Logradouro)
df_B.filter(df_B['Num_Logradouro'] == "").count()

0

In [52]:
# Verificar se há valores ausentes na coluna NUMERO(Num_Logradouro)
df_B.filter(col("Num_Logradouro").isNull()).count()

0

In [53]:
#Inserindo valores ausentes na coluna NUMERO(Num_Logradouro)
df_B = df_B.withColumn("Num_Logradouro", when(col('Id').isin(lista_random), None).otherwise(col("Num_Logradouro")))

In [54]:
# Verificar se os valores ausentes foram inseridos na coluna NUMERO(Num_Logradouro)
df_B.filter(col("Num_Logradouro").isNull()).count()

1000

In [55]:
#Visualizando
df_B.show(10)

+-------------+-----------------+---------------+--------------+----------------+--------------------+---+
|TP_Logradouro|Titulo_Logradouro|Nome_Logradouro|Num_Logradouro|Setor_Censitario|         Complemento| Id|
+-------------+-----------------+---------------+--------------+----------------+--------------------+---+
|          RUA|                 |   FONTE DO BOI|           176|            0001|APARTAMENTO 303 B...|  0|
|      AVENIDA|                 |   OSWALDO CRUZ|           222|            0001|              LOJA 4|  1|
|          RUA|                 |   FONTE DO BOI|            55|            0002|APARTAMENTO 05 AN...|  2|
|          RUA|                 | BARRO VERMELHO|             0|            0003|APARTAMENTO 204 B...|  3|
|          RUA|                 | BARRO VERMELHO|            93|            0003|     APARTAMENTO 105|  4|
|        PRACA|                 | AUGUSTO SEVERO|             6|            0004|                    |  5|
|          RUA|                 |    

#### Supressão De Palavras (aleatória) Nas Variáveis

- Para fazermos isso vamos usar a seguinte abordagem:

    - Primeiro vamos transforamr o **DF Spark** em um **DF pandas**, pra melhorar o fluxo de trabalho e pra facilitar as manipulações.

    - Vamos criar as **funções** que vão retirar **uma** e **duas palavras aleátorias** nas variáveis **Nome_Logradouro** e **Complemento**.

    - Vamos criar **duas listas**, com **valores aleatórios**, baseadas em índices do tamanho da base B. Isso vai permitir modificar mais facilmente os valores de determinada variável.

    - Além disso, vamos exportar a base, somente com os campos **Nome_Logradouro** e **Complemento**, usando o Pandas e vamos ler essa base usando o Spark. Isso vai evitar conflitos e má formatação na mudança de frameworks.

    - Ao final vamos retirar as antigas colunas da **base B** e vamos inserir as novas usando o **FULL JOIN**

In [56]:
df_B_pd = df_B.toPandas()

In [57]:
df_B_pd.head()

Unnamed: 0,TP_Logradouro,Titulo_Logradouro,Nome_Logradouro,Num_Logradouro,Setor_Censitario,Complemento,Id
0,RUA,,FONTE DO BOI,176,1,APARTAMENTO 303 BLOCO B,0
1,AVENIDA,,OSWALDO CRUZ,222,1,LOJA 4,1
2,RUA,,FONTE DO BOI,55,2,APARTAMENTO 05 ANDAR 1,2
3,RUA,,BARRO VERMELHO,0,3,APARTAMENTO 204 BLOCO B,3
4,RUA,,BARRO VERMELHO,93,3,APARTAMENTO 105,4


- Funções Usadas

In [58]:
#Função para supressão de uma palavra
def supressao_palavra(p):
    random.seed(42)
    if p is not None:
        palavras = p.split()
        if palavras:
            indice = random.randint(0,len(palavras) - 1)
            del palavras[indice]
            return " ".join(palavras)
        else:
            return p
    else:
        return None

    

In [59]:
#Função para supressão de duas palavras
def supressao_palavra_2(p):
    if p is not None:
        palavras = p.split()
        for i in range(2):
            if palavras:
                indice = random.randint(0,len(palavras) - 1)
                del palavras[indice]
            else:
                return p
        return " ".join(palavras)
    else:
        return None

#### Supressão De **Uma Palavra** em 2 Mil Registros Na Coluna **Nome_Logradouro**

In [60]:
random.seed(33)

#Criando lista de valores random
indices_random_1 = random.sample(range(tam_amostra_B), 2000)

In [79]:
print(indices_random_1[:10])

[4672, 1370, 5179, 1910, 2271, 3923, 5340, 4363, 4320, 1527]


In [62]:
#Aplicando a função(supressao_palavra) nos indices contidos na lista
df_B_pd.loc[indices_random_1,'Nome_Logradouro'] = df_B_pd.loc[indices_random_1,'Nome_Logradouro'].apply(supressao_palavra)

#### Supressão De **Duas Palavras** em 3 Mil Registros Na Coluna **Complemento**

In [63]:
random.seed(31)

#Criando lista, com tamanho 3 mil, de valores random
indices_random_2 = random.sample(range(tam_amostra_B), 3000)

In [77]:
print(indices_random_2[:10])

[100, 3846, 920, 3218, 1153, 5602, 354, 1136, 921, 4384]


In [65]:
#Aplicando a função(supressao_palavra_2) nos indices contidos na lista 
df_B_pd.loc[indices_random_2,'Complemento'] = df_B_pd.loc[indices_random_2,'Complemento'].apply(supressao_palavra_2)

 - Visualizando

In [66]:
df_B_pd.head()

Unnamed: 0,TP_Logradouro,Titulo_Logradouro,Nome_Logradouro,Num_Logradouro,Setor_Censitario,Complemento,Id
0,RUA,,FONTE DO BOI,176,1,APARTAMENTO 303 BLOCO B,0
1,AVENIDA,,OSWALDO CRUZ,222,1,,1
2,RUA,,FONTE DO BOI,55,2,APARTAMENTO ANDAR,2
3,RUA,,BARRO VERMELHO,0,3,APARTAMENTO 204 BLOCO B,3
4,RUA,,BARRO VERMELHO,93,3,APARTAMENTO 105,4


 #### Exportando Base Com Pandas e Lendo Com Spark

In [67]:
#Exportando as colunas de interesse
df_B_pd[['Nome_Logradouro','Complemento']].to_csv("./Bases_Geradas/Log_Comp.csv", header = True, index = False, na_rep= None)

In [68]:
#Lendo com spark
df_Log_Comp = spark.read.csv('./Bases_Geradas/Log_Comp.csv', inferSchema = False, header = True, nullValue= None)

 - Recolocando Strings Vazias Que Foram Perdidas

In [69]:
#Nas colunas "Nome_Logradouro" e "Complemento" onde tiver nulo substitui por ""
df_Log_Comp = df_Log_Comp.withColumn("Nome_Logradouro" , when(col("Nome_Logradouro").isNull(),"").otherwise(col("Nome_Logradouro")))\
                         .withColumn("Complemento", when(col("Complemento").isNull(),"").otherwise(col("Complemento")))

In [70]:
#Colocando indices a base de dados
df_Log_Comp = df_Log_Comp.withColumn('Id', row_number().over(windowSpec).cast('int') - 1)

In [71]:
df_Log_Comp.show(10)

+---------------+--------------------+---+
|Nome_Logradouro|         Complemento| Id|
+---------------+--------------------+---+
|   FONTE DO BOI|APARTAMENTO 303 B...|  0|
|   OSWALDO CRUZ|                    |  1|
|   FONTE DO BOI|   APARTAMENTO ANDAR|  2|
| BARRO VERMELHO|APARTAMENTO 204 B...|  3|
| BARRO VERMELHO|     APARTAMENTO 105|  4|
| AUGUSTO SEVERO|                    |  5|
|        TAMOIOS|           ANDAR 201|  6|
|         ARGOLO|                    |  7|
|         MAMUTE|                    |  8|
|        VINHAIS|     APARTAMENTO 201|  9|
+---------------+--------------------+---+
only showing top 10 rows



 #### Fazendo As Alterações Na Base **B**

In [72]:
#retirando as antigas colunas
df_B = df_B.drop("Nome_Logradouro", "Complemento")

In [73]:
#Fazendo o join(FULL) entre as duas tabelas baseadas no "Id"
df_B = df_B.join(df_Log_Comp, on = "Id", how = "full")

In [74]:
#Reorganizando a tabela
df_B = df_B.select("Id",
                    "TP_Logradouro",
                    "Titulo_Logradouro",
                    "Nome_Logradouro",
                    "Num_Logradouro",
                    "Setor_Censitario",
                    "Complemento"
                    )

In [75]:
#Verificando alterações
df_B.show(10)

+---+-------------+-----------------+---------------+--------------+----------------+--------------------+
| Id|TP_Logradouro|Titulo_Logradouro|Nome_Logradouro|Num_Logradouro|Setor_Censitario|         Complemento|
+---+-------------+-----------------+---------------+--------------+----------------+--------------------+
|  0|          RUA|                 |   FONTE DO BOI|           176|            0001|APARTAMENTO 303 B...|
|  1|      AVENIDA|                 |   OSWALDO CRUZ|           222|            0001|                    |
|  2|          RUA|                 |   FONTE DO BOI|            55|            0002|   APARTAMENTO ANDAR|
|  3|          RUA|                 | BARRO VERMELHO|             0|            0003|APARTAMENTO 204 B...|
|  4|          RUA|                 | BARRO VERMELHO|            93|            0003|     APARTAMENTO 105|
|  5|        PRACA|                 | AUGUSTO SEVERO|             6|            0004|                    |
|  6|          RUA|                 |

In [76]:
#Exportando base B
df_B_pd = df_B.toPandas()
df_B_pd.to_csv("./Bases_Geradas/baseB.csv", na_rep = "NULL", index = False, encoding = 'utf-8')