In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

In [3]:
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

In [4]:
!pip install -q findspark

In [5]:
import os
os.environ['JAVA_HOME'] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"


In [6]:
import findspark
findspark.init()

# **Identificando valores Nulos**

In [7]:
from pyspark.sql import SparkSession


spark = SparkSession.builder.master('local[*]').appName('oi').getOrCreate()


In [8]:
# definidos números inteiros
df_1 = spark.createDataFrame([(1,),(2,),(5,),(8,),(6,),(None,)],['Data'])
df_1.show()

+----+
|Data|
+----+
|   1|
|   2|
|   5|
|   8|
|   6|
|null|
+----+



In [9]:
#definindo números ponto flutuantes
df = spark.createDataFrame([(1.,),(2.,),(5.,),(8.,),(6.,),(float('nan'),)],['Data'])
df.show()

+----+
|Data|
+----+
| 1.0|
| 2.0|
| 5.0|
| 8.0|
| 6.0|
| NaN|
+----+



In [10]:
#definindo números ponto flutuantes
df = spark.createDataFrame([('1',),('2',),('5',),('8',),('6',),(None,)],['Data'])
df.show()

+----+
|Data|
+----+
|   1|
|   2|
|   5|
|   8|
|   6|
|null|
+----+



In [11]:
# Conectando no dados e enviando para o google drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [12]:
part='/content/drive/MyDrive/curso_spark/socios/K3241.K03200Y8.D30708.SOCIOCSV'
socios = spark.read.csv(part,sep=';',inferSchema=True)

In [13]:
part='/content/drive/MyDrive/curso_spark/empresas/K3241.K03200Y8.D30708.EMPRECSV'
empresas = spark.read.csv(part,sep=';',inferSchema=True)

In [14]:
part='/content/drive/MyDrive/curso_spark/estabelicimentos/K3241.K03200Y8.D30708.ESTABELE'
estabelecimentos = spark.read.csv(part,sep=';',inferSchema=True)

In [15]:
socios.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10
0,4611849,2,ROSANA ZAIATZ SCHIMANDEIRO,***005229**,22,20190930,,***000000**,,0,5
1,6768438,2,GILBERTO MACHADO DE PINHO,***224810**,16,20050912,,***000000**,,0,9
2,6768440,2,ORLANDO ALEXANDRE DA SILVA,***264158**,16,20050912,,***000000**,,0,7
3,2853754,2,PAULO GONZAGA DOS SANTOS,***955741**,16,20050912,,***000000**,,0,6
4,1235378,2,LUIZ YUJI TAMAI,***396528**,49,19960508,,***000000**,,0,8


