## Exemplo prático com Delta Lake (on-premise)

* Os dados utilizados foram coletados na coleta atual
* Realizaremos o armazenamento dos dados em uma arquitetura Medallion
* O objetivo vai ser replicar a análise de velocidade de RJ vs CWB que temos no artigo


### Passo a passo

Primeiramente iremos instanciar o Spark + Delta e definir os caminhos para nossa arquitetura Medallion.

Após isso iremos ler nossos arquivos de entrada com o Spark. Nesse caso teremos DataFrames do RJ e de CWB.

In [5]:
from pyspark.sql import *
from delta import *

builder = SparkSession.builder.appName("velocidade_teste").config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension").config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()


INPUT_PATH = "/home/felipe/code/deltalake/dados/"
BRONZE_PATH = "/home/felipe/code/deltalake/lake/bronze/raw"
SILVER_PATH = "/home/felipe/code/deltalake/lake/silver/processed"
GOLD_PATH = "/home/felipe/code/deltalake/lake/gold/final"

print(spark.version)

3.5.0


In [14]:
rj_df = spark.read.format("parquet").option("inferSchema","true").option("header","true").load(path="/home/felipe/code/deltalake/teste/rj/1707850357.3031645.parquet")

rj_df.show()

+---------+---------+-------------+---------+----------+-----+
| latitude|longitude|tempo_captura|id_onibus|velocidade|linha|
+---------+---------+-------------+---------+----------+-----+
|-22,94427|-43,18129|1707252599000|   A71588|        53|  100|
|-23,01032|-43,30469|1707251542000|   A41063|        43|  309|
| -22,9595|-43,19084|1707252590000|   A41218|         0|  108|
|-22,96402|-43,20373|1707252596000|   A41018|         0|  463|
|-22,95296|-43,17615|1707252587000|   A41019|        10|  100|
|-23,00897|-43,30439|1707251572000|   A41063|         0|  309|
|-22,91231|-43,20969|1707252590000|   A41073|         0|  112|
|-22,93514|-43,20948|1707252593000|   A41245|        42|  112|
|-22,94494|-43,18229|1707252597000|   C41389|        23|  309|
|-22,88665|-43,22967|1707252591000|   C41386|         6|  315|
|-23,00897|-43,30439|1707251602000|   A41063|         0|  309|
|-22,90711|-43,21092|1707252594000|   A41085|        29|  463|
|-23,00895|-43,30438|1707251632000|   A41063|         0

In [10]:
bsb_df = spark.read.format("parquet").option("inferSchema","true").option("header","true").load(f"{INPUT_PATH}/df_micro/")

bsb_df.show()

+---------+---------+-------------+---------+----------+-------+----------+-----+
| latitude|longitude|tempo_captura|id_onibus|velocidade|sentido|   direcao|linha|
+---------+---------+-------------+---------+----------+-------+----------+-----+
|-48.10842|-15.92126|1701355514000|   340260|      0.28|  VOLTA|80.9000015|     |
|-48.12325|-15.89951|1706128074000|   339784|       0.0|  VOLTA|       0.0|     |
|-48.12391|-15.90059|1706315254000|   340073|       0.0|  VOLTA|       0.0|372.9|
| -47.9946|-15.87532|1706551160000|   337846|       0.0|    IDA|333.434948|     |
|-48.02361|-15.87496|1706636961000|   339105|       0.0|    IDA|      90.0|     |
|-47.93532|-15.78702|1706644931000|   335771|      8.61|  VOLTA|35.2276684|0.844|
|-48.10898|-15.92114|1706654234000|   337838|       0.0|    IDA|150.945395|     |
|-48.12296|-15.89952|1706656423000|   336343|       0.0|    IDA|321.340191|     |
|-47.90662|-15.73606|1706702948000|   337480|       0.0|  VOLTA|196.699244|     |
|-48.10886|-15.9

Agora iremos salvar esses DataFrames como Delta Tables em seus respectivos níveis Bronze

