<a href="https://colab.research.google.com/github/AlexSan1910/Tratamento_De_Dados_PySpark/blob/main/Tratamento_estabelecimentos.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Instalando o pyspark para utilização

!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=cb560b8a4676e002cf18feb3f0d1915add60363e30ab4c935e29b00366363bef
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
# importando bibliotecas PySpark

from pyspark.sql import SparkSession
from pyspark.sql import functions as f

spark = SparkSession.builder.appName("Estudos").getOrCreate() # inicializando sessão spark

In [None]:
# Buscando base de dados no Google Drive

url = "/content/drive/MyDrive/BasesSpark/estabelecimentos/part*" # caminho da pasta com os dados que desejo

df_estabelecimentos = spark.read.csv(
    url,
    sep=";", # definindo o separador desse arquivo csv
    header=True, # irá assumoir que esse arquivo tem um cabeçalho "Titulos"
    inferSchema=True, # Tentará identificar cada um dos tipos de dados da coluna
)

In [None]:
df_estabelecimentos.show()

+-----------+----------+-------+---------------------------+--------------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+--------------------+------+-------------------+------------------+--------+---+---------+-----+----------+-----+----------+----------+--------+--------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|       nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|          logradouro|numero|        complemento|            bairro|     cep| uf|municipio|ddd_1|telefone_1|ddd_2|telefone_2|ddd_do_fax|     fax|  correio_eletronico|situacao_especial|data_da_situacao_especial|
+-----------+----------+-------+------

In [None]:
# Definindo tudo que for valor nulo que seja "Sem registro"

# "Sem registro" para dados em string
df_estabelecimentos = df_estabelecimentos.na.fill("Sem registro")

# "0" para dados numéricos
df_estabelecimentos = df_estabelecimentos.na.fill(0)

In [None]:
# Verificando o tipo de cada coluna

df_estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = false)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = false)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = false)
 |-- tipo_de_logradouro: string (nullable = false)
 |-- logradouro: string (nullable = false)
 |-- numero: string (nullable = false)
 |-- complemento: string (nullable = false)
 |-- bairro: string (nullable = false)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = false)
 |-- municipio: integer (nullable = true)
 |-- d

In [None]:
# Alterando tipo de dados de determinadas coluna para o tipo date

df_estabelecimentos = df_estabelecimentos.withColumn("data_situacao_cadastral", f.to_date(f.col("data_situacao_cadastral").cast("string"), 'yyyyMMdd'))


In [None]:
# Verificando as alterações

df_estabelecimentos.show()

+-----------+----------+-------+---------------------------+--------------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+--------------------+------+-------------------+------------------+--------+---+---------+------------+------------+------------+------------+----------+------------+--------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|       nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|          logradouro|numero|        complemento|            bairro|     cep| uf|municipio|       ddd_1|  telefone_1|       ddd_2|  telefone_2|ddd_do_fax|         fax|  correio_eletronico|situacao_especial|data_da_situacao_espe

In [None]:
# Alterando nome das colunas

df_estabelecimentos = df_estabelecimentos.withColumnsRenamed({
   "uf":"Estado",
   "municipio":"id_municipio"
})

In [None]:
df_estabelecimentos.show()

+-----------+----------+-------+---------------------------+--------------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+--------------------+------+-------------------+------------------+--------+------+------------+------------+------------+------------+------------+----------+------------+--------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|       nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|          logradouro|numero|        complemento|            bairro|     cep|Estado|id_municipio|       ddd_1|  telefone_1|       ddd_2|  telefone_2|ddd_do_fax|         fax|  correio_eletronico|situacao_especial|data_da_s

In [None]:
# excluindo colunas que não serão utilizados

df_estabelecimentos = df_estabelecimentos.drop("correio_eletronico","ddd_1", "telefone_1", "ddd_2",
                         "telefone_2", "ddd_do_fax", "fax", "situacao_especial", "data_da_situacao_especial")


In [None]:
# Visualizando as alterações

df_estabelecimentos.show()

+-----------+----------+-------+---------------------------+--------------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+--------------------+------+-------------------+------------------+--------+------+------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|       nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|          logradouro|numero|        complemento|            bairro|     cep|Estado|id_municipio|
+-----------+----------+-------+---------------------------+--------------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------