In [16]:
colunas_socios = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']
colunas_empresas = ['cnpj','razao_social','natureza_juritica','qualificacao_responsavel','capital_social','porte_empresa','ente_federativo']
colunas_estabelecimentos = ['cnpj', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

In [17]:
# estabelecimentos
for index, colunas in enumerate(colunas_estabelecimentos):
  estabelecimentos = estabelecimentos.withColumnRenamed(f'_c{index}',colunas)

In [18]:
for index, colunas in enumerate(colunas_empresas):
  empresas = empresas.withColumnRenamed(f'_c{index}',colunas)

In [19]:
# socios
for index, colunas in enumerate(colunas_socios):
  socios = socios.withColumnRenamed(f'_c{index}',colunas)

In [20]:
from pyspark.sql import functions as f

empresas  = empresas.withColumn('capital_social',f.regexp_replace('capital_social',',','.'))

In [21]:
from pyspark.sql.types import DoubleType
from pyspark.sql.types import StringType

empresas  = empresas.withColumn('capital_social',empresas['capital_social'].cast(DoubleType()))

In [22]:
socios.show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|    4611849|                     2|         ROSANA ZAIATZ SCH...|         ***005229**|                   22|                 20190930|null|        ***000000**|                 null|                                  0|           5|
|    6768438|                     2|         GILBERTO MACHADO ...|      

In [23]:
from pyspark.sql import functions as f

# contagem de valores nulos.

socios.select([f.count(f.when(f.isnull(c),1)).alias(c) for c in socios.columns]).show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|   pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|          0|                     0|                          190|                 919|                    0|                        0|2011583|                  0|              1966609|                                  0|           0|
+-----------+----------------------+------------------------

In [24]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [25]:
# subistituindo valores nulos por zero quando a coluna for do tipo numerico
socios = socios.na.fill(0)

In [26]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,4611849,2,ROSANA ZAIATZ SCHIMANDEIRO,***005229**,22,20190930,0,***000000**,,0,5
1,6768438,2,GILBERTO MACHADO DE PINHO,***224810**,16,20050912,0,***000000**,,0,9
2,6768440,2,ORLANDO ALEXANDRE DA SILVA,***264158**,16,20050912,0,***000000**,,0,7
3,2853754,2,PAULO GONZAGA DOS SANTOS,***955741**,16,20050912,0,***000000**,,0,6
4,1235378,2,LUIZ YUJI TAMAI,***396528**,49,19960508,0,***000000**,,0,8


In [27]:
# subistituindo valores nulos por zero quando a coluna for do tipo text
socios = socios.na.fill('0')

In [28]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,4611849,2,ROSANA ZAIATZ SCHIMANDEIRO,***005229**,22,20190930,0,***000000**,0,0,5
1,6768438,2,GILBERTO MACHADO DE PINHO,***224810**,16,20050912,0,***000000**,0,0,9
2,6768440,2,ORLANDO ALEXANDRE DA SILVA,***264158**,16,20050912,0,***000000**,0,0,7
3,2853754,2,PAULO GONZAGA DOS SANTOS,***955741**,16,20050912,0,***000000**,0,0,6
4,1235378,2,LUIZ YUJI TAMAI,***396528**,49,19960508,0,***000000**,0,0,8


In [29]:
# transformando coluna data_de_entrada_sociedade para tipo de data.
from pyspark.sql.types import StringType

socios = socios\
    .withColumn('data_de_entrada_sociedade',
                f.to_date(socios.data_de_entrada_sociedade.cast(StringType()),'yyyyMMdd'))

# **Ordenando os Dados**

In [30]:
# ordenando decrescente
socios\
    .select('nome_do_socio_ou_razao_social','faixa_etaria',f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .orderBy('ano_de_entrada',ascending=False)\
    .show(5,False)

+-----------------------------+------------+--------------+
|nome_do_socio_ou_razao_social|faixa_etaria|ano_de_entrada|
+-----------------------------+------------+--------------+
|ROGERIO PEREIRA JORGE        |5           |2023          |
|BOUTROS ALBERT EL KHOURY     |6           |2023          |
|SERGIO LUIZ DA SILVA         |6           |2023          |
|EDUARDO CAVALCANTE WOELLNER  |2           |2023          |
|LUCAS VARONE                 |3           |2023          |
+-----------------------------+------------+--------------+
only showing top 5 rows



In [31]:
# ordenando decrescente
socios\
    .select('nome_do_socio_ou_razao_social','faixa_etaria',f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .orderBy(['ano_de_entrada','faixa_etaria'],ascending=[False,False])\
    .show(5,False)

+-----------------------------+------------+--------------+
|nome_do_socio_ou_razao_social|faixa_etaria|ano_de_entrada|
+-----------------------------+------------+--------------+
|ORACI JOSE DUARTE            |9           |2023          |
|MARIA ALICE ALVARENGA        |9           |2023          |
|ROBERTO KNEIF                |9           |2023          |
|FLAMINIO DE ALMEIDA MATOSO   |9           |2023          |
|ARNOLDA C ODORIZZI           |9           |2023          |
+-----------------------------+------------+--------------+
only showing top 5 rows



# **Filtrando os Dados**

In [32]:
empresas\
  .where('capital_social==50')\
  .show(5,False)

+--------+-----------------------------------------+-----------------+------------------------+--------------+-------------+---------------+
|cnpj    |razao_social                             |natureza_juritica|qualificacao_responsavel|capital_social|porte_empresa|ente_federativo|
+--------+-----------------------------------------+-----------------+------------------------+--------------+-------------+---------------+
|32066895|GIOVANNA LIMA DA SILVA 47315409840       |2135             |50                      |50.0          |1            |null           |
|32067442|CATHERINE VICTORIA MAGALHAES 44963897890 |2135             |50                      |50.0          |1            |null           |
|32067748|ALEF DOS SANTOS 12133358684              |2135             |50                      |50.0          |1            |null           |
|32068669|ELEONE DE JESUS 42625114053              |2135             |50                      |50.0          |1            |null           |
|32068923|FER

In [33]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = false)
 |-- cnpj_ou_cpf_do_socio: string (nullable = false)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = false)
 |-- nome_do_representante: string (nullable = false)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [34]:
socios\
    .select('nome_do_socio_ou_razao_social')\
    .filter(socios.nome_do_socio_ou_razao_social.startswith('RODRIGO'))\
    .filter(socios.nome_do_socio_ou_razao_social.endswith('DIAS'))\
    .limit(10)\
    .toPandas()

Unnamed: 0,nome_do_socio_ou_razao_social
0,RODRIGO BORGES DIAS
1,RODRIGO CARDOSO DIAS
2,RODRIGO NASCIMENTO DIAS
3,RODRIGO PIRES PIMENTEL DIAS
4,RODRIGO PEREIRA DIAS
5,RODRIGO FERREIRA DIAS
6,RODRIGO PIAN FERREIRA GARCEZ DIAS
7,RODRIGO SILVEIRA DIAS
8,RODRIGO DA RESSURREICAO DIAS
9,RODRIGO BOSCARATO DIAS


# **Comando LIKE**

In [35]:
df = spark.createDataFrame([('RESTAURANTE DO RUI',),('Juca restaurantes ltda',),('Joca Restaurante',)], ['data'])
df.toPandas()

Unnamed: 0,data
0,RESTAURANTE DO RUI
1,Juca restaurantes ltda
2,Joca Restaurante


In [36]:
# traz a coluna correspondente a a palavra indicada.
# O uso do caractere ‘%’ apenas no lado direito da letra indica que estamos procurando strings que começam com a letra especificada.
df\
    .where(f.upper(df.data).like('%RESTAURANTE%'))\
    .show(truncate=False)

+----------------------+
|data                  |
+----------------------+
|RESTAURANTE DO RUI    |
|Juca restaurantes ltda|
|Joca Restaurante      |
+----------------------+



In [37]:
empresas.printSchema()

root
 |-- cnpj: integer (nullable = true)
 |-- razao_social: string (nullable = true)
 |-- natureza_juritica: integer (nullable = true)
 |-- qualificacao_responsavel: integer (nullable = true)
 |-- capital_social: double (nullable = true)
 |-- porte_empresa: integer (nullable = true)
 |-- ente_federativo: string (nullable = true)



In [38]:
empresas\
    .select('razao_social','natureza_juritica','porte_empresa','capital_social')\
 .filter(f.upper(empresas['razao_social']).like('%RESTAURANTE%'))\
 .show(15,False)

+---------------------------------------------+-----------------+-------------+--------------+
|razao_social                                 |natureza_juritica|porte_empresa|capital_social|
+---------------------------------------------+-----------------+-------------+--------------+
|RESTAURANTE COZINHA GOURMET DA BARREIRA LTDA.|2062             |1            |20000.0       |
|BAR E RESTAURANTE BIER PLATZ LTDA            |2062             |1            |190000.0      |
|RESTAURANTE VILLAA COUNTRY LTDA              |2062             |1            |100000.0      |
|LESSA E BARRETO RESTAURANTE LTDA             |2062             |5            |0.0           |
|RESTAURANTE  SABOR DA SEDE LTDA              |2062             |1            |95400.0       |
|RESTAURANTE MIRANTE DO PONTAL EIRELI         |2305             |3            |120000.0      |
|BRANDAO RESTAURANTE LTDA                     |2062             |1            |50000.0       |
|LAGOA GRILL BAR E RESTAURANTE LTDA           |206

In [39]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = false)
 |-- cnpj_ou_cpf_do_socio: string (nullable = false)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = false)
 |-- nome_do_representante: string (nullable = false)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [40]:
