# Inicializando uma sessão Spark

In [None]:
# instalar as dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # instalação do java versao 8
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz # download do apache sparl 3.1.2 + Hadoop 2.7 (ambos compactados num arquivo unico .tgz)
!tar xf spark-3.1.2-bin-hadoop2.7.tgz # extração do arquivo compactado baixado
!pip install -q findspark # instalação do findspark

# os q's que ficam ao lado de cada comando são para não exibir nenhum log (quiet)

In [None]:
import os # permite definir variáveis ambiente
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
import findspark # FindSpark utilizado para encontrar o Spark (i.e., a conexão com o Cluster, que nesse caso é local)
findspark.init()

In [None]:
from pyspark.sql import SparkSession

#definindo a variável que representa a sessão Spark

spark = SparkSession.builder\
        .master('local[*]')\
        .appName('Iniciando com o Spark')\
        .getOrCreate()

# master('local[*]') serve para indicar que a conexão é local e que estamos interessados
# em utilizar todos os CPU's (Unidade Central de Processamento) disponíveis; poderíamos
# colocar valores numéricos inteiros também

# Ou seja, em outras palavras, nesse caso a nossa própria máquina faz o papel de cluster.

In [None]:
spark

# Inicializando uma sessão Spark com acesso ao Spark UI via google colab

