**Inicializando Spark**

In [None]:
# instalar as dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

**Spark Session - Acessando Spark UI (Google Colab)**

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .master('local[*]') \
  .appName ('Iniciando com Spark') \
  .config('spark.ui.port', '4050') \
  .getOrCreate()

In [None]:
!wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   


In [None]:
get_ipython().system_raw('./ngrok authtoken INSIRA SEU TOKEN')
get_ipython().system_raw('./ngrok http 4050 &')

In [None]:
!curl -s http://localhost:4040/api/tunnels

{"tunnels":[],"uri":"/api/tunnels"}


In [None]:
spark

In [None]:
# exemplo criar DF
data = [('Matheus','30'), ('Dayhara', '28'), ('Jessica','33'),('Jeova', '60')] #tem que ser lista com tuplas
colNames = ['Nome', 'Idade']
df = spark.createDataFrame(data, colNames)
df
df.show()

+-------+-----+
|   Nome|Idade|
+-------+-----+
|Matheus|   30|
|Dayhara|   28|
|Jessica|   33|
|  Jeova|   60|
+-------+-----+



In [None]:
df.toPandas()

Unnamed: 0,Nome,Idade
0,Matheus,30
1,Dayhara,28
2,Jessica,33
3,Jeova,60


**Montando um Drive**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


**Os dados estão disponivel no site da Receita Federal**

**Carregando os dados das empresas**

In [None]:
import zipfile

In [None]:
zipfile.ZipFile('/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/empresas.zip','r').extractall('/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK')

In [None]:
path = '/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/empresas'
empresas = spark.read.csv(path, sep = ';', inferSchema=True)

**Carregando para socios e estabelecimentos**

In [None]:
#socios
zipfile.ZipFile('/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/socios.zip').extractall('/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK')
path_socios = '/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/socios'
socios = spark.read.csv(path_socios, sep = ';', inferSchema = True)

In [None]:
#estabelecimentos
zipfile.ZipFile('/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/estabelecimentos.zip').extractall('/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK')
path_estabelecimentos = '/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/estabelecimentos'
estabelecimentos = spark.read.csv(path_estabelecimentos, sep = ';', inferSchema = True)

**Manipulando os Dados**

In [None]:
empresas.limit(5).toPandas() #visualisar 5 primeiros registros do df
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']


In [None]:
for index, colName in enumerate(empresasColNames):
  empresas = empresas.withColumnRenamed(f"_c{index}", colName)

empresas.columns

['cnpj_basico',
 'razao_social_nome_empresarial',
 'natureza_juridica',
 'qualificacao_do_responsavel',
 'capital_social_da_empresa',
 'porte_da_empresa',
 'ente_federativo_responsavel']

In [None]:
empresas.limit(5).toPandas() 

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [None]:
socios.limit(5).toPandas()
sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

In [None]:
for index, colName in enumerate(sociosColNames):
  socios = socios.withColumnRenamed(f"_c{index}", colName)
socios.columns

['cnpj_basico',
 'identificador_de_socio',
 'nome_do_socio_ou_razao_social',
 'cnpj_ou_cpf_do_socio',
 'qualificacao_do_socio',
 'data_de_entrada_sociedade',
 'pais',
 'representante_legal',
 'nome_do_representante',
 'qualificacao_do_representante_legal',
 'faixa_etaria']

In [None]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,19940609,,***000000**,,0,8


In [None]:
estabelecimentos.limit(5).toPandas()
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

In [None]:
for index, colName in enumerate(estabsColNames):
  estabelecimentos = estabelecimentos.withColumnRenamed(f"_c{index}", colName)
estabelecimentos.columns

['cnpj_basico',
 'cnpj_ordem',
 'cnpj_dv',
 'identificador_matriz_filial',
 'nome_fantasia',
 'situacao_cadastral',
 'data_situacao_cadastral',
 'motivo_situacao_cadastral',
 'nome_da_cidade_no_exterior',
 'pais',
 'data_de_inicio_atividade',
 'cnae_fiscal_principal',
 'cnae_fiscal_secundaria',
 'tipo_de_logradouro',
 'logradouro',
 'numero',
 'complemento',
 'bairro',
 'cep',
 'uf',
 'municipio',
 'ddd_1',
 'telefone_1',
 'ddd_2',
 'telefone_2',
 'ddd_do_fax',
 'fax',
 'correio_eletronico',
 'situacao_especial',
 'data_da_situacao_especial']