socios\
    .select('nome_do_socio_ou_razao_social','cnpj_basico','identificador_de_socio','faixa_etaria')\
 .filter(f.upper(socios['nome_do_socio_ou_razao_social']).like('%OLIVEIRA%'))\
 .show(15,False)

+-------------------------------------+-----------+----------------------+------------+
|nome_do_socio_ou_razao_social        |cnpj_basico|identificador_de_socio|faixa_etaria|
+-------------------------------------+-----------+----------------------+------------+
|LAIZA OLIVEIRA DE SOUZA              |4611875    |2                     |5           |
|SILVANA OLIVEIRA DOS SANTOS          |6768488    |2                     |4           |
|ANTONIO LEOPOLDO DE OLIVEIRA         |4611942    |2                     |6           |
|MARCOS REIS DE OLIVEIRA              |74489642   |2                     |6           |
|LEANDRO DE OLIVEIRA REZENDE JUNQUEIRA|6289445    |2                     |6           |
|JARDEL VIVEIROS OLIVEIRA             |6289485    |2                     |4           |
|SILVIA GONCALVES DE OLIVEIRA CHIEIA  |6289520    |2                     |6           |
|LAURISTOM JOSE DE OLIVEIRA           |36924629   |2                     |7           |
|BICAIL DA CONCEICAO DE OLIVEIRA