- Para executar a próxima sequência de comandos (que envolve a configuração do proxy ngrok) é necessário resetar o notebook nas configurações do canto superior direito.
- [Site Ngrok](https://dashboard.ngrok.com/get-started/your-authtoken)

In [None]:
# instalar as dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # instalação do java versao 8
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz # download do apache sparl 3.1.2 + Hadoop 2.7 (ambos compactados num arquivo unico .tgz)
!tar xf spark-3.1.2-bin-hadoop2.7.tgz # extração do arquivo conpactado baixado
!pip install -q findspark # instalação do findspark

# os q's que ficam ao lado de cada comando são para não exibir nenhum log (quiet)

In [None]:
import os # permite definir variáveis ambiente
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
import findspark # FindSpark utilizado para encontrar o Spark (i.e., a conexão com o Cluster, que nesse caso é local)
findspark.init()

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

# O .config está settando a "porta" a Spark Ui vai estar sendo executada / estar rodando

In [None]:
!wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip # link ftp do arquivo que queremos baixar
!unzip ngrok-stable-linux-amd64.zip

# o ngrok permite

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   


In [None]:
get_ipython().system_raw('./ngrok authtoken 2KTaeQfiCf5raHW5tJq1PkhWwJb_5dJ4UynSrWoP1V6aqjngr') 
get_ipython().system_raw('./ngrok http 4050 &')

In [None]:
!curl -s http://localhost:4040/api/tunnels

{"tunnels":[],"uri":"/api/tunnels"}


In [None]:
# Esse último comando nos permite acessar o Spark Ui da sessão spark de maneira
# segura através do proxy ngrok. É o public_url que nos interessa.

# A spark ui nos permite verificar quais processos estão sendo executados lá no nó 
# (máquina) do cluster do google colab.

# Data frames com Spark (básico)

- As sequências de códigos a seguir só podem ser executadas com a sessão spark inicalizada

In [None]:
data = [('Lucas','21'),('Joana','45')] # lista com tuplas (essa é uma das formas de criar um dataframe da maneira correta)
colNames = ['Nome','Idade']

df = spark.createDataFrame(data, colNames) # Criação de um dataframe dentro da sessão spark
df

DataFrame[Nome: string, Idade: string]

In [None]:
df.show() # apresentação do dataframe

+-----+-----+
| Nome|Idade|
+-----+-----+
|Lucas|   21|
|Joana|   45|
+-----+-----+



In [None]:
display(df.toPandas()) # conversão para um dataframe pandas

Unnamed: 0,Nome,Idade
0,Lucas,21
1,Joana,45


# Início do projeto didático de tratamento de dados

- Leitura, manipulação, tratamento e armazenamento de dados públicos de empresas / estabelecimentos obtidos do site da receita federal.

> [Empresas](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/empresas.zip)
> 
> [Estabelecimentos](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/estabelecimentos.zip)
> 
> [Sócios](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/socios.zip)


### Importando os dados e criando dataframes

In [None]:
from google.colab import drive
drive.mount('/content/drive') # montando a pasta/diretorio no drive

Mounted at /content/drive


In [None]:
import zipfile 
# importando a biblioteca default do python responsável por descompactar arquivos

In [None]:
# A função ZipFile() serve para localizar o arquivo de interesse e o que queremos fazer com ele

# O método extractall() serve para extrair todas as informações armazenadas nesse arquivo zip e 
# colocar dentro do diretório fornecido

path_origem_compactado = '/content/drive/MyDrive/Colab_Notebooks/Alura-CursoSpark/empresas.zip' # path do arquivo compactado
path_origem_descompactado = '/content/drive/MyDrive/Colab_Notebooks/Alura-CursoSpark/' # diretório no qual a pasta descompactada se encontra
zipfile.ZipFile(path_origem_compactado,'r').extractall(path_origem_descompactado)

# o parâmetro 'r' serve para indicar o fato de que queremos apenas ler os arquivos contidos
# no "arquivo maior" compactado

In [None]:
# A partir da extração dos arquivos, efetuo a definição do dataframe de trabalho

path_arquivo_empresas = path_origem_descompactado + 'empresas'
df_empresas = spark.read.csv(path_arquivo_empresas, sep=';', inferSchema=True)
df_empresas.show()

+-----+--------------------+----+---+-------+---+----+
|  _c0|                 _c1| _c2|_c3|    _c4|_c5| _c6|
+-----+--------------------+----+---+-------+---+----+
|  306|FRANCAMAR REFRIGE...|2240| 49|   0,00|  1|null|
| 1355|BRASILEIRO & OLIV...|2062| 49|   0,00|  5|null|
| 4820|REGISTRO DE IMOVE...|3034| 32|   0,00|  5|null|
| 5347|ROSELY APARECIDA ...|2135| 50|   0,00|  5|null|
| 6846|BADU E FILHOS TEC...|2062| 49|4000,00|  1|null|
| 8416|  ELETRICA RUBI LTDA|2062| 49|   0,00|  5|null|
| 8992|SHIROMA VEICULOS ...|2062| 49|   0,00|  5|null|
| 9091|CONTATOS BAR E LA...|2062| 49|   0,00|  5|null|
| 9614|ANTONIA APARECIDA...|2135| 50|   0,00|  5|null|
| 9896|DORACY CORAT DA C...|2135| 50|   0,00|  5|null|
|12112|LANCHONETE RIO VE...|2062| 49|   0,00|  5|null|
|12605|VALMAR JACAREI CO...|2062| 49|   0,00|  5|null|
|13407|ROSANA CRISTINA D...|2135| 50|   0,00|  5|null|
|13408|CELIO RODRIGUES D...|2135| 50|   0,00|  5|null|
|13721|MAQFRAN COMERCIO ...|2062| 49|   0,00|  1|null|
|21181|MOU

In [None]:
# Efetuando os mesmos procedimentos para a base de sócios e de estabelecimentos...
# Começo descompactando os arquivos .zip
zipfile.ZipFile('/content/drive/MyDrive/Colab_Notebooks/Alura-CursoSpark/estabelecimentos.zip','r').extractall('/content/drive/MyDrive/Colab_Notebooks/Alura-CursoSpark/')
zipfile.ZipFile('/content/drive/MyDrive/Colab_Notebooks/Alura-CursoSpark/socios.zip','r').extractall('/content/drive/MyDrive/Colab_Notebooks/Alura-CursoSpark/')


In [None]:
# Agora defino os dataframes a partir dos dados contidos nas pastas de arquivos particionados 
# (contendo informações sobre cada base de interesse)

path_pasta_curso = '/content/drive/MyDrive/Colab_Notebooks/Alura-CursoSpark/'
path_estabelecimentos = path_pasta_curso + 'estabelecimentos'
path_socios = path_pasta_curso + 'socios'
df_estabelecimentos = spark.read.csv(path_estabelecimentos, sep=';', inferSchema = True)
df_socios = spark.read.csv(path_socios, sep = ';', inferSchema = True)

In [None]:
df_estabelecimentos.show()

+-----+---+---+---+--------------------+---+--------+---+----+----+--------+-------+--------------------+-------+--------------------+------+-------------------+------------------+--------+----+----+----+--------+----+--------+----+--------+--------------------+----+----+
|  _c0|_c1|_c2|_c3|                 _c4|_c5|     _c6|_c7| _c8| _c9|    _c10|   _c11|                _c12|   _c13|                _c14|  _c15|               _c16|              _c17|    _c18|_c19|_c20|_c21|    _c22|_c23|    _c24|_c25|    _c26|                _c27|_c28|_c29|
+-----+---+---+---+--------------------+---+--------+---+----+----+--------+-------+--------------------+-------+--------------------+------+-------------------+------------------+--------+----+----+----+--------+----+--------+----+--------+--------------------+----+----+
| 1879|  1| 96|  1|      PIRAMIDE M. C.|  8|20011029|  1|null|null|19940509|1412602|                null|    RUA|     JOSE FIGLIOLINI|   608|               null|         VILA NILO| 

In [None]:
df_socios.show()

+-----+---+--------------------+-----------+---+--------+----+-----------+----+---+----+
|  _c0|_c1|                 _c2|        _c3|_c4|     _c5| _c6|        _c7| _c8|_c9|_c10|
+-----+---+--------------------+-----------+---+--------+----+-----------+----+---+----+
|  411|  2|LILIANA PATRICIA ...|***678188**| 22|19940725|null|***000000**|null|  0|   7|
|  411|  2|CRISTINA HUNDERTMARK|***637848**| 28|19940725|null|***000000**|null|  0|   7|
| 5813|  2|CELSO EDUARDO DE ...|***786068**| 49|19940516|null|***000000**|null|  0|   8|
| 5813|  2|EDUARDO BERRINGER...|***442348**| 49|19940516|null|***000000**|null|  0|   5|
|14798|  2| HANNE MAHFOUD FADEL|***760388**| 49|19940609|null|***000000**|null|  0|   8|
|14798|  2|    CLOD ASSAD FADEL|***205668**| 22|19940609|null|***000000**|null|  0|   6|
|17826|  2|   WALKYRIA ALGARVES|***689078**| 49|19970227|null|***000000**|null|  0|   7|
|17826|  2|SEBASTIAO JADIR T...|***904728**| 49|20090813|null|***000000**|null|  0|   5|
|19491|  2|     JOSE 

### Renomeando colunas

In [None]:
# Podemos usar o método .withColumnRenamed(<old_name>,<new_name>) de data frames spark para
# renomear as colunas dos data frames criados

# 1)  Renomeando as colunas do data frame de empresas
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

for index_lista, nome_coluna in enumerate(empresasColNames):
  df_empresas = df_empresas.withColumnRenamed('_c{}'.format(index_lista), nome_coluna)

df_empresas.limit(10).show() # mostrando as 10 primeiras linhas / tuplas

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                         49|                     0,00|               1|                       null|
|       1355|         BRASILEIRO & OLIV...|             2062|                         49|                     0,00|               5|                       null|
|       4820|         REGISTRO DE IMOVE...|             3034|                         32|                     0,00|               5|                       null|
|       5347|         ROSELY APARE

In [None]:
# 2) Renomeando as colunas do data frame de estabelecimentos
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral'\
                  ,'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade'\
                  , 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro'\
                  , 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico'\
                  , 'situacao_especial', 'data_da_situacao_especial']

for index_lista, nome_coluna in enumerate(estabsColNames):
  df_estabelecimentos = df_estabelecimentos.withColumnRenamed('_c{}'.format(index_lista),nome_coluna)

# 3) Renomeando as colunas do data frame de socios
sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social'\
                  ,'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal'\
                  ,'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

for index_lista, nome_coluna in enumerate(sociosColNames):
  df_socios = df_socios.withColumnRenamed('_c{}'.format(index_lista),nome_coluna)

df_estabelecimentos.limit(10).show()
df_socios.limit(10).show()

+-----------+----------+-------+---------------------------+-----------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+--------------------+------+-------------------+------------------+-------+---+---------+-----+----------+-----+----------+----------+--------+--------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|    nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|          logradouro|numero|        complemento|            bairro|    cep| uf|municipio|ddd_1|telefone_1|ddd_2|telefone_2|ddd_do_fax|     fax|  correio_eletronico|situacao_especial|data_da_situacao_especial|
+-----------+----------+-------+--------------

### Dando início ao processo de tratamento dos dados contidos nos data frames construídos

In [None]:
# Verificando as colunas (atributos) do data frame de empresas e quais tipos 
# de dados cada uma delas armazena

df_empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [None]:
# Verificando o schema do data frame de socios

df_socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [None]:
# Verificando o schema do data frame de estabelecimentos

df_estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str

### Modificado tipos de dados

In [None]:
from pyspark.sql.types import DataType, StringType, DoubleType
from pyspark.sql import functions as f

In [None]:
df_empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [None]:
df_empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [None]:
# Um dos problemas observados durante a aula foi que a coluna capital social, além de incluir vírgulas na notação monetária (padrão típico brasileiro
# que, entretanto, não é recomendável), também está definida com dados tipo string.

# A sequência de códigos a seguir tem como objetivo o seguinte tratamento:
  # 1) Trocar ',' por '.' (padrão americano)
  # 2) Alterar a definição da coluna de String para DoubleType (float com 2 casas decimais de precisão)

# 1) Trocando ',' por '.'
df_empresas = df_empresas.withColumn('capital_social_da_empresa',f.regexp_replace('capital_social_da_empresa',',','.'))
df_empresas.limit(5).show()

# Essencialmente, o método ".withColumn" serve para manipular colunas num DF spark, retornando como resultado um outro DF spark. 
# Nesse caso, como queríamos atualizar o df_empresas, fizemos a manipulação típica do Python.

# O primeiro argumento corresponde à coluna do DataFrame que queremos mexer, criar ou modificar, enquanto que o segundo argumento se refere
# ao tratamento que essa coluna receberá.

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                         49|                     0.00|               1|                       null|
|       1355|         BRASILEIRO & OLIV...|             2062|                         49|                     0.00|               5|                       null|
|       4820|         REGISTRO DE IMOVE...|             3034|                         32|                     0.00|               5|                       null|
|       5347|         ROSELY APARE

In [None]:
# 2) Trocando Stringtype para DoubleType

df_empresas = df_empresas.withColumn('capital_social_da_empresa',df_empresas['capital_social_da_empresa'].cast(DoubleType()))
df_empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [None]:
# Os data frames de estabelecimentos e socios possuem colunas de data cadastradas como string cujos dados vêm 
# representados no padrão yyyymmdd. Queremos agora fazer a manipulação dessas colunas e definí-las como DataTypes

# 1) Coluna 'data_de_entrada_sociedade' da tabela socios

df_socios = df_socios.withColumn('data_de_entrada_sociedade', f.to_date( df_socios['data_de_entrada_sociedade'].cast(StringType()), 'yyyyMMdd' ) )
df_socios.limit(5).show()

# o segundo argumento da funcao to_date não se refere ao formato que queremos que nosso resultado apareça, mas ao formato dos registros aos quais
# a função está sendo aplicada (ou seja, como esses dados estão chegando pra função)

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|        411|                     2|         LILIANA PATRICIA ...|         ***678188**|                   22|               1994-07-25|null|        ***000000**|                 null|                                  0|           7|
|        411|                     2|         CRISTINA HUNDERTMARK|      

In [None]:
# 2) Coluna 'data_situacao_cadastral' da tabela de estabelecimentos

df_estabelecimentos = df_estabelecimentos\
                .withColumn('data_situacao_cadastral'
                , f.to_date(df_estabelecimentos.data_situacao_cadastral.cast(StringType()), 'yyyyMMdd' ) # outra maneira de selecionar colunas de um dataframe df: df.nome_coluna
                )

df_estabelecimentos.show()


+-----------+----------+-------+---------------------------+--------------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+--------------------+------+-------------------+------------------+--------+---+---------+-----+----------+-----+----------+----------+--------+--------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|       nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|          logradouro|numero|        complemento|            bairro|     cep| uf|municipio|ddd_1|telefone_1|ddd_2|telefone_2|ddd_do_fax|     fax|  correio_eletronico|situacao_especial|data_da_situacao_especial|
+-----------+----------+-------+------

In [None]:
df_estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string

In [None]:
# É possível concatenar as aplicações da função with column. Façamos isso para tratar as demais colunas de data. A tabela socios nao possui mais nenhuma,
# mas a tabela estabelecimentos ainda contém as colunas 'data_de_inicio_atividade' e 'data_da_situacao_especial'

df_estabelecimentos = df_estabelecimentos\
                    .withColumn(
                      'data_de_inicio_atividade'
                    , f.to_date( df_estabelecimentos['data_de_inicio_atividade'].cast(StringType()), 'yyyyMMdd' )
                    ) \
                    .withColumn(
                        'data_da_situacao_especial'
                        , f.to_date( df_estabelecimentos['data_da_situacao_especial'].cast(StringType()), 'yyyyMMdd') 
                    )
df_estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [None]:
df_socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



### Comando Select

In [None]:
# Seleção / busca de dados específicos
# DataFrame.select(cols)

# Selecionando todas as colunas:
df_socios\
  .select('*')\
  .limit(10)\
  .show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|        411|                     2|         LILIANA PATRICIA ...|         ***678188**|                   22|               1994-07-25|null|        ***000000**|                 null|                                  0|           7|
|        411|                     2|         CRISTINA HUNDERTMARK|      

In [None]:
# Verificando os mesmos dados, mas com uma sintaxe diferente
df_socios\
  .select('*')\
  .show(10, False) 
  
# Dentro de "show()", o argumento 10 indica que queremos ver só as primeiras 10 linhas,
# enquanto o argumento False indica que não queremos que os registros sejam truncados


+-----------+----------------------+-------------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social  |cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-------------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|411        |2                     |LILIANA PATRICIA GUASTAVINO    |***678188**         |22                   |1994-07-25               |null|***000000**        |null                 |0                                  |7           |
|411        |2                     |CRISTINA HUNDERTMARK        

In [None]:
# Selecionando, da tabela empresas, as colunas 'natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa'

df_empresas\
  .select('natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa')\
  .show(10, False)


+-----------------+----------------+-------------------------+
|natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-----------------+----------------+-------------------------+
|2240             |1               |0.0                      |
|2062             |5               |0.0                      |
|3034             |5               |0.0                      |
|2135             |5               |0.0                      |
|2062             |1               |4000.0                   |
|2062             |5               |0.0                      |
|2062             |5               |0.0                      |
|2062             |5               |0.0                      |
|2135             |5               |0.0                      |
|2135             |5               |0.0                      |
+-----------------+----------------+-------------------------+
only showing top 10 rows



In [None]:
df_socios.show(5)

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|        411|                     2|         LILIANA PATRICIA ...|         ***678188**|                   22|               1994-07-25|null|        ***000000**|                 null|                                  0|           7|
|        411|                     2|         CRISTINA HUNDERTMARK|      

In [None]:
# Selecionando, da tabela socios, as colunas 'nome_do_socio_ou_razao_social', 'faixa_etaria' e o ano de entrada na sociedade

df_socios\
  .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('Ano de entrada'))\
  .show(10,False)

