In [1]:
from pyspark.sql.types import DecimalType, LongType
from pyspark.sql import functions as F
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
import os

spark_session = SparkSession.builder.appName("bronze_pipeline").getOrCreate()

folder_lakehouse= "/lakehouse/default/Files"
file_names=[]

def files_list(folder_lakehouse,file_names):
    for file_name in os.listdir(folder_lakehouse):
        file_path = os.path.join(folder_lakehouse, file_name)
        if os.path.isfile(file_path):
            file_names.append(file_name)
    return file_names

def delete_csv(file_names):
    file_names_without_csv = [fi.split(".csv")[0] for fi in file_names]
    return file_names_without_csv

files_list(folder_lakehouse,file_names)
file_names_def = delete_csv(file_names)

def lakehouse_ingest(spark_session,file_names_def):

    for i in file_names_def:

        table_path = f"Tables/Bronze_{i}"

        df = spark_session.read.format('csv') \
            .option('header', 'true') \
            .option('inferschema', 'true') \
            .load(f'Files/{i}.csv')
        
        df = df.withColumn('Year', F.lit(i)).withColumn("ingestion_timestamp", F.lit(F.current_timestamp())) \
            .withColumn("Tot_Drug_Cst", df.Tot_Drug_Cst.cast(DecimalType(20, 2))) \
            .withColumn("Tot_30day_Fills", df.Tot_30day_Fills.cast(DecimalType(20, 2))) \
            .withColumn("GE65_Tot_30day_Fills", df.GE65_Tot_30day_Fills.cast(DecimalType(20, 2))) \
            .withColumn("GE65_Tot_Drug_Cst", df.GE65_Tot_Drug_Cst.cast(DecimalType(20, 2))) \
            .withColumn("Tot_Clms", df.Tot_Clms.cast(LongType())) \
            .withColumn("Tot_Day_Suply", df.Tot_Day_Suply.cast(LongType())) \
            .withColumn("Tot_Benes", df.Tot_Benes.cast(LongType())) \
            .withColumn("GE65_Tot_Clms", df.GE65_Tot_Clms.cast(LongType())) \
            .withColumn("GE65_Tot_Benes", df.GE65_Tot_Benes.cast(LongType())) \
            .withColumn("GE65_Tot_Day_Suply", df.GE65_Tot_Day_Suply.cast(LongType()))

        delta_table = (
                DeltaTable.createIfNotExists(spark_session)
                .location(table_path)
                .tableName(f"bronze_{i}")
                .addColumns(df.schema)
                .execute()
        )
        

        delta_table.alias("existing").merge(df.alias("incoming"), "existing.ingestion_timestamp = incoming.ingestion_timestamp") \
            .whenNotMatchedInsertAll() \
            .execute()

lakehouse_ingest(spark_session,file_names_def)

StatementMeta(, ceb3b0cb-5f9b-42e9-b049-6c3b3a415ecc, 3, Finished, Available)

In [8]:
%%sql
DESCRIBE DETAIL Bronze_2013

StatementMeta(, 28aec0db-56b3-4c47-a3f2-791539ad3ba9, 10, Finished, Available)

<Spark SQL result set with 1 rows and 14 fields>