**Analisando os dados**

In [None]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [None]:
empresas.printSchema() #veriricar os schema da tabela (tipo do dado por exemplo) - insight: capital social esta como string; separado decimal por vigula

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [None]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,19940609,,***000000**,,0,8


In [None]:
socios.printSchema() #insight: o data de entrada sociedade esta como inteiro; usual formato de data para analises

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [None]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,20011029,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,20081231,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,19971231,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,19980429,1,,,...,7075,,,,,,,,,


In [None]:
estabelecimentos.printSchema() #insight: datas como inteiros

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str

**Modificando os datatypes - string pra double**







In [None]:

from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [None]:
#trocando a ',' por '.' da coluna capital social

empresas = empresas.withColumn('capital_social_da_empresa', f.regexp_replace('capital_social_da_empresa', ',','.'))
#withColumn: função de transformação do DF (alterar data types e values) além de criar novas colunas a partir de existentes
#no f.regexp_replace (função do spark para modificação)
#primeiro recebe a variavel onde sera feita a substituição, no caso 'capital_social_da_empresa' e no segundo qual operação ira realizar, troca de , por .

In [None]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


In [None]:
#alterando capital social de string para double
empresas = empresas.withColumn('capital_social_da_empresa', empresas['capital_social_da_empresa'].cast(DoubleType()))
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



**Modificando os datatypes - string pra date**

In [None]:
estabelecimentos = estabelecimentos\
            .withColumn(
            "data_situacao_cadastral",
            f.to_date(estabelecimentos.data_situacao_cadastral.cast(StringType()), 'yyyyMMdd')
            )\
            .withColumn(
            "data_de_inicio_atividade",
            f.to_date(estabelecimentos.data_de_inicio_atividade.cast(StringType()), 'yyyyMMdd')
            )\
            .withColumn(
            "data_da_situacao_especial",
            f.to_date(estabelecimentos.data_da_situacao_especial.cast(StringType()), 'yyyyMMdd')
            )

estabelecimentos.printSchema()            

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [None]:
socios = socios.withColumn('data_de_entrada_sociedade', f.to_date(socios.data_de_entrada_sociedade.cast(StringType()), 'yyyyMMdd'))
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



**Selecionado Informações**

In [None]:
empresas.select('*').show(10, False) #false pra nao ficar informações truncadas na visualização

+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                                                               |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|306        |FRANCAMAR REFRIGERACAO TECNICA S/C LTDA                                                     |2240             |49                         |0.0                      |1               |null                       |
|1355       |BRASILEIRO & OLIVEIRA LTDA                                                                 

In [None]:
socios.select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
  .show(5, False)

+-------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social  |faixa_etaria|ano_de_entrada|
+-------------------------------+------------+--------------+
|LILIANA PATRICIA GUASTAVINO    |7           |1994          |
|CRISTINA HUNDERTMARK           |7           |1994          |
|CELSO EDUARDO DE CASTRO STEPHAN|8           |1994          |
|EDUARDO BERRINGER STEPHAN      |5           |1994          |
|HANNE MAHFOUD FADEL            |8           |1994          |
+-------------------------------+------------+--------------+
only showing top 5 rows



In [None]:
estabelecimentos\
  .select('nome_fantasia', 'municipio', f.year('data_de_inicio_atividade').alias('ano_de_inicio_atividade'),\
          f.month('data_de_inicio_atividade').alias('mes_de_inicio_atividade'))\
          .show(10, False)

+-----------------+---------+-----------------------+-----------------------+
|nome_fantasia    |municipio|ano_de_inicio_atividade|mes_de_inicio_atividade|
+-----------------+---------+-----------------------+-----------------------+
|PIRAMIDE M. C.   |7107     |1994                   |5                      |
|null             |7107     |1994                   |5                      |
|null             |7107     |1994                   |5                      |
|null             |7107     |1994                   |5                      |
|EMBROIDERY & GIFT|7075     |1995                   |5                      |
|null             |7075     |1999                   |7                      |
|EMBROIDERY & GIFT|7075     |1995                   |4                      |
|null             |7075     |1994                   |5                      |
|KOLVOX           |7107     |1994                   |5                      |
|PROLAV           |7107     |1994                   |5          

