# Carregamento de Dados
---

## [SparkSession](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.html)

O ponto de entrada para programar o Spark com a API Dataset e DataFrame.

Uma SparkSession pode ser utilizada para criar DataFrames, registrar DataFrames como tabelas, executar consultas SQL em tabelas, armazenar em cache e ler arquivos parquet. Para criar uma SparkSession, use o seguinte padrão de construtor:

In [1]:
import os
os.environ['SPARK_HOME'] = r'C:\Spark\spark-3.1.2-bin-hadoop2.7'
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jre1.8.0_421'

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .getOrCreate()


In [4]:
spark

## DataFrames com Spark


### Interfaces Spark

Existem três interfaces principais do Apache Spark que você deve conhecer: Resilient Distributed Dataset, DataFrame e Dataset.

- **Resilient Distributed Dataset**: A primeira abstração do Apache Spark foi o Resilient Distributed Dataset (RDD). É uma interface para uma sequência de objetos de dados que consiste em um ou mais tipos localizados em uma coleção de máquinas (um cluster). Os RDDs podem ser criados de várias maneiras e são a API de “nível mais baixo” disponível. Embora esta seja a estrutura de dados original do Apache Spark, você deve se concentrar na API DataFrame, que é um superconjunto da funcionalidade RDD. A API RDD está disponível nas linguagens Java, Python e Scala.

- **DataFrame**: Trata-se de um conceito similar ao DataFrame que você pode estar familiarizado como o pacote pandas do Python e a linguagem R . A API DataFrame está disponível nas linguagens Java, Python, R e Scala.

- **Dataset**: uma combinação de DataFrame e RDD. Ele fornece a interface digitada que está disponível em RDDs enquanto fornece a conveniência do DataFrame. A API Dataset está disponível nas linguagens Java e Scala.

Em muitos cenários, especialmente com as otimizações de desempenho incorporadas em DataFrames e Datasets, não será necessário trabalhar com RDDs. Mas é importante entender a abstração RDD porque:

- O RDD é a infraestrutura subjacente que permite que o Spark seja executado com tanta rapidez e forneça a linhagem de dados.

- Se você estiver mergulhando em componentes mais avançados do Spark, pode ser necessário usar RDDs.

- As visualizações na Spark UI fazem referência a RDDs.

In [5]:
data = [('Zeca','35'), ('Eva', '29')]
colNames = ['Nome', 'Idade']
df = spark.createDataFrame(data, colNames)

In [6]:
df.printSchema()

root
 |-- Nome: string (nullable = true)
 |-- Idade: string (nullable = true)



In [7]:
df.show()

+----+-----+
|Nome|Idade|
+----+-----+
|Zeca|   35|
| Eva|   29|
+----+-----+



In [8]:
df.toPandas()

Unnamed: 0,Nome,Idade
0,Zeca,35
1,Eva,29


## Projeto

Nosso projeto consiste em ler, manipular, tratar e salvar um conjunto de dados volumosos utilizando como ferramenta o Spark.

## Carregamento de dados

### Dados Públicos CNPJ
#### Receita Federal

> [Empresas](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/empresas.zip)
>
> [Estabelecimentos](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/estabelecimentos.zip)
>
> [Sócios](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/socios.zip)

