# Incremental files ingestion from Azure Blob Storage to Microsoft Fabric Lakehouse using PySpark Notebooks

> <br> This project uses the database provided by the government of Brazil called **Cadastro Geral de Empregados e Desempregados (CAGED)** (General Register of Employed and Unemployed).<br><br>


**Notebook**: LandingToBronze  
**Description**: This PySpark notebook performs incremental ingestion of files from Landing layer to the Bronze Layer (Delta) of the Lakehouse. It uses the ingestion date of each candidate file comparing to the historical already content to Delta table.  

In [None]:
# Imports
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql.types import *

# Set case sensitive for table and column names
spark.conf.set('spark.sql.caseSensitive', True)


## Parameters  

In [None]:
# Lakehouse paths
landing_root_path         = f"Files/Landing/CAGED"
landing_meta_table        = "CagedLandingMeta" # Track what was copied from Blob to Landing
bronze_meta_table         = "CagedBronzeMeta"  # Control table to track from Landing (Files) to Bronze (Delta)
bronze_table              = "CagedBronze"      # Bronze layer of data (as-is)

## Prepare, create and load schemas and tables

In [None]:
bronze_meta_schema = StructType([
    StructField("target_path", StringType(), False),
    StructField("source_modified_at", TimestampType(), False),
    StructField("source_size_mb", FloatType(), False),
    StructField("processed_at", TimestampType(), True),
])

# Create the bronze_meta_table (if not exists)
spark.createDataFrame([], bronze_meta_schema) \
    .write.format("delta") \
    .mode("ignore") \
    .saveAsTable(bronze_meta_table)

# Schema for bronze_table
bronze_schema = StructType([
    # CSV columns
    StructField("competênciamov", StringType(), True),  
    StructField("região", StringType(), True),         
    StructField("uf", StringType(), True),             
    StructField("município", StringType(), True),      
    StructField("seção", StringType(), True),          
    StructField("subclasse", StringType(), True),      
    StructField("saldomovimentação", StringType(), True),  
    StructField("categoria", StringType(), True),      
    StructField("cbo2002ocupação", StringType(), True), 
    StructField("graudeinstrução", StringType(), True), 
    StructField("idade", StringType(), True),          
    StructField("horascontratuais", StringType(), True), 
    StructField("raçacor", StringType(), True),           
    StructField("sexo", StringType(), True),           
    StructField("tipoempregador", StringType(), True), 
    StructField("tipoestabelecimento", StringType(), True), 
    StructField("tipomovimentação", StringType(), True), 
    StructField("tipodedeficiência", StringType(), True), 
    StructField("indtrabintermitente", StringType(), True), 
    StructField("salário", StringType(), True), 
    StructField("tamestabjan", StringType(), True), 
    StructField("indicadoraprendiz", StringType(), True), 
    StructField("origemdainformação", StringType(), True), 
    StructField("competênciadec", StringType(), True),  
    StructField("indicadordeforadoprazo", StringType(), True), 
    StructField("unidadesaláriocódigo", StringType(), True), 
    StructField("valorsaláriofixo", StringType(), True), 
    # Control columns
    StructField("file_path", StringType(), False),      # Landing path
    StructField("source_modified_at", TimestampType(), False), # Modification date
    StructField("is_active", BooleanType(), False)      # Active flag
])

# Create the bronze_table with schema
spark.createDataFrame([], bronze_schema) \
    .write.format("delta") \
    .mode("ignore") \
    .saveAsTable(bronze_table)


# Load landing_meta and bronze_meta
df_landing_meta = spark.table(landing_meta_table)
df_bronze_meta = spark.table(bronze_meta_table)

# Consolidate the processed files
df_bronze_meta_latest = (
    df_bronze_meta
    .groupBy("target_path")
    .agg(F.max("source_modified_at").alias("last_processed_source_mtime"))
)


## Find candidates to load

In [None]:
# Identify candidate files to ingestion
df_bronze_candidates = (
    df_landing_meta
    .join(
        df_bronze_meta_latest, 
        df_landing_meta.target_path == df_bronze_meta_latest.target_path, 
        "left"
    )
    .filter(
        F.col("last_processed_source_mtime").isNull() |
        (F.col("source_modified_at") > F.coalesce(F.col("last_processed_source_mtime"), F.lit("1970-01-01")))
    )
    .select(
        df_landing_meta.target_path,
        df_landing_meta.source_modified_at,
        df_landing_meta.source_size_mb
    )
)