**Verificando dados null e nan**

Dentro deste método, faremos uma contagem dos valores null ou NaN. Para isso, criaremos uma lista [] através do recurso List comprehension, do Python, e usaremos um for c in socios.columns para percorrê-la. Basicamente, o for irá percorrer as colunas do dataframe e retornar uma lista com os nomes dessas colunas. No entanto, não são os nomes que queremos visualizar, mas sim a contagem de valores nulos.

Dentro desta lista, chamaremos a função contadora f.count() e o método .alias(), que receberá o parâmetro c, informando que este será o nome usado para nos referirmos a cada coluna.
A função contadora f.count() receberá f.when(), que assemelha-se a uma condicional no sentido de que, ao encontrar determinado valor, outro seja atribuído. Sendo assim, ao identificar um dado nulo na coluna, será atribuído o valor +1 para a contagem. Para que isso funcione, esta função recebe dois argumentos: f.isnull(c) - que testará se o dado da coluna é nulo - e 1 - valor que deve ser atribuído caso o argumento de teste seja verdadeiro.

In [None]:
socios.select(\
              [f.count(f.when(f.isnull(c), 1)).alias(c)\
               for c in socios.columns])\
              .show()           
              

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|   pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|          0|                     0|                          208|                1234|                    0|                        1|2038255|                  0|              1995432|                                  0|           0|
+-----------+----------------------+------------------------

In [None]:
empresas.select(\
              [f.count(f.when(f.isnull(c), 1)).alias(c)\
               for c in empresas.columns])\
              .show()

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|          0|                            0|                0|                          0|                        0|            5985|                    4579678|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+



In [None]:
estabelecimentos.select(\
              [f.count(f.when(f.isnull(c), 1)).alias(c)\
               for c in estabelecimentos.columns])\
              .show()

+-----------+----------+-------+---------------------------+-------------+------------------+-----------------------+-------------------------+--------------------------+-------+------------------------+---------------------+----------------------+------------------+----------+------+-----------+------+-----+---+---------+-------+----------+-------+----------+----------+-------+------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|   pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|logradouro|numero|complemento|bairro|  cep| uf|municipio|  ddd_1|telefone_1|  ddd_2|telefone_2|ddd_do_fax|    fax|correio_eletronico|situacao_especial|data_da_situacao_especial|
+-----------+----------+-------+---------------------------+-------------+------------------+-----------------

**Ordenando os dados**

In [None]:
socios.select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
            .orderBy(['ano_de_entrada', 'faixa_etaria'], ascending=[False, False])\
            .show(5, False)

+-----------------------------+------------+--------------+
|nome_do_socio_ou_razao_social|faixa_etaria|ano_de_entrada|
+-----------------------------+------------+--------------+
|EDGAR FRANCISCO DA SILVA     |9           |2021          |
|WILLIAM WHITING BEACH VEALE  |9           |2021          |
|ANTONIO TAVARES DE ANDRADE   |9           |2021          |
|AURA MARIA DE ANDRADE        |9           |2021          |
|ANTONIA DE SOUSA VIEIRA      |9           |2021          |
+-----------------------------+------------+--------------+
only showing top 5 rows



In [None]:
estabelecimentos.select('complemento', 'bairro', f.year('data_da_situacao_especial').alias('data_situ_especi'))\
            .orderBy(['data_situ_especi', 'bairro'], ascending=[False, False])\
            .show(5, False)

+-----------+-------------------+----------------+
|complemento|bairro             |data_situ_especi|
+-----------+-------------------+----------------+
|null       |VILA SAO JORGE     |2021            |
|QUIOSQUE   |VILA BEATRIZ       |2021            |
|CASA  06   |NOVO RIO DAS OSTRAS|2021            |
|null       |JARDIM ALGARVE     |2021            |
|null       |CENTRO             |2021            |
+-----------+-------------------+----------------+
only showing top 5 rows



In [None]:
empresas.orderBy(['capital_social_da_empresa', 'porte_da_empresa'], ascending=[False, False])\
              .show(5, False)

