In [0]:
%run "./includes/common_functions"

In [0]:
%run "./includes/configuration"

#### Get file name

In [0]:
source_object_name = dbutils.widgets.get('Source_Object_Name')
load_type = dbutils.widgets.get('Load_Type')
incre_folder_name = dbutils.widgets.get('Incre_Folder_Name')

#### Read source file

In [0]:
if load_type == 'full_load':
    df = spark.read.parquet(f'{raw_folder_path}/{source_object_name}', header=True, inferSchema=True)
elif load_type == 'incremental':
    df = spark.read.parquet(f'{raw_folder_path}/{source_object_name}/{incre_folder_name}', header=True, inferSchema=True)

#### Check null value

In [0]:
count_null(df)

S_ID_null_count,Tin_No_null_count,Company_Name_null_count,Date_of_Reg_null_count,SubCity_null_count,Town_null_count,Telephone_null_count,Fax_null_count,EMail_null_count,Business_License_No_null_count,Business_Type_null_count
0,0,0,0,955,2323,1078,26287,0,1,0


#### Validate data type and Handling null values

In [0]:
for field in df.schema.fields:
        column_name = field.name
        column_type = str(field.dataType)

        # Check for integer type
        if "int" in column_type.lower():
            df = df.withColumn(column_name, when(col(column_name).isNotNull(), col(column_name).cast('int')))
            print(f"{column_name}: Integer type validation applied.")

            df = df.na.fill({column_name: 0})

        # Check for float type
        elif "float" in column_type.lower():
            df = df.withColumn(column_name, when(col(column_name).isNotNull(), col(column_name).cast('float')))
            print(f"{column_name}: Float type validation applied.")

            df = df.na.fill({column_name: 0})
        
        # Check for double type
        elif "double" in column_type.lower():
            df = df.withColumn(column_name, when(col(column_name).isNotNull(), col(column_name).cast('double')))
            print(f"{column_name}: Double type validation applied.")

            df = df.na.fill({column_name: 0})

        # Check for string type
        elif "string" in column_type.lower():
            df = df.withColumn(column_name, when(col(column_name).isNotNull(), col(column_name).cast('string')))
            print(f"{column_name}: String type validation applied.")

            df = df.na.fill({column_name: "Unknown"})

        # Check for boolean type
        elif "boolean" in column_type.lower():
            df = df.withColumn(column_name, when(col(column_name).isNotNull(), col(column_name).cast('boolean')))
            print(f"{column_name}: Boolean type validation applied.")

        # Check for date type
        elif "date" in column_type.lower():
            df = df.withColumn(column_name, when(col(column_name).isNotNull(), col(column_name).cast('date')))
            print(f"{column_name}: Date type validation applied.")

S_ID: Double type validation applied.
Tin_No: String type validation applied.
Company_Name: String type validation applied.
Date_of_Reg: String type validation applied.
SubCity: String type validation applied.
Town: String type validation applied.
Telephone: String type validation applied.
Fax: String type validation applied.
EMail: String type validation applied.
Business_License_No: String type validation applied.
Business_Type: String type validation applied.


#### Again checking null to reverify

In [0]:
count_null(df)

S_ID_null_count,Tin_No_null_count,Company_Name_null_count,Date_of_Reg_null_count,SubCity_null_count,Town_null_count,Telephone_null_count,Fax_null_count,EMail_null_count,Business_License_No_null_count,Business_Type_null_count
0,0,0,0,0,0,0,0,0,0,0


#### Check Duplicates data

In [0]:
primary_key_columns = ["S_ID", "PO_ID", "TRANSPORTATION_ID", "P_ID"]
for column in primary_key_columns:
    if column in df.columns:
        duplicate_count = df.groupBy(column).count().where(col('count')>1).count()
        if duplicate_count > 0:
            df = df.dropDuplicates([column])
            print("Duplicates dropped!")
        else:
            print("No Duplicates found!")

No Duplicates found!


#### Write transformed data to bronze container in delta format

In [0]:
if(load_type == 'full_load'):
    df.write.mode('overwrite').format('delta').save(f'{bronze_folder_path}/{source_object_name}')
    # Creating a Delta Table instance
    from delta.tables import DeltaTable
    dt = DeltaTable.forPath(spark, f'{bronze_folder_path}/{source_object_name}')
    # Removing old history
    spark.conf.set('spark.databricks.delta.retentionDurationCheck.enabled', 'false')
    dt.vacuum(retentionHours=24)
    
elif(load_type == 'incremental'):
    df.write.mode('append').format('delta').save(f'{bronze_folder_path}/{source_object_name}/{incre_folder_name}')