<a href="https://colab.research.google.com/github/hugopoggi/pyspark_senai/blob/main/spark_24_n2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [67]:
# 31/07/2024
# Criando ambiente Spark

# Instalando biblioteca pyspark
!pip install pyspark



In [68]:
# Importando funções e criando o objeto spark:
from pyspark.sql import SparkSession as ss
from pyspark.sql import functions as f
# Importando função para converter dados
from pyspark.sql.types import DoubleType

In [69]:
spark = ss.builder.appName("SENAI").getOrCreate()

In [70]:
spark

In [71]:
# Criando um data frame manualmente
dados = [
    ("joao", "analista", 5000),
    ("isabela", "analista", 5280),
    ("julia", "assistente", 2800),
    ("kleber", "auxiliar", 2200)
]

In [72]:
colunas = ["func", "cargo", "salario"]

In [73]:
# Criando Data Frame (estrutura de linhas x colunas):
df = spark.createDataFrame(dados, colunas)

In [74]:
# Explorando Data Frame:
# Verificando nome das colunas e seus tipos:
df.printSchema()

root
 |-- func: string (nullable = true)
 |-- cargo: string (nullable = true)
 |-- salario: long (nullable = true)



In [75]:
# Exibindo dados do data frame.
df.show()

+-------+----------+-------+
|   func|     cargo|salario|
+-------+----------+-------+
|   joao|  analista|   5000|
|isabela|  analista|   5280|
|  julia|assistente|   2800|
| kleber|  auxiliar|   2200|
+-------+----------+-------+



In [76]:
df.toPandas()

Unnamed: 0,func,cargo,salario
0,joao,analista,5000
1,isabela,analista,5280
2,julia,assistente,2800
3,kleber,auxiliar,2200


In [77]:
# Unindo dois DataFrames
dados2 = [
    ("ana", "auxiliar", 2490),
    ("rogerio", "auxiliar", 9800),
    ("roberto", "diretor", 32000)
]

In [78]:
df2 = spark.createDataFrame(dados2, colunas)

In [79]:
# Unindo data frames:
df_total = df.union(df2)

In [80]:
df_total.show()

+-------+----------+-------+
|   func|     cargo|salario|
+-------+----------+-------+
|   joao|  analista|   5000|
|isabela|  analista|   5280|
|  julia|assistente|   2800|
| kleber|  auxiliar|   2200|
|    ana|  auxiliar|   2490|
|rogerio|  auxiliar|   9800|
|roberto|   diretor|  32000|
+-------+----------+-------+



In [81]:
df = df_total

In [82]:
df.show()

+-------+----------+-------+
|   func|     cargo|salario|
+-------+----------+-------+
|   joao|  analista|   5000|
|isabela|  analista|   5280|
|  julia|assistente|   2800|
| kleber|  auxiliar|   2200|
|    ana|  auxiliar|   2490|
|rogerio|  auxiliar|   9800|
|roberto|   diretor|  32000|
+-------+----------+-------+



In [83]:
df.toPandas()

Unnamed: 0,func,cargo,salario
0,joao,analista,5000
1,isabela,analista,5280
2,julia,assistente,2800
3,kleber,auxiliar,2200
4,ana,auxiliar,2490
5,rogerio,auxiliar,9800
6,roberto,diretor,32000


In [84]:
# Filtrando valores:
# Motrar somente 'analistas' do DataFrame:
df.where('cargo =="analista"').show()

+-------+--------+-------+
|   func|   cargo|salario|
+-------+--------+-------+
|   joao|analista|   5000|
|isabela|analista|   5280|
+-------+--------+-------+



In [85]:
 # Filtrando valores:
# Motrar somente 'analistas' que ganham mais de 5000 do DataFrame:
df.where('cargo =="analista" and salario > 5000').show()

+-------+--------+-------+
|   func|   cargo|salario|
+-------+--------+-------+
|isabela|analista|   5280|
+-------+--------+-------+



In [86]:
df.show()