# Persist because we will use it multiple times
df_bronze_candidates = df_bronze_candidates.groupBy("target_path").agg(
    F.max("source_modified_at").alias("source_modified_at")
    ).persist()

# Count candidates
num_candidates = df_bronze_candidates.count()
print(f"Files to process from Landing to Bronze: {num_candidates}")
display(df_bronze_candidates.orderBy("target_path"))

## Process the load

In [None]:
# Proper emptiness check in Spark
if df_bronze_candidates.rdd.isEmpty():
    print("Nothing to load. Skipping...")
else:
    # 1) Collect only the PATHS (small footprint) – the heavy work is not here
    candidate_paths = [r["target_path"] for r in df_bronze_candidates.select("target_path").toLocalIterator()]

    # 2) Read ALL candidates at once (single Spark job), with minimal parsing.
    #    If you have a known CSV schema for the data columns, set .schema(<schema_only_for_csv_cols>).
    df_all = (
        spark.read
            .format("csv")
            .option("header", "true")
            .option("sep", ";")
            .option("encoding", "UTF-8")
            .load(candidate_paths)
    )

    # -------------------------------------------------------
    # Normalize input_file_name() to match landing_meta paths:
    # - Remove query strings (?version=..., ?flength=...)
    # - Keep relative path from Files/...
    # -------------------------------------------------------
    df_all = (
        df_all
        .withColumn("file_path_raw", F.input_file_name())
        .withColumn("file_path_noqs",  F.regexp_replace(F.col("file_path_raw"), r"\?.*$", ""))
        .withColumn("file_path_rel",   F.regexp_extract(F.col("file_path_noqs"), r"(?:^|/)(Files/.*)", 1))
    )

    df_cand_norm = (
        df_bronze_candidates
        .withColumn("target_path_noqs", F.regexp_replace(F.col("target_path"), r"\?.*$", ""))
        .withColumn("target_path_rel",  F.regexp_extract(F.col("target_path_noqs"), r"(?:^|/)(Files/.*)", 1))
        .select(
            F.col("target_path_rel"),
            "source_modified_at"
            # "source_size_mb"
        )
    )

    # Join to attach control metadata columns from landing_meta
    df_all = (
        df_all
        .join(
            df_cand_norm,
            df_all.file_path_rel == df_cand_norm.target_path_rel,
            "left"
        )
        .drop("target_path_rel")
        .withColumn("is_active", F.lit(True))
    )

    # Persist bronze without start slash
    df_all = df_all.withColumn("file_path", F.col("file_path_rel"))

    # Final selection in order of the schema
    df_all = df_all.select(
        "competênciamov","região","uf","município","seção","subclasse","saldomovimentação","categoria",
        "cbo2002ocupação","graudeinstrução","idade","horascontratuais","raçacor","sexo","tipoempregador",
        "tipoestabelecimento","tipomovimentação","tipodedeficiência","indtrabintermitente","salário",
        "tamestabjan","indicadoraprendiz","origemdainformação","competênciadec","indicadordeforadoprazo",
        "unidadesaláriocódigo","valorsaláriofixo",
        "file_path","source_modified_at","is_active"
    )

    # ------------------------------------------
    # Single MERGE (batch) instead of per-file
    # ------------------------------------------
    df_all.createOrReplaceTempView("temp_files_batch")

    spark.sql(f"""
        MERGE INTO {bronze_table} AS target
        USING temp_files_batch AS source
        ON target.file_path = source.file_path
        WHEN MATCHED AND target.source_modified_at < source.source_modified_at
            THEN UPDATE SET is_active = FALSE
        WHEN NOT MATCHED THEN INSERT *
    """)

    # ------------------------------------------
    # Write bronze_meta in batch
    # ------------------------------------------
    df_to_meta = (
        df_bronze_candidates
        .select(
            F.col("target_path").alias("target_path"),
            "source_modified_at",
            # "source_size_mb"
        )
        .withColumn("processed_at", F.current_timestamp())
    )

    df_to_meta.write.format("delta").mode("append").saveAsTable(bronze_meta_table)

    print(f"Processed files to Bronze: {num_candidates}")

    # Cleanup
    df_bronze_candidates.unpersist()
