# Cria um autoloader para tempo de voltas

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp, col
import logging

In [0]:
source_path = "dbfs:/Volumes/workspace/default/arquivos/"
# checkpoint_path = "/Volumes/workspace/default/checkpoints/lap_times"
checkpoint_path = "/Volumes/workspace/default/checkpoints/lap_times2"
table_name = "f1_bronze.lap_times"

In [0]:
schema = StructType([
    StructField("raceId", IntegerType(), True),
    StructField("driverId", IntegerType(), True),
    StructField("lap", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("milliseconds", IntegerType(), True),
    StructField("bronze_ingestion", TimestampType(), True)
])

In [0]:
# Create the volume if it does not exist
spark.sql(
    """
CREATE VOLUME IF NOT EXISTS workspace.default.checkpoints
COMMENT 'Volume for checkpoint storage'
"""
)

In [0]:
#stream de leitura
df_stream = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .schema(schema)
    .load(source_path)
    .withColumn("bronze_ingestion", current_timestamp())
)

query = (
    df_stream.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", checkpoint_path)
    .option("cloudFiles.useNotifications", "true")
    .option("mergeSchema", "true")
    .trigger(availableNow=True)
    .table(table_name)
)
query.awaitTermination()

In [0]:
df_bronze_lap_times =  spark.readStream .format("delta").table(table_name)

Incremental com autoloader

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp, col
import logging

def insert_update_delta(df, batchId):
    try:
        table_name = "f1_silver.lap_times"
        
        df_silver = (
            df.dropDuplicates()
            .withColumn("ingestion_date", current_timestamp())
            .filter(
                (col("raceId").isNotNull()) &
                (col("driverId").isNotNull())
            )
            .withColumnRenamed("lap", "lap_number")
        )
        
        # Verifica se a tabela existe no catálogo/metastore
        if spark.catalog.tableExists(table_name):
            delta_table = DeltaTable.forName(spark, table_name)
            condition = (
                'b.raceId = s.raceId AND '
                'b.driverId = s.driverId AND '
                'b.lap_number = s.lap_number'
            )

            (delta_table.alias('b')
                .merge(df_silver.alias('s'), condition)
                .whenMatchedUpdateAll()
                .whenNotMatchedInsertAll()
                .execute())
        else:
            # Primeira execução: cria a tabela com base no schema do DataFrame
            df_silver.write.format("delta") \
                .mode("overwrite") \
                .saveAsTable(table_name)
        
    except Exception as e:
        logging.error(f"Error in batch {batchId}: {str(e)}")
        raise


In [0]:
df_bronze_lap_times.writeStream \
    .foreachBatch(insert_update_delta) \
    .option("checkpointLocation", '/Volumes/workspace/default/checkpoints/lap_timesS-silver') \
    .trigger(availableNow=True) \
    .start() \
    .awaitTermination()