+-------+----------+-------+
|   func|     cargo|salario|
+-------+----------+-------+
|   joao|  analista|   5000|
|isabela|  analista|   5280|
|  julia|assistente|   2800|
| kleber|  auxiliar|   2200|
|    ana|  auxiliar|   2490|
|rogerio|  auxiliar|   9800|
|roberto|   diretor|  32000|
+-------+----------+-------+



In [87]:
# Criando ou alterando(mesmo nome altera valor da msm colula nome
# diferente cria coluna nova) novas colunas no pyspark:
# Visualizando alteração:
df.withColumn("novo_salario", df.salario * 1.07).show()

+-------+----------+-------+------------+
|   func|     cargo|salario|novo_salario|
+-------+----------+-------+------------+
|   joao|  analista|   5000|      5350.0|
|isabela|  analista|   5280|      5649.6|
|  julia|assistente|   2800|      2996.0|
| kleber|  auxiliar|   2200|      2354.0|
|    ana|  auxiliar|   2490|      2664.3|
|rogerio|  auxiliar|   9800|     10486.0|
|roberto|   diretor|  32000|     34240.0|
+-------+----------+-------+------------+



In [88]:
# Efetivando alteração no data frame
df = df.withColumn("novo_salario", df.salario * 1.07)

In [89]:
df.show()

+-------+----------+-------+------------+
|   func|     cargo|salario|novo_salario|
+-------+----------+-------+------------+
|   joao|  analista|   5000|      5350.0|
|isabela|  analista|   5280|      5649.6|
|  julia|assistente|   2800|      2996.0|
| kleber|  auxiliar|   2200|      2354.0|
|    ana|  auxiliar|   2490|      2664.3|
|rogerio|  auxiliar|   9800|     10486.0|
|roberto|   diretor|  32000|     34240.0|
+-------+----------+-------+------------+



In [90]:
# Criando DataFrame via importação de dados / arquivos:
url = "/content/drive/MyDrive/BasesSpark/empresas/part*"

empresas = spark.read.csv(
    url,
    sep= ";",
    header= True,
    # InferSchema "tenta" entender o tipo de dado e trazer em seu formato
    inferSchema= True
)

In [91]:
# Numero de linhas e trunca os dados falsos
empresas.show(20, False)

+-----+--------------------------------------------------------------------------------------------+-----------------+------------------------+--------------+-----+---------------+
|cnpj |razao_social                                                                                |natureza_juridica|qualificacao_responsavel|capital_social|porte|ente_federativo|
+-----+--------------------------------------------------------------------------------------------+-----------------+------------------------+--------------+-----+---------------+
|306  |FRANCAMAR REFRIGERACAO TECNICA S/C LTDA                                                     |2240             |49                      |0,00          |1    |NULL           |
|1355 |BRASILEIRO & OLIVEIRA LTDA                                                                  |2062             |49                      |0,00          |5    |NULL           |
|4820 |REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E TABELIONATO E REGISTRO DE CONSTRATOS MARIT

In [92]:
# Verificando quantos registros tem no dataframe:
empresas.count()

4585679

In [93]:
empresas.printSchema()

root
 |-- cnpj: integer (nullable = true)
 |-- razao_social: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_responsavel: integer (nullable = true)
 |-- capital_social: string (nullable = true)
 |-- porte: integer (nullable = true)
 |-- ente_federativo: string (nullable = true)



In [94]:
# Convertendo uma coluna de string para decimal:
# Antes de converter precisamos substituir a virgula pelo ponto.

empresas.withColumn("capital_social", f.regexp_replace("capital_social", ",", ".")).show()

+-----+--------------------+-----------------+------------------------+--------------+-----+---------------+
| cnpj|        razao_social|natureza_juridica|qualificacao_responsavel|capital_social|porte|ente_federativo|
+-----+--------------------+-----------------+------------------------+--------------+-----+---------------+
|  306|FRANCAMAR REFRIGE...|             2240|                      49|          0.00|    1|           NULL|
| 1355|BRASILEIRO & OLIV...|             2062|                      49|          0.00|    5|           NULL|
| 4820|REGISTRO DE IMOVE...|             3034|                      32|          0.00|    5|           NULL|
| 5347|ROSELY APARECIDA ...|             2135|                      50|          0.00|    5|           NULL|
| 6846|BADU E FILHOS TEC...|             2062|                      49|       4000.00|    1|           NULL|
| 8416|  ELETRICA RUBI LTDA|             2062|                      49|          0.00|    5|           NULL|
| 8992|SHIROMA VEIC

