In [0]:
# Catalog and schema
spark.sql("USE CATALOG finance_catalog")
spark.sql("USE finance_catalog.bronze")

# Roots
storage_account = "definproject"
landing         = f"abfss://landing@{storage_account}.dfs.core.windows.net"
daily_dir       = f"{landing}/daily_dir"  
schema_location = f"{landing}/_autoloader_schema/transactions_daily"
checkpoint_loc  = f"{landing}/_autoloader_ckpt/transactions_daily"

display(dbutils.fs.ls(daily_dir))  # transactions_day1..5.csv

path,name,size,modificationTime
abfss://landing@definproject.dfs.core.windows.net/daily_dir/transactions_day1.csv,transactions_day1.csv,153940,1758841968000
abfss://landing@definproject.dfs.core.windows.net/daily_dir/transactions_day2.csv,transactions_day2.csv,153305,1758841968000
abfss://landing@definproject.dfs.core.windows.net/daily_dir/transactions_day3.csv,transactions_day3.csv,152774,1758841968000
abfss://landing@definproject.dfs.core.windows.net/daily_dir/transactions_day4.csv,transactions_day4.csv,152969,1758841968000
abfss://landing@definproject.dfs.core.windows.net/daily_dir/transactions_day5.csv,transactions_day5.csv,153206,1758841968000


In [0]:
from pyspark.sql.functions import col, to_timestamp, trim, upper
# Autoloader (cloudFiles) reading CSV
df_stream = (
  spark.readStream
       .format("cloudFiles")
       .option("cloudFiles.format", "csv")
       .option("header", "true")
       .option("cloudFiles.inferColumnTypes", "true")  # infiere tipos la 1ª vez
       .option("cloudFiles.schemaLocation", schema_location)     # guarda/gestiona el schema
       .load(daily_dir)
)
# Minimum normalization for our model
df_stream_clean = (
  df_stream
    .withColumn("timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss"))
    .withColumn("amount", col("amount").cast("double"))
    .withColumn("origin_country",      upper(trim(col("origin_country"))))
    .withColumn("destination_country", upper(trim(col("destination_country"))))
    .withColumn("currency",            upper(trim(col("currency"))))
    .withColumn("merchant",            trim(col("merchant")))
)

In [0]:
q_once = (
  df_stream_clean.writeStream
    .format("delta")
    .option("checkpointLocation", checkpoint_loc)  # controla duplicados/estado
    .option("mergeSchema", "true")
    .trigger(availableNow=True)  # procesa todo lo disponible y finaliza
    .toTable("finance_catalog.bronze.transactions")  # mismo destino Bronze
)
q_once.awaitTermination()


In [0]:
%sql
SHOW TABLES IN finance_catalog.bronze;


database,tableName,isTemporary
bronze,customers,False
bronze,transactions,False
,_sqldf,True


In [0]:
%sql
DESCRIBE HISTORY finance_catalog.bronze.transactions;


version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
2,2025-09-26T01:32:37.000Z,141588458824919,chan_rojo@hotmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> bd95f77d-06ff-4dc4-b45b-472e8cc02620, epochId -> 0, statsOnLoad -> true)",,List(3904856608010676),0926-012030-ud8vnseg-v2n,1.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 10000, numOutputBytes -> 364269, numAddedFiles -> 1)",,Databricks-Runtime/17.1.x-photon-scala2.13
1,2025-09-26T01:32:30.000Z,141588458824919,chan_rojo@hotmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> bd95f77d-06ff-4dc4-b45b-472e8cc02620, epochId -> -1, statsOnLoad -> false)",,List(3904856608010676),0926-012030-ud8vnseg-v2n,0.0,WriteSerializable,True,Map(),,Databricks-Runtime/17.1.x-photon-scala2.13
0,2025-09-24T21:54:55.000Z,141588458824919,chan_rojo@hotmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""collation"":""UTF8_BINARY"",""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(382243453667326),0924-214619-4805eqdo-v2n,,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 0, numRemovedBytes -> 0, numOutputRows -> 10000, numOutputBytes -> 363917)",,Databricks-Runtime/17.1.x-photon-scala2.13


In [0]:
%sql
SELECT COUNT(*) FROM finance_catalog.bronze.transactions;


COUNT(*)
20000
