<a href="https://colab.research.google.com/github/fraanpsilva/conhecendo-apachespark/blob/main/Conhecendo_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Apache Spark - Introdução

#### Apache Spark
Apache Spark é uma plataforma de computação em cluster que fornece uma API para programação distribuída para processamento de dados em larga escala, semelhante ao modelo MapReduce, mas projetada para ser rápida para consultas interativas e algoritmos iterativos.

O Spark permite que você distribua distribua dados e tarefas em clusters com vários nós. Imagine cada nó como um computador separado. A divisão dos dados torna mais fácil o trabalho com conjuntos de dados muito grandes porque cada nó funciona processa apenas uma parte parte do volume total de dados.

O Spark é amplamente utilizado em projetos analíticos nas seguintes frentes:

- Preparação de dados
- Modelos de machine learning
- Análise de dados em tempo real

### [PySpark](https://spark.apache.org/docs/3.1.2/api/python/index.html)

PySpark é uma interface para Apache Spark em Python. Ele não apenas permite que você escreva aplicativos Spark usando APIs Python, mas também fornece o *shell* PySpark para analisar interativamente seus dados em um ambiente distribuído. O PySpark oferece suporte à maioria dos recursos do Spark, como Spark SQL, DataFrame, Streaming, MLlib (Machine Learning) e Spark Core.

<center><img src="https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/img-001.png"/></center>

#### Spark SQL e DataFrame

Spark SQL é um módulo Spark para processamento de dados estruturados. Ele fornece uma abstração de programação chamada DataFrame e também pode atuar como mecanismo de consulta SQL distribuído.

#### Spark Streaming

Executando em cima do Spark, o recurso de *streaming* no Apache Spark possibilita o uso de poderosas aplicações interativas e analíticas em *streaming* e dados históricos, enquanto herda a facilidade de uso do Spark e as características de tolerância a falhas.

#### Spark MLlib

Construído sobre o Spark, MLlib é uma biblioteca de aprendizado de máquina escalonável que fornece um conjunto uniforme de APIs de alto nível que ajudam os usuários a criar e ajustar *pipelines* de aprendizado de máquina práticos.

#### Spark Core

Spark Core é o mecanismo de execução geral subjacente para a plataforma Spark sobre o qual todas as outras funcionalidades são construídas. Ele fornece um RDD (*Resilient Distributed Dataset*) e recursos de computação na memória.

## Projeto
Este projeto consiste em ler, manipular, tratar e salvar um conjunto de dados volumosos utilizando como ferramenta o Spark.

#### Preparando o ambiente

In [None]:
!pip install pyspark
!pip install findspark

# imports
import pyspark
import findspark

findspark.init()

In [3]:
# Acessando o Spark UI (Google colab)
from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .master('local[*]') \
  .appName("Iniciando com Spark") \
  .config('spark.ui.port', '4050') \
  .getOrCreate()

In [None]:
# ngrok
# !pip install pyngrok
!wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

get_ipython().system_raw('./ngrok authtoken 2LeJdoEHQYMMY2AMgZlN6nJSYLP_4GQCaPyW1ybDAbiYX1uE8')
get_ipython().system_raw('./ngrok http 4050 &')

In [46]:
!curl -s http://localhost:4040/api/tunnels