# A coluna "Ano de entrada" poderia ser obtida da mesma maneira usando a função "substring": f.substring('data_de_entrada_sociedade', 0, 4)

+-------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social  |faixa_etaria|Ano de entrada|
+-------------------------------+------------+--------------+
|LILIANA PATRICIA GUASTAVINO    |7           |1994          |
|CRISTINA HUNDERTMARK           |7           |1994          |
|CELSO EDUARDO DE CASTRO STEPHAN|8           |1994          |
|EDUARDO BERRINGER STEPHAN      |5           |1994          |
|HANNE MAHFOUD FADEL            |8           |1994          |
|CLOD ASSAD FADEL               |6           |1994          |
|WALKYRIA ALGARVES              |7           |1997          |
|SEBASTIAO JADIR TEIXEIRA NUNES |5           |2009          |
|JOSE JOAO ADAMO                |7           |1994          |
|ROSEMARY CANTUARIA AFONSO ADAMO|6           |1994          |
+-------------------------------+------------+--------------+
only showing top 10 rows



In [None]:
# Selecionando, da tabela de estabelecimentos, as colunas 'nome_fantasia' e 'municipio', adicionando as colunas 'ano_de_inicio_atividade' e
# 'mes_de_inicio_atividade', ambas extraídas da coluna 'data_de_inicio_atividade'

