In [0]:
#importing features
#importando funções (PT_BR)

from pyspark.sql.functions import broadcast, sum, desc
from delta.tables import DeltaTable
from pyspark.sql.functions import col

In [0]:
# Define storage paths in the Data Lake
# Define os caminhos de armazenamento no Data Lake(PT-BR)

gold_path_tempo = "/mnt/lhdw/layer_gold/dim_tempo"
gold_path_localizacao = "/mnt/lhdw/layer_gold/dim_Localizacao"
gold_path_orgaos = "/mnt/lhdw/layer_gold/dim_orgaos"
gold_path_caracteristicas = "/mnt/lhdw/layer_gold/dim_caracteristicas"
gold_path_geo = "/mnt/lhdw/layer_gold/dim_geo" 
gold_path_fato = "/mnt/lhdw/layer_gold/fato_acidente"

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Leitura Delta") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

### Number of Cores
### Número de Núcleos (PT-BR)

In [0]:
QTD_CORES = sc._jsc.sc().getExecutorMemoryStatus().keySet().size()
print(f"Número de núcleos no cluster: {QTD_CORES}")


Número de núcleos no cluster: 1


####Evidence Fact accident
####Evidência Fato acidente (PT-BR)

In [0]:
delta_path = "/mnt/lhdw/layer_gold/fato_acidentes"
delta_table = DeltaTable.forPath(spark, delta_path)

delta_table.toDF().show()


+-------------+--------+--------------+---------+------------------+------+------------+-----------+-------------+----------+------------------+-----------------+----------+-----------+
|ID_OCORRENCIA|SK_TEMPO|SK_LOCALIZACAO|SK_ORGAOS|SK_CARACTERISTICAS|SK_GEO|QTD_VEICULOS|QTD_FERIDOS|QTD_IGNORADOS|QTD_ILESOS|QTD_FERIDOS_GRAVES|QTD_FERIDOS_LEVES|QTD_MORTOS|QTD_PESSOAS|
+-------------+--------+--------------+---------+------------------+------+------------+-----------+-------------+----------+------------------+-----------------+----------+-----------+
|       653093|     573|          9989|     9218|              1523|  9263|           2|          0|            0|         2|                 0|                0|         0|          2|
|       655728|    2973|         11070|    11122|              5982|  7157|           2|          1|            0|         1|                 1|                0|         0|          2|
|       655802|    3013|          8261|     8332|             10640|  

###Evidence of Dim time
###Evidência de Dim tempo (PT-BR)


In [0]:
path_dim_tempo = "/mnt/lhdw/layer_gold/dim_tempo"
delta_dim_tempo = DeltaTable.forPath(spark, path_dim_tempo)
delta_dim_tempo.toDF().show()

+------+---------------+---+------------+-------------------+----+---+--------+
|    ID|DATA_OCORRENCIA|DIA|  DIA_SEMANA|            HORARIO| ANO|MES|SK_TEMPO|
+------+---------------+---+------------+-------------------+----+---+--------+
|652468|     2025-01-01| 01|QUARTA-FEIRA|2025-05-21 00:30:00|2025|  1|       1|
|652469|     2025-01-01| 01|QUARTA-FEIRA|2025-05-21 00:50:00|2025|  1|       2|
|652470|     2025-01-01| 01|QUARTA-FEIRA|2025-05-21 01:15:00|2025|  1|       3|
|652476|     2025-01-01| 01|QUARTA-FEIRA|2025-05-21 01:30:00|2025|  1|       4|
|652473|     2025-01-01| 01|QUARTA-FEIRA|2025-05-21 01:40:00|2025|  1|       5|
|652481|     2025-01-01| 01|QUARTA-FEIRA|2025-05-21 01:50:00|2025|  1|       6|
|652471|     2025-01-01| 01|QUARTA-FEIRA|2025-05-21 02:00:00|2025|  1|       7|
|652501|     2025-01-01| 01|QUARTA-FEIRA|2025-05-21 02:09:00|2025|  1|       8|
|655842|     2025-01-01| 01|QUARTA-FEIRA|2025-05-21 02:10:00|2025|  1|       9|
|652527|     2025-01-01| 01|QUARTA-FEIRA

### Evidence Dimension_location
### Evidência Dimensão_localizacao (PT-BR)  

In [0]:
delta_path_localizacao  = "dbfs:/mnt/lhdw/layer_gold/dim_Localizacao"
dim_localizacao = DeltaTable.forPath(spark, delta_path_localizacao)
dim_localizacao.toDF().show()



