# üìä An√°lise de Resultados ‚Äî PySpark + Delta Lake

Este notebook tem como objetivo realizar an√°lises explorat√≥rias e agrega√ß√µes a partir dos dados estruturados na camada **Trusted** do Data Warehouse, utilizando **PySpark** em conjunto com o **Delta Lake**. Ser√£o exploradas informa√ß√µes provenientes de tabelas fato e dimens√µes, permitindo insights sobre o perfil das empresas, distribui√ß√£o geogr√°fica, porte, natureza jur√≠dica, entre outros aspectos relevantes.

## Configura√ß√µes e importes

In [158]:
# Para iniciar a se√ß√£o spark
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DecimalType
from pyspark.sql import functions as f
from datetime import datetime
from delta import configure_spark_with_delta_pip
from delta.tables import DeltaTable

In [159]:
builder = SparkSession.builder \
    .appName("App analise de dados") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g")

spark: SparkSession = configure_spark_with_delta_pip(builder).getOrCreate()

spark

## MERGE

Tabelas para merge: 

- cnpj_base & cnpj_basico & empresa_cnpj_basico
- codigo_municipio & id_municipio
- cod_natureza_juridica & codigo_natureza_juridica

Iniciar deltaTable

In [None]:
try:    
    deltaTable: DeltaTable = DeltaTable.forPath(spark, "../TRS/CONSOLIDADO/01_TABELA_FATO")
    df: DataFrame = deltaTable.toDF()
    df.show(5, truncate=False)
    df.printSchema()
except Exception as e:
    print(f"Erro na leitura: {e}")

                                                                                

+--------------+---------+----------+-------+-------------+------------------------------------------+------------------+-----------------------+-------------------------+--------------------+-----------+---------------------+-----------+---------------------------------------------------------------------------------------------------------------+---------------+-----------------------------+------+---------------+--------+--------+---+----------------+----+---------+----+---------+-------+--------+-------------------------+-----------------+----------------------+-----------+---------------------------------------------------------------------------------------+------------+----------------+-------------------+------------+---------------------+-------------------------------+--------------+--------------------+---------------------------+------------------------+-----------------+-----------+-------------+------------------+---------------------+---------+--------------+------------

In [123]:
# Realizando um UPINSERT
try:
    deltaTable.alias("target") \
        .merge(
            df.alias("source"),
            condition="target.cnpj_completo = source.cnpj_completo"
    ) \
        .whenMatchedUpdate(
            set={
                "cnpj_completo": "source.cnpj_completo",
                "cod_natureza_juridica": "source.codigo_natureza_juridica",
            }
    ).whenNotMatchedInsert(
            values={
                "cnpj_completo": "source.cnpj_completo",
                "cod_natureza_juridica": "source.codigo_natureza_juridica",
            }
    ).execute()

    print("Merge efetuado!")
except Exception as e:
    print(f"Erro ao efetuar merge: {e}")

                                                                                

Merge efetuado!


In [None]:
def atualizar_delta_table():
    try:
        df.write.format("delta") \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .partitionBy("uf") \
            .save("../CONSOLIDADO/01_TABELA_FATO")
        print("Dados gravados!")
    except Exception as e:
        print(f"Erro ao gravar novos dados: {e}")

# atualizar_delta_table()

## üìä An√°lise de Resultados

### Iniciar DeltaSpark

In [152]:
try:    
    deltaTable: DeltaTable = DeltaTable.forPath(spark, "../TRS/CONSOLIDADO/01_TABELA_FATO")
    df: DataFrame = deltaTable.toDF()
    df.show(5, truncate=False)
    df.printSchema()
except Exception as e:
    print(f"Erro na leitura: {e}")

+--------------+---------+----------+-------+-------------+---------------+------------------+-----------------------+-------------------------+--------------------+-----------+---------------------+-----------+-----------------------------------------------+---------------+-------------------------------+------+--------------------------+-----------------------+--------+---+----------------+----+---------+----+---------+-------+----+----------------------------+-----------------+----------------------+-----------+--------------------------------------------------------------------------------------------+------------+-----------+-------------------+------------+---------------------+-------------------------------+--------------+--------------------+---------------------------+------------------------+-----------------+-----------+-------------+------------------+---------------------+---------+--------------+-----------------+
|cnpj_completo |cnpj_base|cnpj_ordem|cnpj_dv|matriz_filia

### Vis√£o Geral