df_estabelecimentos\
  .select('nome_fantasia'
      ,'municipio'
      ,f.year('data_de_inicio_atividade').alias('ano_de_inicio_atividade')
      ,f.month('data_de_inicio_atividade').alias('mes_de_inicio_atividade')
    )\
  .show(5,truncate=False)


+-----------------+---------+-----------------------+-----------------------+
|nome_fantasia    |municipio|ano_de_inicio_atividade|mes_de_inicio_atividade|
+-----------------+---------+-----------------------+-----------------------+
|PIRAMIDE M. C.   |7107     |1994                   |5                      |
|null             |7107     |1994                   |5                      |
|null             |7107     |1994                   |5                      |
|null             |7107     |1994                   |5                      |
|EMBROIDERY & GIFT|7075     |1995                   |5                      |
+-----------------+---------+-----------------------+-----------------------+
only showing top 5 rows



### Desafio! 

- No dataframe definido abaixo, faça uma consulta incluindo as colunas "Identidade" e "Idade", sendo que a coluna "Identidade" contém o último e o primeiro nome de cada indivíduo separados por uma vírgula.


In [None]:
# ! Desafio !

data = [
    ('GISELLE PAULA GUIMARAES CASTRO', 15),
    ('ELAINE GARCIA DE OLIVEIRA', 22),
    ('JOAO CARLOS ABNER DE LOURDES', 43),
    ('MARTA ZELI FERREIRA', 24),
    ('LAUDENETE WIGGERS ROEDER', 51)
]
colNames = ['nome', 'idade']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

+------------------------------+-----+
|nome                          |idade|
+------------------------------+-----+
|GISELLE PAULA GUIMARAES CASTRO|15   |
|ELAINE GARCIA DE OLIVEIRA     |22   |
|JOAO CARLOS ABNER DE LOURDES  |43   |
|MARTA ZELI FERREIRA           |24   |
|LAUDENETE WIGGERS ROEDER      |51   |
+------------------------------+-----+



In [None]:
df\
  .select(
      f.concat_ws(
          ', ',
          f.substring_index('nome', ' ', -1),
          f.substring_index('nome', ' ', 1),
      ).alias('Ident'),
      'Idade'
  )\
  .show(10, truncate=False)

+-----------------+-----+
|Ident            |Idade|
+-----------------+-----+
|CASTRO, GISELLE  |15   |
|OLIVEIRA, ELAINE |22   |
|LOURDES, JOAO    |43   |
|FERREIRA, MARTA  |24   |
|ROEDER, LAUDENETE|51   |
+-----------------+-----+



- Solução do problema!
  - A função concat_ws só permite concatenar colunas ou registros de colunas do próprio dataframe (i.e., não é possível concatenar os dados de uma coluna com o valor default 'xurupita', por exemplo). O primeiro argumento recebe o separador dos campos concatenados, enquanto os outros dois campos recebem os registros / strings que serão concatenados.
  - Para obter o primeiro e o último nome utilizamos a função substring_index(str, delim, count), que retorna como resultado a cadeia de caracteres do campo 'str' imediatamente antes de que o padrão 'delim' tenha atingido a contagem máxima de registros indicada em 'count'.
  - Quanto o campo 'count' recbe um valor positivo, a contagem começa no sentido usual. Quanto recebe um valor negativo, a contagem começa de trás para frente.

- Exemplo de utilização da substring_index:

      input: substring_index('Lucas de Paula Oliveira', ' ', 2)
      output: 'Lucas de'

  

### Ordenamento de dados

In [None]:
df_socios\
  .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
  .orderBy('ano_de_entrada', ascending = False)\
  .show(10,False)

# Ordenação do select pela coluna ano_de_entrada, do maior ano para o menor

+------------------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social             |faixa_etaria|ano_de_entrada|
+------------------------------------------+------------+--------------+
|MARIA SUELY DE MOURA                      |5           |2021          |
|MAURICIO ALVARES DA SILVA VELLOSO FERREIRA|6           |2021          |
|JOAO CESAR DIAS DE FARIAS                 |6           |2021          |
|ANTONOALDO GRANGEON TRANCOSO NEVES        |5           |2021          |
|EDUARDO LOPES BARBOSA DE OLIVEIRA         |5           |2021          |
|ANTONIO SOARES NEIVA NETO                 |7           |2021          |
|LEONARDO MENNA BARRETO LARANJA GONCALVES  |5           |2021          |
|MANOEL ADRIANO COSTA BARBOSA              |6           |2021          |
|RAIMUNDA ADRIANA OLIVEIRA LEITAO          |4           |2021          |
|CLAUDIO PINHEIRO DE FREITAS               |5           |2021          |
+------------------------------------------+-------

In [None]:
# É possível ordenar também por mais de uma coluna, definindo as configurações do order by para cada uma
df_socios\
  .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
  .orderBy(['ano_de_entrada','faixa_etaria'], ascending = [False,False])\
  .show(10,False)

# O código acima inclui o ordenamento pelas colunas ano_de_entrada e faixa_etaria, ambas do maior valor para o menor

+-------------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social        |faixa_etaria|ano_de_entrada|
+-------------------------------------+------------+--------------+
|MARIA RAIMUNDA DOS SANTOS LANZA      |9           |2021          |
|RENILDE DAS GRACAS MAIA              |9           |2021          |
|DORIS PEREIRA GOMES JAZRA            |9           |2021          |
|MARIA JOSE DOMINGUES BONATO          |9           |2021          |
|ZELIA MARIA CAMARA RODRIGUES DA SILVA|9           |2021          |
|JOSE DA SILVA                        |9           |2021          |
|DEMOSTENES JACOB HUHN PINTO          |9           |2021          |
|NADIR BICHARA CHUAHY                 |9           |2021          |
|DEIA DA CUNHA BECK PINTO             |9           |2021          |
|REYNALDO FIORIO                      |9           |2021          |
+-------------------------------------+------------+--------------+
only showing top 10 rows



In [None]:
data = [
    ('CARMINA RABELO', 4, 2010), 
    ('HERONDINA PEREIRA', 6, 2009), 
    ('IRANI DOS SANTOS', 12, 2010), 
    ('JOAO BOSCO DA FONSECA', 3, 2009), 
    ('CARLITO SOUZA', 1, 2010), 
    ('WALTER DIAS', 9, 2009), 
    ('BRENO VENTUROSO', 1, 2009), 
    ('ADELINA TEIXEIRA', 5, 2009), 
    ('ELIO SILVA', 7, 2010), 
    ('DENIS FONSECA', 6, 2010)
]
colNames = ['nome', 'mes', 'ano']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|CARMINA RABELO       |4  |2010|
|HERONDINA PEREIRA    |6  |2009|
|IRANI DOS SANTOS     |12 |2010|
|JOAO BOSCO DA FONSECA|3  |2009|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
+---------------------+---+----+



In [None]:
df\
  .select('*')\
  .orderBy(['ano','mes'], ascending=[False,False])\
  .show(5,False)