[Fonte original dos dados](https://www.gov.br/receitafederal/pt-br/assuntos/orientacao-tributaria/cadastros/consultas/dados-publicos-cnpj)

---
[property SparkSession.read](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.read.html)

[DataFrameReader.csv(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameReader.csv.html)


### Carregando os dados das empresas, estabelecimentos e socios

In [9]:
import zipfile

pathDefault = r'H:\Projetos\PySpark-Data-Science-Intro\data'
empresasZip, estabelecimentosZip, sociosZip = 'empresas.zip', 'estabelecimentos.zip', 'socios.zip'

In [10]:
try:
  zipfile.ZipFile(os.path.join(pathDefault, empresasZip), 'r').extractall(pathDefault)
  zipfile.ZipFile(os.path.join(pathDefault, estabelecimentosZip), 'r').extractall(pathDefault)
  zipfile.ZipFile(os.path.join(pathDefault, sociosZip), 'r').extractall(pathDefault)
except Exception as e:
  print('Erro ao descompactar as pastas, verifique o caminho e tente novamente.')
  pass

Erro ao descompactar as pastas, verifique o caminho e tente novamente.


In [11]:
empresas_files = [os.path.join(pathDefault, 'empresas', f) for f in os.listdir(os.path.join(pathDefault, 'empresas')) if f.endswith('.csv')]
socios_files = [os.path.join(pathDefault, 'socios', f) for f in os.listdir(os.path.join(pathDefault, 'socios')) if f.endswith('.csv')]
estabelecimentos_files = [os.path.join(pathDefault, 'estabelecimentos', f) for f in os.listdir(os.path.join(pathDefault, 'estabelecimentos')) if f.endswith('.csv')]

In [12]:
empresas = spark.read.csv(empresas_files, sep=';', inferSchema=True, header=False)
socios = spark.read.csv(socios_files, sep=';', inferSchema=True, header=False)
estabelecimentos = spark.read.csv(estabelecimentos_files, sep=';', inferSchema=True, header=False)

# Manipulando os Dados
---

## Operações básicas

In [13]:
print('DataFrame de Empresas:')
empresas.limit(5).show(truncate=False)

print('DataFrame de Estabelecimentos:')
estabelecimentos.limit(5).show(truncate=False)

print('DataFrame de Sócios:')
socios.limit(5).show(truncate=False)

DataFrame de Empresas:
+----+--------------------------------------------------------------------------------------------+----+---+-------+---+----+
|_c0 |_c1                                                                                         |_c2 |_c3|_c4    |_c5|_c6 |
+----+--------------------------------------------------------------------------------------------+----+---+-------+---+----+
|306 |FRANCAMAR REFRIGERACAO TECNICA S/C LTDA                                                     |2240|49 |0,00   |1  |null|
|1355|BRASILEIRO & OLIVEIRA LTDA                                                                  |2062|49 |0,00   |5  |null|
|4820|REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E TABELIONATO E REGISTRO DE CONSTRATOS MARITIMOS|3034|32 |0,00   |5  |null|
|5347|ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS                                                |2135|50 |0,00   |5  |null|
|6846|BADU E FILHOS TECIDOS LTDA                                                               

### Renomeando as colunas do DataFrame

In [14]:
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa',
                    'ente_federativo_responsavel']

estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral',
                  'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria',
                  'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax',
                  'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais',
                  'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

dataframes = [empresas, estabelecimentos, socios]
colunas = [empresasColNames, estabsColNames, sociosColNames]

In [15]:
for i in range(len(dataframes)):
    df = dataframes[i]
    colNames = colunas[i]
    for index, colName in enumerate(colNames):
        df = df.withColumnRenamed(f"_c{index}", colName)
    dataframes[i] = df

empresas, estabelecimentos, socios = dataframes

In [16]:
print('DataFrame de Empresas:')
empresas.limit(5).show(truncate=False)
print('Esquema de Empresas:')
empresas.printSchema()
print('----' * 20)

print('DataFrame de Estabelecimentos:')
estabelecimentos.limit(5).show(truncate=False)
print('Esquema de Estabelecimentos:')
estabelecimentos.printSchema()
print('----' * 20)

print('DataFrame de Sócios:')
socios.limit(5).show(truncate=False)
print('Esquema de Sócios:')
socios.printSchema()

DataFrame de Empresas:
+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                                                               |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|306        |FRANCAMAR REFRIGERACAO TECNICA S/C LTDA                                                     |2240             |49                         |0,00                     |1               |null                       |
|1355       |BRASILEIRO & OLIVEIRA LTDA                                          

### Convertendo String ➔ Double

#### `StringType ➔ DoubleType`

In [17]:
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [18]:
empresas = empresas.withColumn('capital_social_da_empresa', f.regexp_replace('capital_social_da_empresa', ',', '.').cast(DoubleType()))

### Convertendo String ➔ Date

#### `StringType ➔ DateType`

[Datetime Patterns](https://spark.apache.org/docs/3.1.2/sql-ref-datetime-pattern.html)

In [19]:
socios = socios\
      .withColumn(
          'data_de_entrada_sociedade',
          f.to_date(f.col('data_de_entrada_sociedade').cast(StringType()), 'yyyyMMdd'))

In [20]:
estabelecimentos = estabelecimentos\
    .withColumn(
        'data_situacao_cadastral',
        f.to_date(f.col('data_situacao_cadastral').cast(StringType()), 'yyyyMMdd')
    )\
    .withColumn(
        'data_de_inicio_atividade',
        f.to_date(f.col('data_de_inicio_atividade').cast(StringType()), 'yyyyMMdd')
    )\
    .withColumn(
        'data_da_situacao_especial',
        f.to_date(f.col('data_da_situacao_especial').cast(StringType()), 'yyyyMMdd')
    )

# Seleções e consultas
---

## Selecionando e Ordenando informações

[DataFrame.select(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.select.html)

[DataFrame.orderBy(*cols, **kwargs)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.orderBy.html)

In [21]:
socios\
      .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('AnoDeEntrada'))\
      .orderBy('AnoDeEntrada')\
      .show(5, False)

+-------------------------------+------------+------------+
|nome_do_socio_ou_razao_social  |faixa_etaria|AnoDeEntrada|
+-------------------------------+------------+------------+
|JOAO FRANCISCO DE AMORIM JUNCAL|3           |null        |
|MARIA SILENE BEZERRA DE AGUIAR |8           |1900        |
|NAIR YOKO HIRAI TAKAKI         |7           |1900        |
|JOSE NELSON VIEIRA CAMPOS      |6           |1901        |
|VALMAR CARDOSO DE SANTANA      |5           |1901        |
+-------------------------------+------------+------------+
only showing top 5 rows



In [22]:
estabelecimentos\
      .select('nome_fantasia', 'municipio', f.month('data_de_inicio_atividade').alias('MesDeInicioAtividade'), f.year('data_de_inicio_atividade').alias('AnoDeIncioAtividade'))\
      .na.fill('Nome Fantasia não encontrado')\
      .orderBy(['AnoDeIncioAtividade', 'MesDeInicioAtividade'], ascending=[False, False])\
      .show(15, False)

+--------------------------------------------+---------+--------------------+-------------------+
|nome_fantasia                               |municipio|MesDeInicioAtividade|AnoDeIncioAtividade|
+--------------------------------------------+---------+--------------------+-------------------+
|NEW DISTRIBUIDORA                           |301      |5                   |2021               |
|AM SUL REPRESENTECAO                        |8589     |5                   |2021               |
|ANDERSON BARROSO                            |2631     |5                   |2021               |
|DANKEL DISTRIBUIDORA DE BEBIDAS             |8985     |5                   |2021               |
|BIANCA TERAPEUTA                            |1389     |5                   |2021               |
|OTTIMO PUNTO                                |7535     |5                   |2021               |
|ARILSON ALESSANDRO LF DANCE E FITNESS       |6001     |5                   |2021               |
|TZ TIOZAOANDABAIXO 

## Identificando valores nulos

In [23]:
print('Quantidade de valores nulos por coluna:')
socios.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in socios.columns]).show(5)
print('----' * 20)
print('Substituindo valores nulos não string por 0:')
socios.na.fill(0).limit(5).show(5)
print('----' * 20)
print('Substituindo valores nulos do tipo string por -:')
socios.na.fill('-').limit(5).show(5)