In [95]:
empresas = empresas.withColumn("capital_social", f.regexp_replace("capital_social", ",", "."))

In [96]:
empresas.show()

+-----+--------------------+-----------------+------------------------+--------------+-----+---------------+
| cnpj|        razao_social|natureza_juridica|qualificacao_responsavel|capital_social|porte|ente_federativo|
+-----+--------------------+-----------------+------------------------+--------------+-----+---------------+
|  306|FRANCAMAR REFRIGE...|             2240|                      49|          0.00|    1|           NULL|
| 1355|BRASILEIRO & OLIV...|             2062|                      49|          0.00|    5|           NULL|
| 4820|REGISTRO DE IMOVE...|             3034|                      32|          0.00|    5|           NULL|
| 5347|ROSELY APARECIDA ...|             2135|                      50|          0.00|    5|           NULL|
| 6846|BADU E FILHOS TEC...|             2062|                      49|       4000.00|    1|           NULL|
| 8416|  ELETRICA RUBI LTDA|             2062|                      49|          0.00|    5|           NULL|
| 8992|SHIROMA VEIC

In [97]:
# Convertendo a coluna "capital_social" para float (DoubleType):
empresas.withColumn("capital_social",
                    empresas.capital_social.cast(DoubleType())
                    ).show()

+-----+--------------------+-----------------+------------------------+--------------+-----+---------------+
| cnpj|        razao_social|natureza_juridica|qualificacao_responsavel|capital_social|porte|ente_federativo|
+-----+--------------------+-----------------+------------------------+--------------+-----+---------------+
|  306|FRANCAMAR REFRIGE...|             2240|                      49|           0.0|    1|           NULL|
| 1355|BRASILEIRO & OLIV...|             2062|                      49|           0.0|    5|           NULL|
| 4820|REGISTRO DE IMOVE...|             3034|                      32|           0.0|    5|           NULL|
| 5347|ROSELY APARECIDA ...|             2135|                      50|           0.0|    5|           NULL|
| 6846|BADU E FILHOS TEC...|             2062|                      49|        4000.0|    1|           NULL|
| 8416|  ELETRICA RUBI LTDA|             2062|                      49|           0.0|    5|           NULL|
| 8992|SHIROMA VEIC

In [98]:
empresas = empresas.withColumn("capital_social",
                    empresas.capital_social.cast(DoubleType())
                    )

In [99]:
empresas.show()

+-----+--------------------+-----------------+------------------------+--------------+-----+---------------+
| cnpj|        razao_social|natureza_juridica|qualificacao_responsavel|capital_social|porte|ente_federativo|
+-----+--------------------+-----------------+------------------------+--------------+-----+---------------+
|  306|FRANCAMAR REFRIGE...|             2240|                      49|           0.0|    1|           NULL|
| 1355|BRASILEIRO & OLIV...|             2062|                      49|           0.0|    5|           NULL|
| 4820|REGISTRO DE IMOVE...|             3034|                      32|           0.0|    5|           NULL|
| 5347|ROSELY APARECIDA ...|             2135|                      50|           0.0|    5|           NULL|
| 6846|BADU E FILHOS TEC...|             2062|                      49|        4000.0|    1|           NULL|
| 8416|  ELETRICA RUBI LTDA|             2062|                      49|           0.0|    5|           NULL|
| 8992|SHIROMA VEIC

In [100]:
empresas.printSchema()

root
 |-- cnpj: integer (nullable = true)
 |-- razao_social: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_responsavel: integer (nullable = true)
 |-- capital_social: double (nullable = true)
 |-- porte: integer (nullable = true)
 |-- ente_federativo: string (nullable = true)



In [101]:
empresas.limit(30).toPandas()

