In [0]:
# 1. Schema function
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType
from pyspark.sql.functions import col

def get_schemas():
    return {
        'Accounts': StructType([
            StructField('account_id', IntegerType(), True),
            StructField('customer_id', IntegerType(), True),
            StructField('account_type', StringType(), True),
            StructField('balance', DoubleType(), True)
        ]),
        'Customers': StructType([
            StructField('customer_id', IntegerType(), True),
            StructField('first_name', StringType(), True),
            StructField('last_name', StringType(), True),
            StructField('address', StringType(), True),
            StructField('city', StringType(), True),
            StructField('state', StringType(), True),
            StructField('zip', StringType(), True)
        ]),
        'Loan_Payments': StructType([
            StructField('payment_id', IntegerType(), True),
            StructField('loan_id', IntegerType(), True),
            StructField('payment_date', DateType(), True),
            StructField('payment_amount', DoubleType(), True)
        ]),
        'Loans': StructType([
            StructField('loan_id', IntegerType(), True),
            StructField('customer_id', IntegerType(), True),
            StructField('loan_amount', DoubleType(), True),
            StructField('interest_rate', DoubleType(), True),
            StructField('loan_term', IntegerType(), True)
        ]),
        'Transactions': StructType([
            StructField('transaction_id', IntegerType(), True),
            StructField('account_id', IntegerType(), True),
            StructField('transaction_date', DateType(), True),
            StructField('transaction_amount', DoubleType(), True),
            StructField('transaction_type', StringType(), True)
        ])
    }



In [0]:
# 2. CHeck nulls and fill nulls value
def clean_data(df, file_type):
    if file_type == 'Accounts':
        return df.filter(
            (col('account_id').isNotNull()) | (col('customer_id').isNotNull())
        ).na.fill({'account_type': 'Unknown', 'balance': 0.0})
    
    elif file_type == 'Customers':
        return df.filter(
            col('customer_id').isNotNull()
        ).na.fill({'first_name': 'Missing', 'last_name': 'Missing', 'address': 'Unknown',
                   'city': 'Unknown', 'state': 'Unknown', 'zip': '000000'})
    
    elif file_type == 'Loan_Payments':
        return df.filter(
            (col('payment_id').isNotNull()) | (col('loan_id').isNotNull())
        ).na.fill({'payment_date': '1900-01-01', 'payment_amount': -1.0})
    
    elif file_type == 'Loans':
        return df.filter(
            (col('loan_id').isNotNull()) | (col('customer_id').isNotNull())
        ).na.fill({'loan_amount': -1.0, 'interest_rate': -1.0, 'loan_term': -1})
    
    elif file_type == 'Transactions':
        return df.filter(
            (col('transaction_id').isNotNull()) | (col('account_id').isNotNull())
        ).na.fill({'transaction_date': '1900-01-01', 'transaction_amount': -1.0, 'transaction_type': 'Unknown'})
    
    return df

In [0]:
# 3. Process single file function
def process_file(file_type, year, month, day):
    schemas = get_schemas()
    bronze_path = f'/mnt/project2/bronze/{file_type}/{year}/{month}/{day}'
    silver_path = f'/mnt/project2/silver/{file_type}'
    
    df = spark.read.format('csv').options(header='true').schema(schemas[file_type]).load(bronze_path)
    df_no_dup = df.dropDuplicates()
    df_clean = clean_data(df_no_dup, file_type)
    
    df_clean.write.format('parquet').mode('overwrite').save(silver_path)
    print(f"Written to Silver: {file_type}")

In [0]:
#4. Read parquet files
def read_silver(file_type):
    silver_path = f"/mnt/project2/silver/{file_type}"
    df = spark.read.format('parquet').load(silver_path)
    return df


In [0]:
#5. Generate hashkey
def generate_hashkey(df):
    df_hashkey = df.withColumn("hashkey", crc32(concat(*df.columns)))
    return df_hashkey


In [0]:
#6. Table Schemas
Create table if not exists hive_metastore.scd1.accounts_scd1
(
  account_id int,
  customer_id int,
  account_type string, 
  balance double,
  hashkey bigint,
  createdby string,
  createddate timestamp,
  updatedby string,
  updateddate timestamp
)
Location '/mnt/project2/gold/Accounts' ;


Create table if not exists hive_metastore.scd1.customers_scd1
(
  customer_id integer,
  first_name string,
  last_name string,
  address string,
  city string,
  state string,
  zip string,
  hashkey long,
  createdby string,
  createddate timestamp,
  updatedby string,
  updateddate timestamp
)
Location '/mnt/project2/gold/Customers' ;

Create table if not exists hive_metastore.scd1.loans_payments_scd1
(
  payment_id int,
  loan_id int,
  payment_date date,
  payment_amount double,
  hashkey long,
  createdby string,
  createddate timestamp,
  updatedby string,
  updateddate timestamp
)
Location '/mnt/project2/gold/Loan_Payments';

