# Iniciando com Spark

## Importando Dados e ZipFile

In [0]:
path_socios = '/FileStore/tables/socios.csv'
path_empresas = '/FileStore/tables/empresas.csv'
path_estabelecimentos = '/FileStore/tables/estabelecimentos.csv'

## Carregando Dados

In [0]:
empresas = spark.read.csv(path_empresas, sep=';', inferSchema=True)
print('Em estabelecimentos temos: ', empresas.count(), " Totais de registros")


Em estabelecimentos temos:  917901  Totais de registros


In [0]:
socios = spark.read.csv(path_socios, sep=';', inferSchema=True)
print('Em socios temos: ', socios.count(), " Totais de registros")


Em socios temos:  204373  Totais de registros


In [0]:
estabelecimentos = spark.read.csv(path_estabelecimentos, sep=';', inferSchema=True)
print('Em socios temos: ', estabelecimentos.count(), " Totais de registros")

Em socios temos:  484305  Totais de registros


### Dados Públicos CNPJ
#### Receita Federal

> [Empresas](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/empresas.zip)
> 
> [Estabelecimentos](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/estabelecimentos.zip)
> 
> [Sócios](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/socios.zip)

[Fonte original dos dados](https://www.gov.br/receitafederal/pt-br/assuntos/orientacao-tributaria/cadastros/consultas/dados-publicos-cnpj)

---
[property SparkSession.read](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.read.html)

[DataFrameReader.csv(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameReader.csv.html)

# Manipulando Dados

## Operações Básicas

### Visualizando Dados

In [0]:
empresas.limit(5).show()

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|       4519|         DANIELA DA SILVA ...|             2135|                         50|                        0|               5|                       null|
|       8638|         JOAO DOS SANTOS F...|             2135|                         50|                        0|               5|                       null|
|      11748|         PANIFICADORA E CO...|             2062|                         49|                        0|               1|                       null|
|      12027|          L G SORVETE

In [0]:
socios.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10
0,11748,2,MARIO KATUMI HOSI,***504158**,49,19940530,,***000000**,,0,7
1,11748,2,ROBERTO YUKIO HOSI,***241578**,22,19940530,,***000000**,,0,7
2,13289,2,ANDREIA CRISTINA DELSIN,***787278**,65,20180615,,***000000**,,0,3
3,17389,2,MARCIA DO CANTO ARRUDA DAIER,***920408**,49,19940613,,***000000**,,0,7
4,19204,2,ALMIR CARLOS CAPELLINI,***299028**,49,19980908,,***000000**,,0,7


In [0]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,4519,1,48,1,GIRAFFAS,8,19950331,1,,,...,6219,,,,,,,,,
1,8638,1,79,1,AGROPECUARIA FAGUNDES,8,20150209,73,,,...,7255,,,,,,,,,
2,11748,1,90,1,,4,20181219,63,,,...,7097,,,,,,,,,
3,12027,1,2,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,13289,1,83,1,JS MATERIAIS DE CONSTRUCAO,2,20040123,0,,,...,6915,19.0,35811286.0,,,,,CONTATO@LEONECONTABIL.COM.BR,,


### Renomeando Colunas

In [0]:
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

for index, colName in enumerate(empresasColNames):
  empresas = empresas.withColumnRenamed(f"_c{index}", colName)

empresas.columns

Out[18]: ['cnpj_basico',
 'razao_social_nome_empresarial',
 'natureza_juridica',
 'qualificacao_do_responsavel',
 'capital_social_da_empresa',
 'porte_da_empresa',
 'ente_federativo_responsavel']

In [0]:
sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

for index, colName in enumerate(sociosColNames):
  socios = socios.withColumnRenamed(f"_c{index}", colName)

socios.columns

Out[19]: ['cnpj_basico',
 'identificador_de_socio',
 'nome_do_socio_ou_razao_social',
 'cnpj_ou_cpf_do_socio',
 'qualificacao_do_socio',
 'data_de_entrada_sociedade',
 'pais',
 'representante_legal',
 'nome_do_representante',
 'qualificacao_do_representante_legal',
 'faixa_etaria']

In [0]:
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']


for index, colName in enumerate(estabsColNames):
  estabelecimentos = estabelecimentos.withColumnRenamed(f"_c{index}", colName)

estabelecimentos.columns

Out[25]: ['cnpj_basico',
 'cnpj_ordem',
 'cnpj_dv',
 'identificador_matriz_filial',
 'nome_fantasia',
 'situacao_cadastral',
 'data_situacao_cadastral',
 'motivo_situacao_cadastral',
 'nome_da_cidade_no_exterior',
 'pais',
 'data_de_inicio_atividade',
 'cnae_fiscal_principal',
 'cnae_fiscal_secundaria',
 'tipo_de_logradouro',
 'logradouro',
 'numero',
 'complemento',
 'bairro',
 'cep',
 'uf',
 'municipio',
 'ddd_1',
 'telefone_1',
 'ddd_2',
 'telefone_2',
 'ddd_do_fax',
 'fax',
 'correio_eletronico',
 'situacao_especial',
 'data_da_situacao_especial']

## Analisando Dados

In [0]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [0]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [0]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: int

## Convertendo String -> Double

In [0]:
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [0]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [0]:
empresas = empresas.withColumn('capital_social_da_empresa', f.regexp_replace('capital_social_da_empresa', ',', '.'))

In [0]:
empresas = empresas.withColumn('capital_social_da_empresa', empresas['capital_social_da_empresa'].cast(DoubleType()))

In [0]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [0]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,4519,DANIELA DA SILVA CRUZ,2135,50,0.0,5,
1,8638,JOAO DOS SANTOS FAGUNDES,2135,50,0.0,5,
2,11748,PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO...,2062,49,0.0,1,
3,12027,L G SORVETERIA LTDA,2062,49,0.0,5,
4,13289,ANDREIA CRISTINA DELSIN EIRELI,2305,65,100000.0,1,


## Convertendo String -> Date

[Datetime Patterns](https://spark.apache.org/docs/3.1.2/sql-ref-datetime-pattern.html)

In [0]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,4519,1,48,1,GIRAFFAS,8,19950331,1,,,...,6219,,,,,,,,,
1,8638,1,79,1,AGROPECUARIA FAGUNDES,8,20150209,73,,,...,7255,,,,,,,,,
2,11748,1,90,1,,4,20181219,63,,,...,7097,,,,,,,,,
3,12027,1,2,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,13289,1,83,1,JS MATERIAIS DE CONSTRUCAO,2,20040123,0,,,...,6915,19.0,35811286.0,,,,,CONTATO@LEONECONTABIL.COM.BR,,


In [0]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: int

In [0]:
estabelecimentos = estabelecimentos\
            .withColumn(
            "data_situacao_cadastral",
            f.to_date(estabelecimentos.data_situacao_cadastral.cast(StringType()), 'yyyyMMdd')
            )\
            .withColumn(
            "data_de_inicio_atividade",
            f.to_date(estabelecimentos.data_de_inicio_atividade.cast(StringType()), 'yyyyMMdd')
            )\
            .withColumn(
            "data_da_situacao_especial",
            f.to_date(estabelecimentos.data_da_situacao_especial.cast(StringType()), 'yyyyMMdd')
            )
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: integer (

In [0]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,4519,1,48,1,GIRAFFAS,8,1995-03-31,1,,,...,6219,,,,,,,,,
1,8638,1,79,1,AGROPECUARIA FAGUNDES,8,2015-02-09,73,,,...,7255,,,,,,,,,
2,11748,1,90,1,,4,2018-12-19,63,,,...,7097,,,,,,,,,
3,12027,1,2,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
4,13289,1,83,1,JS MATERIAIS DE CONSTRUCAO,2,2004-01-23,0,,,...,6915,19.0,35811286.0,,,,,CONTATO@LEONECONTABIL.COM.BR,,


# Spark SQL

## Selecionando Informações

In [0]:
empresas\
    .select('*')\
    .show(5, False)

+-----------+---------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                      |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+---------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|4519       |DANIELA DA SILVA CRUZ                              |2135             |50                         |0.0                      |5               |null                       |
|8638       |JOAO DOS SANTOS FAGUNDES                           |2135             |50                         |0.0                      |5               |null                       |
|11748      |PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO LTDA|2062             |49

In [0]:
socios\
  .select('nome_do_socio_ou_razao_social','faixa_etaria')\
  .toPandas()

Unnamed: 0,nome_do_socio_ou_razao_social,faixa_etaria
0,MARIO KATUMI HOSI,7
1,ROBERTO YUKIO HOSI,7
2,ANDREIA CRISTINA DELSIN,3
3,MARCIA DO CANTO ARRUDA DAIER,7
4,ALMIR CARLOS CAPELLINI,7
...,...,...
204368,MARIA JERUSA RODRIGUES MARINHO,8
204369,JOSE PEDRO MACCARI,4
204370,JORGE LUIS DOS SANTOS MATTOSO,7
204371,ROSA HELENA MAURO BAPTISTA,6


In [0]:
#socios.select('nome_do_socio_ou_razao_social','faixa_etaria', f.year('data_entrada_sociedade').alias("Data Entrada")).show(6, False)

[Functions em Spark SQL](https://spark.apache.org/docs/3.1.2/api/python/)

In [0]:
data = [
    ('GISELLE PAULA GUIMARAES CASTRO', 15),
    ('ELAINE GARCIA DE OLIVEIRA', 22),
    ('JOAO CARLOS ABNER DE LOURDES', 43),
    ('MARTA ZELI FERREIRA', 24),
    ('LAUDENETE WIGGERS ROEDER', 51)
]
colNames = ['nome', 'idade']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

+------------------------------+-----+
|nome                          |idade|
+------------------------------+-----+
|GISELLE PAULA GUIMARAES CASTRO|15   |
|ELAINE GARCIA DE OLIVEIRA     |22   |
|JOAO CARLOS ABNER DE LOURDES  |43   |
|MARTA ZELI FERREIRA           |24   |
|LAUDENETE WIGGERS ROEDER      |51   |
+------------------------------+-----+



In [0]:
df \
    .select(
        f.concat_ws(
            ', ', 
            f.substring_index('nome', ' ', -1), 
            f.substring_index('nome', ' ', 1)
        ).alias('ident'), 
        'idade') \
    .show(truncate=False)

+-----------------+-----+
|ident            |idade|
+-----------------+-----+
|CASTRO, GISELLE  |15   |
|OLIVEIRA, ELAINE |22   |
|LOURDES, JOAO    |43   |
|FERREIRA, MARTA  |24   |
|ROEDER, LAUDENETE|51   |
+-----------------+-----+



## Identificando Valores Nulos

In [0]:
df = spark.createDataFrame([(1,), (2,), (3,), (None,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [0]:
df.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|null|
+----+



In [0]:
socios.limit(5).show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|      11748|                     2|            MARIO KATUMI HOSI|         ***504158**|                   49|                 19940530|null|        ***000000**|                 null|                                  0|           7|
|      11748|                     2|           ROBERTO YUKIO HOSI|      

In [0]:
socios.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in socios.columns]).show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+------+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|  pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+------+-------------------+---------------------+-----------------------------------+------------+
|          0|                     0|                           18|                 109|                    0|                        0|203586|                  0|               199229|                                  0|           0|
+-----------+----------------------+----------------------------

In [0]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [0]:
socios.na.fill('-')

Out[54]: DataFrame[cnpj_basico: int, identificador_de_socio: int, nome_do_socio_ou_razao_social: string, cnpj_ou_cpf_do_socio: string, qualificacao_do_socio: int, data_de_entrada_sociedade: int, pais: int, representante_legal: string, nome_do_representante: string, qualificacao_do_representante_legal: int, faixa_etaria: int]

In [0]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,11748,2,MARIO KATUMI HOSI,***504158**,49,19940530,,***000000**,,0,7
1,11748,2,ROBERTO YUKIO HOSI,***241578**,22,19940530,,***000000**,,0,7
2,13289,2,ANDREIA CRISTINA DELSIN,***787278**,65,20180615,,***000000**,,0,3
3,17389,2,MARCIA DO CANTO ARRUDA DAIER,***920408**,49,19940613,,***000000**,,0,7
4,19204,2,ALMIR CARLOS CAPELLINI,***299028**,49,19980908,,***000000**,,0,7


## Ordenando Dados

In [0]:
socios\
  .select('nome_do_socio_ou_razao_social', 'faixa_etaria')\
  .orderBy('faixa_etaria', ascending=False)\
  .show(5, False)

+-----------------------------+------------+
|nome_do_socio_ou_razao_social|faixa_etaria|
+-----------------------------+------------+
|PERCIVAL IGNACIO DE SOUZA    |9           |
|LEIZE PELACANI MARQUESI      |9           |
|JARDINO BINOTTO              |9           |
|YOUNG KI KIM                 |9           |
|JOAO DA MATA NOGUEIRA        |9           |
+-----------------------------+------------+
only showing top 5 rows



In [0]:
socios\
  .select('nome_do_socio_ou_razao_social', 'faixa_etaria')\
  .orderBy('faixa_etaria', descending=False)\
  .show(5, False)

+------------------------------------------+------------+
|nome_do_socio_ou_razao_social             |faixa_etaria|
+------------------------------------------+------------+
|G. P. SERVICE REMOCAO DE VEICULOS LTDA.   |0           |
|E & A EMPREENDIMENTOS E PARTICIPACOES LTDA|0           |
|PHOENIX CORPORACAO LTDA                   |0           |
|NINAS DOCES & FESTAS LTDA                 |0           |
|FIRENZE PARTICIPACOES SOCIETARIAS LTDA.   |0           |
+------------------------------------------+------------+
only showing top 5 rows



## Filtrando Dados

In [0]:
empresas\
  .where("capital_social_da_empresa==50")\
  .show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|   14715041|         LIDIANE MARIA DO ...|             2135|                         50|                     50.0|               1|                       null|
|   20601885|         CRISTIANO AKIHITO...|             2135|                         50|                     50.0|               1|                       null|
|   23661983|         VITOR ALOISIO DO ...|             2135|                         50|                     50.0|               1|                       null|
|   23714726|         JOSELINA PAN

In [0]:
empresas\
  .where("capital_social_da_empresa>=50")\
  .show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|      13289|         ANDREIA CRISTINA ...|             2305|                         65|                 100000.0|               1|                       null|
|      28664|         M ROCHA COML IMPO...|             2062|                         49|                 100000.0|               5|                       null|
|      65867|         PADADRIA POLITEAM...|             2062|                         49|                  30000.0|               1|                       null|
|      73091|         ATELIE INFAN

In [0]:
socios\
  .select("nome_do_socio_ou_razao_social")\
  .filter(socios.nome_do_socio_ou_razao_social.startswith("RODRIGO"))\
  .filter(socios.nome_do_socio_ou_razao_social.endswith("DIAS"))\
  .limit(10)\
  .toPandas()

Unnamed: 0,nome_do_socio_ou_razao_social
0,RODRIGO ALVES DIAS
1,RODRIGO PEREIRA DIAS
2,RODRIGO SANTOS DIAS
3,RODRIGO OLIVEIRA DIAS
4,RODRIGO ROSENBLIT COLACO DIAS
5,RODRIGO PEDRO DIAS
6,RODRIGO FERNANDO DE MEDEIROS DIAS
7,RODRIGO DA RESSURREICAO DIAS


## Usando o comando Like

In [0]:
df = spark.createDataFrame([('RESTAURANTE DO RUI',), ('Juca restaurantes ltds',), ('Joca Restaurante',)], ['data'])
df.toPandas()

Unnamed: 0,data
0,RESTAURANTE DO RUI
1,Juca restaurantes ltds
2,Joca Restaurante


In [0]:
df\
  .where(f.upper(df.data).like('%RESTAURANTE'))\
  .show(truncate=False)

+----------------+
|data            |
+----------------+
|Joca Restaurante|
+----------------+



In [0]:
df\
  .where(f.upper(df.data).like('%RESTAURANTE%'))\
  .show(truncate=False)

+----------------------+
|data                  |
+----------------------+
|RESTAURANTE DO RUI    |
|Juca restaurantes ltds|
|Joca Restaurante      |
+----------------------+



In [0]:
df\
  .where(f.upper(df.data).like('RESTAURANTE%'))\
  .show(truncate=False)

+------------------+
|data              |
+------------------+
|RESTAURANTE DO RUI|
+------------------+



# Agregações e Junções
---

[DataFrame.groupBy(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html)

[DataFrame.agg(*exprs)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.agg.html)

[DataFrame.summary(*statistics)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.summary.html)

> Funções:
[approx_count_distinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.approx_count_distinct.html) | 
[avg](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.avg.html) | 
[collect_list](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.collect_list.html) | 
[collect_set](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.collect_set.html) | 
[countDistinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.countDistinct.html) | 
[count](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.count.html) | 
[grouping](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.grouping.html) | 
[first](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.first.html) | 
[last](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.last.html) | 
[kurtosis](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.kurtosis.html) | 
[max](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.max.html) | 
[min](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.min.html) | 
[mean](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.mean.html) | 
[skewness](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.skewness.html) | 
[stddev ou stddev_samp](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.stddev.html) | 
[stddev_pop](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.stddev_pop.html) | 
[sum](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.sum.html) | 
[sumDistinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.sumDistinct.html) | 
[variance ou var_samp](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.variance.html) | 
[var_pop](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.var_pop.html)

## Sumarizando Dados

In [0]:
socios\
   .select(f.year('data_de_entrada_sociedade'))\
   .where('ano_de_entrada >= 2010')\
   .groupBy('ano_de_entrada')\
   .cout()\
   .orderBy('ano_de_entrada', ascending=True)\
   .show()

In [0]:
empresas\
   .select('cnpj_basico', 'porte_da_empresa', 'capital_social_da_empresa')\
   .groupBy('porte_da_empresa')\
   .agg(
        f.avg("capital_social_da_empresa").alias("capital_social_medio"),
        f.count("cnpj_basico").alias("frequencia")
    )\
   .orderBy('porte_da_empresa', ascending=True)\
   .show()

+----------------+--------------------+----------+
|porte_da_empresa|capital_social_medio|frequencia|
+----------------+--------------------+----------+
|            null|  42.444821731748725|      1259|
|               1|  160602.71555333387|    626150|
|               3|  149052.41008177216|     23113|
|               5|   701385.7030501274|    267379|
+----------------+--------------------+----------+



In [0]:
empresas\
    .select('capital_social_da_empresa')\
    .summary()\
    .show()


+-------+-------------------------+
|summary|capital_social_da_empresa|
+-------+-------------------------+
|  count|                   917820|
|   mean|        317646.4847070014|
| stddev|      6.639108296478236E7|
|    min|                      0.0|
|    25%|                      0.0|
|    50%|                   1000.0|
|    75%|                   7000.0|
|    max|          5.2202625985E10|
+-------+-------------------------+



## Juntando DataFrames - Joins

[DataFrame.join(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.join.html)

In [0]:
produtos = spark.createDataFrame(
    [
        ('1', 'Bebidas', 'Água Mineral'),
        ('2', 'Limpeza', 'Sabão em pó'),
        ('3', 'Frios', 'Queijo'),
        ('4', 'Bebidas', 'Refrigerante'),
        ('5', 'Pet', 'Ração para Cães')
    ],
    ['id', 'cat', 'prod']
)

impostos = spark.createDataFrame(
    [
        ('Bebidas', 0.15),
        ('Limpeza', 0.05),
        ('Frios', 0.065),
        ('Carnes', 0.08)
    ],
    ['cat', 'tax']
)

In [0]:
produtos.show()

+---+-------+---------------+
| id|    cat|           prod|
+---+-------+---------------+
|  1|Bebidas|   Água Mineral|
|  2|Limpeza|    Sabão em pó|
|  3|  Frios|         Queijo|
|  4|Bebidas|   Refrigerante|
|  5|    Pet|Ração para Cães|
+---+-------+---------------+



In [0]:
impostos.show()

+-------+-----+
|    cat|  tax|
+-------+-----+
|Bebidas| 0.15|
|Limpeza| 0.05|
|  Frios|0.065|
| Carnes| 0.08|
+-------+-----+



In [0]:
produtos.join(impostos, 'cat', how='inner')\
    .sort('id')\
    .show()

+-------+---+------------+-----+
|    cat| id|        prod|  tax|
+-------+---+------------+-----+
|Bebidas|  1|Água Mineral| 0.15|
|Limpeza|  2| Sabão em pó| 0.05|
|  Frios|  3|      Queijo|0.065|
|Bebidas|  4|Refrigerante| 0.15|
+-------+---+------------+-----+



In [0]:
produtos.join(impostos, 'cat', how='left')\
    .sort('id')\
    .show()

+-------+---+---------------+-----+
|    cat| id|           prod|  tax|
+-------+---+---------------+-----+
|Bebidas|  1|   Água Mineral| 0.15|
|Limpeza|  2|    Sabão em pó| 0.05|
|  Frios|  3|         Queijo|0.065|
|Bebidas|  4|   Refrigerante| 0.15|
|    Pet|  5|Ração para Cães| null|
+-------+---+---------------+-----+



In [0]:
produtos.join(impostos, 'cat', how='right')\
    .sort('id')\
    .show()

+-------+----+------------+-----+
|    cat|  id|        prod|  tax|
+-------+----+------------+-----+
| Carnes|null|        null| 0.08|
|Bebidas|   1|Água Mineral| 0.15|
|Limpeza|   2| Sabão em pó| 0.05|
|  Frios|   3|      Queijo|0.065|
|Bebidas|   4|Refrigerante| 0.15|
+-------+----+------------+-----+



In [0]:
produtos.join(impostos, 'cat', how='outer')\
    .sort('id')\
    .show()

+-------+----+---------------+-----+
|    cat|  id|           prod|  tax|
+-------+----+---------------+-----+
| Carnes|null|           null| 0.08|
|Bebidas|   1|   Água Mineral| 0.15|
|Limpeza|   2|    Sabão em pó| 0.05|
|  Frios|   3|         Queijo|0.065|
|Bebidas|   4|   Refrigerante| 0.15|
|    Pet|   5|Ração para Cães| null|
+-------+----+---------------+-----+



In [0]:
produtos.join(impostos, 'cat', how='full')\
    .sort('id')\
    .show()

+-------+----+---------------+-----+
|    cat|  id|           prod|  tax|
+-------+----+---------------+-----+
| Carnes|null|           null| 0.08|
|Bebidas|   1|   Água Mineral| 0.15|
|Limpeza|   2|    Sabão em pó| 0.05|
|  Frios|   3|         Queijo|0.065|
|Bebidas|   4|   Refrigerante| 0.15|
|    Pet|   5|Ração para Cães| null|
+-------+----+---------------+-----+



## SparkSQL

[SparkSession.sql(sqlQuery)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.sql.html)

Para saber mais sobre performance: [Artigo - Spark RDDs vs DataFrames vs SparkSQL](https://community.cloudera.com/t5/Community-Articles/Spark-RDDs-vs-DataFrames-vs-SparkSQL/ta-p/246547)

In [0]:
# Criando uma view temporária

empresas.createOrReplaceTempView("empresasView")

In [0]:
spark.sql("SELECT * FROM empresasView").show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|       4519|         DANIELA DA SILVA ...|             2135|                         50|                      0.0|               5|                       null|
|       8638|         JOAO DOS SANTOS F...|             2135|                         50|                      0.0|               5|                       null|
|      11748|         PANIFICADORA E CO...|             2062|                         49|                      0.0|               1|                       null|
|      12027|          L G SORVETE

In [0]:
spark.sql("SELECT * FROM empresasView where razao_social_nome_empresarial like 'DANIELA%'").show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|       4519|         DANIELA DA SILVA ...|             2135|                         50|                      0.0|               5|                       null|
|    1404514|         DANIELA SOUZA ROD...|             2135|                         50|                      0.0|               5|                       null|
|   10480038|         DANIELA APARECIDA...|             2135|                         50|                  10000.0|               1|                       null|
|   12084021|         DANIELA APAR

In [0]:
spark\
    .sql("""
        SELECT 
            * 
         FROM
             empresasView 
          WHERE 
              capital_social_da_empresa = 50
    """)\
    .show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|   14715041|         LIDIANE MARIA DO ...|             2135|                         50|                     50.0|               1|                       null|
|   20601885|         CRISTIANO AKIHITO...|             2135|                         50|                     50.0|               1|                       null|
|   23661983|         VITOR ALOISIO DO ...|             2135|                         50|                     50.0|               1|                       null|
|   23714726|         JOSELINA PAN

# Formas de Armazenamento

## Arquivos CSV

[property DataFrame.write](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.write.html)

[DataFrameWriter.csv(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.csv.html)

In [0]:
empresas.write.csv(
    path='/FileStore/tables/receita/csv',
    mode='overwrite,
    sep=';'
    header=True
)

## Arquivos PARQUET

[Apache Parquet](https://parquet.apache.org/)

[DataFrameWriter.parquet(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.parquet.html)

In [0]:
empresas.write.parquet(
    path='/FileStore/tables/receita/csv',
    mode='overwrite,
)

In [0]:
empresas_parquet = spark.read.parquet(
    '/FileStore/tables/receita/csv'
)