Unnamed: 0,cnpj,razao_social,natureza_juridica,qualificacao_responsavel,capital_social,porte,ente_federativo
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,
5,8416,ELETRICA RUBI LTDA,2062,49,0.0,5,
6,8992,SHIROMA VEICULOS LTDA.,2062,49,0.0,5,
7,9091,CONTATOS BAR E LANCHONETE LTDA,2062,49,0.0,5,
8,9614,ANTONIA APARECIDA DE SOUZA ULIANA,2135,50,0.0,5,
9,9896,DORACY CORAT DA COSTA,2135,50,0.0,5,


In [102]:
# Entendendo coluna "ente_federativo":
empresas\
        .where("ente_federativo is not null")\
        .show(20, False)

+--------+----------------------------------------------------------------------------------------------+-----------------+------------------------+--------------+-----+---------------------------+
|cnpj    |razao_social                                                                                  |natureza_juridica|qualificacao_responsavel|capital_social|porte|ente_federativo            |
+--------+----------------------------------------------------------------------------------------------+-----------------+------------------------+--------------+-----+---------------------------+
|784274  |SECRETARIA MUNICIPAL DE SAUDE                                                                 |1031             |5                       |0.0           |5    |PARATINGA - BA             |
|2030715 |AGENCIA NACIONAL DE TELECOMUNICACOES                                                          |1104             |16                      |0.0           |5    |UNI�O                      |
|3104818 |

In [103]:
# Padronizando dados nulos da coluna "ente_federativo"
empresas.na.fill({"ente_federativo":"N/A"}).show()

+-----+--------------------+-----------------+------------------------+--------------+-----+---------------+
| cnpj|        razao_social|natureza_juridica|qualificacao_responsavel|capital_social|porte|ente_federativo|
+-----+--------------------+-----------------+------------------------+--------------+-----+---------------+
|  306|FRANCAMAR REFRIGE...|             2240|                      49|           0.0|    1|            N/A|
| 1355|BRASILEIRO & OLIV...|             2062|                      49|           0.0|    5|            N/A|
| 4820|REGISTRO DE IMOVE...|             3034|                      32|           0.0|    5|            N/A|
| 5347|ROSELY APARECIDA ...|             2135|                      50|           0.0|    5|            N/A|
| 6846|BADU E FILHOS TEC...|             2062|                      49|        4000.0|    1|            N/A|
| 8416|  ELETRICA RUBI LTDA|             2062|                      49|           0.0|    5|            N/A|
| 8992|SHIROMA VEIC

In [104]:
empresas = empresas.na.fill({"ente_federativo":"N/A"})

In [105]:
# Verifique quantas empresas tem capital_social igual ou menor a zero:

empresas\
        .where("capital_social < 0").count()

0

In [106]:
empresas\
        .where("capital_social == 0")\
        .count()

1691023

In [107]:
# Criando um novo Data Frame somente com capital social maior que zero:
capitalizadas = empresas\
        .where("capital_social > 0")


In [108]:
capitalizadas.show(30, False)

+------+-------------------------------------------------------------------+-----------------+------------------------+--------------+-----+---------------+
|cnpj  |razao_social                                                       |natureza_juridica|qualificacao_responsavel|capital_social|porte|ente_federativo|
+------+-------------------------------------------------------------------+-----------------+------------------------+--------------+-----+---------------+
|6846  |BADU E FILHOS TECIDOS LTDA                                         |2062             |49                      |4000.0        |1    |N/A            |
|24205 |SUELY LEME MARI ANI 25086572800                                    |2135             |50                      |1000.0        |1    |N/A            |
|58970 |TOTAL CAR VEICULOS LTDA                                            |2062             |49                      |20000.0       |1    |N/A            |
|70273 |VANDA DA SILVA LANCHONETE                         

In [109]:
# Pesquisando strings:
# Mostre em tela os 20 primeiros registros de empresas que contenham a palavra
# restaurante
capitalizadas\
              .where("razao_social like '%RESTAURANTE%'")\
              .show(20, False)