+-----------+-------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                    |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|11463784   |CONSTRUTORA SCHNEIDER & SANTOS LTDA              |2062             |49                         |3.22014670262E11         |1               |null                       |
|331465     |INCOMPLAST INDUSTRIA DE EMBALAGENS PLASTICAS LTDA|2062             |49                         |2.52006125741E11         |3               |null                       |
|7954317    |MAISON AGENCIA DE VIAGENS E TURISMO LTDA         |2062             |49            

**Filtrando os dados**

In [None]:
empresas\
  .where("capital_social_da_empresa = 50")\
  .show(5, False)

+-----------+------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial       |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|17350147   |ERIK MARCELO DOS SANTOS 42107848858 |2135             |50                         |50.0                     |1               |null                       |
|17833214   |ALEXANDRE MACHADO LIMA 73750123772  |2135             |50                         |50.0                     |1               |null                       |
|20860830   |YASMIN MOURA DA FONSECA 13457709793 |2135             |50                         |50.0                     |1               |null                 

In [None]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [None]:
socios.select("nome_do_socio_ou_razao_social")\
  .filter(socios.nome_do_socio_ou_razao_social.startswith("MATHEUS"))\
  .filter(socios.nome_do_socio_ou_razao_social.endswith("SILVA"))\
  .limit(10)\
  .toPandas()

Unnamed: 0,nome_do_socio_ou_razao_social
0,MATHEUS ALMEIDA RIBEIRO DA SILVA
1,MATHEUS MASSON VIEIRA DA SILVA
2,MATHEUS MENDES DA CONCEICAO SILVA
3,MATHEUS ELIAS SILVA
4,MATHEUS URBANO DA SILVA
5,MATHEUS APARECIDO DA SILVA
6,MATHEUS ROQUETTE FERRATO DA SILVA
7,MATHEUS CAVALCANTE DIAS DA SILVA
8,MATHEUS RIBEIRO DA SILVA
9,MATHEUS APARECIDO DA SILVA


In [None]:
empresas\
    .select('razao_social_nome_empresarial', 'natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa')\
    .filter(f.upper(empresas['razao_social_nome_empresarial']).like('%MERCEARIA%'))\
    .show(15, False)

+-------------------------------------------------+-----------------+----------------+-------------------------+
|razao_social_nome_empresarial                    |natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-------------------------------------------------+-----------------+----------------+-------------------------+
|MOURA & SILVA MERCEARIA LIMITADA                 |2062             |5               |0.0                      |
|BAR E MERCEARIA KIT LTDA                         |2062             |5               |0.0                      |
|MERCEARIA E T S LTDA                             |2062             |1               |0.0                      |
|ISAIAS PEREIRA DOS SANTOS MERCEARIA              |4014             |5               |0.0                      |
|SEPETIBAO MERCEARIA LTDA                         |2062             |5               |0.0                      |
|BAR E MERCEARIA PARATI DE TUPA LTDA              |2062             |1               |0.0       

**Agregação e Junções**

**Sumarizando os dados**