+----------------+---+----+
|nome            |mes|ano |
+----------------+---+----+
|IRANI DOS SANTOS|12 |2010|
|ELIO SILVA      |7  |2010|
|DENIS FONSECA   |6  |2010|
|CARMINA RABELO  |4  |2010|
|CARLITO SOUZA   |1  |2010|
+----------------+---+----+
only showing top 5 rows



### Filtro de dados

- [Cláusula where](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.where.html)
- [Cláusula filter](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.filter.html)

In [None]:
# Utilizando a cláusula where
df_empresas\
  .where('capital_social_da_empresa = 50')\
  .show(5, truncate = False)

+-----------+------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial       |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|17350147   |ERIK MARCELO DOS SANTOS 42107848858 |2135             |50                         |50.0                     |1               |null                       |
|17833214   |ALEXANDRE MACHADO LIMA 73750123772  |2135             |50                         |50.0                     |1               |null                       |
|20860830   |YASMIN MOURA DA FONSECA 13457709793 |2135             |50                         |50.0                     |1               |null                 

In [None]:
# É possível também aninhar diferentes filtros. Façamos um teste com o uso da cláusula "filter", a fim de exemplo.
df_socios\
  .select('nome_do_socio_ou_razao_social')\
  .filter(df_socios.nome_do_socio_ou_razao_social.startswith('RODRIGO'))\
  .where(df_socios.nome_do_socio_ou_razao_social.endswith('DIAS'))\
  .toPandas()\
  #.count()

# Estamos procurando todos os sócios que têm o primeiro nome "RODRIGO" e último nome "DIAS"

Unnamed: 0,nome_do_socio_ou_razao_social
0,RODRIGO BENASSI DIAS
1,RODRIGO RUDIBERTO DIAS
2,RODRIGO AURELIANO DIAS
3,RODRIGO SIMOES LEMOS DIAS
4,RODRIGO GEORGE DIAS
...,...
62,RODRIGO CRUANES DE SOUZA DIAS
63,RODRIGO GUEDES DIAS
64,RODRIGO DE OLIVEIRA DIAS
65,RODRIGO THADEU BORALLI DIAS


### Exercício!
  - No dataframe spark definido abaixo, faça uma seleção de todos os alunos nascidos no primeiro semestre de 2019

In [None]:
data = [
    ('CARMINA RABELO', 4, 2010), 
    ('HERONDINA PEREIRA', 6, 2009), 
    ('IRANI DOS SANTOS', 12, 2010), 
    ('JOAO BOSCO DA FONSECA', 3, 2009), 
    ('CARLITO SOUZA', 1, 2010), 
    ('WALTER DIAS', 9, 2009), 
    ('BRENO VENTUROSO', 1, 2009), 
    ('ADELINA TEIXEIRA', 5, 2009), 
    ('ELIO SILVA', 7, 2010), 
    ('DENIS FONSECA', 6, 2010)
]
colNames = ['nome', 'mes', 'ano']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|CARMINA RABELO       |4  |2010|
|HERONDINA PEREIRA    |6  |2009|
|IRANI DOS SANTOS     |12 |2010|
|JOAO BOSCO DA FONSECA|3  |2009|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
+---------------------+---+----+



In [None]:
# Existem diversas maneiras de fazer esse exercício. Aqui vão algumas delas:
# 1)
df\
  .filter('mes <= 6')\
  .filter('ano = 2009')\
  .show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|HERONDINA PEREIRA    |6  |2009|
|JOAO BOSCO DA FONSECA|3  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
+---------------------+---+----+



In [None]:
# 2)
df\
  .select('*')\
  .where('ano = 2009 and mes <= 6')\
  .show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|HERONDINA PEREIRA    |6  |2009|
|JOAO BOSCO DA FONSECA|3  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
+---------------------+---+----+



In [None]:
# 3)
df\
  .filter( (df.mes <= 6) & (df.ano == 2009) )\
  .orderBy(df.mes, ascending=False)\
  .show(truncate=False)

# Lembrando que, utilizando a notação de colunas do dataframe tal como feito aqui, deve-se empregar os operadores lógicos do python corretamente.
# (==, >, <, >=, <=, etc).
# Além disso, é importante lembrar também que, dentro da linguagem de dataframes, os operadores AND e OR são representados por & e |, respectivamente.

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|HERONDINA PEREIRA    |6  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|JOAO BOSCO DA FONSECA|3  |2009|
|BRENO VENTUROSO      |1  |2009|
+---------------------+---+----+



### Comando LIKE

In [None]:
# Definindo um dataframe de exemplo
df = spark.createDataFrame([('RESTAURANTE DO RUI',), ('Juca restaurantes ltda',), ('Joca Restaurante',)], ['data'])
df.toPandas()

Unnamed: 0,data
0,RESTAURANTE DO RUI
1,Juca restaurantes ltda
2,Joca Restaurante


In [None]:
# Seleção de todos os registros que têm a palavra 'RESTAURANTE' no nome:

df\
  .select(f.upper('data').alias('data'))\
  .where(f.upper('data').like('%RESTAURANTE%'))\
  .show(truncate=False)

# a princípio não precisaria do select, mas o upper aplicado na cláusula where não interfere nos 
# resultados da pesquisa (i.e., dados escritos em letra minúscula continuariam com letra minúscula)

+----------------------+
|data                  |
+----------------------+
|RESTAURANTE DO RUI    |
|JUCA RESTAURANTES LTDA|
|JOCA RESTAURANTE      |
+----------------------+



In [None]:
# Seleção de todos os registros que começam com a palavra 'RESTAURANTE'
df\
  .where(f.upper('data').like('RESTAURANTE%'))\
  .show(truncate=False)

# Também seria possível obter o mesmo resultado, NESSE CASO, por exemplo, utilizando o endswith
df\
  .where(f.upper('data').startswith('RESTAURANTE'))\
  .show(truncate=False)


+------------------+
|data              |
+------------------+
|RESTAURANTE DO RUI|
+------------------+

+------------------+
|data              |
+------------------+
|RESTAURANTE DO RUI|
+------------------+



In [None]:
# Seleção de todos os registros que terminam com a palavra 'RESTAURANTE'
df\
  .filter(f.upper('data').like('%RESTAURANTE'))\
  .show(truncate=False)

# Também seria possível obter o mesmo resultado, NESSE CASO, por exemplo, utilizando o endswith
df\
  .filter(f.upper('data').endswith('RESTAURANTE'))\
  .show()

+----------------+
|data            |
+----------------+
|Joca Restaurante|
+----------------+

+----------------+
|            data|
+----------------+
|Joca Restaurante|
+----------------+



### Sumarização de dados
- Funções / funcionalidades importantes:
  - [df.GroupBy()](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.groupby.html)
  - [df.Agg()](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.agg.html)
  - [df.Summary()](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.summary.html)