+--------+------------------------------------------------------+-----------------+------------------------+--------------+-----+---------------+
|cnpj    |razao_social                                          |natureza_juridica|qualificacao_responsavel|capital_social|porte|ente_federativo|
+--------+------------------------------------------------------+-----------------+------------------------+--------------+-----+---------------+
|2591875 |BAR E RESTAURANTE CASA DA QUINTA LTDA                 |2062             |49                      |5000.0        |1    |N/A            |
|3059084 |DON MUGO RESTAURANTE LTDA                             |2062             |49                      |10000.0       |1    |N/A            |
|3396661 |GERACAO DE OURO - BAR E RESTAURANTE LTDA              |2062             |5                       |40000.0       |5    |N/A            |
|5930238 |PSTJ RESTAURANTE LTDA                                 |2062             |49                      |30000.0       |1

In [110]:
# Trabalhando com dados estatisticos basico:
# Crie uma visão estatistica geral contendo: contagem, média. mediana, soma e
# Desvio padrão da coluna capital_social.

capitalizadas.agg(
          f.count("*").alias("quantidade"),
          f.mean("capital_social").alias("media_investimento"),
          f.median("capital_social").alias("mediana_investimento"),
          f.sum("capital_social").alias("total_investido"),
          f.stddev("capital_social").alias("desvio_padrao")
          ).show()



+----------+------------------+--------------------+--------------------+-------------------+
|quantidade|media_investimento|mediana_investimento|     total_investido|      desvio_padrao|
+----------+------------------+--------------------+--------------------+-------------------+
|   2894656| 797946.8062905608|              5000.0|2.309781510509809...|2.658089851944688E8|
+----------+------------------+--------------------+--------------------+-------------------+



In [111]:
capitalizadas.agg(
          f.format_number(f.count("*"),2).alias("quantidade"),
          f.format_number(f.mean("capital_social"),2).alias("media_investimento"),
          f.format_number(f.median("capital_social"),2).alias("mediana_investimento"),
          f.format_number(f.sum("capital_social"),2).alias("total_investido"),
          f.format_number(f.stddev("capital_social"),2).alias("desvio_padrao")
          ).toPandas()

Unnamed: 0,quantidade,media_investimento,mediana_investimento,total_investido,desvio_padrao
0,2894656.0,797946.81,5000.0,2309781510509.81,265808985.19


In [112]:
# Mostre as mesmas informações acima mas agrupando por porte de empresa
empresas.show()

+-----+--------------------+-----------------+------------------------+--------------+-----+---------------+
| cnpj|        razao_social|natureza_juridica|qualificacao_responsavel|capital_social|porte|ente_federativo|
+-----+--------------------+-----------------+------------------------+--------------+-----+---------------+
|  306|FRANCAMAR REFRIGE...|             2240|                      49|           0.0|    1|            N/A|
| 1355|BRASILEIRO & OLIV...|             2062|                      49|           0.0|    5|            N/A|
| 4820|REGISTRO DE IMOVE...|             3034|                      32|           0.0|    5|            N/A|
| 5347|ROSELY APARECIDA ...|             2135|                      50|           0.0|    5|            N/A|
| 6846|BADU E FILHOS TEC...|             2062|                      49|        4000.0|    1|            N/A|
| 8416|  ELETRICA RUBI LTDA|             2062|                      49|           0.0|    5|            N/A|
| 8992|SHIROMA VEIC

In [113]:
capitalizadas.groupby("porte").agg(
          f.format_number(f.count("*"),2).alias("quantidade"),
          f.format_number(f.mean("capital_social"),2).alias("media_investimento"),
          f.format_number(f.median("capital_social"),2).alias("mediana_investimento"),
          f.format_number(f.sum("capital_social"),2).alias("total_investido"),
          f.format_number(f.stddev("capital_social"),2).alias("desvio_padrao")
          ).toPandas()

Unnamed: 0,porte,quantidade,media_investimento,mediana_investimento,total_investido,desvio_padrao
0,,1.0,50000.0,50000.0,50000.0,
1,1.0,2671059.0,398290.53,5000.0,1063857513944.56,222606539.8
2,3.0,102894.0,2910839.84,67800.0,299507954553.49,790882852.95
3,5.0,120702.0,7840930.49,50000.0,946415992011.76,254102444.49