{"tunnels":[{"name":"command_line","uri":"/api/tunnels/command_line","public_url":"https://3908-35-184-68-153.ngrok.io","proto":"https","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}},{"name":"command_line (http)","uri":"/api/tunnels/command_line%20%28http%29","public_url":"http://3908-35-184-68-153.ngrok.io","proto":"http","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}}],"uri":"/api/tunnels"}


In [None]:
# montando o drive
from google.colab import drive
drive.mount('/content/drive')


#### Carregando os dados

In [7]:
# carregando os dados
import zipfile

In [9]:
# extraindo arquivos empresas.zip
zipfile.ZipFile('/content/drive/MyDrive/Colab Notebooks/spark/empresas.zip', 'r').extractall('/content/drive/MyDrive/Colab Notebooks/spark/arquivos-projeto')
# lendo os dados
path = '/content/drive/MyDrive/Colab Notebooks/spark/arquivos-projeto/empresas'
empresas = spark.read.csv(path, sep=';', inferSchema=True)
# verificando quantos registros há no arquivo
empresas.count()

In [11]:
# extraindo e lendo arquivos estabelecimento.zip
zipfile.ZipFile('/content/drive/MyDrive/Colab Notebooks/spark/estabelecimentos (1).zip', 'r').extractall('/content/drive/MyDrive/Colab Notebooks/spark/arquivos-projeto')
path_estab = '/content/drive/MyDrive/Colab Notebooks/spark/arquivos-projeto/estabelecimentos'
estab = spark.read.csv(path_estab, sep=';', inferSchema=True)
estab.count()

4836219

In [12]:
# extraindo e lendo arquivos socios.zip
zipfile.ZipFile('/content/drive/MyDrive/Colab Notebooks/spark/socios.zip', 'r').extractall('/content/drive/MyDrive/Colab Notebooks/spark/arquivos-projeto')
path_socios = '/content/drive/MyDrive/Colab Notebooks/spark/arquivos-projeto/socios'
socios = spark.read.csv(path_socios, sep=';', inferSchema=True)
socios.count()

2046430

#### Manipulando e analisando os dados 
DataTypes Spark (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/data_types.html

In [13]:
# renomeando as colunas
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situação_especial']
sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

In [None]:
# retornando as tuplas com os indices das colunas
for index, colName in enumerate(empresasColNames):
  empresas = empresas.withColumnRenamed(f"_c{index}", colName)
empresas.columns
empresas.limit(3).toPandas()

In [None]:
# retornando as tuplas com os indices das colunas (estabelecimento)
for index, colName in enumerate(estabsColNames):
  estab = estab.withColumnRenamed(f"_c{index}", colName)

estab.columns
estab.limit(3).toPandas()

In [None]:
# retornando as tuplas com os indices das colunas (socios)
for index, colName in enumerate(sociosColNames):
  socios = socios.withColumnRenamed(f"_c{index}", colName)

socios.columns
socios.limit(3).toPandas()


In [None]:
# verificando os tipos de coluna do datafram
#empresas.printSchema()
#estab.printSchema()
#socios.printSchema()

#### Convertendo String ➝ Double e String ➝ Date
`StringType ➝ DoubleType`

`StringType ➝ Date`


In [20]:
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [None]:
empresas.limit(3).toPandas()

In [None]:
# substituindo a virgula pelo ponto
empresas = empresas.withColumn('capital_social_da_empresa', f.regexp_replace('capital_social_da_empresa', ',', '.'))
empresas.limit(3).toPandas()

In [None]:
# alterando o tipo de dados de uma columa
empresas = empresas.withColumn('capital_social_da_empresa', empresas['capital_social_da_empresa'].cast(DoubleType()))
empresas.printSchema()

In [None]:
#estab.printSchema()

In [None]:
# convertendo String para date (estabelecimentos)
estab = estab \
  .withColumn(
      "data_situacao_cadastral", 
      f.to_date(estab.data_situacao_cadastral.cast(StringType()), 'yyyyMMdd')
    )\
    .withColumn(
      "data_de_inicio_atividade",
      f.to_date(estab.data_de_inicio_atividade.cast(StringType()), 'yyyyMMdd')
        
    )\
    .withColumn(
      "data_da_situação_especial",
      f.to_date(estab.data_da_situação_especial.cast(StringType()), 'yyyyMMdd')
        
    ) 
estab.printSchema()

In [None]:
estab.limit(3).toPandas()

In [None]:
socios.printSchema()
socios.limit(3).toPandas()

In [None]:
# convertendo String para date (socios)
socios = socios \
  .withColumn(
      "data_de_entrada_sociedade", 
      f.to_date(socios.data_de_entrada_sociedade.cast(StringType()), 'yyyyMMdd')
    )
socios.printSchema()

## Seleções e consultas

#### Selecionando informações

Saprk sql (https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.sql.html#functions

In [None]:
# select * from
empresas\
  .select('*') \
  .show(5, False)

In [None]:
# selecionando colunas especificas
empresas \
  .select('natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa') \
  .show(5)

In [None]:
socios\
  .select('nome_do_socio_ou_razao_social', 'faixa_etaria',
          f.year('data_de_entrada_sociedade'). alias('ano_de_entrada')) \
  .show(5, False)

In [None]:
estab\
  .select('nome_fantasia', 'municipio',
          f.year('data_de_inicio_atividade').alias('ano_de_inicio_atividade'),
          f.month('data_de_inicio_atividade').alias('mes_de_inicio_atividade'))\
  .show(5, False)

### Identificando valores nulos

In [34]:
socios.limit(3).toPandas() # ou
socios.limit(3).show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|        411|                     2|         LILIANA PATRICIA ...|         ***678188**|                   22|               1994-07-25|null|        ***000000**|                 null|                                  0|           7|
|        411|                     2|         CRISTINA HUNDERTMARK|      

In [35]:
# contagem dos valores nulos
socios \
  .select([f.count(f.when(f.isnull(c), 1)) \
  .alias(c) \
  for c in socios.columns]).show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|   pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|          0|                     0|                          208|                1234|                    0|                        1|2038255|                  0|              1995432|                                  0|           0|
+-----------+----------------------+------------------------

In [36]:
#substituindo os valores
socios.na.fill(0).limit(3).toPandas() # ao encontrar um valor nulo, substitui por 0 se for do tipo inteiro

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,0,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,0,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,0,***000000**,,0,8


In [37]:
socios.na.fill("-").limit(3).toPandas()  # ao encontrar um valor nulo, substitui por - se for do tipo string

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,-,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,-,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,-,0,8


### Ordenando dados

In [None]:
socios \
  .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada')) \
  .show(5, False)

In [None]:
# ordenando pela data de entrada e faixa etaria (do maior para o menor) - duas colunas
socios \
  .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada')) \
  .orderBy(['ano_de_entrada', 'faixa_etaria'], ascending=[False, False]) \
  .show(10, False)

### Filtrando dados

In [40]:
empresas \
  .where("capital_social_da_empresa == 50") \
  .show(5, False)

+-----------+------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial       |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|17350147   |ERIK MARCELO DOS SANTOS 42107848858 |2135             |50                         |50.0                     |1               |null                       |
|17833214   |ALEXANDRE MACHADO LIMA 73750123772  |2135             |50                         |50.0                     |1               |null                       |
|20860830   |YASMIN MOURA DA FONSECA 13457709793 |2135             |50                         |50.0                     |1               |null                 

In [41]:
# aninhando filtros (select não é obrigatório nos filtros)
socios \
  .select("nome_do_socio_ou_razao_social") \
  .filter(socios.nome_do_socio_ou_razao_social.startswith("RODRIGO")) \
  .filter(socios.nome_do_socio_ou_razao_social.endswith("DIAS")) \
  .limit(10) \
  .toPandas()
  #.count() \




Unnamed: 0,nome_do_socio_ou_razao_social
0,RODRIGO BENASSI DIAS
1,RODRIGO RUDIBERTO DIAS
2,RODRIGO AURELIANO DIAS
3,RODRIGO SIMOES LEMOS DIAS
4,RODRIGO GEORGE DIAS
5,RODRIGO AUGUSTO FELICIO DIAS
6,RODRIGO FERNANDES DIAS
7,RODRIGO GARRIDO DIAS
8,RODRIGO OLIVEIRA DIAS
9,RODRIGO GONCALVES DIAS


### Comando LIKE

In [None]:
# consulta com o comando LIKE
empresas \
  .select('razao_social_nome_empresarial', 'natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa') \
  .filter(f.upper(empresas['razao_social_nome_empresarial']).like('%RESTAURANTE')) \
  .show(15, False)

### Agrupamento e sumarização dos dados

In [None]:
# contagem e agrupamento por ano de entrada na sociedade de empresa
socios \
  .select(f.year('data_de_entrada_sociedade').alias('ano_de_entrada')) \
  .where('ano_de_entrada >= 2010') \
  .groupBy('ano_de_entrada') \
  .count() \
  .orderBy('ano_de_entrada', ascending=True) \
  .show()

In [None]:
empresas \
  .select('cnpj_basico', 'porte_da_empresa', 'capital_social_da_empresa') \
  .groupBy('porte_da_empresa') \
  .agg(
      f.avg("capital_social_da_empresa").alias("Capital_social_medio"),
      f.count("cnpj_basico").alias("Frequencia")) \
  .orderBy('porte_da_empresa', ascendig=True) \
  .show()

In [None]:
empresas \
  .select("capital_social_da_empresa") \
  .summary()\
  .show()

# .summary("count", "mean", "stddev", "min", "25%", "50%", "75%", "max")

### Juntando DataFrames

In [None]:
empresas.printSchema()

In [None]:
estab.printSchema()

In [None]:
socios.printSchema()

In [None]:
# junção de estabelecimentos com empresas
empresas_join = estab.join(empresas, 'cnpj_basico', how='inner')
empresas_join.printSchema()

In [58]:
# distribuição de frequencia segundo o ano de inicio de atividade
freq = empresas_join \
  .select(
      'cnpj_basico',
      f.year('data_de_inicio_atividade').alias('data_de_inicio'))\
  .where('data_de_inicio >= 2010') \
  .groupBy('data_de_inicio') \
  .agg(f.count('cnpj_basico').alias('frequencia')) \
  .orderBy('data_de_inicio', ascending=True)

In [None]:
freq.toPandas()

In [None]:
#fazendo uma union
freq.union(
    freq.select(
        f.lit('Total').alias('data_de_inicio'),
        f.sum(freq.frequencia).alias('frequencia')
    )
). show()

### SparkSQL

A SparkSession permite que façamos consultas SQL utilizando escritas da maneira padrão. para que isso seja possível, é necessário que criemos uma view temporária do dataframe, que será usada dentro das instruções SQL em nosso método.

In [62]:
# criando uma view temporária
empresas.createOrReplaceTempView('empresasView')

In [None]:
# escrevendo instruções SQL
spark.sql("SELECT * FROM empresasView").show(5)

In [None]:
spark \
  .sql("""
        SELECT razao_social_nome_empresarial, capital_social_da_empresa FROM empresasView
        WHERE capital_social_da_empresa = 50""")\
  .show(5)

In [None]:
spark \
  .sql("""
        SELECT porte_da_empresa, MEAN(capital_social_da_empresa) AS Media
        FROM empresasView
        GROUP BY porte_da_empresa""")\
  .show(5)

In [67]:
# criando uma view temporária de empresas_join
empresas_join.createOrReplaceTempView('empresasJoinView')

In [None]:
freq = spark \
  .sql("""
        SELECT YEAR(data_de_inicio_atividade) AS data_de_inicio, COUNT(cnpj_basico) AS count_cnpj
        FROM empresasJoinView
        WHERE YEAR(data_de_inicio_atividade) >= 2010
        GROUP BY data_de_inicio
        ORDER BY data_de_inicio""")
  
freq.show(5)

In [71]:
freq.createOrReplaceTempView('freqView')

In [None]:
spark\
  .sql("""
        SELECT * FROM freqView
        UNION ALL 
        SELECT 'total' as data_de_inicio, SUM(count_cnpj) AS count
        FROM freqView
      """)\
  .show()

### Arquivos CSV

[DataFrameWriter.csv(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.write.html)

https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.csv.html

In [75]:
# salvando os dados em arquivo .csv
empresas.write.csv(
    path = '/content/drive/MyDrive/Colab Notebooks/spark/arquivos-projeto/empresas/csv',
    mode='overwrite',
    sep=';',
    header=True
)

In [77]:
# lendo os arquivos salvos
empresas2 = spark.read.csv(
    '/content/drive/MyDrive/Colab Notebooks/spark/arquivos-projeto/empresas/csv',
    sep=';',
    inferSchema=True,
    header=True)

empresas2.printSchema()

In [83]:
# salvando os dados em arquivo .csv (estabelecimentos)
estab.write.csv(
    path = '/content/drive/MyDrive/Colab Notebooks/spark/arquivos-projeto/estabelecimentos/csv',
    mode='overwrite',
    sep=';',
    header=True
)

In [None]:
# lendo os arquivos salvos
estab2 = spark.read.csv(
    '/content/drive/MyDrive/Colab Notebooks/spark/arquivos-projeto/estabelecimentos/csv',
    sep=';',
    inferSchema=True,
    header=True)

estab2.printSchema()


In [None]:
# salvando os dados em arquivo .csv (socios)
socios.write.csv(
    path = '/content/drive/MyDrive/Colab Notebooks/spark/arquivos-projeto/socios/csv',
    mode='overwrite',
    sep=';',
    header=True
)

In [None]:
# lendo os arquivos salvos
socios2 = spark.read.csv(
    '/content/drive/MyDrive/Colab Notebooks/spark/arquivos-projeto/socios/csv',
    sep=';',
    inferSchema=True,
    header=True)

socios2.printSchema()


### Arquivos Parquet

[Apache Parquet](https://parquet.apache.org/docs/)


In [80]:
# salvando os dados em arquivo .parquet
empresas.write.parquet(
    path = '/content/drive/MyDrive/Colab Notebooks/spark/arquivos-projeto/empresas/parquet',
    mode='overwrite')

In [81]:
# lendo os arquivos .parquet
empresas_parquet = spark.read.parquet(
    '/content/drive/MyDrive/Colab Notebooks/spark/arquivos-projeto/empresas/parquet')

empresas_parquet.printSchema()

In [None]:
estab.write.parquet(
    path = '/content/drive/MyDrive/Colab Notebooks/spark/arquivos-projeto/estabelecimentos/parquet',
    mode='overwrite')

estabs_parquet = spark.read.parquet('/content/drive/MyDrive/Colab Notebooks/spark/arquivos-projeto/estabelecimentos/parquet')
estabs_parquet.printSchema()

In [None]:
socios.write.parquet(
    path = '/content/drive/MyDrive/Colab Notebooks/spark/arquivos-projeto/socios/parquet',
    mode='overwrite')

socios_parquet = spark.read.parquet('/content/drive/MyDrive/Colab Notebooks/spark/arquivos-projeto/socios/parquet')
socios_parquet.printSchema()

### Particionamento dos dados

[DataFrameWriter.partitionBy(*cols)](https://)

In [84]:
empresas.coalesce(1).write.csv(
    path='/content/drive/MyDrive/Colab Notebooks/spark/arquivos-projeto/empresas/csv-unico',
    mode='overwrite',
    sep=';',
    header=True
)

In [85]:
empresas.write.parquet(
    path = '/content/drive/MyDrive/Colab Notebooks/spark/arquivos-projeto/empresas/parquet_partitionBy',
    mode='overwrite',
    partitionBy='porte_da_empresa')

In [86]:
# encerrando a sparkSession
spark.stop()