# INTRODUÇÃO AO APACHE SPARK

### Tutorial para instalar java e spark usando o SO LINUX no WSL ou LINUX comum

Tutorial detalhado para instalar java (java-8-openjdk-amd64) e spark (spark-3.5.1-bin-hadoop3) Utilizando o SO linux, seja em uma camada WSL ou não.

    1 - Instale o JDK JAVA, se não possuir.
    2 - Instale arquivos do Spark, se não possuir.
    3 - Crie as variáveis de ambiente SPARK_HOME e JAVA_HOME

link com detalhando os passos acima - https://www.alura.com.br/artigos/iniciando-projeto-spark-no-colab

* Neste link possui um tutorial voltando para usuários do google colab tambem!

In [127]:
# Criar uma variável de ambiente apontando para a instalacao do spark e java no SO linux

import os 

os.environ["SPARK_HOME"] = "/home/leonardojdss/spark/spark-3.5.1-bin-hadoop3"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

### Tutorial para instalar java e spark usando o SO Windows

Para usuários do SO Windows 10 em diante recomendo seguir o tutorial do link anexado.

link - https://romeritomorais.medium.com/instalacao-configuracao-e-teste-do-apache-spark-no-windows-11-a11beb19eb0a

# 0 - INSTANCIANDO O SPARK

In [128]:
# Importa a biblioteca findspark, que facilita a configuração do PySpark no Python
import findspark

# Inicializa o findspark para configurar o ambiente do Spark
findspark.init()

# Importa SparkSession da biblioteca pyspark.sql
from pyspark.sql import SparkSession

# Cria uma instância de SparkSession
spark = SparkSession\
    .builder\
    .master('local[*]')\
    .getOrCreate()

In [129]:
# Visualizar a sessão spark criada
spark

# 1 - CARREGAR DADOS COM SPARK

### 1.1 Criando um DataFrame

In [130]:
# Metodo simples criando seu dataset
data = [('Zeca', '35'), ('Eva', '29')] # Lista
colnames = ['Nome', 'Idade']

df = spark.createDataFrame(data, colnames)
df

DataFrame[Nome: string, Idade: string]

In [131]:
# Visualização do dataset criado
df.show()

                                                                                

+----+-----+
|Nome|Idade|
+----+-----+
|Zeca|   35|
| Eva|   29|
+----+-----+



In [132]:
# Visualização do dataset parecida com dataset pandas
df.toPandas()

Unnamed: 0,Nome,Idade
0,Zeca,35
1,Eva,29


### 1.2 Carregando dados CSV

Ao longo desta instrodução iremos utilizar os arquivos empresas, estabelecimentos e socios, são dados públicos disponível no site da receita federal, para facilitar os dados estão disponiveis nos links a seguir:

* Faça o Download de todas as partições de cada arquivo