Create table if not exists hive_metastore.scd1.loans_scd1
(
  loan_id int,
  customer_id int,
  loan_amount double,
  interest_rate double,
  loan_term int,
  hashkey long,
  createdby string,
  createddate timestamp,
  updatedby string,
  updateddate timestamp
)
Location '/mnt/project2/gold/Loans';

Create table if not exists hive_metastore.scd1.transactions_scd1
(
  transaction_id int,
  account_id int,
  transaction_date date,
  transaction_amount double,
  transaction_type string,
  hashkey long,
  createdby string,
  createddate timestamp,
  updatedby string,
  updateddate timestamp
)
Location '/mnt/project2/gold/Transactions';

In [0]:
#7. Merge 

from delta.tables import DeltaTable
from pyspark.sql.functions import col, lit, current_timestamp

def merge_new_data(file_type, src_df):
    # Gold path
    gold_path = f"/mnt/project2/gold/{file_type}"
    
    join_keys = {
        'Accounts': 'account_id',
        'Customers': 'customer_id',
        'Loan_Payments': 'payment_id',
        'Loans': 'loan_id',
        'Transactions': 'transaction_id'
    }
    
    join_key = join_keys[file_type]

    # Gold table to df
    delta_tbl = DeltaTable.forPath(spark, gold_path)
    tgt_df = delta_tbl.toDF()

    # Get only new or changed data
    src = src_df.alias('src')
    tgt = tgt_df.alias('tgt')

    join_condition = col(f'src.{join_key}') == col(f'tgt.{join_key}')
    filter_condition = (col(f'tgt.{join_key}').isNull()) | (col('src.hashkey') != col('tgt.hashkey'))

    filtered_src = src.join(tgt, join_condition, 'left').filter(filter_condition).select('src.*')

    # Add columns in src list
    src_cols = []  
    for col_name in filtered_src.columns:
        src_cols.append(col_name)  # Add each column name

    # Update expression 
    update_expr = {}  # Start with an empty dictionary
    for col_name in src_cols:
        if col_name not in ['createdby', 'createddate']:     #exclude createdby and createddate
            update_expr[col_name] = f"src.{col_name}"         #colname:src_colname

    # Overwrite values for updatedby and updateddate
    update_expr['updatedby'] = lit('Databricks-updated')
    update_expr['updateddate'] = current_timestamp()

    # Insert expression 
    insert_expr = {}  # Start with an empty dictionary
    for col_name in src_cols:
        insert_expr[col_name] = f"src.{col_name}"

    # Override audit fields for insert
    insert_expr['createdby'] = lit('Databricks')
    insert_expr['createddate'] = current_timestamp()
    insert_expr['updatedby'] = lit('Databricks')
    insert_expr['updateddate'] = current_timestamp()

    # Merge into Gold Delta table
    delta_tbl.alias('tgt').merge(
        filtered_src.alias('src'),
        join_condition
    ).whenMatchedUpdate(set=update_expr
    ).whenNotMatchedInsert(values=insert_expr
    ).execute()

    print(f"Merged data into Gold table: {file_type}")


In [0]:
#8 Join Function
def perform_final_merge(files_found):
    # Required files
    required_files = ['Accounts', 'Customers', 'Loan_Payments', 'Loans', 'Transactions']

    # Check if all required files are available
    if all(file in files_found for file in required_files):
        print("All required files available.")

        accounts_df = spark.read.format('parquet').load(f'/mnt/project2/silver/Accounts')
        customers_df = spark.read.format('parquet').load(f'/mnt/project2/silver/Customers')
        transactions_df = spark.read.format('parquet').load(f'/mnt/project2/silver/Transactions')
        loan_payments_df = spark.read.format('parquet').load(f'/mnt/project2/silver/Loan_Payments')
        loans_df = spark.read.format('parquet').load(f'/mnt/project2/silver/Loans')

        # Merge logic
        merged_df = accounts_df \
            .join(customers_df, 'customer_id', 'inner') \
            .join(transactions_df, 'account_id', 'inner')

        merged_loan = loan_payments_df \
            .join(loans_df, 'loan_id', 'inner')

        merged_final = merged_df \
            .join(merged_loan, 'customer_id', 'inner') \
            .select(
                'account_id', 
                'customer_id', 
                'payment_id', 
                'loan_id', 
                'transaction_id', 
                'transaction_date', 
                'transaction_amount', 
                'payment_amount', 
                'payment_date', 
                'loan_amount', 
                'balance'
            )

        # Write merged data to Gold (Delta)
        merged_final.write.format('parquet').mode('overwrite').save('/mnt/project2/gold/MergedFinal')
        print("Merged data written to Gold layer")
    
    else:
        print("Skipping merge logic. Not all required files are available.")