+------+---+-----------------+------+---+--------------+
|    ID| UF|        MUNICIPIO|NUM_BR| KM|SK_LOCALIZACAO|
+------+---+-----------------+------+---+--------------+
|654665| AC|       ACRELANDIA|   364| 16|             1|
|666662| AC|       ACRELANDIA|   364| 20|             2|
|662001| AC|       ACRELANDIA|   364|  5|             3|
|660148| AC|        BRASILEIA|   317|296|             4|
|655862| AC|        BRASILEIA|   317|343|             5|
|655996| AC|         CAPIXABA|   317|114|             6|
|670690| AC|         CAPIXABA|   317|145|             7|
|662377| AC|   EPITACIOLANDIA|   317|278|             8|
|664336| AC|   EPITACIOLANDIA|   317|288|             9|
|661490| AC|            FEIJO|   364|438|            10|
|665092| AC|            FEIJO|   364|510|            11|
|660160| AC|PLACIDO DE CASTRO|   364| 67|            12|
|667122| AC|       PORTO ACRE|   317| 37|            13|
|665315| AC|       RIO BRANCO|     0|  0|            14|
|663142| AC|       RIO BRANCO| 

### Evidence dim_orgãos
### Evidência dim_orgãos (PT-BR)

In [0]:
path_dim_orgaos = "/mnt/lhdw/layer_gold/dim_orgaos"
delta_dim_orgaos= DeltaTable.forPath(spark, path_dim_orgaos)
delta_dim_orgaos.toDF().show()


+------+--------+-----------+---------+------------+-----+------+---------+
|    ID|REGIONAL|UF_REGIONAL|DELEGACIA|UF_DELEGACIA|  UOP|UF_UOP|SK_ORGAOS|
+------+--------+-----------+---------+------------+-----+------+---------+
|657734|    SPRF|         AC|    DEL01|          AC|UOP01|    AC|        1|
|665092|    SPRF|         AC|    DEL01|          AC|UOP01|    AC|        2|
|657981|    SPRF|         AC|    DEL01|          AC|UOP01|    AC|        3|
|668453|    SPRF|         AC|    DEL01|          AC|UOP01|    AC|        4|
|666378|    SPRF|         AC|    DEL01|          AC|UOP01|    AC|        5|
|670786|    SPRF|         AC|    DEL01|          AC|UOP01|    AC|        6|
|665008|    SPRF|         AC|    DEL01|          AC|UOP01|    AC|        7|
|670690|    SPRF|         AC|    DEL01|          AC|UOP01|    AC|        8|
|659505|    SPRF|         AC|    DEL01|          AC|UOP01|    AC|        9|
|664777|    SPRF|         AC|    DEL01|          AC|UOP01|    AC|       10|
|665315|    

####Evidence of Dim classification
####Evidência de Dim caracteristicas (PT-BR) 

In [0]:
path_dim_caracteristicas = "/mnt/lhdw/layer_gold/dim_caracteristicas"
delta_caracteristicas = DeltaTable.forPath(spark, path_dim_caracteristicas)
delta_caracteristicas.toDF().show()


+------+--------------------+--------------------+----------------------+-----------+-----------+----------------------+----------+-------------+---------+------------------+
|    ID|      CAUSA_ACIDENTE|       TIPO_ACIDENTE|CLASSIFICACAO_ACIDENTE|   FASE_DIA|SENTIDO_VIA|CONDICAO_METEREOLOGICA|TIPO_PISTA|  TRACADO_VIA|USOU_SOLO|SK_CARACTERISTICAS|
+------+--------------------+--------------------+----------------------+-----------+-----------+----------------------+----------+-------------+---------+------------------+
|670569|ACESSAR A VIA SEM...|ATROPELAMENTO DE ...|   COM VÍTIMAS FERIDAS|  ANOITECER|  CRESCENTE|             CÉU CLARO|   SIMPLES|         RETA|      NÃO|                 1|
|654635|ACESSAR A VIA SEM...|ATROPELAMENTO DE ...|   COM VÍTIMAS FERIDAS|  PLENO DIA|DECRESCENTE|             CÉU CLARO|     DUPLA|CURVA-DECLIVE|      NÃO|                 2|
|659384|ACESSAR A VIA SEM...|ATROPELAMENTO DE ...|   COM VÍTIMAS FERIDAS|  PLENO DIA|DECRESCENTE|               NUBLADO|   SI