Quantidade de valores nulos por coluna:
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|   pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|          0|                     0|                          208|                1234|                    0|                        1|2038255|                  0|              1995432|                                  0|           0|
+-----------+-------

## Filtrando os dados

[DataFrame.where(condition)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.where.html) ou [DataFrame.filter(condition)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.filter.html)

In [24]:
empresas\
      .where(f.col('capital_social_da_empresa') == 50)\
      .limit(5).show(5, False)

+-----------+------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial       |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|17350147   |ERIK MARCELO DOS SANTOS 42107848858 |2135             |50                         |50.0                     |1               |null                       |
|17833214   |ALEXANDRE MACHADO LIMA 73750123772  |2135             |50                         |50.0                     |1               |null                       |
|20860830   |YASMIN MOURA DA FONSECA 13457709793 |2135             |50                         |50.0                     |1               |null                 

In [25]:
socios\
    .select("nome_do_socio_ou_razao_social", "data_de_entrada_sociedade")\
    .filter(f.year('data_de_entrada_sociedade') >= 2002)\
    .filter(f.col('nome_do_socio_ou_razao_social').startswith("GABRIEL"))\
    .orderBy(f.year('data_de_entrada_sociedade').asc())\
    .limit(10).show(truncate=False)

+---------------------------------------+-------------------------+
|nome_do_socio_ou_razao_social          |data_de_entrada_sociedade|
+---------------------------------------+-------------------------+
|GABRIEL LAMBORGHINI ZANDOMENICO        |2002-01-16               |
|GABRIEL CHAGAS NETO                    |2002-10-03               |
|GABRIELLA PACHECO DE ABREU GRANDE POUSA|2002-01-07               |
|GABRIELA LEITE BRANDAO                 |2002-06-21               |
|GABRIELA LARA DE CASTRO SIMAO          |2002-07-17               |
|GABRIEL MOREIRA FRANK                  |2002-03-27               |
|GABRIELA INES CANALE MARTINS           |2002-04-12               |
|GABRIELA MAURELL FERREIRA              |2002-07-31               |
|GABRIELLE FERREIRA ARIOSA              |2002-06-07               |
|GABRIELA CAXAMBU DOS SANTOS            |2002-09-09               |
+---------------------------------------+-------------------------+