In [None]:
# Comecemos utilizando a função GroupBy. O select a seguir faz os seguintes procedimentos:
  # Seleciona, da tabela de socios, a contagem de todos os registros agrupados e ordenados pelo ano de entrada, tais que 
  # o ano de entrada é maior ou igual a 2010

df_socios\
  .select(f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
  .where('ano_de_entrada >= 2010')\
  .groupBy('ano_de_entrada')\
  .count()\
  .orderBy('ano_de_entrada',ascending = True)\
  .show()

# Perceba que, ao utilizar a cláusula groupBy, precisamos indicar explicitamente qual função de agrupamento estamos interessados em obter
# (nesse caso era a contagem dos dados para cada ano)

+--------------+------+
|ano_de_entrada| count|
+--------------+------+
|          2010| 79337|
|          2011| 83906|
|          2012| 80101|
|          2013| 83919|
|          2014| 80590|
|          2015| 80906|
|          2016| 81587|
|          2017| 90221|
|          2018| 99935|
|          2019|118248|
|          2020|125927|
|          2021| 56316|
+--------------+------+



In [None]:
# A cláusula .agg() é um complemento. Com ela é possível aplicar explicitamente as funções de agrupamento de interesse, e ainda definir
# apelidos (nomes) para as colunas tratadas

# Como exemplo, faremos a seguinte seleção no dataframe de empresas: selecionaremos as colunas 'cnpj_basico', 'porte_da_empresa' e 'capital_social_da_empresa',
# agrupadas pelo porte da empresa; aplicaremos a função de cálculo da média ao capital social para cada porde e, ainda, faremos uma contagem 
# de quantas empresas se enquadram em cada um desses portes.

df_empresas\
  .select('cnpj_basico', 'porte_da_empresa', 'capital_social_da_empresa')\
  .groupBy('porte_da_empresa')\
  .agg(
      f.avg('capital_social_da_empresa').alias('capital_social_medio'),
      f.count('cnpj_basico').alias('total_empresas')
  )\
  .orderBy('porte_da_empresa', ascending = False)\
  .show(10, truncate = False)

+----------------+--------------------+--------------+
|porte_da_empresa|capital_social_medio|total_empresas|
+----------------+--------------------+--------------+
|5               |708660.4208249798   |1335500       |
|3               |2601001.7677092673  |115151        |
|1               |339994.53313506936  |3129043       |
|null            |8.35421888053467    |5985          |
+----------------+--------------------+--------------+



In [None]:
df_empresas.select([f.count(f.when(f.isnull(c),1)).alias(c) for c in df_empresas.columns]).show()

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|          0|                            0|                0|                          0|                        0|            5985|                    4579678|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+



Atenção! 
  - As colunas capital_social_da_empresa e cnpj_basico não possuem nenhum registro null (None / NaN / null). Caso tivessem, seria necessário fazer um tratamento antes de aplicar as funções de agregação a fim de evitar erros de interpretação / análise de dados

In [None]:
# A função summary é importante para fornecer um overiew estatístico sobre o dataframe em estudo ou sobre uma determinada coluna de interesse

df_empresas\
  .select('capital_social_da_empresa', 'cnpj_basico')\
  .summary()\
  .show(15)

+-------+-------------------------+--------------------+
|summary|capital_social_da_empresa|         cnpj_basico|
+-------+-------------------------+--------------------+
|  count|                  4585679|             4585679|
|   mean|        503694.5478542675|2.6019821870497912E7|
| stddev|     2.1118691490537405E8|1.9400506431147214E7|
|    min|                      0.0|                  18|
|    25%|                      0.0|            11785327|
|    50%|                   1000.0|            23400108|
|    75%|                   7000.0|            35001698|
|    max|         3.22014670262E11|            99017782|
+-------+-------------------------+--------------------+



### Método CASE / OTHERWISE

In [None]:
del df

In [None]:
data = [
    ('CARLOS', 'MATEMÁTICA', 7), 
    ('IVO', 'MATEMÁTICA', 9), 
    ('MÁRCIA', 'MATEMÁTICA', 8), 
    ('LEILA', 'MATEMÁTICA', 9), 
    ('BRENO', 'MATEMÁTICA', 7), 
    ('LETÍCIA', 'MATEMÁTICA', 8), 
    ('CARLOS', 'FÍSICA', 2), 
    ('IVO', 'FÍSICA', 8), 
    ('MÁRCIA', 'FÍSICA', 10), 
    ('LEILA', 'FÍSICA', 9), 
    ('BRENO', 'FÍSICA', 1), 
    ('LETÍCIA', 'FÍSICA', 6), 
    ('CARLOS', 'QUÍMICA', 10), 
    ('IVO', 'QUÍMICA', 8), 
    ('MÁRCIA', 'QUÍMICA', 1), 
    ('LEILA', 'QUÍMICA', 10), 
    ('BRENO', 'QUÍMICA', 7), 
    ('LETÍCIA', 'QUÍMICA', 9)
]
colNames = ['nome', 'materia', 'nota']
df = spark.createDataFrame(data, colNames)
df.show()

+-------+----------+----+
|   nome|   materia|nota|
+-------+----------+----+
| CARLOS|MATEMÁTICA|   7|
|    IVO|MATEMÁTICA|   9|
| MÁRCIA|MATEMÁTICA|   8|
|  LEILA|MATEMÁTICA|   9|
|  BRENO|MATEMÁTICA|   7|
|LETÍCIA|MATEMÁTICA|   8|
| CARLOS|    FÍSICA|   2|
|    IVO|    FÍSICA|   8|
| MÁRCIA|    FÍSICA|  10|
|  LEILA|    FÍSICA|   9|
|  BRENO|    FÍSICA|   1|
|LETÍCIA|    FÍSICA|   6|
| CARLOS|   QUÍMICA|  10|
|    IVO|   QUÍMICA|   8|
| MÁRCIA|   QUÍMICA|   1|
|  LEILA|   QUÍMICA|  10|
|  BRENO|   QUÍMICA|   7|
|LETÍCIA|   QUÍMICA|   9|
+-------+----------+----+



In [None]:
# Seria interessante criar uma nova coluna contendo o "status" de desempenho desses alunos, com o valor "APROVADO" caso a nota tenha sido
# maior ou igual a 6 e com o valor "REPROVADO" caso a nota tenha sido menor que 6.

# Nesse caso o que precisamos fazer é adicionar uma coluna aplicando a cláusula .withColumn ao dataframe e indicando corretamente quais
# registros essa coluna receberá. É aqui que entra o método when/otherwise

df = df.withColumn('status', f.when(df.nota >= 7, 'APROVADO').otherwise('REPROVADO'))
df.orderBy(df.nota, ascending = False).show()

+-------+----------+----+---------+
|   nome|   materia|nota|   status|
+-------+----------+----+---------+
| CARLOS|   QUÍMICA|  10| APROVADO|
| MÁRCIA|    FÍSICA|  10| APROVADO|
|  LEILA|   QUÍMICA|  10| APROVADO|
|    IVO|MATEMÁTICA|   9| APROVADO|
|  LEILA|MATEMÁTICA|   9| APROVADO|
|LETÍCIA|   QUÍMICA|   9| APROVADO|
|  LEILA|    FÍSICA|   9| APROVADO|
|    IVO|    FÍSICA|   8| APROVADO|
|    IVO|   QUÍMICA|   8| APROVADO|
|LETÍCIA|MATEMÁTICA|   8| APROVADO|
| MÁRCIA|MATEMÁTICA|   8| APROVADO|
|  BRENO|MATEMÁTICA|   7| APROVADO|
|  BRENO|   QUÍMICA|   7| APROVADO|
| CARLOS|MATEMÁTICA|   7| APROVADO|
|LETÍCIA|    FÍSICA|   6|REPROVADO|
| CARLOS|    FÍSICA|   2|REPROVADO|
|  BRENO|    FÍSICA|   1|REPROVADO|
| MÁRCIA|   QUÍMICA|   1|REPROVADO|
+-------+----------+----+---------+



In [None]:
df\
  .select('nota')\
  .summary('min', '25%', '50%', '75%', 'max')\
  .show()

+-------+----+
|summary|nota|
+-------+----+
|    min|   1|
|    25%|   7|
|    50%|   8|
|    75%|   9|
|    max|  10|
+-------+----+



In [None]:
df\
  .select('status')\
  .groupBy('status')\
  .agg(
      f.count('status').alias('Contagem de Resultados')
  )\
  .show()

+---------+----------------------+
|   status|Contagem de Resultados|
+---------+----------------------+
|REPROVADO|                     4|
| APROVADO|                    14|
+---------+----------------------+



### Junção de DataFrames - Joins

- [df.join()](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.join.html)

In [None]:
# Existem diversos tipos de joins. A seguir veremos alguns exemplos envolvendo cada um deles
# Consideremos inicialmente o seguinte dataframe

produtos = spark.createDataFrame(
    [
        ('1', 'Bebidas', 'Água mineral'), 
        ('2', 'Limpeza', 'Sabão em pó'), 
        ('3', 'Frios', 'Queijo'), 
        ('4', 'Bebidas', 'Refrigerante'),
        ('5', 'Pet', 'Ração para cães')
    ],
    ['id', 'cat', 'prod']
)

impostos = spark.createDataFrame(
    [
        ('Bebidas', 0.15), 
        ('Limpeza', 0.05),
        ('Frios', 0.065),
        ('Carnes', 0.08)
    ],
    ['cat', 'tax']
)

In [None]:
produtos.toPandas()

Unnamed: 0,id,cat,prod
0,1,Bebidas,Água mineral
1,2,Limpeza,Sabão em pó
2,3,Frios,Queijo
3,4,Bebidas,Refrigerante
4,5,Pet,Ração para cães


In [None]:
impostos.toPandas()

Unnamed: 0,cat,tax
0,Bebidas,0.15
1,Limpeza,0.05
2,Frios,0.065
3,Carnes,0.08


In [None]:
# o Spark possui uma cláusula específica para juntar dataframes: .join()
# que carrega a seguinte sintaxe: df1.join(df2, 'coluna_de_conexao', how='tipo_de_join')

# 1) Inner Join
produtos.join(impostos, 'cat', how='inner').sort('id').show()

# O comando acima fez um inner join entre a tabela de produtos e a tabela de impostos, utilizando a coluna
# 'cat' como meio de conexão. Ou seja, o resultado desse código será um novo dataframe contendo
# todas as tuplas cujo registro 'cat' da tabela de produtos é igual ao registro 'cat' da tabela de produtos, além de incluir
# todas as colunas de ambos os dataframes envolvidos.

# (ordenamento pela coluna 'Id', da tabela produtos)

+-------+---+------------+-----+
|    cat| id|        prod|  tax|
+-------+---+------------+-----+
|Bebidas|  1|Água mineral| 0.15|
|Limpeza|  2| Sabão em pó| 0.05|
|  Frios|  3|      Queijo|0.065|
|Bebidas|  4|Refrigerante| 0.15|
+-------+---+------------+-----+



In [None]:
# 2) Left Join

produtos.join(impostos, 'cat', how='left').sort('id').show()

# O comando acima faz um left join entre a tabela de produtos e a tabela de impostos, utilizando a coluna 'cat' como meio de conexão.
# Ou seja, o resultado desse código traz todas as tuplas do dataframe de produtos (bem como suas colunas). Para os casos em que
# o dataframe impostos possui um registro correspondente na coluna 'cat', o vínculo é feito, mas para aqueles registros da tabela
# produtos que não possuem correspondente na tabela de impostos, o valor das colunas desse dataframe recebem 'null'


+-------+---+---------------+-----+
|    cat| id|           prod|  tax|
+-------+---+---------------+-----+
|Bebidas|  1|   Água mineral| 0.15|
|Limpeza|  2|    Sabão em pó| 0.05|
|  Frios|  3|         Queijo|0.065|
|Bebidas|  4|   Refrigerante| 0.15|
|    Pet|  5|Ração para cães| null|
+-------+---+---------------+-----+



In [None]:
# 3) Right Join

produtos.join(impostos, 'cat', how='right').sort('id').show()

# O comando acima faz um left join entre a tabela de produtos e a tabela de impostos, utilizando a coluna 'cat' como meio de conexão.
# Ou seja, o resultado desse código traz todas as tuplas do dataframe de impostos (bem como suas colunas). Para os casos em que
# o dataframe impostos possui um registro correspondente na coluna 'cat', o vínculo é feito, mas para aqueles registros da tabela
# produtos que não possuem correspondente na tabela de impostos, o valor das colunas desse dataframe recebem 'null'

+-------+----+------------+-----+
|    cat|  id|        prod|  tax|
+-------+----+------------+-----+
| Carnes|null|        null| 0.08|
|Bebidas|   1|Água mineral| 0.15|
|Limpeza|   2| Sabão em pó| 0.05|
|  Frios|   3|      Queijo|0.065|
|Bebidas|   4|Refrigerante| 0.15|
+-------+----+------------+-----+



In [None]:
# 4) Outer Join

produtos.join(impostos, 'cat', how='outer').sort('id').show() 

# Junta todas as tuplas / colunas de ambas as tabelas. Quando há correspondência na coluna 'cat', o vínculo é feito corretamente,
# mas quando não há as colunas dos respectivos dataframes recebem o valor 'null'

+-------+----+---------------+-----+
|    cat|  id|           prod|  tax|
+-------+----+---------------+-----+
| Carnes|null|           null| 0.08|
|Bebidas|   1|   Água mineral| 0.15|
|Limpeza|   2|    Sabão em pó| 0.05|
|  Frios|   3|         Queijo|0.065|
|Bebidas|   4|   Refrigerante| 0.15|
|    Pet|   5|Ração para cães| null|
+-------+----+---------------+-----+



In [None]:
# Podemos aplicar esses conceitos aos dataframes que viemos trabalhando até então

empresas_join = df_estabelecimentos.join(df_empresas, 'cnpj_basico', how = 'inner')
empresas_join.printSchema()

# Agora temos as informações de ambos os dataframes, todas relacionadas pela coluna 'cnpj_basico'

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [None]:
# Existe também uma outra forma de unir dataframes: o .union() (que funciona como uma espécie de append)
# Para isso vejamos o exemplo:

freq = empresas_join\
        .select('cnpj_basico', f.year('data_de_inicio_atividade').alias('ano_de_inicio'))\
        .where(f.year('data_de_inicio_atividade') >= 2010)\
        .groupBy('ano_de_inicio')\
        .count().alias('frequencia')\
        .orderBy('ano_de_inicio', ascending= False)
    
freq.show()

+-------------+------+
|ano_de_inicio| count|
+-------------+------+
|         2021|153275|
|         2020|400654|
|         2019|325922|
|         2018|275435|
|         2017|237292|
|         2016|265417|
|         2015|212523|
|         2014|202276|
|         2013|198424|
|         2012|232480|
|         2011|172677|
|         2010|154159|
+-------------+------+



In [None]:
# Uma vez obtido o resultado anterior, façamos a inclusão da linha "Total", contendo a contagem total de empresas
# (o mesmo poderia ser aplicado para dois dataframes distintos)

freq.union(
    freq.select(
        f.lit('Total').alias('ano_de_inicio'),
        f.sum('count')
    )
).show()

+-------------+-------+
|ano_de_inicio|  count|
+-------------+-------+
|         2021| 153275|
|         2020| 400654|
|         2019| 325922|
|         2018| 275435|
|         2017| 237292|
|         2016| 265417|
|         2015| 212523|
|         2014| 202276|
|         2013| 198424|
|         2012| 232480|
|         2011| 172677|
|         2010| 154159|
|        Total|2830534|
+-------------+-------+



### SparkSQL

In [None]:
# Para utilizar o método usual de consultas de SQL é necessário criar uma view temporária a partir
# dos dataframes de trabalho

df_empresas.createOrReplaceTempView('empresasView')

In [None]:
spark.sql(
    'select * from empresasView'
).show()

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                         49|                      0.0|               1|                       null|
|       1355|         BRASILEIRO & OLIV...|             2062|                         49|                      0.0|               5|                       null|
|       4820|         REGISTRO DE IMOVE...|             3034|                         32|                      0.0|               5|                       null|
|       5347|         ROSELY APARE

In [None]:
# A partir daqui é possível fazer as mesmas manipulações que nós já fizemos anteriormente, mas agora com o 
# "conforto" da linguagem sql

spark.sql(
    'Select * \
      from empresasView \
      where capital_social_da_empresa = 50'
).show()

# Analogamente:

spark.sql(
    '''Select * 
      from empresasView
      where capital_social_da_empresa = 50
      '''
).show()

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|   17350147|         ERIK MARCELO DOS ...|             2135|                         50|                     50.0|               1|                       null|
|   17833214|         ALEXANDRE MACHADO...|             2135|                         50|                     50.0|               1|                       null|
|   20860830|         YASMIN MOURA DA F...|             2135|                         50|                     50.0|               1|                       null|
|   22242856|         JOAO CESAR M

In [None]:
spark\
  .sql('''
      select porte_da_empresa, 
             mean(capital_social_da_empresa) as capital_social_medio
          from empresasView
          group by porte_da_empresa
          order by 1 desc
  ''').show()

+----------------+--------------------+
|porte_da_empresa|capital_social_medio|
+----------------+--------------------+
|               5|   708660.4208249798|
|               3|  2601001.7677092673|
|               1|  339994.53313506936|
|            null|    8.35421888053467|
+----------------+--------------------+



In [None]:
empresas_join.createOrReplaceTempView('empresasJoinView')

spark\
  .sql("""
      select * from empresasJoinView
  """).show()

+-----------+----------+-------+---------------------------+--------------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+------------------+------+-------------------+-----------------+--------+---+---------+-----+----------+-----+----------+----------+-------+--------------------+-----------------+-------------------------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|       nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|        logradouro|numero|        complemento|           bairro|     cep| uf|municipio|ddd_1|telefone_1|d

### Armazenamento de dados

- Gerando arquivos csv

In [None]:
df_empresas.write.csv(
    path='/content/drive/MyDrive/Colab_Notebooks/Alura-CursoSpark/empresas/csv',
    mode='overwrite',
    header=True,
    sep=';'
)

In [None]:
# Uma vez gerado esse arquivo csv (com todas as modificações já efetuadas), podemos
# reaproveitá-lo em estudos / necessidades futuras (sem a necessidade de precisar
# ficar refazendo todos os passos)

df2_empresas = spark.read.csv(
    '/content/drive/MyDrive/Colab_Notebooks/Alura-CursoSpark/empresas/csv',
    header=True,
    inferSchema=True,
    sep=';'
)

- Gerando arquivos .parquet

In [None]:
df_empresas.write.parquet(
    path='/content/drive/MyDrive/Colab_Notebooks/Alura-CursoSpark/empresas/parquet',
    mode='overwrite'
)

In [None]:
# no caso de leituras dos arquivos parquet não é necessário passar nada além do path; 
# o próprio spark (comandado por nós com o auxílio da biblioteca PySpark) já interpreta todas essas informações
# automaticamente.
df2_empresas = spark.read.parquet('/content/drive/MyDrive/Colab_Notebooks/Alura-CursoSpark/empresas/parquet')

### Particionamento de dados

- Por default o Spark já cria arquivos particionados quando executamos os comandos de armazenamento dos dados (isso independentemente de solicitarmos o armazenamento em formato csv ou parquet). Em alguns casos, entretanto, pode ser mais conveniente gerar um arquivo csv único (não particionado) ou, ainda, gerar um particionamento específico ao transcrever uma base de dados para o formato parquet. Essas funcionalidades serão exploradas nos comandos a seguir

In [None]:
# Geração de um arquivo csv de partição unitária (i.e., aglomeração de todos os dados da base
# num csv único)
df_empresas.coalesce(1).write.csv(
    path= '/content/drive/MyDrive/Colab_Notebooks/Alura-CursoSpark/empresas/csv-unico'
    ,mode='overwrite'
    ,sep=';'
    ,header=True
)

In [None]:
# Geração de arquivos parquet com um particionamento feito por uma coluna de interesse:
# porte_da_empresa

df_empresas.write.parquet(
    path='/content/drive/MyDrive/Colab_Notebooks/Alura-CursoSpark/empresas/parquet-particionado'
    ,partitionBy = 'porte_da_empresa'
    ,mode = 'overwrite'
)

# o que a sequência de comandos acima faz é particionar a base de dados em arquivos parquet,
# mas ela faz isso tomando a coluna porte_da_empresa como referência

# (ou seja, para cada valor armazenado nessa coluna uma nova pasta é criada no drive, e
# dentro dessa pasta, por sua vez, são alocadas as partições de dados referentes
# àquele "valor de porte" específico)

# é possível fazer essa particionamento por uma ou mais colunas

In [None]:
spark.stop()