
### Connecting ADLS Gen2 with Azure Databricks Notebook

In [0]:
'''
1. Created the App Registration in Azure Portal. 
2. Created the Secret under Certificate&Secrets for the App Registration that is created
3. In ADLS2, under IAM -> Added new Role Assignment and add the User that is created in the App Registration to establish the connectivity between ADLS and Azure Databricks. 
'''

storage_account = "dmgproductionadls2"                     # Storage account name
application_id = "7729fa92-7775-4604-9238-20b3d90fdecf"    # Application ID from App Registration
directory_id = "72a18fcd-e918-4ce9-aea4-0d2739f1bc60"      # Directory ID from App Registration

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", application_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", "BtG8Q~xXGHe18fHgNtggNyDHcJuxSgOun1kagdBE")                # Value from App Registration -> Certification & Secrets
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net", f"https://login.microsoftonline.com/{directory_id}/oauth2/token")


### Accessing the bronze layer and getting all the files names for further processing

In [0]:
#df=spark.read.parquet("abfss://bronze@dmgproductionadls2.dfs.core.windows.net/Production/Culture/Culture.parquet",header=True)

base_path = 'abfss://bronze@dmgproductionadls2.dfs.core.windows.net/Production/'
dbutils.fs.ls(base_path)
tables_list=[]
for table in dbutils.fs.ls(base_path):
    tables_list.append(table.name)
print(tables_list)

['BillOfMaterials/', 'Culture/', 'Illustration/', 'Location/', 'Product/', 'ProductCategory/', 'ProductCostHistory/', 'ProductDescription/', 'ProductInventory/', 'ProductListPriceHistory/', 'ProductModel/', 'ProductModelIllustration/', 'ProductModelProductDescriptionCulture/', 'ProductPhoto/', 'ProductProductPhoto/', 'ProductReview/', 'ProductSubcategory/', 'ScrapReason/', 'TransactionHistory/', 'TransactionHistoryArchive/', 'UnitMeasure/', 'WorkOrder/', 'WorkOrderRouting/']



### Reading the Bronze layer files into Spark dataframes dynamically

In [0]:
df={}
for table in tables_list:
    file_name=table.split("/")[0]
    file_df="df_"+file_name
    file_path = base_path+table+file_name+".parquet"
    df[file_df]=spark.read.parquet(file_path,header=True)
    print(f'The name of the dataframe is {file_df} and count is {df[file_df].count()}')
    print("The data is:\n",df[file_df].show(2))

The name of the dataframe is df_BillOfMaterials and count is 2679
+-----------------+-----------------+-----------+-------------------+-------------------+---------------+--------+--------------+-------------------+
|BillOfMaterialsID|ProductAssemblyID|ComponentID|          StartDate|            EndDate|UnitMeasureCode|BOMLevel|PerAssemblyQty|       ModifiedDate|
+-----------------+-----------------+-----------+-------------------+-------------------+---------------+--------+--------------+-------------------+
|              893|             NULL|        749|2010-05-26 00:00:00|               NULL|            EA |       0|          1.00|2010-05-12 00:00:00|
|              271|             NULL|        750|2010-03-04 00:00:00|2010-05-03 00:00:00|            EA |       0|          1.00|2010-05-03 00:00:00|
+-----------------+-----------------+-----------+-------------------+-------------------+---------------+--------+--------------+-------------------+
only showing top 2 rows

The data 


## Data Cleaning


#### Defining the function to check null values

In [0]:
from pyspark.sql.functions import col, count, when

def check_nulls(df, df_name):
    # Initialize an empty list to store the count expressions
    counts = []
    
    # Iterate over each column and apply the count condition
    for c in df.columns:
        counts.append(count(when(col(c).isNull(), 1)).alias(c))
    
    # Apply select with the count expressions
    null_counts_df = df.select(*counts)
    
    # Collect results as a dictionary
    null_counts = null_counts_df.collect()[0].asDict()
   
    # Check if any column has nulls
    if any(value > 0 for value in null_counts.values()):
        print(f"!!!!! Null values detected in {df_name}!!!!!")
        print(null_counts)
    else:
        print(f"<<OK>> No nulls found in {df_name} <<OK>>")