In [None]:
# Visualizando valores distintos da coluna Estado, para alteração no proximo bloco de codigo

df_estabelecimentos.select("Estado").distinct().show()

+------+
|Estado|
+------+
|    SC|
|    RO|
|    PI|
|    AM|
|    EX|
|    RR|
|    GO|
|    TO|
|    MT|
|    SP|
|    ES|
|    PB|
|    RS|
|    MS|
|    AL|
|    MG|
|    PA|
|    BA|
|    SE|
|    PE|
+------+
only showing top 20 rows



In [None]:
# Foi identificado um valor "EX" na coluna estado, que não corresponde a nenhum estado..
# Sendo assim selecionaremos os valores EX para visualizarmos esses dados e identificarmos
# do que se trata.

df_estabelecimentos.where("Estado like '%EX%'").show(truncate=False) # Comando "where" sql para pesquisas com condições

+-----------+----------+-------+---------------------------+-------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+---------------------------------+------+------------+------------------+---+------+------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|logradouro                       |numero|complemento |bairro            |cep|Estado|id_municipio|
+-----------+----------+-------+---------------------------+-------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+------------

In [None]:
# Foi identificado que sigla EX tem o signficado de Exterior.. ou seja
# são empresas que provavelmente são de fora do pais.. sendo assim iremos excluir
# esses dados da nossa lista pois a nossa lista é estabelecimentos apenas do Brasil

df_estabelecimentos = df_estabelecimentos.filter(f.col("Estado") != "EX")

In [None]:
# Verificando se saiu todos os dados de EX.

df_estabelecimentos.select("Estado").distinct().show()

+------+
|Estado|
+------+
|    SC|
|    RO|
|    PI|
|    AM|
|    RR|
|    GO|
|    TO|
|    MT|
|    SP|
|    ES|
|    PB|
|    RS|
|    MS|
|    AL|
|    MG|
|    PA|
|    BA|
|    SE|
|    PE|
|    CE|
+------+
only showing top 20 rows



In [None]:
# Substituindo as siglas dos estados pelo respectivo nome de cada estado

df_estabelecimentos = df_estabelecimentos.withColumn(
    "Estado", # definindo a coluna
    f.when(df_estabelecimentos["Estado"] == "SP", "Sao Paulo") # se o valor na coluna Estado for igual a SP substitua por Sao Paulo
    f.when(df_estabelecimentos["Estado"] == "SC", "Santa Catarina")
    f.when(df_estabelecimentos["Estado"] == "RO", "Rondonia")
    f.when(df_estabelecimentos["Estado"] == "PI", "Piaui")
    f.when(df_estabelecimentos["Estado"] == "AM", "Amazonas")
    f.when(df_estabelecimentos["Estado"] == "RR", "Roraima")
    f.when(df_estabelecimentos["Estado"] == "GO", "Goias")
    f.when(df_estabelecimentos["Estado"] == "TO", "Tocantins")
    f.when(df_estabelecimentos["Estado"] == "MT", "Mato Grosso")
    f.when(df_estabelecimentos["Estado"] == "ES", "Espirito Santo")
    f.when(df_estabelecimentos["Estado"] == "PB", "Paraiba")
    f.when(df_estabelecimentos["Estado"] == "RS", "Rio Grande do Sul")
    f.when(df_estabelecimentos["Estado"] == "MS", "Mato Grosso do Sul")
    f.when(df_estabelecimentos["Estado"] == "AL", "Alagoas")
    f.when(df_estabelecimentos["Estado"] == "MG", "Minas Gerais")
    f.when(df_estabelecimentos["Estado"] == "PA", "Para")
    f.when(df_estabelecimentos["Estado"] == "BA", "Bahia")
    f.when(df_estabelecimentos["Estado"] == "SE", "Sergipe")
    f.when(df_estabelecimentos["Estado"] == "PE", "Pernambuco")
    .otherwise(df_estabelecimentos["Estado"]))



SyntaxError: invalid syntax. Perhaps you forgot a comma? (<ipython-input-34-77cf428a0df4>, line 5)

In [None]:
# Slavando em uma pasta no google drive de arquivos tratados

df_estabelecimentos.write.csv("/content/drive/MyDrive/Bases Tratadas/estabelecimentos.csv", header=True, mode="overwrite")