In [41]:
socios.show(5)

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|    4611849|                     2|         ROSANA ZAIATZ SCH...|         ***005229**|                   22|               2019-09-30|   0|        ***000000**|                    0|                                  0|           5|
|    6768438|                     2|         GILBERTO MACHADO ...|      

# **Sumarizando os Dados**

In [42]:
# Funçoes:
# approx_disctinct, avg, collect_list, collect_set, countDistinct, count, grouping, first, last, kurtosis, max, min, mean, skewness, stddev ou stddev_samp, stddev_pop, sum sumDistinct, variance ou car_samp, var_pop

In [43]:
socios\
    .select(f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .where('ano_de_entrada >= 2010')\
    .groupBy('ano_de_entrada')\
    .count()\
    .orderBy('ano_de_entrada',ascending=True)\
    .show()

+--------------+------+
|ano_de_entrada| count|
+--------------+------+
|          2010| 72588|
|          2011| 76047|
|          2012| 73867|
|          2013| 76021|
|          2014| 73218|
|          2015| 72236|
|          2016| 71708|
|          2017| 77648|
|          2018| 85055|
|          2019| 98482|
|          2020|106138|
|          2021|147646|
|          2022|164260|
|          2023| 95331|
+--------------+------+



In [44]:
empresas\
    .select('cnpj','porte_empresa','capital_social')\
    .groupBy('porte_empresa')\
    .agg(
        f.avg('capital_social').alias('capital_social_medio'),
        f.count('cnpj').alias('frequencia')
    )\
    .orderBy('porte_empresa',ascending=True)\
    .show()

+-------------+--------------------+----------+
|porte_empresa|capital_social_medio|frequencia|
+-------------+--------------------+----------+
|         null|  0.5530973451327433|      5424|
|            1|   62552.97445749658|   3938094|
|            3|  190671.42579851445|    105953|
|            5|   4210742.862500831|    445389|
+-------------+--------------------+----------+



In [45]:
empresas\
  .select('capital_social')\
  .summary()\
  .show()

  # .summary('count','mean','stddev','min','25%','50%','75%','max')

+-------+--------------------+
|summary|      capital_social|
+-------+--------------------+
|  count|             4494860|
|   mean|   476535.4780253961|
| stddev|2.2475126461613375E8|
|    min|                 0.0|
|    25%|              1000.0|
|    50%|              2200.0|
|    75%|             10000.0|
|    max|        3.3202134E11|
+-------+--------------------+



# **Juções de DataFrame, JOIN**

In [46]:
produtos = spark.createDataFrame(
    [
      ('1','Bebidas','Água mineral'),
      ('2','Limpeza','Sabão em pó'),
      ('3','Frios','Queijo'),
      ('4','Bebidas','Refrigerante'),
      ('5','Pet','Ração para cães')
    ],
      ['id','cat','prod']
)


impostos = spark.createDataFrame(

      [
          ('Bebidas',0.15),
          ('Limpeza',0.05),
          ('Frios',0.065),
          ('Carnes',0.08)
       ],
      ['cat','tax']
)


In [47]:
produtos.toPandas()

Unnamed: 0,id,cat,prod
0,1,Bebidas,Água mineral
1,2,Limpeza,Sabão em pó
2,3,Frios,Queijo
3,4,Bebidas,Refrigerante
4,5,Pet,Ração para cães


In [48]:
impostos.toPandas()

Unnamed: 0,cat,tax
0,Bebidas,0.15
1,Limpeza,0.05
2,Frios,0.065
3,Carnes,0.08


In [49]:
# A cláusula INNER JOIN permite usar um operador de comparação para comparar os valores de colunas provenientes de tabelas associadas.
produtos.join(impostos,'cat',how='inner')\
    .sort('id')\
    .show()

+-------+---+------------+-----+
|    cat| id|        prod|  tax|
+-------+---+------------+-----+
|Bebidas|  1|Água mineral| 0.15|
|Limpeza|  2| Sabão em pó| 0.05|
|  Frios|  3|      Queijo|0.065|
|Bebidas|  4|Refrigerante| 0.15|
+-------+---+------------+-----+



In [50]:
# A cláusula left permite obter não apenas os dados relacionados de duas tabelas, mas também os dados não relacionados encontrados na tabela à esquerda da cláusula JOIN.
produtos.join(impostos,'cat',how='left')\
    .sort('id')\
    .show()

+-------+---+---------------+-----+
|    cat| id|           prod|  tax|
+-------+---+---------------+-----+
|Bebidas|  1|   Água mineral| 0.15|
|Limpeza|  2|    Sabão em pó| 0.05|
|  Frios|  3|         Queijo|0.065|
|Bebidas|  4|   Refrigerante| 0.15|
|    Pet|  5|Ração para cães| null|
+-------+---+---------------+-----+



In [51]:
# Ao contrário do LEFT JOIN, a cláusula RIGHT JOIN ou RIGHT OUTER JOIN retorna todos os dados encontrados na tabela à direita de JOIN. Caso não existam dados associados entre as tabelas à esquerda e à direita de JOIN, serão retornados valores nulos.
produtos.join(impostos,'cat',how='right')\
    .sort('id')\
    .show()

+-------+----+------------+-----+
|    cat|  id|        prod|  tax|
+-------+----+------------+-----+
| Carnes|null|        null| 0.08|
|Bebidas|   1|Água mineral| 0.15|
|Limpeza|   2| Sabão em pó| 0.05|
|  Frios|   3|      Queijo|0.065|
|Bebidas|   4|Refrigerante| 0.15|
+-------+----+------------+-----+



In [52]:
# Todas as linhas de dados da tabela à esquerda de JOIN e da tabela à direita serão retornadas pela cláusula FULL JOIN ou FULL OUTER JOIN.
# Caso uma linha de dados não esteja associada a qualquer linha da outra tabela, os valores das colunas
# a lista de seleção serão nulos. Caso contrário, os valores obtidos serão baseados nas tabelas usadas como referência.
produtos.join(impostos,'cat',how='outer')\
    .sort('id')\
    .show()

+-------+----+---------------+-----+
|    cat|  id|           prod|  tax|
+-------+----+---------------+-----+
| Carnes|null|           null| 0.08|
|Bebidas|   1|   Água mineral| 0.15|
|Limpeza|   2|    Sabão em pó| 0.05|
|  Frios|   3|         Queijo|0.065|
|Bebidas|   4|   Refrigerante| 0.15|
|    Pet|   5|Ração para cães| null|
+-------+----+---------------+-----+



In [53]:
empresas.printSchema()

root
 |-- cnpj: integer (nullable = true)
 |-- razao_social: string (nullable = true)
 |-- natureza_juritica: integer (nullable = true)
 |-- qualificacao_responsavel: integer (nullable = true)
 |-- capital_social: double (nullable = true)
 |-- porte_empresa: integer (nullable = true)
 |-- ente_federativo: string (nullable = true)



In [54]:
estabelecimentos.printSchema()

root
 |-- cnpj: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: string (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- ddd_1: string (null

In [None]:
empresas = empresas.withColumn('data_de_inicio_atividade',f.to_date(empresas.data_de_inicio_atividade.cast(StringType()),'yyyyMMdd'))

In [55]:
empresas_join = estabelecimentos.join(empresas,'cnpj',how='inner')

In [56]:
empresas_join.printSchema()

root
 |-- cnpj: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: string (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- ddd_1: string (null

In [118]:
freq = empresas_join\
    .select(
        'cnpj',
        f.year('data_de_inicio_atividade').alias('data_de_inicio')
    )\
    .where('data_de_inicio >=2010')\
    .groupBy('data_de_inicio')\
    .agg(f.count('cnpj').alias('frequencia'))\
    .orderBy('data_de_inicio',ascending=True)

In [119]:
freq.toPandas()

Unnamed: 0,data_de_inicio,frequencia
0,2010,210
1,2011,135
2,2012,163
3,2013,138
4,2014,114
5,2015,95
6,2016,78
7,2017,77
8,2018,1610
9,2019,22218


In [120]:
# lit --> soma total
freq.union(
    freq.select(
        f.lit('Total').alias('data_de_inicio'),
        f.sum(freq.frequencia).alias('frequencia')
    )
).show()

+--------------+----------+
|data_de_inicio|frequencia|
+--------------+----------+
|          2010|       210|
|          2011|       135|
|          2012|       163|
|          2013|       138|
|          2014|       114|
|          2015|        95|
|          2016|        78|
|          2017|        77|
|          2018|      1610|
|          2019|     22218|
|          2020|      5017|
|          2021|      1069|
|          2022|      1082|
|          2023|       491|
|         Total|     32497|
+--------------+----------+



## Spark SQL

 #spark RDDs vs #DataFrames vs #SparkSQL

In [122]:
empresas.createOrReplaceTempView('empresasView')


In [123]:
spark.sql('SELECT * FROM empresasView').show(5)


+--------+--------------------+-----------------+------------------------+--------------+-------------+---------------+
|    cnpj|        razao_social|natureza_juritica|qualificacao_responsavel|capital_social|porte_empresa|ente_federativo|
+--------+--------------------+-----------------+------------------------+--------------+-------------+---------------+
|32066829|KARINA CRISTINA B...|             2135|                      50|        1000.0|            1|           null|
|32066830|32.066.830 KAROLI...|             2135|                      50|        5000.0|            1|           null|
|32066831|PARTIDO REPUBLICA...|             3271|                      16|           0.0|            5|           null|
|32066832|JOSE ALIPIO GARCI...|             2135|                      50|       10000.0|            1|           null|
|32066833|M&M COMERCIO DE P...|             2062|                      49|       10000.0|            3|           null|
+--------+--------------------+---------

In [63]:
spark\
    .sql("""
        SELECT *
           FROM empresasView
           WHERE  capital_social = 50
        """
    )\
    .show(5)

+--------+--------------------+-----------------+------------------------+--------------+-------------+---------------+
|    cnpj|        razao_social|natureza_juritica|qualificacao_responsavel|capital_social|porte_empresa|ente_federativo|
+--------+--------------------+-----------------+------------------------+--------------+-------------+---------------+
|32066895|GIOVANNA LIMA DA ...|             2135|                      50|          50.0|            1|           null|
|32067442|CATHERINE VICTORI...|             2135|                      50|          50.0|            1|           null|
|32067748|ALEF DOS SANTOS 1...|             2135|                      50|          50.0|            1|           null|
|32068669|ELEONE DE JESUS 4...|             2135|                      50|          50.0|            1|           null|
|32068923|FERNANDO PINHEIRO...|             2135|                      50|          50.0|            1|           null|
+--------+--------------------+---------

In [64]:
spark\
  .sql("""
      SELECT porte_empresa, MEAN(capital_social) AS Media
      FROM empresasVIew
      GROUP BY porte_empresa
  """)\
  .show()




+-------------+------------------+
|porte_empresa|             Media|
+-------------+------------------+
|         null|0.5530973451327433|
|            1| 62552.97445749658|
|            3|190671.42579851445|
|            5| 4210742.862500831|
+-------------+------------------+



In [99]:

empresas_join.createOrReplaceTempView("empresasJoinView")

In [114]:
# a data ta tipo data
empresas_join.printSchema()

root
 |-- cnpj: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: string (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- ddd_1: string (nullabl

In [125]:
freq = spark\
    .sql("""
        SELECT YEAR(data_de_inicio_atividade) AS data_de_inicio, COUNT(cnpj) AS count_cnpj
            FROM empresasJoinView
            WHERE YEAR(data_de_inicio_atividade) >= 2010
            GROUP BY data_de_inicio
            ORDER BY data_de_inicio
    """)

freq\
    .show()



+--------------+----------+
|data_de_inicio|count_cnpj|
+--------------+----------+
|          2010|       210|
|          2011|       135|
|          2012|       163|
|          2013|       138|
|          2014|       114|
|          2015|        95|
|          2016|        78|
|          2017|        77|
|          2018|      1610|
|          2019|     22218|
|          2020|      5017|
|          2021|      1069|
|          2022|      1082|
|          2023|       491|
+--------------+----------+



In [126]:
freq.limit(5).toPandas()

Unnamed: 0,data_de_inicio,count_cnpj
0,2010,210
1,2011,135
2,2012,163
3,2013,138
4,2014,114


In [127]:
freq.createOrReplaceTempView("freqView")

In [128]:
spark.sql("""
      SELECT *
        FROM freqView
      UNION ALL
      SELECT 'Total' AS data_de_inicio, SUM(count_cnpj) AS count
        FROM freqView
  """).show()



+--------------+----------+
|data_de_inicio|count_cnpj|
+--------------+----------+
|          2010|       210|
|          2011|       135|
|          2012|       163|
|          2013|       138|
|          2014|       114|
|          2015|        95|
|          2016|        78|
|          2017|        77|
|          2018|      1610|
|          2019|     22218|
|          2020|      5017|
|          2021|      1069|
|          2022|      1082|
|          2023|       491|
|         Total|     32497|
+--------------+----------+



_____________________________________________________________________________________________________________________

# **Formas de Armazenamento**

Uma forma bastante simples de se ter uma medida de performance é utilizando a magic word %time. Colocando %%time no início de cada célula do notebook você tem um print do tempo gasto para a execução do código daquela célula.

# Salvando Arquivos CSV

In [150]:
%%time
# salvando arquivo em csv

# caminho onde vai ser salvo o arquivo o ultimo crio uma pasta para salvar o arquivo
# dentro dessa pasta o arquivo modificado

empresas.write.csv(path='/content/drive/MyDrive/curso_spark/empresas/salvando_csv',
                   mode='overwrite',
                   sep=';'
                   ,header=True)

# Salvando os Arquivos PARQUET



Apache Parquet é um formato de armazenamento colunar disponível para qualquer projeto no ecossistema Hadoop, independentemente da escolha da estrutura de processamento de dados, modelo de dados ou linguagem de programação.

Tanto o formato Apache Parquet quanto o formato Apache ORC são formatos de armazenamento colunar projetados para serem usados no ecossistema Hadoop. Ambos têm como objetivo otimizar o armazenamento e processamento de dados em cenários de big data.

Ambos os formatos têm características semelhantes, como:

Armazenamento colunar: Ambos os formatos armazenam os dados em colunas, o que permite uma melhor compressão e leitura eficiente de conjuntos específicos de colunas, reduzindo a sobrecarga de leitura de dados desnecessários.

Compactação: Ambos os formatos oferecem técnicas de compactação avançadas para reduzir o espaço de armazenamento necessário.

Suporte ao esquema: Ambos os formatos suportam esquemas ricos e flexíveis, o que significa que você pode armazenar dados com diferentes tipos de dados e estruturas complexas.

Suporte a consultas e leitura eficiente: Ambos os formatos foram projetados para suportar consultas de leitura eficiente, aproveitando o armazenamento colunar e técnicas de compactação.

Embora o Parquet e o ORC tenham semelhanças, eles também têm algumas diferenças em termos de recursos específicos e desempenho em cenários específicos. Em geral, ambos os formatos são muito populares e amplamente utilizados em ecossistemas de big data, como o Hadoop, independentemente da escolha da estrutura de processamento de dados, modelo de dados ou linguagem de programação. A escolha entre Parquet e ORC dependerá das necessidades específicas do projeto e das características de desempenho desejadas.

In [151]:
%%time
# salvando o arquivo no formato parquet
empresas.write.parquet('/content/drive/MyDrive/curso_spark/empresas/salvando_parquet')

# Supondo que você tenha o DataFrame chamado 'df'
# e queira salvar o arquivo no formato Parquet.

# Escreva o DataFrame no formato Parquet
# df.write.format("parquet").save("/caminho/para/seu/arquivo_parquet")

In [136]:
%%time
# lendo arquivos parquet
# Importe a classe necessária
from pyspark.sql import DataFrameWriter

empresas_parquet = spark.read.parquet('/content/drive/MyDrive/curso_spark/empresas/salvando_parquet')

# Salvando os Arquivos ORC


parquet é rápido orc é mais rápido que o parquet



Assim como o PARQUET, ORC é um formato de arquivo colunar. Ele é otimizado para grandes leituras de streaming, mas com suporte integrado para localizar as linhas necessárias rapidamente. O armazenamento de dados em formato colunar permite ler, descompactar e processar apenas os valores necessários para a consulta.

In [152]:
%%time
# Supondo que você tenha o DataFrame chamado 'empresas'
# e queira salvar o arquivo no formato ORC.

# Importe a classe necessária
from pyspark.sql import DataFrameWriter

# Escreva o DataFrame no formato ORC
empresas.write.format("orc").save("/content/drive/MyDrive/curso_spark/empresas/salvando_orc")


CPU times: user 143 ms, sys: 11 ms, total: 154 ms
Wall time: 22.4 s


In [140]:
%%time
# lendo os arquivos ORC

from pyspark.sql import SparkSession

# Inicialize o SparkSession
spark = SparkSession.builder.appName("Exemplo ORC").getOrCreate()

# Leia o arquivo ORC
empresas_orc = spark.read.format("orc").load("/content/drive/MyDrive/curso_spark/empresas/salvando_orc")


# **Particionamento dos Dados**

## Salvando em CSV

In [153]:
%%time
# salvando em partição única em csv
# pois automatico ele salva em varias particões.

empresas.coalesce(1).write.csv(
    path='/content/drive/MyDrive/curso_spark/empresas/salvando_em_particao_unica_csv',
    mode='overwrite',
    sep=';',
    header=True
)

CPU times: user 176 ms, sys: 19.5 ms, total: 195 ms
Wall time: 29.3 s


In [None]:
%%time
# Supondo que você tenha o DataFrame chamado 'empresas'
# salvando em partição personalizada
# e queira salvar o arquivo em formato CSV em partições por 'porte_empresa'

empresas.write.csv(
    path='/content/drive/MyDrive/curso_spark/empresas/salvando_em_particao_csv',
    mode='overwrite',
    header=True,
    partitionBy='porte_empresa'
)


# from pyspark.sql import SparkSession
# from pyspark.sql.functions import col

# # Inicialize o SparkSession
# spark = SparkSession.builder.appName("Exemplo CSV com particionamento").getOrCreate()

# # Supondo que você tenha o DataFrame chamado 'empresas'
# # e queira salvar o arquivo em formato CSV em partições por 'porte_empresa'

# # Primeiro, crie um DataFrame temporário com os dados particionados
# empresas_temp = empresas.withColumn("porte_empresa", col("porte_empresa").cast("string"))

# # Salve o DataFrame em CSV particionado
# empresas_temp.write.partitionBy("porte_empresa")\
#     .csv("/content/drive/MyDrive/curso_spark/empresas/salvando_em_particao_csv", mode="overwrite", header=True)



## Salvando em Parquet

In [155]:
%%time
# Salvando o DataFrame em uma partição única no formato Parquet
empresas.coalesce(1).write.parquet('/content/drive/MyDrive/curso_spark/empresas/salvando_em_particao_unica_parquet', mode='overwrite')


CPU times: user 186 ms, sys: 21.5 ms, total: 208 ms
Wall time: 29.1 s


In [156]:
%%time
# salvando em particao em parquet escolhendo uma coluna como argumento para parametro partitionBy
# salvando em partição personalizada

empresas.write.parquet(
    path='/content/drive/MyDrive/curso_spark/empresas/salvando_em_particao_parquet',
    mode='overwrite',
    partitionBy='porte_empresa'
)

# Se a coluna 'porte_da_empresa' existir no DataFrame, você pode salvá-lo em partições por essa coluna:
# empresas.write.partitionBy('porte_da_empresa').parquet('/content/drive/MyDrive/curso_spark/empresas/salvando_em_particao_parquet', mode='overwrite')

CPU times: user 193 ms, sys: 21.4 ms, total: 214 ms
Wall time: 30.3 s


# Salvando em orc

In [159]:
%%time
# Salvando o DataFrame em uma partição única no formato orc
empresas.coalesce(1).write.orc('/content/drive/MyDrive/curso_spark/empresas/salvando_em_particao_unica_orc', mode='overwrite')


CPU times: user 179 ms, sys: 20.9 ms, total: 200 ms
Wall time: 26.8 s


In [158]:
%%time
# salvando em particao em ORC escolhendo uma coluna como argumento  como parametro partitionBy
# salvando em partição personalizada

empresas.write.orc(
    path='/content/drive/MyDrive/curso_spark/empresas/salvando_em_particao_orc',
    mode='overwrite',
    partitionBy='porte_empresa'
)


# Salvando o DataFrame em partições usando o formato ORC
# empresas.write.partitionBy('porte_empresa').orc('/content/drive/MyDrive/curso_spark/empresas/salvando_em_particao_orc', mode='overwrite')


CPU times: user 173 ms, sys: 11.7 ms, total: 185 ms
Wall time: 25.5 s


In [160]:
spark.stop()