In [0]:
for table in df.keys():
    print("\nThe  table is  :", table)
    print("The count of df:",df[table].count())
    check_nulls(df[table],table)


The  table is  : df_BillOfMaterials
The count of df: 2679
!!!!! Null values detected in df_BillOfMaterials!!!!!
{'BillOfMaterialsID': 0, 'ProductAssemblyID': 103, 'ComponentID': 0, 'StartDate': 0, 'EndDate': 2480, 'UnitMeasureCode': 0, 'BOMLevel': 0, 'PerAssemblyQty': 0, 'ModifiedDate': 0}

The  table is  : df_Culture
The count of df: 8
<<OK>> No nulls found in df_Culture <<OK>>

The  table is  : df_Illustration
The count of df: 5
<<OK>> No nulls found in df_Illustration <<OK>>

The  table is  : df_Location
The count of df: 14
<<OK>> No nulls found in df_Location <<OK>>

The  table is  : df_Product
The count of df: 504
!!!!! Null values detected in df_Product!!!!!
{'ProductID': 0, 'Name': 0, 'ProductNumber': 0, 'MakeFlag': 0, 'FinishedGoodsFlag': 0, 'Color': 248, 'SafetyStockLevel': 0, 'ReorderPoint': 0, 'StandardCost': 0, 'ListPrice': 0, 'Size': 293, 'SizeUnitMeasureCode': 328, 'WeightUnitMeasureCode': 299, 'Weight': 299, 'DaysToManufacture': 0, 'ProductLine': 226, 'Class': 257, 'Sty


So from the above cell, it is clear that most of the dataframes are without null values, however below have null values. Lets review them if data needs imputed. 

- df_BillOfMaterials 
- df_Product 
- df_ProductCostHistory 
- df_ProductListPriceHistory 
- df_ProductModel 
- df_WorkOrder 

In [0]:
# Fixing the null values in the table df_BillOfMaterials
from pyspark.sql.functions import col, when, add_months

# display Original dataframe
print("The Original table is  :")
df['df_BillOfMaterials'].show(5)

# Update EndDate where it's NULL as 2 monthsr from the StartDate. 
df['df_BillOfMaterials'] = df['df_BillOfMaterials'].withColumn("EndDate", when(col("EndDate").isNull(), add_months(col("StartDate"), 2)).otherwise(col("EndDate")))
# Update ProductAssemblyID where its null as 0 . 
df['df_BillOfMaterials'] = df['df_BillOfMaterials'].withColumn("ProductAssemblyID", when(col("ProductAssemblyID").isNull(), 0).otherwise(col("ProductAssemblyID")))

# Show updated DataFrame
print("The Updated table is  :")
df['df_BillOfMaterials'].show(5)

# Validating the null values
check_nulls(df['df_BillOfMaterials'], 'df_BillOfMaterials')

The Original table is  :
+-----------------+-----------------+-----------+-------------------+-------------------+---------------+--------+--------------+-------------------+
|BillOfMaterialsID|ProductAssemblyID|ComponentID|          StartDate|            EndDate|UnitMeasureCode|BOMLevel|PerAssemblyQty|       ModifiedDate|
+-----------------+-----------------+-----------+-------------------+-------------------+---------------+--------+--------------+-------------------+
|              893|             NULL|        749|2010-05-26 00:00:00|               NULL|            EA |       0|          1.00|2010-05-12 00:00:00|
|              271|             NULL|        750|2010-03-04 00:00:00|2010-05-03 00:00:00|            EA |       0|          1.00|2010-05-03 00:00:00|
|               34|             NULL|        750|2010-05-04 00:00:00|               NULL|            EA |       0|          1.00|2010-04-20 00:00:00|
|              830|             NULL|        751|2010-05-26 00:00:00|      

In [0]:
from pyspark.sql.functions import date_add

In [0]:
from pyspark.sql.functions import col, when, avg,date_add

# Show original DataFrame
print("The Original table is  :")
df['df_Product'].show(5)

# Update "SizeUnitMeasureCode" as "CM" as all values are "CM"
df['df_Product'] = df['df_Product'].withColumn("SizeUnitMeasureCode",when((col("SizeUnitMeasureCode").isNull()), "CM ").otherwise(col("SizeUnitMeasureCode")))

# Update "WeightUnitMeasureCode" as "LB" as all values are either "LB" or "G".
df['df_Product'] = df['df_Product'].withColumn("WeightUnitMeasureCode",when((col("WeightUnitMeasureCode").isNull()), "LB ").otherwise(col("WeightUnitMeasureCode")))

# Compute the mean of "Size"
size_mean = round((df['df_Product'].select(avg(col("Size"))).collect()[0][0]),)
# Update "Size" where it's NULL
df['df_Product'] = df['df_Product'].withColumn("Size", when(col("Size").isNull(), size_mean).otherwise(col("Size")))

# Compute the mean of "Weight"
Weight_mean = df['df_Product'].select(avg(col("Weight"))).collect()[0][0]
# Update "Weight" where it's NULL
df['df_Product'] = df['df_Product'].withColumn("Weight", when(col("Weight").isNull(), size_mean).otherwise(col("Weight")))

# Update "Color" as "No color" where it's NULL
df['df_Product'] = df['df_Product'].withColumn("Color",when(col("Color").isNull(),"No Color").otherwise(col("Color")))

# Update "ProductLine" as "No ProductLine" where it's NULL
df['df_Product'] = df['df_Product'].withColumn("ProductLine",when(col("ProductLine").isNull(),"No ProductLine").otherwise(col("ProductLine")))

# Update "Class" as "Not Available" where it's NULL
df['df_Product'] = df['df_Product'].withColumn("Class",when(col("Class").isNull(),"Not Available").otherwise(col("Class")))

# Update "Style" as "Not Available" where it's NULL
df['df_Product'] = df['df_Product'].withColumn("Style",when(col("Style").isNull(),"Not Available").otherwise(col("Style")))

# Update "ProductSubcategoryID" as 0 where it's NULL
df['df_Product'] = df['df_Product'].withColumn("ProductSubcategoryID",when(col("ProductSubcategoryID").isNull(),0).otherwise(col("ProductSubcategoryID")))

# Update "ProductModelID" as 0 where it's NULL
df['df_Product'] = df['df_Product'].withColumn("ProductModelID",when(col("ProductModelID").isNull(),0).otherwise(col("ProductModelID")))

# Update "SellEndDate" as 1 year from SellStartDate
df['df_Product'] = df['df_Product'].withColumn("SellEndDate",when(col("SellEndDate").isNull(),date_add(col("SellStartDate"),365)).otherwise(col("SellEndDate")))

# Update "DiscontinuedDate" as "9999-12-31" as all the values are Null. 
df['df_Product'] = df['df_Product'].withColumn("DiscontinuedDate",when(col("DiscontinuedDate").isNull(),"9999-12-31").otherwise(col("DiscontinuedDate")))

# Show Updated DataFrame
print("The Updated table is  :")
df['df_Product'].show(5)

check_nulls(df['df_Product'],'df_Product')

The Original table is  :
+---------+--------------------+-------------+--------+-----------------+-----+----------------+------------+------------+---------+----+-------------------+---------------------+------+-----------------+-----------+-----+-----+--------------------+--------------+-------------------+-----------+----------------+--------------------+--------------------+
|ProductID|                Name|ProductNumber|MakeFlag|FinishedGoodsFlag|Color|SafetyStockLevel|ReorderPoint|StandardCost|ListPrice|Size|SizeUnitMeasureCode|WeightUnitMeasureCode|Weight|DaysToManufacture|ProductLine|Class|Style|ProductSubcategoryID|ProductModelID|      SellStartDate|SellEndDate|DiscontinuedDate|             rowguid|        ModifiedDate|
+---------+--------------------+-------------+--------+-----------------+-----+----------------+------------+------------+---------+----+-------------------+---------------------+------+-----------------+-----------+-----+-----+--------------------+--------------

In [0]:
# Fixing the null values in the table df_ProductCostHistory

# Show Original DataFrame
print("The Original table is  :")
df['df_ProductCostHistory'].show(5)

df['df_ProductCostHistory']=df['df_ProductCostHistory'].withColumn("EndDate",when(col("EndDate").isNull(),date_add(col("StartDate"),365)).otherwise(col("EndDate")))

# Show Updated DataFrame
print("The Updated table is  :")
df['df_ProductCostHistory'].show(5)

check_nulls(df['df_ProductCostHistory'],'df_ProductCostHistory')

The Original table is  :
+---------+-------------------+-------------------+------------+-------------------+
|ProductID|          StartDate|            EndDate|StandardCost|       ModifiedDate|
+---------+-------------------+-------------------+------------+-------------------+
|      707|2011-05-31 00:00:00|2012-05-29 00:00:00|     12.0278|2012-05-29 00:00:00|
|      707|2012-05-30 00:00:00|2013-05-29 00:00:00|     13.8782|2013-05-29 00:00:00|
|      707|2013-05-30 00:00:00|               NULL|     13.0863|2013-05-16 00:00:00|
|      708|2011-05-31 00:00:00|2012-05-29 00:00:00|     12.0278|2012-05-29 00:00:00|
|      708|2012-05-30 00:00:00|2013-05-29 00:00:00|     13.8782|2013-05-29 00:00:00|
+---------+-------------------+-------------------+------------+-------------------+
only showing top 5 rows

The Updated table is  :
+---------+-------------------+-------------------+------------+-------------------+
|ProductID|          StartDate|            EndDate|StandardCost|       Modif

In [0]:
# Fixing the null values in the table df_ProductListPriceHistory

# display Original dataframe
print("The Original table is  :")
df['df_ProductListPriceHistory'].show(5)

from pyspark.sql.functions import col, when, date_add

# Update EndDate where it's NULL as 1 year from the StartDate. 
df['df_ProductListPriceHistory'] = df['df_ProductListPriceHistory'].withColumn("EndDate", when(col("EndDate").isNull(), date_add(col("StartDate"), 365)).otherwise(col("EndDate")))

# display updated DataFrame
print("The corrected table is :")
df['df_ProductListPriceHistory'].show(5)

# Validating the null values
check_nulls(df['df_ProductListPriceHistory'],'df_ProductListPriceHistory')

The Original table is  :
+---------+-------------------+-------------------+---------+-------------------+
|ProductID|          StartDate|            EndDate|ListPrice|       ModifiedDate|
+---------+-------------------+-------------------+---------+-------------------+
|      707|2011-05-31 00:00:00|2012-05-29 00:00:00|  33.6442|2012-05-29 00:00:00|
|      707|2012-05-30 00:00:00|2013-05-29 00:00:00|  33.6442|2013-05-29 00:00:00|
|      707|2013-05-30 00:00:00|               NULL|  34.9900|2013-05-09 00:00:00|
|      708|2011-05-31 00:00:00|2012-05-29 00:00:00|  33.6442|2012-05-29 00:00:00|
|      708|2012-05-30 00:00:00|2013-05-29 00:00:00|  33.6442|2013-05-29 00:00:00|
+---------+-------------------+-------------------+---------+-------------------+
only showing top 5 rows

The corrected table is :
+---------+-------------------+-------------------+---------+-------------------+
|ProductID|          StartDate|            EndDate|ListPrice|       ModifiedDate|
+---------+------------

In [0]:
# Fixing the null values in the table df_ProductModel

# display Original DataFrame
print("The Original table is :")
df['df_ProductModel'].show(5)

df['df_ProductModel']=df['df_ProductModel'].withColumn("CatalogDescription",when(col("CatalogDescription").isNull(),"No Description").otherwise(col("CatalogDescription")))
df['df_ProductModel']=df['df_ProductModel'].withColumn("Instructions",when(col("Instructions").isNull(),"No Instructions").otherwise(col("Instructions")))

# display updated DataFrame
print("The corrected table is :")
df['df_ProductModel'].show(5)

# Validating the null values
check_nulls(df['df_ProductModel'],'df_ProductModel')

The Original table is :
+--------------+------------------+------------------+------------+--------------------+-------------------+
|ProductModelID|              Name|CatalogDescription|Instructions|             rowguid|       ModifiedDate|
+--------------+------------------+------------------+------------+--------------------+-------------------+
|             1|      Classic Vest|              NULL|        NULL|29321d47-1e4c-4aa...|2013-04-30 00:00:00|
|             2|       Cycling Cap|              NULL|        NULL|474fb654-3c96-4cb...|2011-05-01 00:00:00|
|             3|Full-Finger Gloves|              NULL|        NULL|a75483fe-3c47-4aa...|2012-04-30 00:00:00|
|             4|Half-Finger Gloves|              NULL|        NULL|14b56f2a-d4aa-40a...|2012-04-30 00:00:00|
|             5| HL Mountain Frame|              NULL|        NULL|fdd5407b-c2db-49d...|2011-05-01 00:00:00|
+--------------+------------------+------------------+------------+--------------------+----------------

In [0]:
# Fixing the null values in the table df_WorkOrder

# display Original DataFrame
print("The Original table is :")
df['df_WorkOrder'].show(5)

df['df_WorkOrder']=df['df_WorkOrder'].withColumn("ScrapReasonID",when(col("ScrapReasonID").isNull(),00).otherwise(col("ScrapReasonID")))

# display updated DataFrame
print("The corrected table is :")
df['df_WorkOrder'].show(5)

# Validating the null values
check_nulls(df['df_WorkOrder'],'df_WorkOrder')

The Original table is :
+-----------+---------+--------+----------+-----------+-------------------+-------------------+-------------------+-------------+-------------------+
|WorkOrderID|ProductID|OrderQty|StockedQty|ScrappedQty|          StartDate|            EndDate|            DueDate|ScrapReasonID|       ModifiedDate|
+-----------+---------+--------+----------+-----------+-------------------+-------------------+-------------------+-------------+-------------------+
|          1|      722|       8|         8|          0|2011-06-03 00:00:00|2011-06-13 00:00:00|2011-06-14 00:00:00|         NULL|2011-06-13 00:00:00|
|          2|      725|      15|        15|          0|2011-06-03 00:00:00|2011-06-13 00:00:00|2011-06-14 00:00:00|         NULL|2011-06-13 00:00:00|
|          3|      726|       9|         9|          0|2011-06-03 00:00:00|2011-06-13 00:00:00|2011-06-14 00:00:00|         NULL|2011-06-13 00:00:00|
|          4|      729|      16|        16|          0|2011-06-03 00:00:00|2

In [0]:
for table in df.keys():
    print("\nThe  table is  :", table)
    print("The count of df:",df[table].count())
    check_nulls(df[table],table)


The  table is  : df_BillOfMaterials
The count of df: 2679
<<OK>> No nulls found in df_BillOfMaterials <<OK>>

The  table is  : df_Culture
The count of df: 8
<<OK>> No nulls found in df_Culture <<OK>>

The  table is  : df_Illustration
The count of df: 5
<<OK>> No nulls found in df_Illustration <<OK>>

The  table is  : df_Location
The count of df: 14
<<OK>> No nulls found in df_Location <<OK>>

The  table is  : df_Product
The count of df: 504
<<OK>> No nulls found in df_Product <<OK>>

The  table is  : df_ProductCategory
The count of df: 4
<<OK>> No nulls found in df_ProductCategory <<OK>>

The  table is  : df_ProductCostHistory
The count of df: 395
<<OK>> No nulls found in df_ProductCostHistory <<OK>>

The  table is  : df_ProductDescription
The count of df: 762
<<OK>> No nulls found in df_ProductDescription <<OK>>

The  table is  : df_ProductInventory
The count of df: 1069
<<OK>> No nulls found in df_ProductInventory <<OK>>

The  table is  : df_ProductListPriceHistory
The count of df: 


View table schema and then transform Timestamp datatype to Date in all tables since the time values in timestamp are zeros. 

In [0]:
# Create the function to print the Schema. 
def print_schema(df, df_name):
    print("\nThe Schema of the table {} is :".format(df_name))
    df.printSchema()

In [0]:
# Displaying the schema of each tables to identify the timestamp column
for file in df.keys():
    print_schema(df[file],file)


The Schema of the table df_BillOfMaterials is :
root
 |-- BillOfMaterialsID: integer (nullable = true)
 |-- ProductAssemblyID: integer (nullable = true)
 |-- ComponentID: integer (nullable = true)
 |-- StartDate: timestamp (nullable = true)
 |-- EndDate: timestamp (nullable = true)
 |-- UnitMeasureCode: string (nullable = true)
 |-- BOMLevel: integer (nullable = true)
 |-- PerAssemblyQty: decimal(8,2) (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)


The Schema of the table df_Culture is :
root
 |-- CultureID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)


