In [0]:
%load_ext autoreload
%autoreload 2
# Enables autoreload; learn more at https://docs.databricks.com/en/files/workspace-modules.html#autoreload-for-python-modules
# To disable autoreload; run %autoreload 0

In [0]:
from pyspark.sql.functions import col,lit,current_timestamp,year,month,count
from collections import defaultdict
from src.utils.config import landing_customer_root_path,bronze_operational_root_path,get_consolidated_customer_file_paths
import re

In [0]:

def customer_direct_load(customer_source_path, customer_target_path):
    status = None
    try:
        
        customer_lnd_df = spark.read.format('json').load(customer_source_path)
        customer_df = customer_lnd_df.select(
            "customer_id",
            col("created_timestamp").alias("created_at"),
            "customer_name",
            "date_of_birth",
            "telephone",
            "email",
            "member_since",
            col("_metadata.file_path").alias("source_file_path"),
            col("_metadata.file_size").alias("file_size_bytes"),
            current_timestamp().alias("load_timestamp"),
            lit("customer").alias("load_source"),
            lit("landing").alias("schema"),
            year(col("created_at")).alias("year"),
            month(col("created_at")).alias("month")
        )
        customer_df.write.mode("append").partitionBy("year","month","load_source").format("delta").save(customer_target_path)
       
        status = 'success'
    except Exception as error:
        status = str(error)

    print(status)   
    
    


In [0]:
def get_customer_source_path(customer_consolidated_paths):
    
    try:
        print(customer_consolidated_paths)
        cust_brnz_df = spark.read.format('delta').load(bronze_operational_root_path)
        if cust_brnz_df.isEmpty():
            year = min(customer_consolidated_paths)
            customer_file_path = customer_consolidated_paths[year][0]
            print("First Fresh load")
        else:
            print("incremental load")
            last_loaded_partitions = (
                cust_brnz_df.filter("load_source = 'customers'")
                .groupBy("year","month")
                .agg(count("*").alias("cnt"))
                .orderBy(col('year').desc(),col('month').desc())
                .limit(1)
            ).collect()[0]
            

            last_load_year = last_loaded_partitions.year
            last_load_month = last_loaded_partitions.month 

            print('loop execution starts..')  

            for path in customer_consolidated_paths[last_load_year]:
                mnth = path.split('/')[-1].split('.')[0].split('_')[2]
                print(mnth)
                if  mnth > last_load_month:
                    customer_file_path = path
                    break
                else:
                    continue
    except Exception as error:
        customer_file_path = None 
        print(f'Error Occured in get_customer_source_path method : {error}')

    return customer_file_path 

In [0]:
customer_consolidated_paths = get_consolidated_customer_file_paths(spark,landing_customer_root_path)
customer_source_path = get_customer_source_path(customer_consolidated_paths)
customer_direct_load(customer_source_path,bronze_operational_root_path)

'''
for path in customer_consolidated_paths['2024']:
    print(path)
'''