## O comando LIKE

[Column.like(other)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.Column.like.html)

In [26]:
empresas\
    .where(f.upper('razao_social_nome_empresarial').like('%BANCO DO BRASIL%'))\
    .limit(5).show(5, False)

+-----------+---------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                                        |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+---------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|21422233   |ASSOCIACAO ATLETICA BANCO DO BRASIL                                  |3999             |16                         |0.0                      |5               |null                       |
|18174037   |ASSOCIACAO ATLETICA BANCO DO BRASIL                                  |3999             |16                         |0.0                      |5               |null                    

In [27]:
empresas\
    .where(f.upper('razao_social_nome_empresarial').like('BRADESCO%'))\
    .limit(5).show(5, False)

+-----------+----------------------------------------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                                                                                               |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+----------------------------------------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|13401188   |BRADESCO FUNDO DE INVESTIMENTO EM COTAS DE FUNDO DE INVESTIMENTO RENDA FIXA TITULOS DO TESOURO                              |2224             |43                         |0.0                      |5               |null 

In [28]:
empresas\
    .where(f.upper('razao_social_nome_empresarial').like('%ITAU'))\
    .limit(5).show(5, False)

+-----------+------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                   |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|60865763   |ITAU GRAFICA LTDA GRUPO ITAU                    |2062             |49                         |0.0                      |5               |null                       |
|916136     |ASSOCIACAO DOS PRODUTORES DE CAJU DE ITAU       |3999             |16                         |0.0                      |5               |null                       |
|45593704   |ASSOCIACAO DOS EX FUNCIONARIOS DA CIMENTO ITAU  |3999             |16                  

# Agregações e Junções
---