####Evidence of Dim GEO
####Evidência de Dim GEO (PT-BR) 

In [0]:
path_dim_geo = "/mnt/lhdw/layer_gold/dim_geo"
delta_geo = DeltaTable.forPath(spark, path_dim_geo)
delta_geo.toDF().show()

+------+-----------+------------+------+
|    ID|   LATITUDE|   LONGITUDE|SK_GEO|
+------+-----------+------------+------+
|666701|-0,90125038|-60,51950645|     1|
|656445|  -1,085483|  -46,902207|     2|
|661700|-1,20786421|-47,09654994|     3|
|666523|  -1,210947|   -47,11915|     4|
|664672|  -1,211037|   -47,18363|     5|
|667555|  -1,211381|  -47,144838|     6|
|653562|  -1,212873|  -47,181547|     7|
|663444|  -1,231285|  -47,216423|     8|
|666700|  -1,262413|  -47,268885|     9|
|655110|-1,27592647|-48,08710906|    10|
|668920|  -1,275973|  -48,096307|    11|
|664471|  -1,279143|   -47,12152|    12|
|655220| -1,2795637|-48,12243407|    13|
|659141|  -1,279735|  -47,310113|    14|
|654110|-1,28522594|-48,14868959|    15|
|665819|-1,28924465|-48,15428317|    16|
|661286| -1,2948794|-47,89280703|    17|
|670185|  -1,295019|  -47,899224|    18|
|656058|-1,29629557|-47,91983997|    19|
|667587|  -1,296548|  -47,928528|    20|
+------+-----------+------------+------+
only showing top

%md
**Tips for Optimizing Performance**
> **Partitioning**: We define appropriate partitions to avoid unnecessary reads and improve query performance.

> **Compression codec**: We use Snappy, as it offers good compression and decompression performance.

> **Shuffle partitions**: We define a fixed value for spark.sql.shuffle.partitions to improve parallelism during operations such as joins and aggregations.
> In addition, we can explore techniques such as caching for small tables (dimensions) that are frequently accessed, and broadcast joins to optimize joins between the Fact table and dimension tables.


**Dicas para Otimizar a Performance**
> **Particionamento**: Definimos partições adequadas para evitar leituras desnecessárias e melhorar a performance de consultas.

> **Codec de compressão**: Usamos Snappy, pois oferece boa performance de compressão e descompressão.

> **Shuffle partitions**: Definimos um valor fixo para spark.sql.shuffle.partitions para melhorar o paralelismo durante operações como joins e agregações.
> Além disso, podemos explorar técnicas como cache para tabelas pequenas (dimensões) que são frequentemente acessadas, e broadcast join para otimizar joins entre a tabela Fato e as tabelas de dimensões (PT-BR)

### Read Optimization with predicate pushdown:
- Make sure that queries are taking advantage of predicate pushdown, which means that filters are applied directly when reading the data, improving query efficiency and performance

### Otimização de Leitura com predicate pushdown:
- Tenha a certeza  que as consultas estão aproveitando o predicate pushdown, o que significa que os filtros são aplicados diretamente ao ler os dados, melhorando a eficiência e performace na consulta(PT-BR)


In [0]:
# Using predicate pushdown to optimize the query
# Utilizando predicate pushdown para otimizar a consulta (PT-BR)

gold_path = "dbfs:/mnt/lhdw/layer_gold/dim_tempo"
df_filtrado = spark.read.format("delta").load(gold_path).filter((col("Ano") == 2025) & (col("Mes") == 1))

display(df_filtrado)


ID,DATA_OCORRENCIA,DIA,DIA_SEMANA,HORARIO,ANO,MES,SK_TEMPO
652468,2025-01-01,1,QUARTA-FEIRA,2025-05-21T00:30:00.000+0000,2025,1,1
652469,2025-01-01,1,QUARTA-FEIRA,2025-05-21T00:50:00.000+0000,2025,1,2
652470,2025-01-01,1,QUARTA-FEIRA,2025-05-21T01:15:00.000+0000,2025,1,3
652476,2025-01-01,1,QUARTA-FEIRA,2025-05-21T01:30:00.000+0000,2025,1,4
652473,2025-01-01,1,QUARTA-FEIRA,2025-05-21T01:40:00.000+0000,2025,1,5
652481,2025-01-01,1,QUARTA-FEIRA,2025-05-21T01:50:00.000+0000,2025,1,6
652471,2025-01-01,1,QUARTA-FEIRA,2025-05-21T02:00:00.000+0000,2025,1,7
652501,2025-01-01,1,QUARTA-FEIRA,2025-05-21T02:09:00.000+0000,2025,1,8
655842,2025-01-01,1,QUARTA-FEIRA,2025-05-21T02:10:00.000+0000,2025,1,9
652527,2025-01-01,1,QUARTA-FEIRA,2025-05-21T02:15:00.000+0000,2025,1,10