In [114]:
# Altere a coluna "porte" para seu descritivo
# Consultar dicionário de dados:
capitalizadas.withColumn("porte",
                         f.when(capitalizadas.porte == 0, "NAO INFORMADO")\
                         .when(capitalizadas.porte == 1, "MICRO EMPRESA")\
                         .when(capitalizadas.porte == 3, "PEQUENO PORTE")\
                         .otherwise("OUTRAS")
                         ).show()

+------+--------------------+-----------------+------------------------+--------------+-------------+---------------+
|  cnpj|        razao_social|natureza_juridica|qualificacao_responsavel|capital_social|        porte|ente_federativo|
+------+--------------------+-----------------+------------------------+--------------+-------------+---------------+
|  6846|BADU E FILHOS TEC...|             2062|                      49|        4000.0|MICRO EMPRESA|            N/A|
| 24205|SUELY LEME MARI A...|             2135|                      50|        1000.0|MICRO EMPRESA|            N/A|
| 58970|TOTAL CAR VEICULO...|             2062|                      49|       20000.0|MICRO EMPRESA|            N/A|
| 70273|VANDA DA SILVA LA...|             2135|                      50|       15000.0|MICRO EMPRESA|            N/A|
| 74218|   V G SILVA TECIDOS|             2135|                      50|       16000.0|MICRO EMPRESA|            N/A|
|116791| AGNALDO ROCHA VIANA|             2135|         

In [115]:
capitalizadas = capitalizadas.withColumn("porte",
                         f.when(capitalizadas.porte == 0, "NAO INFORMADO")\
                         .when(capitalizadas.porte == 1, "MICRO EMPRESA")\
                         .when(capitalizadas.porte == 3, "PEQUENO PORTE")\
                         .otherwise("OUTRAS")
                         )

In [116]:
capitalizadas.groupby("porte").agg(
          f.format_number(f.count("*"),2).alias("quantidade"),
          f.format_number(f.mean("capital_social"),2).alias("media_investimento"),
          f.format_number(f.median("capital_social"),2).alias("mediana_investimento"),
          f.format_number(f.sum("capital_social"),2).alias("total_investido"),
          f.format_number(f.stddev("capital_social"),2).alias("desvio_padrao")
          ).orderBy("porte").toPandas()

Unnamed: 0,porte,quantidade,media_investimento,mediana_investimento,total_investido,desvio_padrao
0,MICRO EMPRESA,2671059.0,398290.53,5000.0,1063857513944.56,222606539.8
1,OUTRAS,120703.0,7840865.94,50000.0,946416042011.76,254101392.88
2,PEQUENO PORTE,102894.0,2910839.84,67800.0,299507954553.49,790882852.95


In [117]:
# Altere a coluna "qualificação_responsavel" para "responsavel" e a coluna:
# "ente_federativo" para "UF":
capitalizadas.withColumnsRenamed({
    "qualificacao_responsavel":"responsavel",
    "ente_federativo":"UF"
}).show()



+------+--------------------+-----------------+-----------+--------------+-------------+---+
|  cnpj|        razao_social|natureza_juridica|responsavel|capital_social|        porte| UF|
+------+--------------------+-----------------+-----------+--------------+-------------+---+
|  6846|BADU E FILHOS TEC...|             2062|         49|        4000.0|MICRO EMPRESA|N/A|
| 24205|SUELY LEME MARI A...|             2135|         50|        1000.0|MICRO EMPRESA|N/A|
| 58970|TOTAL CAR VEICULO...|             2062|         49|       20000.0|MICRO EMPRESA|N/A|
| 70273|VANDA DA SILVA LA...|             2135|         50|       15000.0|MICRO EMPRESA|N/A|
| 74218|   V G SILVA TECIDOS|             2135|         50|       16000.0|MICRO EMPRESA|N/A|
|116791| AGNALDO ROCHA VIANA|             2135|         50|       60000.0|MICRO EMPRESA|N/A|
|122983|      ALOISIO OCZUST|             2135|         50|       10000.0|MICRO EMPRESA|N/A|
|164903|SUPERMERCADO DALT...|             2062|         49|       1000