The Schema of the table df_Illustration is :
root
 |-- IllustrationID: integer (nullable = true)
 |-- Diagram: string (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)


The Schema of the table df_Location is :
root
 |-- LocationID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- CostRate: decimal(10,4) (nullable = true)

In [0]:
# Transforming the date columns to date format
from pyspark.sql.functions import to_date

for file in df.keys():
    print("\n",file," in Original Schema")
    print_schema(df[file],file)
    for col in df[file].dtypes:
        col_name = col[0]
        col_type = col[1]
        if 'date' in col_name or 'Date' in col_name:
            df[file] = df[file].withColumn(col_name, to_date(col_name, 'MM/dd/yyyy'))

    print("\n",file," in transformed Schema")
    print_schema(df[file],file)


 df_BillOfMaterials  in Original Schema

The Schema of the table df_BillOfMaterials is :
root
 |-- BillOfMaterialsID: integer (nullable = true)
 |-- ProductAssemblyID: integer (nullable = true)
 |-- ComponentID: integer (nullable = true)
 |-- StartDate: timestamp (nullable = true)
 |-- EndDate: timestamp (nullable = true)
 |-- UnitMeasureCode: string (nullable = true)
 |-- BOMLevel: integer (nullable = true)
 |-- PerAssemblyQty: decimal(8,2) (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)


 df_BillOfMaterials  in transformed Schema