[Empresas](https://1drv.ms/f/s!Av2lzwYwawWTgWEIG4wnPE1Ea__J?e=8mBZyZ)
 
[Estabelecimentos](https://1drv.ms/f/s!Av2lzwYwawWTgWNAbPa7xPgzhMKl?e=vaincb)
 
[Sócios](https://1drv.ms/f/s!Av2lzwYwawWTgWIif8d4cAXhdN5l?e=pBvIZz)

In [133]:
# Carregando os dados das empresas com spark

# Precisamos passar o diretorios de todos os arquivos ou um arquivo, neste exemplo estamos passando o diretorio de todos os arquivos
path_empresas = "/home/leonardojdss/Documentos/trilha_spark_apache_python/01_Spark_apresentando_a_ferramenta/empresas/empresas"

# Realizando a leitura de todos os arquivos do diretorio
empresas = spark.read.csv(path_empresas, sep=';', inferSchema=True)

#Contando a quantidade de registros, a soma de todos os registros de todos os arquivos no diretorio
print("Os arquivos de empresas possuem:", empresas.count(), "registros")

                                                                                

Os arquivos de empresas possuem: 4585679 registros


In [134]:
#Leitura do arquivo estabelecimentos

Path_estabelecimentos = "/home/leonardojdss/Documentos/trilha_spark_apache_python/01_Spark_apresentando_a_ferramenta/estabelecimentos/estabelecimentos"
estabelecimentos = spark.read.csv(Path_estabelecimentos, sep=';', inferSchema=True)
print("Os arquivos de estabelecimentos possuem:", empresas.count(), "registros")

Path_socios = "/home/leonardojdss/Documentos/trilha_spark_apache_python/01_Spark_apresentando_a_ferramenta/socios/socios"
socios = spark.read.csv(Path_socios, sep=';', inferSchema=True)
print("Os arquivos de socios possuem:", socios.count(), "registros")

                                                                                

Os arquivos de estabelecimentos possuem: 4585679 registros


                                                                                

Os arquivos de socios possuem: 2046430 registros


# 2 Manipulação

### 2.1 Tratamento básico

### 2.1.1 Renomear Colunas

In [135]:
# Analisando o conjunto de dados, em uma visualização semelhante ao pandas

empresas.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [136]:
# Renomear colunas do dataframe empresas

empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

for index, colName in enumerate(empresasColNames):
    empresas = empresas.withColumnRenamed(f"_c{index}", colName)

empresas.limit(5).toPandas()


Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [137]:
# Renomear colunas do dataframe estabelecimento

estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

for index, colName in enumerate(estabsColNames):
    estabelecimentos = estabelecimentos.withColumnRenamed(f"_c{index}", colName)

estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,20011029,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,20081231,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,19971231,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,19980429,1,,,...,7075,,,,,,,,,


In [138]:
# Renomear colunas do dataframe socios

sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

for index, colName in enumerate(sociosColNames):
    socios = socios.withColumnRenamed(f"_c{index}", colName)

socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,19940609,,***000000**,,0,8


### 2.1.2 Analisar schemas dos dados

In [139]:
# Analisando o conjunto de dados, em uma visualização semelhante ao pandas

empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [140]:
# Analisando o conjunto de dados, em uma visualização semelhante ao pandas

estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str

In [141]:
# Analisando o conjunto de dados, em uma visualização semelhante ao pandas

socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



### 2.1.3    Conversão de string para double

In [142]:
# Importanto as classes para conversão de dados

from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [143]:
# Analisando os tipos de dados

empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [144]:
# Antes de realizar a conversão de string para double precisamos alterar as virgula dicimais para pontos decimais
# o spark entender o ponto como decimal ao inves do padrão brasileiro que é a virgula

empresas = empresas.withColumn('capital_social_da_empresa', f.regexp_replace('capital_social_da_empresa', ',', '.'))
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


In [145]:
# Alterar o tipo de coluna string para double

empresas = empresas.withColumn('capital_social_da_empresa', empresas['capital_social_da_empresa'].cast(DoubleType()))
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


In [146]:
# Analisando o schema após alteração

empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



### 2.1.4 Conversão de String para Date

In [147]:
# Criando uma DataFrame para exemplo de conversão String > Date

df = spark.createDataFrame([(20200924,), (20201022,), (20210215,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,20200924
1,20201022
2,20210215


In [148]:
# Analisando o schema

df.printSchema()

root
 |-- data: long (nullable = true)



In [149]:
# Exemplo de transformação

df = df.withColumn("data", f.to_date(df["data"].cast(StringType()), 'yyyyMMdd'))
df.printSchema()

root
 |-- data: date (nullable = true)



In [150]:
# Analisando o schema de Estabelecimento para Transformar as colunas string em date

estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str

In [151]:
# Transformação string para date no dataset estabelecimentos

#coluna data_situacao_cadastral
estabelecimentos = estabelecimentos\
    .withColumn('data_situacao_cadastral', 
        f.to_date(estabelecimentos['data_situacao_cadastral'].cast(StringType()), 'yyyyMMdd'))

#coluna data_de_inicio_atividade
estabelecimentos = estabelecimentos\
    .withColumn('data_de_inicio_atividade', 
        f.to_date(estabelecimentos['data_de_inicio_atividade'].cast(StringType()), 'yyyyMMdd'))

#coluna data_da_situacao_especial
estabelecimentos = estabelecimentos\
    .withColumn('data_da_situacao_especial', 
        f.to_date(estabelecimentos['data_da_situacao_especial'].cast(StringType()), 'yyyyMMdd'))

# Aferindo se realmente as mudanças ocorrem
estabelecimentos[['data_da_situacao_especial', "data_de_inicio_atividade", "data_da_situacao_especial"]].printSchema()



root
 |-- data_da_situacao_especial: date (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- data_da_situacao_especial: date (nullable = true)



In [152]:
# Analisando o schema após alteração

estabelecimentos[['data_da_situacao_especial', "data_de_inicio_atividade", "data_da_situacao_especial"]].limit(5).toPandas()

Unnamed: 0,data_da_situacao_especial,data_de_inicio_atividade,data_da_situacao_especial.1
0,,1994-05-09,
1,,1994-05-12,
2,,1994-05-12,
3,,1994-05-13,
4,,1995-05-09,


In [153]:
# Analisando o schema de socios para Transformar as colunas string em date

socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [154]:
# Transformação string para date no dataset socios

#coluna data_de_entradas_sociedade
socios = socios\
    .withColumn('data_de_entrada_sociedade', 
        f.to_date(socios['data_de_entrada_sociedade'].cast(StringType()), 'yyyyMMdd'))

# Aferindo se realmente as mudanças ocorrem
socios[["data_de_entrada_sociedade"]].printSchema()

root
 |-- data_de_entrada_sociedade: date (nullable = true)



In [155]:
# Analisando o schema após alteração
socios[["data_de_entrada_sociedade"]].limit(5).toPandas()

Unnamed: 0,data_de_entrada_sociedade
0,1994-07-25
1,1994-07-25
2,1994-05-16
3,1994-05-16
4,1994-06-09


# 2.2 Manipulação
    
### 2.2.1 SELECT

In [156]:
# Realizando consultas

empresas\
    .select("*")\
    .show(5, truncate=False)

+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                                                               |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|306        |FRANCAMAR REFRIGERACAO TECNICA S/C LTDA                                                     |2240             |49                         |0.0                      |1               |NULL                       |
|1355       |BRASILEIRO & OLIVEIRA LTDA                                                                 

In [157]:
# selecionando colunas desejadas

empresas\
    .select("natureza_juridica", "porte_da_empresa", "capital_social_da_empresa")\
    .show(5, truncate=False)

+-----------------+----------------+-------------------------+
|natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-----------------+----------------+-------------------------+
|2240             |1               |0.0                      |
|2062             |5               |0.0                      |
|3034             |5               |0.0                      |
|2135             |5               |0.0                      |
|2062             |1               |4000.0                   |
+-----------------+----------------+-------------------------+
only showing top 5 rows



### 2.2.2 Extraindo informações dos dados

In [158]:
# Extraindo ano de entrada na sociedade da coluna data_de_entrada_sociedade

socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .show(5, truncate=False)

+-------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social  |faixa_etaria|ano_de_entrada|
+-------------------------------+------------+--------------+
|LILIANA PATRICIA GUASTAVINO    |7           |1994          |
|CRISTINA HUNDERTMARK           |7           |1994          |
|CELSO EDUARDO DE CASTRO STEPHAN|8           |1994          |
|EDUARDO BERRINGER STEPHAN      |5           |1994          |
|HANNE MAHFOUD FADEL            |8           |1994          |
+-------------------------------+------------+--------------+
only showing top 5 rows



In [159]:
# Extraindo ano e mês de inicio das atividades da coluna ano_de_inicio_atividade

estabelecimentos\
    .select('nome_fantasia', 'municipio', f.year('data_de_inicio_atividade').alias('ano_de_inicio_atividade')\
             ,f.month('data_de_inicio_atividade').alias('mes_de_inicio_atividade'))\
    .show(5, truncate=False)

+-----------------+---------+-----------------------+-----------------------+
|nome_fantasia    |municipio|ano_de_inicio_atividade|mes_de_inicio_atividade|
+-----------------+---------+-----------------------+-----------------------+
|PIRAMIDE M. C.   |7107     |1994                   |5                      |
|NULL             |7107     |1994                   |5                      |
|NULL             |7107     |1994                   |5                      |
|NULL             |7107     |1994                   |5                      |
|EMBROIDERY & GIFT|7075     |1995                   |5                      |
+-----------------+---------+-----------------------+-----------------------+
only showing top 5 rows



### 2.2.3 Concater informações e imprimir de acordo com uma lógica de ordem que as strings precisam aparecer

In [160]:
# Criando DataFrame para exemplo

data = [
    ('GISELLE PAULA GUIMARAES CASTRO', 15),
    ('ELAINE GARCIA DE OLIVEIRA', 22),
    ('JOAO CARLOS ABNER DE LOURDES', 43),
    ('MARTA ZELI FERREIRA', 24),
    ('LAUDENETE WIGGERS ROEDER', 51)
]
colNames = ['nome', 'idade']

df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

+------------------------------+-----+
|nome                          |idade|
+------------------------------+-----+
|GISELLE PAULA GUIMARAES CASTRO|15   |
|ELAINE GARCIA DE OLIVEIRA     |22   |
|JOAO CARLOS ABNER DE LOURDES  |43   |
|MARTA ZELI FERREIRA           |24   |
|LAUDENETE WIGGERS ROEDER      |51   |
+------------------------------+-----+



In [161]:
# Concatenando o ultimo nome com o primeiro nome

df \
    .select(
        f.concat_ws(
            ', ', 
            f.substring_index('nome', ' ', -1), 
            f.substring_index('nome', ' ', 1)
        ).alias('ident'), 
        'idade') \
    .show(truncate=False)

+-----------------+-----+
|ident            |idade|
+-----------------+-----+
|CASTRO, GISELLE  |15   |
|OLIVEIRA, ELAINE |22   |
|LOURDES, JOAO    |43   |
|FERREIRA, MARTA  |24   |
|ROEDER, LAUDENETE|51   |
+-----------------+-----+



### 2.2.4 Identificando valores nulos e contagem de nulos

No contexto do Spark DataFrame, NaN, null e None são valores que representam dados ausentes, mas têm significados e comportamentos diferentes. Vamos detalhar cada um deles:

* 1. NaN (Not a Number)
Tipo: Utilizado principalmente com colunas de tipo FloatType e DoubleType.
Significado: Representa um valor numérico indefinido ou não representável. Por exemplo, o resultado de 0.0/0.0 ou sqrt(-1.0).
Comportamento: O NaN é tratado como um valor válido do ponto de vista do tipo de dado, mas não é igual a nenhum valor, incluindo ele mesmo (NaN != NaN). Operações matemáticas com NaN geralmente resultam em NaN.
* 2. Null
Tipo: Pode ser usado em qualquer tipo de dado.
Significado: Representa a ausência de valor ou dado desconhecido.
Comportamento: O Spark trata null de forma especial em operações de filtragem e agregação. Comparações com null geralmente resultam em null, e operações como somar null a um número resultam em null.
* 3. None
Tipo: Este é um valor do Python e não um tipo de dado Spark por si só.
Significado: Em Python, None é o equivalente a null. Quando usado no contexto de Spark, ele é automaticamente convertido para null em um DataFrame.
Comportamento: Quando você define um valor como None em uma coluna de um DataFrame Spark, ele será interpretado como null pelo Spark.

In [162]:
# Criando DataFrame para exemplo com nulo

df = spark.createDataFrame([(1,), (2,), (3,), (None,)], ['data'])
df.toPandas()

# None = null
# NaN = Not a number

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [163]:
# Visualizando o DataFrame

df.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|NULL|
+----+



In [164]:
# Criando DataFrame para exemplo com NaN (Not a number)

df = spark.createDataFrame([(1.,), (2.,), (3.,), (float('nan'),)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [165]:
# Visualizando o DataFrame

df.show()

+----+
|data|
+----+
| 1.0|
| 2.0|
| 3.0|
| NaN|
+----+



In [166]:
df = spark.createDataFrame([('1',), ('2',), ('3',), (None,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [167]:
df.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|NULL|
+----+



In [168]:
# Avaliando valores nulos na tabela Socios

socios.limit(5).show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|        411|                     2|         LILIANA PATRICIA ...|         ***678188**|                   22|               1994-07-25|NULL|        ***000000**|                 NULL|                                  0|           7|
|        411|                     2|         CRISTINA HUNDERTMARK|      

In [169]:
# contagem de nulos por colunas

socios.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in socios.columns]).show()



+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|   pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|          0|                     0|                          208|                1234|                    0|                        1|2038255|                  0|              1995432|                                  0|           0|
+-----------+----------------------+------------------------

                                                                                

### 2.2.5 Substituindo os valores de nulos em campos integers e string

In [170]:
#Substituindo os valores de nulos em socios, campo integer

# Método para inteiros (integer)
socios.na.fill(0).limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,0,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,0,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,0,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,0,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,0,***000000**,,0,8


In [171]:
#Substituindo os valores de nulos em socios, campo string

# Método para texto (string)
socios.na.fill("-").limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,-,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,-,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,-,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,-,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,-,0,8


### 2.2.5 Ordenando os dados

In [172]:
# Ordenando a coluna ano_de_entrada de forma descrecente

socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .orderBy('ano_de_entrada', ascending=False)\
    .show(5, truncate=False)



+----------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social     |faixa_etaria|ano_de_entrada|
+----------------------------------+------------+--------------+
|RANIERE DE FREITAS ANDRADE        |4           |2021          |
|ANA LUCIA ALVES DE CARVALHO       |5           |2021          |
|ANTONIO JOSE HENRIQUE MORAIS LOPES|3           |2021          |
|JORGE RODRIGUES SANTOS            |6           |2021          |
|DOUGLAS CAPPELLETTI               |3           |2021          |
+----------------------------------+------------+--------------+
only showing top 5 rows



                                                                                

In [173]:
# Ordenando duas colunas, ano_de_entrada e faixa_etaria

socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .orderBy(['ano_de_entrada', 'faixa_etaria'], ascending=[False, False])\
    .show(10, truncate=False)



+-------------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social        |faixa_etaria|ano_de_entrada|
+-------------------------------------+------------+--------------+
|MARIA RAIMUNDA DOS SANTOS LANZA      |9           |2021          |
|LUZINETE DANTAS DE OLIVEIRA RODRIGUES|9           |2021          |
|ANDRE PAUL GABAY                     |9           |2021          |
|JOAO KOJIN                           |9           |2021          |
|ALDO SANTI                           |9           |2021          |
|FERNANDO MARCONDES DE MATTOS         |9           |2021          |
|FLORIDA ECHECHIPIA                   |9           |2021          |
|YARA CABRAL PINTO                    |9           |2021          |
|APPARECIDA ALBANI DE LIMA            |9           |2021          |
|JOSE CURADO ADORNO                   |9           |2021          |
+-------------------------------------+------------+--------------+
only showing top 10 rows



                                                                                

In [174]:
# Para fixação, será criado DataFrame de alunos para ordenação 

data = [
    ('CARMINA RABELO', 4, 2010), 
    ('HERONDINA PEREIRA', 6, 2009), 
    ('IRANI DOS SANTOS', 12, 2010), 
    ('JOAO BOSCO DA FONSECA', 3, 2009), 
    ('CARLITO SOUZA', 1, 2010), 
    ('WALTER DIAS', 9, 2009), 
    ('BRENO VENTUROSO', 1, 2009), 
    ('ADELINA TEIXEIRA', 5, 2009), 
    ('ELIO SILVA', 7, 2010), 
    ('DENIS FONSECA', 6, 2010)
]
colNames = ['nome', 'mes', 'ano']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

# Ordenando dos alunos mais novos para os mais velhos.

df\
    .select("*")\
    .orderBy(['ano', 'mes'], ascending=[True, True])\
    .show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|CARMINA RABELO       |4  |2010|
|HERONDINA PEREIRA    |6  |2009|
|IRANI DOS SANTOS     |12 |2010|
|JOAO BOSCO DA FONSECA|3  |2009|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
+---------------------+---+----+

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|BRENO VENTUROSO      |1  |2009|
|JOAO BOSCO DA FONSECA|3  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|HERONDINA PEREIRA    |6  |2009|
|WALTER DIAS          |9  |2009|
|CARLITO SOUZA        |1  |2010|
|CARMINA RABELO       |4  |2010|
|DENIS FONSECA        |6  |2010|
|ELIO SILVA           |7  |2010|
|IRANI DOS SANTOS     |12 |2010|
+---------------------+---+----+



### 2.2.6    Criando Filtros

### DataFrame.where(condition) ou DataFrame.filter(condition), são a mesma coisa em spark.

In [175]:
# Realizando apenas um filtro

empresas\
    .where("capital_social_da_empresa==50")\
    .show(5, truncate=False)

+-----------+------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial       |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|17350147   |ERIK MARCELO DOS SANTOS 42107848858 |2135             |50                         |50.0                     |1               |NULL                       |
|17833214   |ALEXANDRE MACHADO LIMA 73750123772  |2135             |50                         |50.0                     |1               |NULL                       |
|20860830   |YASMIN MOURA DA FONSECA 13457709793 |2135             |50                         |50.0                     |1               |NULL                 

In [176]:
# Realizando dois filtros

socios\
    .select('nome_do_socio_ou_razao_social')\
    .filter(socios['nome_do_socio_ou_razao_social'].startswith("LEONARDO"))\
    .filter(socios['nome_do_socio_ou_razao_social'].endswith("SILVA"))\
    .limit(10)\
    .toPandas()

Unnamed: 0,nome_do_socio_ou_razao_social
0,LEONARDO FERREIRA DA SILVA
1,LEONARDO MARTINS SILVA
2,LEONARDO CANDIDO DA SILVA
3,LEONARDO HENRIQUE DE OLIVEIRA SILVA
4,LEONARDO MARTINS SILVA
5,LEONARDO DE SANTIS VIEIRA DA SILVA
6,LEONARDO HENRIQUE DO NASCIMENTO E SILVA
7,LEONARDO SILVA
8,LEONARDO DO VALLE DA SILVA
9,LEONARDO GOMES DA SILVA


In [177]:
# Para fixação do comando de Filtros

data = [
    ('CARMINA RABELO', 4, 2010), 
    ('HERONDINA PEREIRA', 6, 2009), 
    ('IRANI DOS SANTOS', 12, 2010), 
    ('JOAO BOSCO DA FONSECA', 3, 2009), 
    ('CARLITO SOUZA', 1, 2010), 
    ('WALTER DIAS', 9, 2009), 
    ('BRENO VENTUROSO', 1, 2009), 
    ('ADELINA TEIXEIRA', 5, 2009), 
    ('ELIO SILVA', 7, 2010), 
    ('DENIS FONSECA', 6, 2010)
]
colNames = ['nome', 'mes', 'ano']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

# seleção de apenas alunos nascidos no primeiro semestre de 2009.
df\
    .select('*')\
    .filter("mes >= 1" and "mes <= 6")\
    .filter("ano == 2009")\
    .show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|CARMINA RABELO       |4  |2010|
|HERONDINA PEREIRA    |6  |2009|
|IRANI DOS SANTOS     |12 |2010|
|JOAO BOSCO DA FONSECA|3  |2009|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
+---------------------+---+----+

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|HERONDINA PEREIRA    |6  |2009|
|JOAO BOSCO DA FONSECA|3  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
+---------------------+---+----+



### 2.2.7 Método LIKE

In [178]:
# Criando um dataframe

df = spark.createDataFrame([('RESTAURANTE DO RUI',), ('Juca restaurantes ltda',), ('Joca Restaurante',)], ['nome'])
df.toPandas()

Unnamed: 0,nome
0,RESTAURANTE DO RUI
1,Juca restaurantes ltda
2,Joca Restaurante


In [179]:
# Utilizando o comando Like para buscar no DataFrame qualquer linha que tenha no nome a palavra restaurante
# Comando upper faz todos os caracteres de uma coluna ficar com letra maiuscula

df\
    .where(f.upper(df['nome']).like('%RESTAURANTE'))\
    .show(truncate=False)

# RESTAURANTE% = Começa com restaurante
# %RESTAURANTE$ = tem restaurante no meio do nome
# %RESTAURANTE = termina com restaurante

+----------------+
|nome            |
+----------------+
|Joca Restaurante|
+----------------+



In [180]:
# Buscando as empresas que tem a palavra RESTAURANTE na razão social

empresas\
    .select('razao_social_nome_empresarial', 'natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa')\
    .filter(f.upper(empresas['razao_social_nome_empresarial']).like('%RESTAURANTE%'))\
    .show(15, truncate=False)

+-------------------------------------------------------+-----------------+----------------+-------------------------+
|razao_social_nome_empresarial                          |natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-------------------------------------------------------+-----------------+----------------+-------------------------+
|RESTAURANTE IMIGRANTE PORTUGUES LTDA.                  |2062             |5               |0.0                      |
|MORAIS & CARVALHO RESTAURANTE E PIZZARIA LTDA          |2062             |1               |0.0                      |
|BAR E RESTAURANTE PAGANOTTO LTDA                       |2062             |5               |0.0                      |
|RODRIGUES & RODRIGUES RESTAURANTE LTDA                 |2062             |5               |0.0                      |
|TEXAS RANCH BAR RESTAURANTE PRODUCOES ARTISTICAS E CULT|2062             |1               |0.0                      |
|V V SANTOS RESTAURANTE BAR E ATIV DESPORTIVAS L

### 2.2.8 Comando WHEN

In [181]:
# Criando um DataFrame para utilizar o comando When

data = [
    ('CARLOS', 'MATEMÁTICA', 7), 
    ('IVO', 'MATEMÁTICA', 9), 
    ('MÁRCIA', 'MATEMÁTICA', 8), 
    ('LEILA', 'MATEMÁTICA', 9), 
    ('BRENO', 'MATEMÁTICA', 7), 
    ('LETÍCIA', 'MATEMÁTICA', 8), 
    ('CARLOS', 'FÍSICA', 2), 
    ('IVO', 'FÍSICA', 8), 
    ('MÁRCIA', 'FÍSICA', 10), 
    ('LEILA', 'FÍSICA', 9), 
    ('BRENO', 'FÍSICA', 1), 
    ('LETÍCIA', 'FÍSICA', 6), 
    ('CARLOS', 'QUÍMICA', 10), 
    ('IVO', 'QUÍMICA', 8), 
    ('MÁRCIA', 'QUÍMICA', 1), 
    ('LEILA', 'QUÍMICA', 10), 
    ('BRENO', 'QUÍMICA', 7), 
    ('LETÍCIA', 'QUÍMICA', 9)
]
colNames = ['nome', 'materia', 'nota']
df = spark.createDataFrame(data, colNames)

# Criar um indicador para os alunos APROVADOS ou REPROVADOS. Com a função when podemos criar esta nova coluna de forma bastante simples, exemplo:
df = df.withColumn('status', f.when(df.nota >= 7, "APROVADO").otherwise("REPROVADO"))
df.show()

+-------+----------+----+---------+
|   nome|   materia|nota|   status|
+-------+----------+----+---------+
| CARLOS|MATEMÁTICA|   7| APROVADO|
|    IVO|MATEMÁTICA|   9| APROVADO|
| MÁRCIA|MATEMÁTICA|   8| APROVADO|
|  LEILA|MATEMÁTICA|   9| APROVADO|
|  BRENO|MATEMÁTICA|   7| APROVADO|
|LETÍCIA|MATEMÁTICA|   8| APROVADO|
| CARLOS|    FÍSICA|   2|REPROVADO|
|    IVO|    FÍSICA|   8| APROVADO|
| MÁRCIA|    FÍSICA|  10| APROVADO|
|  LEILA|    FÍSICA|   9| APROVADO|
|  BRENO|    FÍSICA|   1|REPROVADO|
|LETÍCIA|    FÍSICA|   6|REPROVADO|
| CARLOS|   QUÍMICA|  10| APROVADO|
|    IVO|   QUÍMICA|   8| APROVADO|
| MÁRCIA|   QUÍMICA|   1|REPROVADO|
|  LEILA|   QUÍMICA|  10| APROVADO|
|  BRENO|   QUÍMICA|   7| APROVADO|
|LETÍCIA|   QUÍMICA|   9| APROVADO|
+-------+----------+----+---------+



# 2.3 Sumarizando dados

[DataFrame.groupBy(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html)

[DataFrame.agg(*exprs)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.agg.html)

[DataFrame.summary(*statistics)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.summary.html)

> Funções:
[approx_count_distinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.approx_count_distinct.html) | 
[avg](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.avg.html) | 
[collect_list](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.collect_list.html) | 
[collect_set](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.collect_set.html) | 
[countDistinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.countDistinct.html) | 
[count](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.count.html) | 
[grouping](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.grouping.html) | 
[first](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.first.html) | 
[last](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.last.html) | 
[kurtosis](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.kurtosis.html) | 
[max](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.max.html) | 
[min](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.min.html) | 
[mean](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.mean.html) | 
[skewness](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.skewness.html) | 
[stddev ou stddev_samp](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.stddev.html) | 
[stddev_pop](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.stddev_pop.html) | 
[sum](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.sum.html) | 
[sumDistinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.sumDistinct.html) | 
[variance ou var_samp](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.variance.html) | 
[var_pop](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.var_pop.html)

### 2.3.1 Agrupamento e agragações
### 3.3.1.1 Contagem

In [182]:
# Metodo para agrupamento utilizando o comando groupBy
# Realizando a contagem de entrada de sociedade por ano, superior a 2010

socios\
    .select(f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .where('ano_de_entrada >= 2010')\
    .groupBy('ano_de_entrada')\
    .count()\
    .orderBy('ano_de_entrada')\
    .show(truncate=False)



+--------------+------+
|ano_de_entrada|count |
+--------------+------+
|2010          |79337 |
|2011          |83906 |
|2012          |80101 |
|2013          |83919 |
|2014          |80590 |
|2015          |80906 |
|2016          |81587 |
|2017          |90221 |
|2018          |99935 |
|2019          |118248|
|2020          |125927|
|2021          |56316 |
+--------------+------+



                                                                                

### 3.3.1.2 Contagem e média

In [183]:
# Utilizando o comando AVG para agregar mais uma estatistica descritiva

empresas\
    .select("cnpj_basico", "porte_da_empresa", "capital_social_da_empresa")\
    .groupBy('porte_da_empresa')\
    .agg(
        f.avg('capital_social_da_empresa').alias('capital_social_medio'),
        f.count('cnpj_basico').alias('frequencia')\
    )\
    .orderBy('porte_da_empresa', ascending=True)\
    .show(truncate=False)



+----------------+--------------------+----------+
|porte_da_empresa|capital_social_medio|frequencia|
+----------------+--------------------+----------+
|NULL            |8.35421888053467    |5985      |
|1               |339994.53313507047  |3129043   |
|3               |2601001.7677092687  |115151    |
|5               |708660.4208249793   |1335500   |
+----------------+--------------------+----------+



                                                                                

### 3.3.1.3 Estatística descritivas

In [184]:
# Comando summary para trazer mais estastiticas descritivas

empresas\
    .select('capital_social_da_empresa')\
    .summary()\
    .show()

# Se eu quiser retornar apenas algumas estastiticas apenas como média e contagem, eu poderia colocar dentro do summary o nome da estastiticas, exemplo:

# summary("mean", "count")



+-------+-------------------------+
|summary|capital_social_da_empresa|
+-------+-------------------------+
|  count|                  4585679|
|   mean|        503694.5478542674|
| stddev|     2.1118691490537727E8|
|    min|                      0.0|
|    25%|                      0.0|
|    50%|                   1000.0|
|    75%|                   7000.0|
|    max|         3.22014670262E11|
+-------+-------------------------+



                                                                                

# 2.3.2 joins
### 2.3.2.1 Inner Join

In [185]:
# Criando dois DataFrames# para ser o exemplo

produtos = spark.createDataFrame(
    [
        ('1', 'Bebidas', 'Água mineral'), 
        ('2', 'Limpeza', 'Sabão em pó'), 
        ('3', 'Frios', 'Queijo'), 
        ('4', 'Bebidas', 'Refrigerante'),
        ('5', 'Pet', 'Ração para cães')
    ],
    ['id', 'cat', 'prod']
)

impostos = spark.createDataFrame(
    [
        ('Bebidas', 0.15), 
        ('Limpeza', 0.05),
        ('Frios', 0.065),
        ('Carnes', 0.08)
    ],
    ['cat', 'tax']
)

In [186]:
# Analisando o DataFrame

produtos.toPandas()

Unnamed: 0,id,cat,prod
0,1,Bebidas,Água mineral
1,2,Limpeza,Sabão em pó
2,3,Frios,Queijo
3,4,Bebidas,Refrigerante
4,5,Pet,Ração para cães


In [187]:
# Analisando o DataFrame

impostos.toPandas()

Unnamed: 0,cat,tax
0,Bebidas,0.15
1,Limpeza,0.05
2,Frios,0.065
3,Carnes,0.08


In [188]:
# Realizando os joins

produtos.join(impostos, 'cat', how='inner')\
    .sort('id')\
    .show(truncate=False)

+-------+---+------------+-----+
|cat    |id |prod        |tax  |
+-------+---+------------+-----+
|Bebidas|1  |Água mineral|0.15 |
|Limpeza|2  |Sabão em pó |0.05 |
|Frios  |3  |Queijo      |0.065|
|Bebidas|4  |Refrigerante|0.15 |
+-------+---+------------+-----+



                                                                                

### 2.3.2.2 Left Join

In [189]:
# Realizando os Left join

produtos.join(impostos, 'cat', how='left')\
    .sort('id')\
    .show(truncate=False)

+-------+---+---------------+-----+
|cat    |id |prod           |tax  |
+-------+---+---------------+-----+
|Bebidas|1  |Água mineral   |0.15 |
|Limpeza|2  |Sabão em pó    |0.05 |
|Frios  |3  |Queijo         |0.065|
|Bebidas|4  |Refrigerante   |0.15 |
|Pet    |5  |Ração para cães|NULL |
+-------+---+---------------+-----+



### 2.3.2.3 Right Join

In [190]:
# Realizando os Right join

produtos.join(impostos, 'cat', how='right')\
    .sort('id')\
    .show(truncate=False)

+-------+----+------------+-----+
|cat    |id  |prod        |tax  |
+-------+----+------------+-----+
|Carnes |NULL|NULL        |0.08 |
|Bebidas|1   |Água mineral|0.15 |
|Limpeza|2   |Sabão em pó |0.05 |
|Frios  |3   |Queijo      |0.065|
|Bebidas|4   |Refrigerante|0.15 |
+-------+----+------------+-----+



### 2.3.2.4 Outer Join

In [191]:
# Realizando os outer join

produtos.join(impostos, 'cat', how='outer')\
    .sort('id')\
    .show(truncate=False)

+-------+----+---------------+-----+
|cat    |id  |prod           |tax  |
+-------+----+---------------+-----+
|Carnes |NULL|NULL           |0.08 |
|Bebidas|1   |Água mineral   |0.15 |
|Limpeza|2   |Sabão em pó    |0.05 |
|Frios  |3   |Queijo         |0.065|
|Bebidas|4   |Refrigerante   |0.15 |
|Pet    |5   |Ração para cães|NULL |
+-------+----+---------------+-----+



### 2.3.2.5 Join com nossos dados

In [192]:
# Realizando os joins com nossos dados

empresas_join = estabelecimentos.join(empresas, 'cnpj_basico', how='inner')

In [193]:
# Verificando que as informações de empresas e estabelecimentos estão unidas, aquelas que tem match, ou seja que tem os mesmo cnpj basico

empresas_join.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [194]:
freq = empresas_join\
    .select('cnpj_basico',
            f.year('data_de_inicio_atividade').alias('data_de_inicio')
    )\
    .where('data_de_inicio >= 2010')\
    .groupBy('data_de_inicio')\
    .agg(f.count('cnpj_basico').alias("frequencia"))\
    .orderBy('data_de_inicio', ascending=True)

In [195]:
freq.show()

24/07/15 21:46:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:23 WARN RowBasedKeyValueBatch: Calling spill() on

+--------------+----------+
|data_de_inicio|frequencia|
+--------------+----------+
|          2010|    154159|
|          2011|    172677|
|          2012|    232480|
|          2013|    198424|
|          2014|    202276|
|          2015|    212523|
|          2016|    265417|
|          2017|    237292|
|          2018|    275435|
|          2019|    325922|
|          2020|    400654|
|          2021|    153275|
+--------------+----------+



                                                                                

In [196]:
# Criar um total

freq.union(
    freq.select(
        f.lit('total').alias('data_de_inicio'),
        f.sum(freq.frequencia).alias('frequencia')
    )
).show()

24/07/15 21:46:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:33 WARN RowBasedKeyValueBatch: Calling spill() on

+--------------+----------+
|data_de_inicio|frequencia|
+--------------+----------+
|          2010|    154159|
|          2011|    172677|
|          2012|    232480|
|          2013|    198424|
|          2014|    202276|
|          2015|    212523|
|          2016|    265417|
|          2017|    237292|
|          2018|    275435|
|          2019|    325922|
|          2020|    400654|
|          2021|    153275|
|         total|   2830534|
+--------------+----------+



# 2.4 Utilizar SQL no SparkSQL

### Para saber mais sobre performance: [Artigo - Spark RDDs vs DataFrames vs SparkSQL](https://community.cloudera.com/t5/Community-Articles/Spark-RDDs-vs-DataFrames-vs-SparkSQL/ta-p/246547)

In [197]:
# Exemplo 1 - trazer a tabela empresas

# Sempre que usar o spark SQL é necessario criar uma VIEW, spark.sql trabalha diretamente com SQL, 
# e o SQL precisa de uma tabela ou view conhecida para consultar.

# Criando a VIEW empresas_view
empresas.createOrReplaceTempView("empresas_view")

# Comando SQL 
spark.sql(

    "SELECT * FROM empresas_view"

).show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                         49|                      0.0|               1|                       NULL|
|       1355|         BRASILEIRO & OLIV...|             2062|                         49|                      0.0|               5|                       NULL|
|       4820|         REGISTRO DE IMOVE...|             3034|                         32|                      0.0|               5|                       NULL|
|       5347|         ROSELY APARE

In [198]:
# Exemplo 2 - filtrar empresas com capital social IGUAL a 50

# Comando SQL
spark.sql("""
        SELECT *
            FROM empresas_view
            WHERE capital_social_da_empresa = 50
"""
).show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|   17350147|         ERIK MARCELO DOS ...|             2135|                         50|                     50.0|               1|                       NULL|
|   17833214|         ALEXANDRE MACHADO...|             2135|                         50|                     50.0|               1|                       NULL|
|   20860830|         YASMIN MOURA DA F...|             2135|                         50|                     50.0|               1|                       NULL|
|   22242856|         JOAO CESAR M

In [199]:
# Exemplo 3 - calcular a média do capital social por porte da empresa

# Comando SQL
spark.sql("""
    SELECT porte_da_empresa, MEAN(capital_social_da_empresa) AS Media
          FROM empresas_view
          Group by porte_da_empresa       
""").show(5)



+----------------+------------------+
|porte_da_empresa|             Media|
+----------------+------------------+
|            NULL|  8.35421888053467|
|               1|339994.53313507047|
|               3|2601001.7677092687|
|               5| 708660.4208249793|
+----------------+------------------+



                                                                                

In [200]:
# Exemplo 4 - Calcular a frequencia de CNPJ abertos por ano

# Criando a VIEW empresas_Join_View
empresas_join.createOrReplaceTempView('empresas_Join_View')

# Comando SQL 
freq = spark.sql("""
    SELECT COUNT(cnpj_basico) AS frequencia, YEAR(data_de_inicio_atividade) AS data_de_inicio
        FROM empresas_Join_View
        WHERE YEAR(data_de_inicio_atividade) >= 2010
        GROUP BY data_de_inicio
        ORDER BY data_de_inicio       
""")

freq.show()

24/07/15 21:46:47 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:47 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:47 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:47 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:47 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:47 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:48 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:48 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:48 WARN RowBasedKeyValueBatch: Calling spill() on

+----------+--------------+
|frequencia|data_de_inicio|
+----------+--------------+
|    154159|          2010|
|    172677|          2011|
|    232480|          2012|
|    198424|          2013|
|    202276|          2014|
|    212523|          2015|
|    265417|          2016|
|    237292|          2017|
|    275435|          2018|
|    325922|          2019|
|    400654|          2020|
|    153275|          2021|
+----------+--------------+



                                                                                

In [201]:
# Exercicio 5 - Calcular a frequencia de CNPJ abertos por ano E unir com um total

# Criando a VIEW freq_view
freq.createOrReplaceTempView('freq_view')

# Comando SQL
freq_view = spark.sql("""
    SELECT *
        FROM freq_view
    UNION ALL                  
    SELECT SUM(frequencia) AS frequencia, 'total' AS data_de_inicio
        FROM freq_view    
""")

freq_view.show()

24/07/15 21:46:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:46:55 WARN RowBasedKeyValueBatch: Calling spill() on

+----------+--------------+
|frequencia|data_de_inicio|
+----------+--------------+
|    154159|          2010|
|    172677|          2011|
|    232480|          2012|
|    198424|          2013|
|    202276|          2014|
|    212523|          2015|
|    265417|          2016|
|    237292|          2017|
|    275435|          2018|
|    325922|          2019|
|    400654|          2020|
|    153275|          2021|
|   2830534|         total|
+----------+--------------+



                                                                                

In [202]:
# Exemplo 6 - Replicar o Exemplo 4 com comando PySpark

# Comando PySpark
empresas_join\
    .select(f.year(empresas_join.data_de_inicio_atividade).alias('data_de_inicio'))\
    .where("data_de_inicio >= 2010")\
    .groupBy('data_de_inicio')\
    .count()\
    .orderBy('data_de_inicio')\
    .show()

24/07/15 21:47:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:47:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:47:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:47:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:47:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:47:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:47:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:47:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/15 21:47:08 WARN RowBasedKeyValueBatch: Calling spill() on

+--------------+------+
|data_de_inicio| count|
+--------------+------+
|          2010|154159|
|          2011|172677|
|          2012|232480|
|          2013|198424|
|          2014|202276|
|          2015|212523|
|          2016|265417|
|          2017|237292|
|          2018|275435|
|          2019|325922|
|          2020|400654|
|          2021|153275|
+--------------+------+



                                                                                

### 2.5 Armazenamento

### 2.5.1 Sáida CSV

In [203]:
# Criar a pasta de output se não existir
Path_out_empresas = 'out_CSV_empresas'

if not os.path.exists(Path_out_empresas):
    os.makedirs(Path_out_empresas)

# Realizando o output de empresas
empresas.write.csv(
    path=Path_out_empresas,
    mode='overwrite',
    sep=';',
    header=True
)

                                                                                

In [204]:
# Criar a pasta de output se não existir
Path_out_estabelecimentos = 'out_CSV_estabelecimentos'

if not os.path.exists(Path_out_estabelecimentos):
    os.makedirs(Path_out_estabelecimentos)

# Realizando o output de empresas
estabelecimentos.write.csv(
    path=Path_out_estabelecimentos,
    mode='overwrite',
    sep=';',
    header=True
)

                                                                                

In [205]:
# Criar a pasta de output se não existir
Path_out_socios = 'out_CSV_socios'

if not os.path.exists(Path_out_socios):
    os.makedirs(Path_out_socios)

# Realizando o output de empresas
socios.write.csv(
    path=Path_out_socios,
    mode='overwrite',
    sep=';',
    header=True
)

                                                                                

In [206]:
# Dica - Header (cabeçalho)

# Quando o cabeçalho é disponivel devemos usar o argumento header=True para leitura do arquivo
empresas_dica = spark.read.csv(Path_out_empresas, sep=';', inferSchema=True, header=True)
empresas_dica.show(5)



+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                         49|                      0.0|               1|                       NULL|
|       1355|         BRASILEIRO & OLIV...|             2062|                         49|                      0.0|               5|                       NULL|
|       4820|         REGISTRO DE IMOVE...|             3034|                         32|                      0.0|               5|                       NULL|
|       5347|         ROSELY APARE

                                                                                

### 2.5.2 Saída PARQUET

In [207]:
# Salvando em APACHE PARQUET 

# Criar a pasta de output se não existir
Path_out_empresas = 'out_PARQUET_empresas'

if not os.path.exists(Path_out_empresas):
    os.makedirs(Path_out_empresas)

# Realizando o output de empresas
empresas.write.parquet(
    path=Path_out_empresas,
    mode='overwrite'
)

                                                                                

In [208]:
# Configurando o modo de rebaseamento de datetime para "LEGACY" ou "CORRECTED"
spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY")

# ou
# spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED")

# Salvando em APACHE PARQUET 

# Criar a pasta de output se não existir
Path_out_estabelecimentos = 'out_PARQUET_estabelecimentos'

if not os.path.exists(Path_out_estabelecimentos):
    os.makedirs(Path_out_estabelecimentos)

# Realizando o output de empresas
estabelecimentos.write.parquet(
    path=Path_out_estabelecimentos,
    mode='overwrite'
)

                                                                                

In [209]:
# Salvando em APACHE PARQUET 

# Criar a pasta de output se não existir
Path_out_socios = 'out_PARQUET_socios'

if not os.path.exists(Path_out_socios):
    os.makedirs(Path_out_socios)

# Realizando o output de empresas
socios.write.parquet(
    path=Path_out_socios,
    mode='overwrite',
)

                                                                                

# 2.6 PARTICIONAMENTO

In [210]:
# Particionamento para salvar o arquivo CSV

# Criar a pasta de output se não existir
Path_out_empresas = 'out_CSV_empresa_particionamento'

if not os.path.exists(Path_out_empresas):
    os.makedirs(Path_out_empresas)

# Realizando o output de empresas
empresas.coalesce(1).write.csv(
    path=Path_out_empresas,
    mode='overwrite',
    sep=';',
    header=True
)

# coalesce(X) X = Quantidade de partições

                                                                                

In [211]:
# Salvando em APACHE PARQUET 

# Criar a pasta de output se não existir
Path_out_empresas = 'out_PARQUET_empresas_particionamento'

if not os.path.exists(Path_out_empresas):
    os.makedirs(Path_out_empresas)

# Realizando o output de empresas particionamento pela quantidade de portes que tem na coluna "porte_da_empresa"
empresas.write.parquet(
    path=Path_out_empresas,
    mode='overwrite',
    partitionBy='porte_da_empresa'
)

# Outro exemplo desse tipo de particionamento é quando queremos particionar o arquivo pelos tipos de grupos de dados, exemplo:
# - Grupos A+B+C+D......, Categorias etc

                                                                                

In [212]:
# Encerrar o servidor Apache Spark
 
spark.stop()

# 3 - CONCLUSÃO

Após este grande estudo sobre Spark é possivel concluir que o spark é um grande aliado para processar grandes massas de dados, em ambiente corporativos o Spark é largamente utilizado para processar dados de sites de compras, app, redes sociais, IoT e etc, o Spark tem uma arquitetura robusta para lidar com grandes quantidades de dados e tem uma interface homem > Máquina simplificada para programar rotinas de modelagem, junções, agragações, cálculos etc com facilidade.