<a href="https://colab.research.google.com/github/hsantos1985/spark_projects/blob/main/SCRIPT_FUNDAMENTOS_SPARK.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##PROJETO

O projeto consiste em ler, manipular, tratar e salvar um conjunto de dados volumosos utilizando como ferramenta o Spark.

##INICIALIZAÇÃO DO SPARK

In [9]:
# instalar as dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [10]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [11]:
import findspark
findspark.init()

# Carregamento de Dados
---

## [SparkSession](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.html)


In [12]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .getOrCreate()

In [13]:
spark

## Acessando o [Spark UI](https://spark.apache.org/docs/3.1.2/web-ui.html)

---

(Google Colab)

In [14]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
     .master('local[*]')\
    .appName("Iniciando com Spark")\
    .config('spark.ui.port', '4050')\
    .getOrCreate()

## Carregamento de dados

### Dados Públicos CNPJ
#### Receita Federal

> [Empresas](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/empresas.zip)
>
> [Estabelecimentos](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/estabelecimentos.zip)
>
> [Sócios](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/socios.zip)

[Fonte original dos dados](https://www.gov.br/receitafederal/pt-br/assuntos/orientacao-tributaria/cadastros/consultas/dados-publicos-cnpj)

---
[property SparkSession.read](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.read.html)

[DataFrameReader.csv(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameReader.csv.html)


In [15]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


##CARREGANDO OS DADOS

###EMPRESAS

In [16]:
import zipfile

In [17]:
zipfile.ZipFile('/content/drive/MyDrive/After_2023/13_Alura_cursos/03_CURSO_SPARK_INTRODUCAO/empresas.zip','r').extractall('/content/drive/MyDrive/After_2023/13_Alura_cursos/03_CURSO_SPARK_INTRODUCAO')

In [18]:
path='/content/drive/MyDrive/After_2023/13_Alura_cursos/03_CURSO_SPARK_INTRODUCAO/empresas'


In [19]:
empresas=spark.read.csv(path,sep=';',inferSchema=True)

In [20]:
empresas.count()

4585679

###ESTABELECIMENTO

In [21]:
zipfile.ZipFile('/content/drive/MyDrive/After_2023/13_Alura_cursos/03_CURSO_SPARK_INTRODUCAO/estabelecimentos.zip','r').extractall('/content/drive/MyDrive/After_2023/13_Alura_cursos/03_CURSO_SPARK_INTRODUCAO')

In [22]:
path2='/content/drive/MyDrive/After_2023/13_Alura_cursos/03_CURSO_SPARK_INTRODUCAO/estabelecimentos'

In [23]:
estabelecimentos=spark.read.csv(path2,sep=';',inferSchema=True)

In [24]:
estabelecimentos.count()

4836219

###SOCIOS

In [25]:
zipfile.ZipFile('/content/drive/MyDrive/After_2023/13_Alura_cursos/03_CURSO_SPARK_INTRODUCAO/socios.zip','r').extractall('/content/drive/MyDrive/After_2023/13_Alura_cursos/03_CURSO_SPARK_INTRODUCAO')

In [26]:
path3='/content/drive/MyDrive/After_2023/13_Alura_cursos/03_CURSO_SPARK_INTRODUCAO/socios'

In [27]:
socios=spark.read.csv(path3,sep=';',inferSchema=True)

In [28]:
socios.count()

2046430

# Manipulando os Dados
---

## Operações básicas

### Renomeando as colunas do DataFrame

###Empresas

In [29]:
#verificando o df de empresas
empresas.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [30]:
#colunas do df empresas
empresas.columns

['_c0', '_c1', '_c2', '_c3', '_c4', '_c5', '_c6']

In [31]:
#novas as colunas
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

In [32]:
#iterando sobre o nome das colunas noas para inserri no df empresas
for number,coluna in enumerate(empresasColNames):
  empresas=empresas.withColumnRenamed(f"_c{number}", coluna)

In [33]:
empresas.columns

['cnpj_basico',
 'razao_social_nome_empresarial',
 'natureza_juridica',
 'qualificacao_do_responsavel',
 'capital_social_da_empresa',
 'porte_da_empresa',
 'ente_federativo_responsavel']

In [34]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [35]:
#verificando o tipo das colunas
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



###Estabelecimento

In [36]:
#verificando o df estabelecimentos
estabelecimentos.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,...,_c20,_c21,_c22,_c23,_c24,_c25,_c26,_c27,_c28,_c29
0,1879,1,96,1,PIRAMIDE M. C.,8,20011029,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,20081231,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,19971231,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,19980429,1,,,...,7075,,,,,,,,,


In [37]:
#colunas do df estabelecimentos
estabelecimentos.columns

['_c0',
 '_c1',
 '_c2',
 '_c3',
 '_c4',
 '_c5',
 '_c6',
 '_c7',
 '_c8',
 '_c9',
 '_c10',
 '_c11',
 '_c12',
 '_c13',
 '_c14',
 '_c15',
 '_c16',
 '_c17',
 '_c18',
 '_c19',
 '_c20',
 '_c21',
 '_c22',
 '_c23',
 '_c24',
 '_c25',
 '_c26',
 '_c27',
 '_c28',
 '_c29']

In [38]:
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

In [39]:
#iterando sobre o nome das colunas noas para inserri no df empresas
for number,coluna in enumerate(estabsColNames):
  estabelecimentos=estabelecimentos.withColumnRenamed(f"_c{number}", coluna)

In [40]:
estabelecimentos.columns

['cnpj_basico',
 'cnpj_ordem',
 'cnpj_dv',
 'identificador_matriz_filial',
 'nome_fantasia',
 'situacao_cadastral',
 'data_situacao_cadastral',
 'motivo_situacao_cadastral',
 'nome_da_cidade_no_exterior',
 'pais',
 'data_de_inicio_atividade',
 'cnae_fiscal_principal',
 'cnae_fiscal_secundaria',
 'tipo_de_logradouro',
 'logradouro',
 'numero',
 'complemento',
 'bairro',
 'cep',
 'uf',
 'municipio',
 'ddd_1',
 'telefone_1',
 'ddd_2',
 'telefone_2',
 'ddd_do_fax',
 'fax',
 'correio_eletronico',
 'situacao_especial',
 'data_da_situacao_especial']

In [41]:
#verificando o tipo das colunas
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str

###Socios

In [42]:
#verificando o df socios
socios.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,19940609,,***000000**,,0,8


In [43]:
#colunas do df socios
socios.columns

['_c0', '_c1', '_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9', '_c10']

In [44]:
sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

In [45]:
#iterando sobre o nome das colunas noas para inserri no df socios
for number,coluna in enumerate(sociosColNames):
 socios=socios.withColumnRenamed(f"_c{number}", coluna)

In [46]:
socios.columns

['cnpj_basico',
 'identificador_de_socio',
 'nome_do_socio_ou_razao_social',
 'cnpj_ou_cpf_do_socio',
 'qualificacao_do_socio',
 'data_de_entrada_sociedade',
 'pais',
 'representante_legal',
 'nome_do_representante',
 'qualificacao_do_representante_legal',
 'faixa_etaria']

In [47]:
#verificando o tipo das colunas
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



## Modificando os tipos de dados

[Functions](https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.sql.html#functions)

[withColumn](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html)

### Convertendo String ➔ Double

#### `StringType ➔ DoubleType`

In [48]:
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [49]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [50]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [51]:
#substituindo a virgula por ponto
empresas=empresas.withColumn('capital_social_da_empresa',f.regexp_replace('capital_social_da_empresa',',','.'))

In [52]:
#mudando o tipo da coluna
empresas=empresas.withColumn('capital_social_da_empresa',empresas['capital_social_da_empresa'].cast(DoubleType()))


In [53]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



### Convertendo String ➔ Date
### Estabelecimentos

In [54]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str

In [55]:
#modificando as colunas de data de string para date
estabelecimentos=estabelecimentos\
  .withColumn(
      "data_situacao_cadastral",
      f.to_date(estabelecimentos.data_situacao_cadastral.cast(StringType()),'yyyyMMdd')
  )\
   .withColumn(
      "data_de_inicio_atividade",
      f.to_date(estabelecimentos.data_de_inicio_atividade.cast(StringType()),'yyyyMMdd')
  )\
   .withColumn(
      "data_da_situacao_especial",
      f.to_date(estabelecimentos.data_da_situacao_especial.cast(StringType()),'yyyyMMdd')
  )


In [56]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

### Convertendo String ➔ Date
### Socios

In [57]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [58]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,19940609,,***000000**,,0,8


In [59]:
socios=socios\
  .withColumn(
      "data_de_entrada_sociedade",
      f.to_date(socios.data_de_entrada_sociedade.cast(StringType()),'yyyyMMdd')
  )

In [60]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8


In [61]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



# Seleções e consultas
---

## Selecionando informações

[DataFrame.select(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.select.html)

In [62]:
#selecionando algumas colunas
empresas\
  .select('*')\
  .show(10, False)

+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                                                               |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|306        |FRANCAMAR REFRIGERACAO TECNICA S/C LTDA                                                     |2240             |49                         |0.0                      |1               |null                       |
|1355       |BRASILEIRO & OLIVEIRA LTDA                                                                 

In [63]:
empresas\
  .select('natureza_juridica', 'porte_da_empresa')\
  .show(10, False)

+-----------------+----------------+
|natureza_juridica|porte_da_empresa|
+-----------------+----------------+
|2240             |1               |
|2062             |5               |
|3034             |5               |
|2135             |5               |
|2062             |1               |
|2062             |5               |
|2062             |5               |
|2062             |5               |
|2135             |5               |
|2135             |5               |
+-----------------+----------------+
only showing top 10 rows



In [64]:
socios\
  .select('cnpj_basico','qualificacao_do_socio','pais',f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
  .show(5,False)

+-----------+---------------------+----+--------------+
|cnpj_basico|qualificacao_do_socio|pais|ano_de_entrada|
+-----------+---------------------+----+--------------+
|411        |22                   |null|1994          |
|411        |28                   |null|1994          |
|5813       |49                   |null|1994          |
|5813       |49                   |null|1994          |
|14798      |49                   |null|1994          |
+-----------+---------------------+----+--------------+
only showing top 5 rows



In [65]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [66]:
estabelecimentos\
  .select('nome_fantasia','municipio',f.year('data_de_inicio_atividade').alias('ano_de_inicio_atividade')\
          ,f.month('data_de_inicio_atividade').alias('mes_de_inicio_atividade'))\
  .show(5,False)

+-----------------+---------+-----------------------+-----------------------+
|nome_fantasia    |municipio|ano_de_inicio_atividade|mes_de_inicio_atividade|
+-----------------+---------+-----------------------+-----------------------+
|PIRAMIDE M. C.   |7107     |1994                   |5                      |
|null             |7107     |1994                   |5                      |
|null             |7107     |1994                   |5                      |
|null             |7107     |1994                   |5                      |
|EMBROIDERY & GIFT|7075     |1995                   |5                      |
+-----------------+---------+-----------------------+-----------------------+
only showing top 5 rows



## Identificando valores nulos

In [67]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8


In [68]:
socios.limit(6).show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|        411|                     2|         LILIANA PATRICIA ...|         ***678188**|                   22|               1994-07-25|null|        ***000000**|                 null|                                  0|           7|
|        411|                     2|         CRISTINA HUNDERTMARK|      

In [69]:
socios.select()

DataFrame[]

In [70]:
#verificando os valores nulos
socios.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in socios.columns]).show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|   pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|          0|                     0|                          208|                1234|                    0|                        1|2038255|                  0|              1995432|                                  0|           0|
+-----------+----------------------+------------------------

In [71]:
#ESTABELECIMENTOS
estabelecimentos.limit(10).show()

+-----------+----------+-------+---------------------------+-----------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+--------------------+------+-------------------+------------------+-------+---+---------+-----+----------+-----+----------+----------+--------+--------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|    nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|          logradouro|numero|        complemento|            bairro|    cep| uf|municipio|ddd_1|telefone_1|ddd_2|telefone_2|ddd_do_fax|     fax|  correio_eletronico|situacao_especial|data_da_situacao_especial|
+-----------+----------+-------+--------------

In [72]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,2001-10-29,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,1997-12-31,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,1998-04-29,1,,,...,7075,,,,,,,,,


In [73]:
#verificando os valores nulos
estabelecimentos.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in estabelecimentos.columns]).show()

+-----------+----------+-------+---------------------------+-------------+------------------+-----------------------+-------------------------+--------------------------+-------+------------------------+---------------------+----------------------+------------------+----------+------+-----------+------+-----+---+---------+-------+----------+-------+----------+----------+-------+------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|   pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|logradouro|numero|complemento|bairro|  cep| uf|municipio|  ddd_1|telefone_1|  ddd_2|telefone_2|ddd_do_fax|    fax|correio_eletronico|situacao_especial|data_da_situacao_especial|
+-----------+----------+-------+---------------------------+-------------+------------------+-----------------

# Seleções e consultas
---

In [74]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [75]:
socios\
  .select('nome_do_socio_ou_razao_social','faixa_etaria',f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
  .orderBy('ano_de_entrada', ascending=True)\
  .show(6,)

+-----------------------------+------------+--------------+
|nome_do_socio_ou_razao_social|faixa_etaria|ano_de_entrada|
+-----------------------------+------------+--------------+
|         JOAO FRANCISCO DE...|           3|          null|
|         MARIA SILENE BEZE...|           8|          1900|
|         NAIR YOKO HIRAI T...|           7|          1900|
|         JOSE NELSON VIEIR...|           6|          1901|
|         VALMAR CARDOSO DE...|           5|          1901|
|         ANTERO DA SILVA R...|           9|          1918|
+-----------------------------+------------+--------------+
only showing top 6 rows



In [76]:
socios\
  .select('nome_do_socio_ou_razao_social','faixa_etaria',f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
  .orderBy(['ano_de_entrada','faixa_etaria'], ascending=[False,False])\
  .show(6,)

+-----------------------------+------------+--------------+
|nome_do_socio_ou_razao_social|faixa_etaria|ano_de_entrada|
+-----------------------------+------------+--------------+
|         MARIA RAIMUNDA DO...|           9|          2021|
|         ZELIA MARIA CAMAR...|           9|          2021|
|         DORIS PEREIRA GOM...|           9|          2021|
|         NADIR BICHARA CHUAHY|           9|          2021|
|         RAIMUNDA TORRES M...|           9|          2021|
|                JOSE DA SILVA|           9|          2021|
+-----------------------------+------------+--------------+
only showing top 6 rows



## Filtrando os dados

In [77]:

socios\
    .select("nome_do_socio_ou_razao_social")\
        .filter(socios.nome_do_socio_ou_razao_social.startswith("HEITOR"))\
        .filter(socios.nome_do_socio_ou_razao_social.endswith("SANTOS"))\
        .limit(10)\
        .toPandas()


Unnamed: 0,nome_do_socio_ou_razao_social
0,HEITOR MORAIS DOS SANTOS
1,HEITOR DE OLIVEIRA SANTOS
2,HEITOR SILOS DOS SANTOS
3,HEITOR AUGUSTO LOBO SANTOS
4,HEITOR GABRIEL TRAJANO SANTOS
5,HEITOR SAVALA DOS SANTOS
6,HEITOR CORDIBELI DOS SANTOS
7,HEITOR VIEIRA MELLO DOS SANTOS
8,HEITOR ESPINATO OLIVEIRA DOS SANTOS
9,HEITOR AMARAL SANTOS


## O comando LIKE

In [78]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [79]:
empresas\
  .select('razao_social_nome_empresarial','natureza_juridica','porte_da_empresa','capital_social_da_empresa')\
  .filter(f.upper(empresas['razao_social_nome_empresarial']).like('%OFICINA%'))\
  .show(10,False)

+-------------------------------------------------------------+-----------------+----------------+-------------------------+
|razao_social_nome_empresarial                                |natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-------------------------------------------------------------+-----------------+----------------+-------------------------+
|MOBI DICK OFICINA DE INFLAVEIS LTDA                          |2062             |5               |0.0                      |
|OFICINA MECANICA BAGE LTDA                                   |2062             |5               |0.0                      |
|OFICINA DO CABELO EIRELI                                     |2305             |1               |100000.0                 |
|ANTONIO PRETI OFICINA MECANICA                               |2135             |1               |3000.0                   |
|OFICINA H2 - CENTRO DE MANUTENCAO AUTOMOTIVA DE CAMACARI LTDA|2062             |1               |100000.0                 |


# Agregações e Junções
---

In [80]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [81]:
#agregando o df socios
socios\
  .select(f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
  .where('ano_de_entrada>=2010')\
  .groupBy('ano_de_entrada')\
  .count()\
  .orderBy('ano_de_entrada',ascending=True)\
  .show()

+--------------+------+
|ano_de_entrada| count|
+--------------+------+
|          2010| 79337|
|          2011| 83906|
|          2012| 80101|
|          2013| 83919|
|          2014| 80590|
|          2015| 80906|
|          2016| 81587|
|          2017| 90221|
|          2018| 99935|
|          2019|118248|
|          2020|125927|
|          2021| 56316|
+--------------+------+



In [82]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [83]:
#agregando o df empresas
empresas\
  .select('cnpj_basico','porte_da_empresa','capital_social_da_empresa')\
  .groupBy('porte_da_empresa')\
  .agg(
      f.avg('capital_social_da_empresa').alias('capital_social_medio'),
      f.count('cnpj_basico').alias('frequencia')
  )\
  .orderBy('porte_da_empresa',ascending=True)\
  .show()

+----------------+--------------------+----------+
|porte_da_empresa|capital_social_medio|frequencia|
+----------------+--------------------+----------+
|            null|    8.35421888053467|      5985|
|               1|  339994.53313506936|   3129043|
|               3|  2601001.7677092673|    115151|
|               5|   708660.4208249798|   1335500|
+----------------+--------------------+----------+



In [84]:
#utilizando o describe
empresas\
  .select('capital_social_da_empresa')\
  .summary()\
  .show()

+-------+-------------------------+
|summary|capital_social_da_empresa|
+-------+-------------------------+
|  count|                  4585679|
|   mean|        503694.5478542675|
| stddev|     2.1118691490537405E8|
|    min|                      0.0|
|    25%|                      0.0|
|    50%|                   1000.0|
|    75%|                   7000.0|
|    max|         3.22014670262E11|
+-------+-------------------------+



In [85]:
empresas\
  .select('capital_social_da_empresa')\
  .summary('count')\
  .show()

+-------+-------------------------+
|summary|capital_social_da_empresa|
+-------+-------------------------+
|  count|                  4585679|
+-------+-------------------------+



In [86]:
#utilizando o describe
socios\
  .select('faixa_etaria')\
  .summary()\
  .show()

+-------+-----------------+
|summary|     faixa_etaria|
+-------+-----------------+
|  count|          2046430|
|   mean|5.396815918453111|
| stddev|1.687799526189411|
|    min|                0|
|    25%|                4|
|    50%|                5|
|    75%|                7|
|    max|                9|
+-------+-----------------+



In [87]:
#utilizando o describe
estabelecimentos\
  .select('municipio')\
  .summary()\
  .show()

+-------+-----------------+
|summary|        municipio|
+-------+-----------------+
|  count|          4836219|
|   mean|5797.744195620587|
| stddev| 2584.18571231171|
|    min|                1|
|    25%|             4123|
|    50%|             6389|
|    75%|             7535|
|    max|             9997|
+-------+-----------------+



# Consultas em SQL
---

In [88]:
empresas.createOrReplaceTempView("empresasView")

In [89]:
spark.sql("SELECT * FROM empresasView").show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                         49|                      0.0|               1|                       null|
|       1355|         BRASILEIRO & OLIV...|             2062|                         49|                      0.0|               5|                       null|
|       4820|         REGISTRO DE IMOVE...|             3034|                         32|                      0.0|               5|                       null|
|       5347|         ROSELY APARE

In [90]:
spark\
    .sql("""
        SELECT *
            FROM empresasView
            WHERE capital_social_da_empresa = 50
    """)\
    .show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|   17350147|         ERIK MARCELO DOS ...|             2135|                         50|                     50.0|               1|                       null|
|   17833214|         ALEXANDRE MACHADO...|             2135|                         50|                     50.0|               1|                       null|
|   20860830|         YASMIN MOURA DA F...|             2135|                         50|                     50.0|               1|                       null|
|   22242856|         JOAO CESAR M

In [91]:
spark\
.sql("""
SELECT * FROM empresasView
WHERE capital_social_da_empresa=50

"""



)\
.show(10)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|   17350147|         ERIK MARCELO DOS ...|             2135|                         50|                     50.0|               1|                       null|
|   17833214|         ALEXANDRE MACHADO...|             2135|                         50|                     50.0|               1|                       null|
|   20860830|         YASMIN MOURA DA F...|             2135|                         50|                     50.0|               1|                       null|
|   22242856|         JOAO CESAR M

In [92]:
spark\
    .sql("""
        SELECT porte_da_empresa, MEAN(capital_social_da_empresa) AS Media
            FROM empresasView
            GROUP BY porte_da_empresa
    """)\
    .show(5)

+----------------+------------------+
|porte_da_empresa|             Media|
+----------------+------------------+
|            null|  8.35421888053467|
|               1|339994.53313506936|
|               3|2601001.7677092673|
|               5| 708660.4208249798|
+----------------+------------------+



In [108]:
spark.stop()