In [0]:
%run /Users/sunnynuri12@gmail.com/Big_project_01/Data_Loader

In [0]:
##calling Data Loader
dl=DataLoader("/FileStore/Big_Proj_01/Bronze_Layer/Insurance_Company.csv")
return_df=dl.create_df()


In [0]:
from pyspark.sql.functions import col, date_format, to_date

## getting a df from loader and cleaning the data.
class DataCleaning():

    ## init method
    def __init__(self,df:'Dataframe'):
        self.df=df

    def clean_data(self):

        ## Standardizing dates
        standard_dates=return_df.withColumn("Opened",to_date("Opened","MM/dd/yyyy")).withColumn("Closed",to_date("Closed","MM/dd/yyyy"))

        ## marking not available in all string columns where null is present
        cleaned_df=standard_dates.fillna({"Coverage":"Not Available",
                                "SubCoverage":"Not Available",
                                "Reason":"Not Available",
                                "SubReason":"Not Available",
                                "Disposition":"Not Available",
                                "Conclusion":"Not Available",
                                "Status":"Not Available",
                                "Recovery":0})
        return cleaned_df
        
        
    

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, max, monotonically_increasing_id

class DimTables():

    def __init__(self,df:"Dataframe"):
        self.df=df
        # Create an instance of DataCleaning inside DimTables
        self.cleaned_df = DataCleaning(self.df).clean_data()

    def create_dim_tables(self,schema,column_name,table_name):

        # Extract unique companies from the new data (only the Company column)
        distinct_df = self.cleaned_df.select(column_name).distinct()

        # distinct_df.show()

        # Try to load the existing Company Table
        try:
            existing_data_df = spark.read.format("delta").load(f"/FileStore/Big_Proj_01/Silver_Layer/Delta_Tables/{table_name}")
        except Exception as e:
            # If the table doesn't exist, create an empty DataFrame with the correct schema
            existing_data_df = spark.createDataFrame([], schema)

        # Identify companies that are new (not present in the existing Company Table)
        new_data_only = distinct_df.join(existing_data_df, column_name, "leftanti")

        # # Find the current maximum CompanyID in the existing table
        if not existing_data_df.rdd.isEmpty():
            max_id = existing_data_df.agg(max(f"{table_name}ID")).collect()[0][0]
        else:
            max_id = 0  # If the table is empty, start from 0

        # # Assign new CompanyID to new companies (starting from the next available ID)
        # # Cast monotonically_increasing_id to IntegerType and add max_id to it
        new_data_with_id = new_data_only.withColumn(f"{table_name}ID", (monotonically_increasing_id() + max_id + 1).cast(IntegerType()))


        # # Combine existing and new companies, ensuring no duplicate CompanyID
        # # Enforce the correct schema for the union
        combined_data_df = existing_data_df.unionByName(new_data_with_id.select(f"{column_name}ID", column_name))

        # #combined_company_df.display()
        # # creating delta tables
        new_data_with_id.write.format("delta").mode("append").save(f"/FileStore/Big_Proj_01/Silver_Layer/Delta_Tables/{table_name}")


    def create_all_dims(self):
        # List of dimension configurations (column_name, table_name, schema)
        dim_configurations = [
            ("Company", "company", StructType([StructField("CompanyID", IntegerType(), nullable=False), StructField("Company", StringType(), nullable=False)])),

            ("Reason", "reason", StructType([StructField("ReasonID", IntegerType(), nullable=False), StructField("Reason", StringType(), nullable=False)])),

            ("SubReason", "subreason", StructType([StructField("SubReasonID", IntegerType(), nullable=False), StructField("SubReason", StringType(), nullable=False)])),

            ("Coverage", "coverage", StructType([StructField("CoverageID", IntegerType(), nullable=False), StructField("Coverage", StringType(), nullable=False)])),

            ("SubCoverage", "subcoverage", StructType([StructField("SubCoverageID", IntegerType(), nullable=False), StructField("SubCoverage", StringType(), nullable=False)])),

            ("Disposition", "disposition", StructType([StructField("DispositionID", IntegerType(), nullable=False), StructField("Disposition", StringType(), nullable=False)])),

            ("Conclusion", "conclusion", StructType([StructField("ConclusionID", IntegerType(), nullable=False), StructField("Conclusion", StringType(), nullable=False)])),

            ("Status", "status", StructType([StructField("StatusID", IntegerType(), nullable=False), StructField("Status", StringType(), nullable=False)]))
            
        ]
        
        # Loop through each dimension configuration and create dimension tables
        for column_name, table_name, schema in dim_configurations:
            self.create_dim_tables(column_name=column_name, table_name=table_name, schema=schema)
            # print(column_name, table_name, schema)