In [15]:

bsb_df.write.format("delta").saveAsTable("bronze_df")

24/02/13 15:53:11 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
24/02/13 15:53:11 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 84,44% for 9 writers
24/02/13 15:53:11 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 76,00% for 10 writers
24/02/13 15:53:11 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 84,44% for 9 writers
24/02/13 15:53:11 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
24/02/13 15:53:12 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                   

In [16]:
rj_df.write.format("delta").saveAsTable("bronze_rj")

Podemos visualizar elas já como DeltaTables buscando pelo nome que salvamos

In [17]:
rj_delta = spark.read.format("delta").table("bronze_rj")
rj_delta.show()

+---------+---------+-------------+---------+----------+-----+
| latitude|longitude|tempo_captura|id_onibus|velocidade|linha|
+---------+---------+-------------+---------+----------+-----+
|-22,94427|-43,18129|1707252599000|   A71588|        53|  100|
|-23,01032|-43,30469|1707251542000|   A41063|        43|  309|
| -22,9595|-43,19084|1707252590000|   A41218|         0|  108|
|-22,96402|-43,20373|1707252596000|   A41018|         0|  463|
|-22,95296|-43,17615|1707252587000|   A41019|        10|  100|
|-23,00897|-43,30439|1707251572000|   A41063|         0|  309|
|-22,91231|-43,20969|1707252590000|   A41073|         0|  112|
|-22,93514|-43,20948|1707252593000|   A41245|        42|  112|
|-22,94494|-43,18229|1707252597000|   C41389|        23|  309|
|-22,88665|-43,22967|1707252591000|   C41386|         6|  315|
|-23,00897|-43,30439|1707251602000|   A41063|         0|  309|
|-22,90711|-43,21092|1707252594000|   A41085|        29|  463|
|-23,00895|-43,30438|1707251632000|   A41063|         0

In [18]:
bsb_delta = spark.read.format("delta").table("bronze_df")
bsb_delta.show()

+---------+---------+-------------+---------+----------+-------+----------+-----+
| latitude|longitude|tempo_captura|id_onibus|velocidade|sentido|   direcao|linha|
+---------+---------+-------------+---------+----------+-------+----------+-----+
|-48.10842|-15.92126|1701355514000|   340260|      0.28|  VOLTA|80.9000015|     |
|-48.12325|-15.89951|1706128074000|   339784|       0.0|  VOLTA|       0.0|     |
|-48.12391|-15.90059|1706315254000|   340073|       0.0|  VOLTA|       0.0|372.9|
| -47.9946|-15.87532|1706551160000|   337846|       0.0|    IDA|333.434948|     |
|-48.02361|-15.87496|1706636961000|   339105|       0.0|    IDA|      90.0|     |
|-47.93532|-15.78702|1706644931000|   335771|      8.61|  VOLTA|35.2276684|0.844|
|-48.10898|-15.92114|1706654234000|   337838|       0.0|    IDA|150.945395|     |
|-48.12296|-15.89952|1706656423000|   336343|       0.0|    IDA|321.340191|     |
|-47.90362|-15.78452|1706700515000|   337773|       0.0|  VOLTA|       0.0|     |
|-47.90636|-15.7

Para passarmos de camada, iremos realizar simples tratamentos e mudanças apenas para fins de demonstração:

* Renomear os onibus_id para o padrão do BRBus


In [19]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def changeOnibusIdRio(onibus_id):
    return f"RIO_{onibus_id}"

def changeOnibusIdBSB(onibus_id):
    return f"BSB_{onibus_id}"

udf_transformBusIdRio = udf(changeOnibusIdRio,StringType())
udf_transformBusIdBSB = udf(changeOnibusIdBSB,StringType())

rj_df_busid = rj_delta.withColumn("bus_id", udf_transformBusIdRio(rj_delta["id_onibus"]))
bsb_df_busid = bsb_delta.withColumn("bus_id", udf_transformBusIdBSB(bsb_delta["id_onibus"]))