In [118]:
capitalizadas = capitalizadas.withColumnsRenamed({
    "qualificacao_responsavel":"responsavel",
    "ente_federativo":"UF"
})

In [119]:
capitalizadas.show()

+------+--------------------+-----------------+-----------+--------------+-------------+---+
|  cnpj|        razao_social|natureza_juridica|responsavel|capital_social|        porte| UF|
+------+--------------------+-----------------+-----------+--------------+-------------+---+
|  6846|BADU E FILHOS TEC...|             2062|         49|        4000.0|MICRO EMPRESA|N/A|
| 24205|SUELY LEME MARI A...|             2135|         50|        1000.0|MICRO EMPRESA|N/A|
| 58970|TOTAL CAR VEICULO...|             2062|         49|       20000.0|MICRO EMPRESA|N/A|
| 70273|VANDA DA SILVA LA...|             2135|         50|       15000.0|MICRO EMPRESA|N/A|
| 74218|   V G SILVA TECIDOS|             2135|         50|       16000.0|MICRO EMPRESA|N/A|
|116791| AGNALDO ROCHA VIANA|             2135|         50|       60000.0|MICRO EMPRESA|N/A|
|122983|      ALOISIO OCZUST|             2135|         50|       10000.0|MICRO EMPRESA|N/A|
|164903|SUPERMERCADO DALT...|             2062|         49|       1000

In [121]:
# 05/08/2024
# Importando dataset de estabelecimentos
url = "/content/drive/MyDrive/BasesSpark/estabelecimentos/part*"

estabs = spark.read.csv(
    url,
    header=True,
    inferSchema=True,
    sep=";"
)

In [124]:
estabs.show(15, False)

+-----------+----------+-------+---------------------------+-----------------------------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+---------------------------------------+------------------+----------------------+------+-------------------+------------------+--------+---+---------+-----+----------+-----+----------+----------+--------+----------------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|nome_fantasia                      |situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria                 |tipo_de_logradouro|logradouro            |numero|complemento        |bairro            |cep     |uf |municipio|ddd_1|telefone_1|ddd_2|telefone_2|ddd_do_fax|fax     |correio_eletronico          

In [125]:
estabs.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str

In [129]:
# Convertendo coluna data_de_inicio_atividade para o formato date

estabs.withColumn("data_de_inicio_atividade",
                  f.to_date("data_de_inicio_atividade", "yyyyMMdd")).show()

+-----------+----------+-------+---------------------------+--------------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+--------------------+------+-------------------+------------------+--------+---+---------+-----+----------+-----+----------+----------+--------+--------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|       nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|          logradouro|numero|        complemento|            bairro|     cep| uf|municipio|ddd_1|telefone_1|ddd_2|telefone_2|ddd_do_fax|     fax|  correio_eletronico|situacao_especial|data_da_situacao_especial|
+-----------+----------+-------+------

In [130]:
# Aplicando atualização
estabs = estabs.withColumn("data_de_inicio_atividade",
                  f.to_date("data_de_inicio_atividade", "yyyyMMdd"))

In [131]:
# Renomeando colunas
estabs.withColumnsRenamed({
    "data_de_inicio_atividade": "data_inicio"
}).show()

+-----------+----------+-------+---------------------------+--------------------+------------------+-----------------------+-------------------------+--------------------------+----+-----------+---------------------+----------------------+------------------+--------------------+------+-------------------+------------------+--------+---+---------+-----+----------+-----+----------+----------+--------+--------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|       nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_inicio|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|          logradouro|numero|        complemento|            bairro|     cep| uf|municipio|ddd_1|telefone_1|ddd_2|telefone_2|ddd_do_fax|     fax|  correio_eletronico|situacao_especial|data_da_situacao_especial|
+-----------+----------+-------+---------------------------+----

In [132]:
estabs = estabs.withColumnsRenamed({
    "data_de_inicio_atividade": "data_inicio"
})