obj1=DimTables(return_df)
dim_tables=obj1.create_all_dims()


In [0]:
class FactTable():

    ##define init
    def __init__(self, df:'Dataframe'):
        self.df=df
        self.cleaned_df = DataCleaning(self.df).clean_data()

    def load_fact(self):
        company_df = spark.read.format("delta").load("/FileStore/Big_Proj_01/Silver_Layer/Delta_Tables/company/")

        conclusion_df = spark.read.format("delta").load("/FileStore/Big_Proj_01/Silver_Layer/Delta_Tables/conclusion/")

        coverage_df = spark.read.format("delta").load("/FileStore/Big_Proj_01/Silver_Layer/Delta_Tables/coverage/")

        disposition_df = spark.read.format("delta").load("/FileStore/Big_Proj_01/Silver_Layer/Delta_Tables/disposition/")

        reason_df = spark.read.format("delta").load("/FileStore/Big_Proj_01/Silver_Layer/Delta_Tables/reason/")

        status_df = spark.read.format("delta").load("/FileStore/Big_Proj_01/Silver_Layer/Delta_Tables/status/")

        subcoverage_df = spark.read.format("delta").load("/FileStore/Big_Proj_01/Silver_Layer/Delta_Tables/subcoverage/")

        subreason_df = spark.read.format("delta").load("/FileStore/Big_Proj_01/Silver_Layer/Delta_Tables/subreason/")
                                                     
        fact_df=(
            self.cleaned_df
            .join(company_df, self.cleaned_df["Company"]==company_df["Company"], "left")
            .join(conclusion_df, self.cleaned_df["Conclusion"]==conclusion_df["Conclusion"], "left") 
            .join(coverage_df, self.cleaned_df["coverage"]==coverage_df["coverage"], "left")
            .join(disposition_df, self.cleaned_df["disposition"]==disposition_df["disposition"], "left")
            .join(reason_df, self.cleaned_df["reason"]==reason_df["reason"], "left")
            .join(subcoverage_df, self.cleaned_df["subcoverage"]==subcoverage_df["subcoverage"], "left")
            .join(subreason_df, self.cleaned_df["subreason"]==subreason_df["subreason"], "left")
            .join(status_df, self.cleaned_df["status"]==status_df["status"], "left")
            .select(
                company_df["CompanyID"].alias("CompanyID"),
                conclusion_df["ConclusionID"].alias("ConclusionID"),
                coverage_df["CoverageID"].alias("CoverageID"),
                disposition_df["DispositionID"].alias("DispositionID"),
                reason_df["ReasonID"].alias("ReasonID"),
                subcoverage_df["SubcoverageID"].alias("SubcoverageID"),
                subreason_df["SubreasonID"].alias("SubreasonID"),
                status_df["StatusID"].alias("StatusID"),
                self.cleaned_df["FileNo"].alias("FileNo"),
                self.cleaned_df["Opened"].alias("OpenedDate"),
                self.cleaned_df["Closed"].alias("ClosedDate"),
                self.cleaned_df["Recovery"].alias("RecoveryAmount")
                    )
        )
        fact_df.write.format("delta").mode("overwrite").save("/FileStore/Big_Proj_01/Silver_Layer/Delta_Tables/fact")
       