#### Broadcast join
**Explicação:**
**1. Broadcast Join:**

- O broadcast() é aplicado às tabelas de <b>dimensões</b> (fato_acidente e dim_tempo). Isso replica as tabelas de dimensão para todos os nós, permitindo que as junções sejam realizadas localmente em cada nó, sem necessidade de comunicação entre nós, o que melhora a performance em clusters distribuídos.

**2. Junção com Broadcast:**

- As junções são feitas entre as colunas de SK e as tabelas de dimensão e Fato para obter as chaves substitutas (SK_tempo, SK_tempo).



**3. Partitioning:**

- We added Year and Month columns to optimize the storage of the fact table and improve performance in temporal queries. The table is partitioned by these columns.

**Advantages of Broadcast Join:**

- Reduces data movement during the join operation, since small dimensions are replicated to all nodes.

- Improves performance when dimension tables are significantly smaller than the fact table, which is common in data warehouse architectures.

**Disadvantages of Broadcast Join:**
- Memory Limitation: The smaller DataFrame must fit in the memory of all nodes. If the DataFrame is too large, it may cause out-of-memory errors


**3. Particionamento:**

- Adicionamos colunas de Ano e Mês para otimizar o armazenamento da tabela de fatos e melhorar o desempenho em consultas temporais. A tabela é particionada por essas colunas.

**Vantagens do Broadcast Join:**

- Reduz a movimentação de dados durante a operação de junção, pois as dimensões pequenas são replicadas para todos os nós.
- Aumenta a performance quando as tabelas de dimensão são significativamente menores que a tabela de fatos, o que é o caso comum em arquiteturas de data warehouse.

**Desvantagens do Broadcast Join:**
- Limitação de Memória: O DataFrame menor deve caber na memória de todos os nós. Se o DataFrame for muito grande, pode causar erros de falta de memória (PT-BR)

In [0]:
df_fato = spark.read.format("delta").load("/mnt/lhdw/layer_gold/fato_acidentes")
df_tempo = spark.read.format("delta").load("/mnt/lhdw/layer_gold/dim_tempo")
df_caracteristicas = spark.read.format("delta").load("/mnt/lhdw/layer_gold/dim_caracteristicas")

# Apply broadcast
# Aplicar broadcast (PT-BR)
df_tempo = broadcast(df_tempo)
df_caracteristicas = broadcast(df_caracteristicas)


df_fato_tempo = df_fato.join(df_tempo, "SK_TEMPO", "inner")

join_df = df_fato_tempo.join(df_caracteristicas, "SK_CARACTERISTICAS", "inner")

result_df = join_df.groupBy("classificacao_acidente", "Ano").agg(
    sum("QTD_PESSOAS").alias("TOTAL_PESSOAS"),
    sum("QTD_MORTOS").alias("TOTAL_MORTOS"),
    sum("QTD_FERIDOS_LEVES").alias("TOTAL_FERIDOS_LEVES"),
    sum("QTD_FERIDOS_GRAVES").alias("TOTAL_FERIDOS_GRAVES"),
    sum("QTD_ILESOS").alias("TOTAL_ILESOS"),
    sum("QTD_IGNORADOS").alias("TOTAL_IGNORADOS"),
    sum("QTD_FERIDOS").alias("TOTAL_FERIDOS"),
    sum("QTD_VEICULOS").alias("TOTAL_VEICULOS")
).orderBy("Ano", desc("TOTAL_MORTOS"))


**Performance Improvements**

Partition Filters: If you know which specific partitions you want to read from, applying filters to the partitions can significantly reduce read time.
Repartitioning: If your data is unevenly distributed, you can use repartition() to redistribute the DataFrame based on a key column.

**Melhorias de Performance**

Filtros de Partição: Se você souber quais partições específicas deseja ler, aplicar filtros nas partições pode reduzir significativamente o tempo de leitura.
Reparticionamento: Se os dados estiverem distribuídos de forma desigual, você pode usar repartition() para redistribuir o DataFrame com base em uma coluna-chave. (PT-BR)