In [None]:
socios\
  .select(f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
  .where('ano_de_entrada >= 2010')\
  .groupBy('ano_de_entrada')\
  .count()\
  .orderBy('ano_de_entrada', ascending = True)\
  .show()

+--------------+------+
|ano_de_entrada| count|
+--------------+------+
|          2010| 79337|
|          2011| 83906|
|          2012| 80101|
|          2013| 83919|
|          2014| 80590|
|          2015| 80906|
|          2016| 81587|
|          2017| 90221|
|          2018| 99935|
|          2019|118248|
|          2020|125927|
|          2021| 56316|
+--------------+------+



In [None]:
empresas\
  .select('cnpj_basico', 'porte_da_empresa', 'capital_social_da_empresa')\
  .groupBy('porte_da_empresa')\
  .agg(
      f.avg('capital_social_da_empresa').alias('capital_social_medio'),
      f.count('cnpj_basico').alias('frequencia')
  )\
  .orderBy('porte_da_empresa', ascending = True)\
  .show()

+----------------+--------------------+----------+
|porte_da_empresa|capital_social_medio|frequencia|
+----------------+--------------------+----------+
|            null|    8.35421888053467|      5985|
|               1|  339994.53313506936|   3129043|
|               3|  2601001.7677092673|    115151|
|               5|   708660.4208249798|   1335500|
+----------------+--------------------+----------+



In [None]:
empresas\
  .select('capital_social_da_empresa')\
  .summary()\
  .show()

+-------+-------------------------+
|summary|capital_social_da_empresa|
+-------+-------------------------+
|  count|                  4585679|
|   mean|        503694.5478542675|
| stddev|     2.1118691490537405E8|
|    min|                      0.0|
|    25%|                      0.0|
|    50%|                   1000.0|
|    75%|                   7000.0|
|    max|         3.22014670262E11|
+-------+-------------------------+



**Jutando DF**

In [None]:
empresas_join = estabelecimentos.join(empresas, 'cnpj_basico', how = 'inner')
empresas_join.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [None]:
freq = empresas_join\
    .select(
        'cnpj_basico', 
        f.year('data_de_inicio_atividade').alias('data_de_inicio')
    )\
    .where('data_de_inicio >= 2010')\
    .groupBy('data_de_inicio')\
    .agg(f.count("cnpj_basico").alias("frequencia"))\
    .orderBy('data_de_inicio', ascending=True)

In [None]:
freq.toPandas()

Unnamed: 0,data_de_inicio,frequencia
0,2010,154159
1,2011,172677
2,2012,232480
3,2013,198424
4,2014,202276
5,2015,212523
6,2016,265417
7,2017,237292
8,2018,275435
9,2019,325922


In [None]:
freq.union(
    freq.select(
        f.lit('Total').alias('data_de_inicio'),
        f.sum(freq.frequencia).alias('frequencia')   
    )
).show()

+--------------+----------+
|data_de_inicio|frequencia|
+--------------+----------+
|          2010|    154159|
|          2011|    172677|
|          2012|    232480|
|          2013|    198424|
|          2014|    202276|
|          2015|    212523|
|          2016|    265417|
|          2017|    237292|
|          2018|    275435|
|          2019|    325922|
|          2020|    400654|
|          2021|    153275|
|         Total|   2830534|
+--------------+----------+



**Spark SQL**
  

In [None]:
# antes de usar as query de fato, temos que criar uma view da tabela que estamos trabalhando
empresas.createOrReplaceTempView("empresasView")

In [None]:
spark.sql("SELECT * FROM empresasView").show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                         49|                      0.0|               1|                       null|
|       1355|         BRASILEIRO & OLIV...|             2062|                         49|                      0.0|               5|                       null|
|       4820|         REGISTRO DE IMOVE...|             3034|                         32|                      0.0|               5|                       null|
|       5347|         ROSELY APARE

In [None]:
spark.sql("""
            SELECT * 
            FROM empresasView 
            WHERE capital_social_da_empresa = 50 """)\
    .show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|   17350147|         ERIK MARCELO DOS ...|             2135|                         50|                     50.0|               1|                       null|
|   17833214|         ALEXANDRE MACHADO...|             2135|                         50|                     50.0|               1|                       null|
|   20860830|         YASMIN MOURA DA F...|             2135|                         50|                     50.0|               1|                       null|
|   22242856|         JOAO CESAR M

In [None]:
spark.sql("""
            SELECT porte_da_empresa, MEAN(capital_social_da_empresa) AS Media 
            FROM empresasView 
            GROUP BY porte_da_empresa """)\
    .show(5)

+----------------+------------------+
|porte_da_empresa|             Media|
+----------------+------------------+
|            null|  8.35421888053467|
|               1|339994.53313506936|
|               3|2601001.7677092673|
|               5| 708660.4208249798|
+----------------+------------------+



In [None]:
empresas_join.createOrReplaceTempView("empresasJoinView")

In [None]:
freq = spark.sql("""
                  SELECT YEAR(data_de_inicio_atividade) AS data_de_inicio, COUNT(cnpj_basico) AS count
                  FROM empresasJoinView 
                  WHERE YEAR(data_de_inicio_atividade) >= 2010
                  GROUP BY data_de_inicio
                  ORDER BY data_de_inicio """)

freq.show()

+--------------+------+
|data_de_inicio| count|
+--------------+------+
|          2010|154159|
|          2011|172677|
|          2012|232480|
|          2013|198424|
|          2014|202276|
|          2015|212523|
|          2016|265417|
|          2017|237292|
|          2018|275435|
|          2019|325922|
|          2020|400654|
|          2021|153275|
+--------------+------+



In [None]:
freq.createOrReplaceTempView("freqView")

In [None]:
spark.sql("""
            SELECT * FROM freqView
            UNION ALL
            SELECT 'Total' AS data_de_inicio, SUM(count) AS count
            FROM freqView """)\
    .show()

+--------------+-------+
|data_de_inicio|  count|
+--------------+-------+
|          2010| 154159|
|          2011| 172677|
|          2012| 232480|
|          2013| 198424|
|          2014| 202276|
|          2015| 212523|
|          2016| 265417|
|          2017| 237292|
|          2018| 275435|
|          2019| 325922|
|          2020| 400654|
|          2021| 153275|
|         Total|2830534|
+--------------+-------+



 **Formas de Armazenamento**

  Arquivos CSV

In [None]:
empresas.write.csv(
    path='/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/empresas/csv',
    mode='overwrite', #sobreescrever
    sep=',',
    header=True #salvar alteraçoes na tabela
)

In [None]:
# teste se é possivel ler o arquivo salvo
empresas_2 = spark.read.csv(
    '/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/empresas/csv',
    sep = ',',
    inferSchema=True,
    header=True    
)

empresas_2.limit(10).toPandas()

In [None]:
empresas_2.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [None]:
socios.write.csv(
    path = '/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/socios/csv',
    mode='overwrite',
    sep = ',',
    header = True
                 )

In [None]:
socios_2= spark.read.csv(
    '/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/socios/csv',
    sep = ',',
    inferSchema = True,
    header = True
)

socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [None]:
spark.conf.set("spark.sql.legacy.csv.datetimeRebaseModeInWrite", "CORRECTED")

In [None]:
estabelecimentos.write.csv(
    path = '/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/estabelecimentos/csv',
    mode = 'overwrite',
    sep = ',',
    header = True,
    
)

In [None]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [None]:
estabelecimentos_2 = spark.read.csv(
    '/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/estabelecimentos/csv',
    sep = ',',
    inferSchema = True,
    header = True
)

In [None]:
estabelecimentos_2.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: string (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: string (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: strin

**Arquivos PARQUET**

In [None]:
empresas.write.parquet(
    path='/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/empresas/parquet',
    mode='overwrite'
)

In [None]:
empresas_parquet = spark.read.parquet(
    '/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/empresas/parquet'
)

In [None]:
empresas_parquet.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [None]:
socios.write.parquet(
    path='/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/socios/parquet',
    mode='overwrite'
)

In [None]:
socios_parquet = spark.read.parquet(
    '/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/socios/parquet'
)

In [None]:
socios_parquet.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [None]:
estabelecimentos.write.parquet(
    path='/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/estabelecimentos/parquet',
    mode='overwrite'
)

In [None]:
estabelecimentos_parquet = spark.read.parquet(
    '/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/estabelecimentos/parquet'
)

In [None]:
estabelecimentos_parquet.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

**PARTICIONAMENTO DOS DADOS**

In [None]:
#partição unica, mais performatico em coalesce
empresas.coalesce(1).write.csv(
    path='/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/empresas/csv-unico',
    mode='overwrite',
    sep=';',
    header = True
)

In [None]:
socios.coalesce(1).write.csv(
    path='/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/socios/csv-unico',
    mode='overwrite',
    sep=';',
    header = True
)

In [None]:
estabelecimentos.coalesce(1).write.csv(
    path='/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/estabelecimentos/csv-unico',
    mode='overwrite',
    sep=';',
    header = True
)

In [None]:
# partiçoes baseadas pela coluna 'porte_da_empresa'
empresas.write.parquet(
    path='/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/empresas/parquet-partitionBy',
    mode='overwrite',
    partitionBy='porte_da_empresa'
)

In [None]:
socios.write.parquet(
    path='/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/socios/parquet-partitionBy',
    mode='overwrite',
    partitionBy='identificador_de_socio'
)

In [None]:
spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED")

In [None]:
estabelecimentos.write.parquet(
    path='/content/drive/MyDrive/Colab Notebooks/CURSO_SPARK/estabelecimentos/parquet-partitionBy',
    mode='overwrite',
    partitionBy='uf'
)

In [None]:
spark.stop()