[DataFrame.groupBy(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html)

[DataFrame.agg(*exprs)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.agg.html)

[DataFrame.summary(*statistics)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.summary.html)

> Funções:
[approx_count_distinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.approx_count_distinct.html) |
[avg](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.avg.html) |
[collect_list](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.collect_list.html) |
[collect_set](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.collect_set.html) |
[countDistinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.countDistinct.html) |
[count](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.count.html) |
[grouping](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.grouping.html) |
[first](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.first.html) |
[last](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.last.html) |
[kurtosis](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.kurtosis.html) |
[max](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.max.html) |
[min](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.min.html) |
[mean](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.mean.html) |
[skewness](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.skewness.html) |
[stddev ou stddev_samp](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.stddev.html) |
[stddev_pop](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.stddev_pop.html) |
[sum](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.sum.html) |
[sumDistinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.sumDistinct.html) |
[variance ou var_samp](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.variance.html) |
[var_pop](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.var_pop.html)

In [29]:
socios\
    .select(f.year('data_de_entrada_sociedade').alias('AnoDeEntrada'))\
    .where(f.year('data_de_entrada_sociedade') >= 2010)\
    .groupBy('AnoDeEntrada')\
    .count()\
    .orderBy('AnoDeEntrada', ascending=True)\
    .limit(15).show(truncate=False)

+------------+------+
|AnoDeEntrada|count |
+------------+------+
|2010        |79337 |
|2011        |83906 |
|2012        |80101 |
|2013        |83919 |
|2014        |80590 |
|2015        |80906 |
|2016        |81587 |
|2017        |90221 |
|2018        |99935 |
|2019        |118248|
|2020        |125927|
|2021        |56316 |
+------------+------+



In [30]:
empresas\
    .select('cnpj_basico', 'porte_da_empresa', 'capital_social_da_empresa')\
    .groupBy('porte_da_empresa')\
    .agg(
        f.avg('capital_social_da_empresa').alias('MédiaCapitalSocial'),
        f.count('cnpj_basico').alias('QuantidadeEmpresas')
    )\
    .orderBy('MédiaCapitalSocial', ascending=False)\
    .limit(15).show(truncate=False)

+----------------+------------------+------------------+
|porte_da_empresa|MédiaCapitalSocial|QuantidadeEmpresas|
+----------------+------------------+------------------+
|3               |2601001.7677092687|115151            |
|5               |708660.4208249793 |1335500           |
|1               |339994.53313507047|3129043           |
|null            |8.35421888053467  |5985              |
+----------------+------------------+------------------+



In [31]:
empresas\
    .select('capital_social_da_empresa')\
    .summary()\
    .show()

    # .summary('count', 'mean', 'stddev', 'min', '25%', '50%', '75%', 'max')

+-------+-------------------------+
|summary|capital_social_da_empresa|
+-------+-------------------------+
|  count|                  4585679|
|   mean|        503694.5478542674|
| stddev|     2.1118691490537727E8|
|    min|                      0.0|
|    25%|                      0.0|
|    50%|                   1000.0|
|    75%|                   7000.0|
|    max|         3.22014670262E11|
+-------+-------------------------+



## Juntando DataFrames - Joins

[DataFrame.join(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.join.html)

In [32]:
produtos = spark.createDataFrame(
    [
        ('1', 'Bebidas', 'Água mineral'),
        ('2', 'Limpeza', 'Sabão em pó'),
        ('3', 'Frios', 'Queijo'),
        ('4', 'Bebidas', 'Refrigerante'),
        ('5', 'Pet', 'Ração para cães')
    ],
    ['id', 'cat', 'prod']
)

impostos = spark.createDataFrame(
    [
        ('Bebidas', 0.15),
        ('Limpeza', 0.05),
        ('Frios', 0.065),
        ('Carnes', 0.08)
    ],
    ['cat', 'tax']
)

In [33]:
produtos.join(impostos, 'cat', how='inner')\
    .sort('id')\
    .show()

+-------+---+------------+-----+
|    cat| id|        prod|  tax|
+-------+---+------------+-----+
|Bebidas|  1|Água mineral| 0.15|
|Limpeza|  2| Sabão em pó| 0.05|
|  Frios|  3|      Queijo|0.065|
|Bebidas|  4|Refrigerante| 0.15|
+-------+---+------------+-----+



In [34]:
produtos.join(impostos, 'cat', how='left')\
    .sort('id')\
    .show()

+-------+---+---------------+-----+
|    cat| id|           prod|  tax|
+-------+---+---------------+-----+
|Bebidas|  1|   Água mineral| 0.15|
|Limpeza|  2|    Sabão em pó| 0.05|
|  Frios|  3|         Queijo|0.065|
|Bebidas|  4|   Refrigerante| 0.15|
|    Pet|  5|Ração para cães| null|
+-------+---+---------------+-----+



In [35]:
produtos.join(impostos, 'cat', how='right')\
    .sort('id')\
    .show()

+-------+----+------------+-----+
|    cat|  id|        prod|  tax|
+-------+----+------------+-----+
| Carnes|null|        null| 0.08|
|Bebidas|   1|Água mineral| 0.15|
|Limpeza|   2| Sabão em pó| 0.05|
|  Frios|   3|      Queijo|0.065|
|Bebidas|   4|Refrigerante| 0.15|
+-------+----+------------+-----+



In [36]:
produtos.join(impostos, 'cat', how='outer')\
    .sort('id')\
    .show()

+-------+----+---------------+-----+
|    cat|  id|           prod|  tax|
+-------+----+---------------+-----+
| Carnes|null|           null| 0.08|
|Bebidas|   1|   Água mineral| 0.15|
|Limpeza|   2|    Sabão em pó| 0.05|
|  Frios|   3|         Queijo|0.065|
|Bebidas|   4|   Refrigerante| 0.15|
|    Pet|   5|Ração para cães| null|
+-------+----+---------------+-----+



In [37]:
empresas_join = estabelecimentos.join(empresas, 'cnpj_basico', how='inner')

In [38]:
freq = empresas_join\
    .select(
        'cnpj_basico',
        f.year('data_de_inicio_atividade').alias('data_de_inicio')
    )\
    .where('data_de_inicio >= 2010')\
    .groupBy('data_de_inicio')\
    .agg(f.count("cnpj_basico").alias("frequencia"))\
    .orderBy('data_de_inicio', ascending=True)

In [39]:
freq.union(
    freq.select(
        f.lit('Total').alias('data_de_inicio'),
        f.sum(freq.frequencia).alias('frequencia')
    )
).show()

+--------------+----------+
|data_de_inicio|frequencia|
+--------------+----------+
|          2010|    154159|
|          2011|    172677|
|          2012|    232480|
|          2013|    198424|
|          2014|    202276|
|          2015|    212523|
|          2016|    265417|
|          2017|    237292|
|          2018|    275435|
|          2019|    325922|
|          2020|    400654|
|          2021|    153275|
|         Total|   2830534|
+--------------+----------+



## SparkSQL

[SparkSession.sql(sqlQuery)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.sql.html)

Para saber mais sobre performance: [Artigo - Spark RDDs vs DataFrames vs SparkSQL](https://community.cloudera.com/t5/Community-Articles/Spark-RDDs-vs-DataFrames-vs-SparkSQL/ta-p/246547)

In [40]:
empresas.createOrReplaceTempView('empresasView')
estabelecimentos.createOrReplaceTempView('estabelecimentosView')
socios.createOrReplaceTempView('sociosView')
empresas_join.createOrReplaceTempView('empresasJoinView')

In [41]:
spark.sql("SELECT * FROM empresasView").show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                         49|                      0.0|               1|                       null|
|       1355|         BRASILEIRO & OLIV...|             2062|                         49|                      0.0|               5|                       null|
|       4820|         REGISTRO DE IMOVE...|             3034|                         32|                      0.0|               5|                       null|
|       5347|         ROSELY APARE

In [42]:
spark\
    .sql(
        """
        SELECT *
        FROM empresasView e
        JOIN estabelecimentosView est ON e.cnpj_basico = est.cnpj_basico
        WHERE e.capital_social_da_empresa >= 10000
        """
    )\
    .show(5)


+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+-----------+----------+-------+---------------------------+--------------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+--------------------+------+-----------+-------------+--------+---+---------+-----+----------+-----+----------+----------+--------+--------------------+-----------------+-------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|       nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fi

In [43]:
spark\
    .sql(
        """
        SELECT porte_da_empresa, MEAN(capital_social_da_empresa) AS media_capital_social
        FROM empresasView e
        GROUP BY porte_da_empresa
        """
    )\
    .show(5)


+----------------+--------------------+
|porte_da_empresa|media_capital_social|
+----------------+--------------------+
|            null|    8.35421888053467|
|               1|  339994.53313507047|
|               3|  2601001.7677092687|
|               5|   708660.4208249793|
+----------------+--------------------+



In [44]:
freq = spark\
    .sql(
        """
          WITH freq AS (
          SELECT YEAR(data_de_inicio_atividade) AS data_de_inicio, COUNT(cnpj_basico) AS frequencia
          FROM estabelecimentosView
          WHERE YEAR(data_de_inicio_atividade) >= 2010
          GROUP BY data_de_inicio
          ORDER BY data_de_inicio
        )
        SELECT * FROM freq
        UNION ALL
        SELECT 'Total', SUM(frequencia) FROM freq
        """
    ).show()

+--------------+----------+
|data_de_inicio|frequencia|
+--------------+----------+
|          2010|    154159|
|          2011|    172677|
|          2012|    232480|
|          2013|    198424|
|          2014|    202276|
|          2015|    212523|
|          2016|    265415|
|          2017|    237292|
|          2018|    275435|
|          2019|    325922|
|          2020|    400654|
|          2021|    153275|
|         Total|   2830532|
+--------------+----------+



# Formas de Armazenamento
---

## Arquivos CSV

[property DataFrame.write](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.write.html)

[DataFrameWriter.csv(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.csv.html)

In [51]:
# Para ambientes integrados com o hadoop, localmente não funcionará
empresas.write.csv(
    path=os.path.join(pathDefault, 'empresas', 'resultado'),
    mode='overwrite',
    header=True,
    sep=';'
)

estabelecimentos.write.csv(
    path=os.path.join(pathDefault, 'estabelecimentos', 'resultado'),
    mode='overwrite',
    header=True,
    sep=';'
)

socios.write.csv(
    path=os.path.join(pathDefault, 'socios', 'resultado'),
    mode='overwrite',
    header=True,
    sep=';'
)

Ocorreu um erro: An error occurred while calling o494.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apach

## Arquivos PARQUET

[Apache Parquet](https://parquet.apache.org/)

[DataFrameWriter.parquet(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.parquet.html)

In [None]:
# Para ambientes integrados com o hadoop, localmente não funcionará
# Defina o modo de rebase de data e hora como 'LEGACY' para garantir compatibilidade com sistemas mais antigos
spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY")

empresas.write.parquet(
    path='/content/drive/MyDrive/curso-spark/spark-apresentacao-ferramenta/empresas/resultadoParquet',
    mode='overwrite'
)

estabelecimentos.write.parquet(
    path='/content/drive/MyDrive/curso-spark/spark-apresentacao-ferramenta/estabelecimentos/resultadoParquet',
    mode='overwrite'
)

socios.write.parquet(
    path='/content/drive/MyDrive/curso-spark/spark-apresentacao-ferramenta/socios/resultadoParquet',
    mode='overwrite'
)

Py4JJavaError: An error occurred while calling o452.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:874)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 111.0 failed 1 times, most recent failure: Lost task 7.0 in stage 111.0 (TID 3225) (192.168.0.7 executor driver): java.io.IOException: (null) entry in command string: null chmod 0644 H:\content\drive\MyDrive\curso-spark\spark-apresentacao-ferramenta\empresas\resultadoParquet\_temporary\0\_temporary\attempt_202407210030568438880548593270256_0111_m_000007_3225\part-00007-6b459992-e942-4838-a8e0-0c32b26944d8-c000.snappy.parquet
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:869)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:852)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
	at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:149)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:111)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:200)
	... 33 more