In [133]:
# Criando views dos data frames:
empresas.createOrReplaceTempView("v_empresas")
estabs.createOrReplaceTempView("v_estabs")

In [141]:
# Usando comandos SQL nativos
# Mostre todas as empresas que tiveram seu inicio de atividade antes de 1940
spark.sql(
"""
SELECT A.CNPJ_BASICO AS CNPJ, A.NOME_FANTASIA AS NOME,
       A.DATA_INICIO AS FUNDACAO, A.UF AS ESTADO
FROM V_ESTABS A
WHERE
  A.DATA_INICIO < '1940-01-01' AND
  A.NOME_FANTASIA IS NOT NULL
ORDER BY NOME
"""
).show(30, False)

22

In [142]:
# Criando um data frame final com a junção das duas views:
spark.sql(
"""
SELECT *
FROM V_EMPRESAS A
INNER JOIN V_ESTABS B ON B.CNPJ_BASICO = A.CNPJ
"""
).show(10, False)

+----+--------------------+-----------------+------------------------+--------------+-----+---------------+-----------+----------+-------+---------------------------+--------------------+------------------+-----------------------+-------------------------+--------------------------+----+-----------+---------------------+----------------------+------------------+--------------------+------+--------------------+------------------+--------+---+---------+-----+----------+-----+----------+----------+-------+--------------------+-----------------+-------------------------+
|cnpj|        razao_social|natureza_juridica|qualificacao_responsavel|capital_social|porte|ente_federativo|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|       nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_inicio|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|          logradouro|numero|         complemento|            b

In [143]:
# Criando um data frame final com a junção das duas views:
emp_estabs = spark.sql(
"""
SELECT *
FROM V_EMPRESAS A
INNER JOIN V_ESTABS B ON B.CNPJ_BASICO = A.CNPJ
"""
)

In [144]:
emp_estabs.count()

4836221

In [145]:
emp_estabs.createOrReplaceTempView("v_emp_estabs")

In [152]:
# Mostre a média de investimento por estado:
spark.sql(
"""
SELECT A.UF AS ESTADO, AVG(A.CAPITAL_SOCIAL) AS MEDIA_INVESTIMENTO
FROM V_EMP_ESTABS A
WHERE
  A.CAPITAL_SOCIAL > 0
GROUP BY A.UF
ORDER BY MEDIA_INVESTIMENTO DESC
"""
).toPandas()

Unnamed: 0,ESTADO,MEDIA_INVESTIMENTO
0,EX,588596800.0
1,PB,31601040.0
2,RS,23087900.0
3,PE,14466430.0
4,AL,12860690.0
5,RO,12498840.0
6,ES,11188650.0
7,MA,8671443.0
8,RR,8632789.0
9,SP,8475511.0


In [153]:
# Salvando dataframes / views em um arquivo.orc (arquivo colunar)
url = "/content/drive/MyDrive/BasesSpark/ORC"
emp_estabs.write.orc(
    url,
    mode = "overwrite"
)

In [154]:
# Importando arquivo orc:
url = "/content/drive/MyDrive/BasesSpark/ORC/part*"
orc = spark.read.orc(url)

In [155]:
# Criando a view
orc.createOrReplaceTempView("v_orc")


In [156]:
# Mostre a média de investimento por estado.
# SP | 1000000
# RJ | 5000000
spark.sql(
    """
SELECT A.UF AS ESTADO, AVG(A.CAPITAL_SOCIAL) AS MEDIA_INVESTIMENTO
FROM V_ORC A
WHERE
  A.CAPITAL_SOCIAL > 0
GROUP BY A.UF
ORDER BY MEDIA_INVESTIMENTO DESC
"""
).toPandas()


Unnamed: 0,ESTADO,MEDIA_INVESTIMENTO
0,EX,588596800.0
1,PB,31601040.0
2,RS,23087900.0
3,PE,14466430.0
4,AL,12860690.0
5,RO,12498840.0
6,ES,11188650.0
7,MA,8671443.0
8,RR,8632789.0
9,SP,8475511.0