The Schema of the table df_BillOfMaterials is :
root
 |-- BillOfMaterialsID: integer (nullable = true)
 |-- ProductAssemblyID: integer (nullable = true)
 |-- ComponentID: integer (nullable = true)
 |-- StartDate: date (nullable = true)
 |-- EndDate: date (nullable = true)
 |-- UnitMeasureCode: string (nullable = true)
 |-- BOMLevel: integer (nullable = true)
 |-- PerAssemblyQty: decimal(8,2) (nullable = true)
 |-- ModifiedDate: date (null

In [0]:
# Displaying the data of each tables to identify the timestamp column changed to dates
for file in df.keys():
    df[file].show(5)

+-----------------+-----------------+-----------+----------+----------+---------------+--------+--------------+------------+
|BillOfMaterialsID|ProductAssemblyID|ComponentID| StartDate|   EndDate|UnitMeasureCode|BOMLevel|PerAssemblyQty|ModifiedDate|
+-----------------+-----------------+-----------+----------+----------+---------------+--------+--------------+------------+
|              893|                0|        749|2010-05-26|2010-07-26|            EA |       0|          1.00|  2010-05-12|
|              271|                0|        750|2010-03-04|2010-05-03|            EA |       0|          1.00|  2010-05-03|
|               34|                0|        750|2010-05-04|2010-07-04|            EA |       0|          1.00|  2010-04-20|
|              830|                0|        751|2010-05-26|2010-07-26|            EA |       0|          1.00|  2010-05-12|
|             2074|                0|        752|2010-07-08|2010-09-08|            EA |       0|          1.00|  2010-06-24|