Caused by: java.io.IOException: (null) entry in command string: null chmod 0644 H:\content\drive\MyDrive\curso-spark\spark-apresentacao-ferramenta\empresas\resultadoParquet\_temporary\0\_temporary\attempt_202407210030568438880548593270256_0111_m_000007_3225\part-00007-6b459992-e942-4838-a8e0-0c32b26944d8-c000.snappy.parquet
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:869)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:852)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
	at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:149)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:111)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more


## Particionamento dos dados

[DataFrameWriter.partitionBy(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.partitionBy.html)

```
# Isto está formatado como código
```



In [None]:
# Para ambientes integrados com o hadoop, localmente não funcionará
empresas.coalesce(1).write.csv(
    path='/content/drive/MyDrive/curso-spark/spark-apresentacao-ferramenta/empresas/resultado/csvUnico',
    mode='overwrite',
    header=True,
    sep=';'
)

estabelecimentos.coalesce(1).write.csv(
    path='/content/drive/MyDrive/curso-spark/spark-apresentacao-ferramenta/estabelecimentos/resultado/csvUnico',
    mode='overwrite',
    header=True,
    sep=';'
)

socios.coalesce(1).write.csv(
    path='/content/drive/MyDrive/curso-spark/spark-apresentacao-ferramenta/socios/resultado/csvUnico',
    mode='overwrite',
    header=True,
    sep=';'
)

Py4JJavaError: An error occurred while calling o531.csv.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:850)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:242)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:94)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:372)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 25 more


## Finalizando sessão spark


In [None]:
spark.stop()