Demonstração das novas tabelas:

In [20]:
rj_df_busid.show()

+---------+---------+-------------+---------+----------+-----+----------+
| latitude|longitude|tempo_captura|id_onibus|velocidade|linha|    bus_id|
+---------+---------+-------------+---------+----------+-----+----------+
|-22,94427|-43,18129|1707252599000|   A71588|        53|  100|RIO_A71588|
|-23,01032|-43,30469|1707251542000|   A41063|        43|  309|RIO_A41063|
| -22,9595|-43,19084|1707252590000|   A41218|         0|  108|RIO_A41218|
|-22,96402|-43,20373|1707252596000|   A41018|         0|  463|RIO_A41018|
|-22,95296|-43,17615|1707252587000|   A41019|        10|  100|RIO_A41019|
|-23,00897|-43,30439|1707251572000|   A41063|         0|  309|RIO_A41063|
|-22,91231|-43,20969|1707252590000|   A41073|         0|  112|RIO_A41073|
|-22,93514|-43,20948|1707252593000|   A41245|        42|  112|RIO_A41245|
|-22,94494|-43,18229|1707252597000|   C41389|        23|  309|RIO_C41389|
|-22,88665|-43,22967|1707252591000|   C41386|         6|  315|RIO_C41386|
|-23,00897|-43,30439|1707251602000|   

Salvando na Silver

In [27]:
rj_df_busid.write.format("delta").saveAsTable("silver_rj")
bsb_df_busid.write.format("delta").saveAsTable("silver_bsb")

24/02/13 16:23:56 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
24/02/13 16:23:56 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 84,44% for 9 writers
24/02/13 16:23:56 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers


Por fim, iremos passar para a camada gold, onde teremos:

* Uma visualização das velocidades
* limpeza de velocidades menores/iguais a 0 ou maiores que 90

Essa limpeza poderia ter sido feita na Silver, porém, com intuito de manter essas velocidades para outrs tipos de análises (outliers, por exemplo), busquei realizar essa ultima etapa de limpeza na Gold

In [38]:
def cleanVelocity(df):
    return df.filter((df.velocidade >0) & (df.velocidade<90))

rio = spark.read.format("delta").table("silver_rj")
brasilia = spark.read.format("delta").table("silver_bsb")

rio = cleanVelocity(rio)
brasilia = cleanVelocity(brasilia)

velocidade_rio = rio.select(rio['velocidade'])
velocidade_bsb = brasilia.select(brasilia['velocidade'])

velocidade_rio.write.format("delta").saveAsTable("gold_velocidade_rio")
velocidade_bsb.write.format("delta").saveAsTable("gold_velocidade_bsb")


velocidade_rio.show()


24/02/13 16:30:29 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
24/02/13 16:30:29 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 84,44% for 9 writers
24/02/13 16:30:29 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 76,00% for 10 writers
24/02/13 16:30:29 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 84,44% for 9 writers
24/02/13 16:30:29 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers


+----------+
|velocidade|
+----------+
|        53|
|        43|
|        10|
|        42|
|        23|
|         6|
|        29|
|        50|
|        40|
|        49|
|        63|
|        16|
|        29|
|         1|
|         8|
|        68|
|         3|
|        18|
|        40|
|        55|
+----------+
only showing top 20 rows



Agora iremos realizar as análises de velocidade

In [39]:
from pyspark.sql.functions import avg

vel_rj = spark.read.format("delta").table("gold_velocidade_rio")
vel_bsb = spark.read.format("delta").table("gold_velocidade_bsb")

media_rj = vel_rj.select(avg(vel_rj['velocidade'])).collect()[0][0]
media_bsb = vel_bsb.select(avg(vel_bsb['velocidade'])).collect()[0][0]


print(f"Média RJ: {media_rj} | Média BSB: {media_bsb}")

Média RJ: 22.08558642550811 | Média BSB: 16.112476538433995