In [157]:
df.select(
    df["cnpj_completo"].alias("CNPJ"),
    df["nome_fantasia"].alias("NOME FANTASIA"),
    df["situacao_cadastral"].alias("SITUA√á√ÉO CADASTRAL"),
    f.date_format(df["data_situacao_cadastral"], "dd/MM/yyyy").alias("DATA SITUA√á√ÉO CADASTRAL"),
    df["motivo_situacao_cadastral"].alias("SITUA√á√ÉO CADASTRAL MOTIVO"),
    df["nome_cidade_exterior"].alias("CIDADE EXTERIOR"),
    df["codigo_pais"].alias("C√ìDIGO DO PA√çS"),
    f.date_format(df["data_inicio_atividade"], "dd/MM/yyyy").alias("DATA IN√çCIO ATIVIDADE"),
    df["cnae_fiscal"].alias("CNAE PRINCIPAL"),
    df["cnae_fiscal_secundaria"].alias("CNAES SECUND√ÅRIOS"),
    f.concat(
        df["tipo_logradouro"],
        f.lit(" "),
        df["logradouro"],
        f.lit(", "),
        df["numero"]
    ).alias("ENDERE√áO"),
    df["complemento"].alias("COMPLEMENTO"),
    df["bairro"].alias("BAIRRO"),
    df["cep"].alias("CEP"),
    df["municipio"].alias("MUNIC√çPIO"),
    df["uf"].alias("UF"),
    df["telefone1"].alias("TELEFONE"),
    df["email"].alias("EMAIL"),
    df["descricao_cnae"].alias("DESCRI√á√ÉO CNAE"),
    df["razao_social"].alias("RAZ√ÉO SOCIAL"),
    df["natureza_juridica"].alias("NATUREZA JUR√çDICA"),
    df["codigo_porte_empresa"].alias("C√ìDIGO PORTE"),
    df["opcao_simples"].alias("OP√á√ÉO PELO SIMPLES"),
    f.date_format(df["data_opcao_simples"], "dd/MM/yyyy").alias("DATA OP√á√ÉO SIMPLES"),
    f.date_format(df["data_exclusao_simples"], "dd/MM/yyyy").alias("DATA EXCLUS√ÉO SIMPLES"),
    df["opcao_mei"].alias("OP√á√ÉO MEI"),
    f.date_format(df["data_opcao_mei"], "dd/MM/yyyy").alias("DATA OP√á√ÉO MEI"),
    f.date_format(df["data_exclusao_mei"], "dd/MM/yyyy").alias("DATA EXCLUS√ÉO MEI")
).filter(df["nome_fantasia"].like("%BANCO DO BRASIL%")).show(10, truncate=False)



+--------------+----------------------------------------------+------------------+-----------------------+-------------------------+---------------+--------------+---------------------+--------------+-----------------+----------------------------------+-------------------+--------------+--------+--------------+---+--------+------------------------------+------------------------------------------------------------------------------------------------------+----------------------------------------------------------+-----------------------+------------+------------------+------------------+---------------------+---------+--------------+-----------------+
|CNPJ          |NOME FANTASIA                                 |SITUA√á√ÉO CADASTRAL|DATA SITUA√á√ÉO CADASTRAL|SITUA√á√ÉO CADASTRAL MOTIVO|CIDADE EXTERIOR|C√ìDIGO DO PA√çS|DATA IN√çCIO ATIVIDADE|CNAE PRINCIPAL|CNAES SECUND√ÅRIOS|ENDERE√áO                          |COMPLEMENTO        |BAIRRO        |CEP     |MUNIC√çPIO     |UF |TELEFONE|EMAI

                                                                                

### üìä Estat√≠sticas Gerais

#### Listar UFs distintas

In [111]:
df.select("uf").distinct().show()

+---+
| uf|
+---+
| SP|
| MG|
| RJ|
| PR|
| RS|
| SC|
| BA|
| GO|
| PE|
| ES|
| PA|
| CE|
| MT|
| MS|
| MA|
| DF|
| PI|
| AM|
| PB|
| AL|
+---+
only showing top 20 rows


#### Contagem de empresas por UF

In [None]:
df_contagem_uf_empresas = \
    df.groupBy("uf") \
    .agg(
        f.count("*").alias("Qtd Empresas")
    ).orderBy("Qtd Empresas", ascending=False)

df_contagem_uf_empresas.show()

+---+------------+
| uf|Qtd Empresas|
+---+------------+
| SP|     1379898|
| MG|      513667|
| RJ|      382888|
| RS|      344456|
| PR|      326165|
| BA|      237630|
| SC|      231308|
| GO|      169298|
| PE|      141706|
| CE|      134480|
| ES|       96212|
| PA|       93181|
| MT|       91885|
| DF|       79580|
| MA|       72871|
| MS|       63846|
| PB|       59851|
| RN|       53391|
| AM|       51203|
| AL|       45296|
+---+------------+
only showing top 20 rows


#### Decri√ß√£o estat√≠stica

In [116]:
# Para mais de uma coluna -> df.describe(["col1", "col2"]).show()
df.describe("capital_social").show()

+-------+-------------------+
|summary|     capital_social|
+-------+-------------------+
|  count|             486655|
|   mean|    11290841.310325|
| stddev|7.893831372139556E8|
|    min|               0.00|
|    max|    500000352182.73|
+-------+-------------------+



### üîç 3. Consultas Relevantes para Insights

#### Top 10 munic√≠pios com mais empresas

In [122]:
df_municipio = df
df_municipio_por_uf = df_municipio.groupBy("municipio", "uf").agg(
    f.count("*").alias("Qtd Municipios"),
).orderBy("Qtd Municipios", ascending=False)
df_municipio_por_uf.show(10)

+--------------+---+--------------+
|     municipio| uf|Qtd Municipios|
+--------------+---+--------------+
|     SAO PAULO| SP|        430363|
|RIO DE JANEIRO| RJ|        168441|
|BELO HORIZONTE| MG|         91174|
|      BRASILIA| DF|         79580|
|      CURITIBA| PR|         75285|
|      SALVADOR| BA|         66713|
|  PORTO ALEGRE| RS|         62352|
|     FORTALEZA| CE|         62262|
|       GOIANIA| GO|         51450|
|        RECIFE| PE|         42108|
+--------------+---+--------------+
only showing top 10 rows


                                                                                

#### Distribui√ß√£o por Natureza Jur√≠dica

In [None]:
df_natureza_juridica = df


## Delta Table - Controle de vers√µes

In [None]:
deltaTable.history().show(vertical=True, truncate=False)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 version             | 5                                                                                                                                                                                                                                                      

## Finalizar spark

In [160]:
spark.stop()