# Delta Lake

The idea of this section is to create a basic version of a delta lake, for the specific dataset - using open source tools such as airflow and pyspark

In [36]:
import pandas as pd
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
from datetime import datetime


Create a function to start a spark session. Then, It would be possible 

In [37]:
# Create Spark Session 
def create_spark_session():
    builder = (
        SparkSession.builder.appName("DeltaLake Pipeline")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    )

    return configure_spark_with_delta_pip(builder).getOrCreate()

Basically, a Delta lake it's based on three principal states. You want to store all your data, without any transformation. This is called the Bronze state of data. In this case, we will create a bronze table for each table of our original excel:

In [38]:
# Bronze state
def bronze():
    
    spark = create_spark_session()
    df =  spark.read.csv( "Data/Transacciones_Prueba_Especialista_Prevencion_Fraude.xlsx", header = True, inferSchema = True)
    df.write.format("delta").mode("overwrite").save("Data/bronze_table")

In [39]:
# Gold State

In [40]:
# Airflow DAG
default_args = {"owner":"airflow", "start_date": datetime(2025,1,1)}
dag = DAG("delta_medallion_architecture", default_args = default_args, schedule_interval="@daily",catchup=False)

In [41]:
# DAG tasks
task_bronze = PythonOperator( task_id="load_bronze", python_callable=bronze,dag=dag) 

In [42]:
# Order
task_bronze

<Task(PythonOperator): load_bronze>