In [0]:
dbutils.secrets.list('dbclientretail')


[SecretMetadata(key='clientsecretretail')]

In [0]:
secret=dbutils.secrets.get('dbclientretail','clientsecretretail')

In [0]:
# Service Principal credentials and configurations
client_id= "6e518dc2-6192-4de7-bfd8-a2081c0ed276"
client_secret = secret
directory_id= "80afa3f4-1563-46f1-a268-04f5fb610dd9"

# Access storage account using Service Principal
configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": f"{client_id}",
           "fs.azure.account.oauth2.client.secret": f"{client_secret}",
           "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{directory_id}/oauth2/token"}


In [0]:
storage_account_name = "dlretail"
container_name = "raw"
processed_container_name = "curated"

mount_point = f"/mnt/{storage_account_name}/{container_name}"

if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    # Unmount the existing mount point
    dbutils.fs.unmount(mount_point)
    print(f"Unmounted existing mount at {mount_point}")

try:
    dbutils.fs.mount(
        source=f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/",
        mount_point=mount_point,
        extra_configs=configs
    )
    print(f"Mounted {container_name} successfully at {mount_point}")
except Exception as e:
    print(f"Error mounting: {e}")

/mnt/dlretail/raw has been unmounted.
Unmounted existing mount at /mnt/dlretail/raw
Mounted raw successfully at /mnt/dlretail/raw


In [0]:
#files in the mounted container
dbutils.fs.ls("/mnt/dlretail/raw")

[FileInfo(path='dbfs:/mnt/dlretail/raw/address.csv', name='address.csv', size=39775, modificationTime=1732363959000),
 FileInfo(path='dbfs:/mnt/dlretail/raw/city.csv', name='city.csv', size=15349, modificationTime=1732363974000),
 FileInfo(path='dbfs:/mnt/dlretail/raw/country.csv', name='country.csv', size=2455, modificationTime=1732363989000),
 FileInfo(path='dbfs:/mnt/dlretail/raw/customer.csv', name='customer.csv', size=50056, modificationTime=1732364006000),
 FileInfo(path='dbfs:/mnt/dlretail/raw/employees.csv', name='employees.csv', size=290, modificationTime=1732363941000)]

In [0]:
#display mounts
display(dbutils.fs.mounts())

mountPoint,source,encryptionType
/databricks-datasets,databricks-datasets,
/Volumes,UnityCatalogVolumes,
/databricks/mlflow-tracking,databricks/mlflow-tracking,
/databricks-results,databricks-results,
/databricks/mlflow-registry,databricks/mlflow-registry,
/mnt/dlretail/curated,abfss://curated@dlretail.dfs.core.windows.net/,
/Volume,DbfsReserved,
/volumes,DbfsReserved,
/mnt/dlretail/raw,abfss://raw@dlretail.dfs.core.windows.net/,
/,DatabricksRoot,


In [0]:
#files in the mounted container
display(dbutils.fs.ls(mount_point))

path,name,size,modificationTime
dbfs:/mnt/dlretail/raw/address.csv,address.csv,39775,1732363959000
dbfs:/mnt/dlretail/raw/city.csv,city.csv,15349,1732363974000
dbfs:/mnt/dlretail/raw/country.csv,country.csv,2455,1732363989000
dbfs:/mnt/dlretail/raw/customer.csv,customer.csv,50056,1732364006000
dbfs:/mnt/dlretail/raw/employees.csv,employees.csv,290,1732363941000


In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import *

# Schema for File 1
schema_address = StructType([
    StructField("address_id", IntegerType(), True),
    StructField("address", StringType(), True),
    StructField("address2", StringType(), True),
    StructField("district", StringType(), True),
    StructField("city_id", IntegerType(), True),
    StructField("postal_code", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("last_update", TimestampType(), True)
])

# Schema for File 2
schema_city = StructType([
    StructField("city_id", IntegerType(), True),
    StructField("city", StringType(), True),
    StructField("country_id", IntegerType(), True),
    StructField("last_update", TimestampType(), True)
])

# Schema for File 3
schema_country = StructType([
    StructField("country_id", IntegerType(), True),
    StructField("country", StringType(), True),
    StructField("last_update", TimestampType(), True)
])

# Schema for File 4
schema_customer = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("store_id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("address_id", IntegerType(), True),
    StructField("activebool", StringType(), True),
    StructField("create_date", TimestampType(), True),
    StructField("last_update", TimestampType(), True),
    StructField("active", IntegerType(), True)
    
])

# Schema for File 5
schema_employees = StructType([
    StructField("emp_id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("birthdate", TimestampType(), True),
    StructField("hire_date",TimestampType(), True),
    StructField("Salary", DoubleType(), True),
])

df1 = spark.read.schema(schema_address).csv(f"/mnt/{storage_account_name}/{container_name}/address.csv", header=True)
df2 = spark.read.schema(schema_city).csv(f"/mnt/{storage_account_name}/{container_name}/city.csv", header=True)
df3 = spark.read.schema(schema_country).csv(f"/mnt/{storage_account_name}/{container_name}/country.csv", header=True)
df4 = spark.read.schema(schema_customer).csv(f"/mnt/{storage_account_name}/{container_name}/customer.csv", header=True)
df5 = spark.read.schema(schema_employees).csv(f"/mnt/{storage_account_name}/{container_name}/employees.csv", header=True)                     

df1.show(15)
df2.show(15)
df3.show(15)
df4.show(15)
df5.show(15)

+----------+--------------------+--------+--------------+-------+-----------+-----------+-----------+
|address_id|             address|address2|      district|city_id|postal_code|      phone|last_update|
+----------+--------------------+--------+--------------+-------+-----------+-----------+-----------+
|         1|   47 MySakila Drive|    NULL|       Alberta|    300|       NULL|       NULL|       NULL|
|         2|  28 MySQL Boulevard|    NULL|           QLD|    576|       NULL|       NULL|       NULL|
|         3|   23 Workhaven Lane|    NULL|       Alberta|    300|       NULL|14033335568|       NULL|
|         4|1411 Lillydale Drive|    NULL|           QLD|    576|       NULL| 6172235589|       NULL|
|         5|      1913 Hanoi Way|    NULL|      Nagasaki|    463|      35200|28303384290|       NULL|
|         6|    1121 Loja Avenue|    NULL|    California|    449|      17886|8.38635E+11|       NULL|
|         7|   692 Joliet Street|    NULL|        Attika|     38|      83579|4.484

In [0]:
#Dropping last_update column and address2 column from address table
df1 = df1.drop("last_update")
df1 = df1.drop("address2")
df2 = df2.drop("last_update")
df3 = df3.drop("last_update")
df4 = df4.drop("last_update")
df5 = df5.drop("last_update")

df1.show(15)

+----------+--------------------+--------------+-------+-----------+-----------+
|address_id|             address|      district|city_id|postal_code|      phone|
+----------+--------------------+--------------+-------+-----------+-----------+
|         1|   47 MySakila Drive|       Alberta|    300|       NULL|       NULL|
|         2|  28 MySQL Boulevard|           QLD|    576|       NULL|       NULL|
|         3|   23 Workhaven Lane|       Alberta|    300|       NULL|14033335568|
|         4|1411 Lillydale Drive|           QLD|    576|       NULL| 6172235589|
|         5|      1913 Hanoi Way|      Nagasaki|    463|      35200|28303384290|
|         6|    1121 Loja Avenue|    California|    449|      17886|8.38635E+11|
|         7|   692 Joliet Street|        Attika|     38|      83579|4.48477E+11|
|         8|    1566 Inegl Manor|      Mandalay|    349|      53561|7.05814E+11|
|         9|     53 Idfu Parkway|        Nantou|    361|      42399|10655648674|
|        10|1795 Santiago de

In [0]:
#Enable AQE
spark.conf.set("spark.sql.adaptive.enabled", "true")

In [0]:
#Checking existing partitions
mydefault_partitions = df1.rdd.getNumPartitions()
mydefault_partitions = df2.rdd.getNumPartitions()
mydefault_partitions = df3.rdd.getNumPartitions()

print(f"Number of partitions in df1: {mydefault_partitions}")
print(f"Number of partitions in df2: {mydefault_partitions}")
print(f"Number of partitions in df3: {mydefault_partitions}")

Number of partitions in df1: 1
Number of partitions in df2: 1
Number of partitions in df3: 1


In [0]:
#Calculating number of rows in the table

row_count1 = df1.count()
row_count2 = df2.count()
row_count3 = df3.count()

print(row_count1)
print(row_count2)
print(row_count3)

603
601
109


In [0]:
#Increasing Number of partitions

df1=df1.repartition(12)
df2=df2.repartition(12)
df3=df3.repartition(10)
print(f"Number of partitions: {df1.rdd.getNumPartitions()}")
print(f"Number of partitions: {df2.rdd.getNumPartitions()}")
print(f"Number of partitions: {df3.rdd.getNumPartitions()}")

Number of partitions: 12
Number of partitions: 12
Number of partitions: 10


In [0]:
#coalesce
df3=df3.coalesce(5)

print(f"Number of partitions: {df3.rdd.getNumPartitions()}")

Number of partitions: 5


In [0]:
#Mounting curated container

storage_account_name = "dlretail"
container_name = "raw"
processed_container_name = "curated"

mount_point = f"/mnt/{storage_account_name}/{processed_container_name}"

if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    # Unmount the existing mount point
    dbutils.fs.unmount(mount_point)
    print(f"Unmounted existing mount at {mount_point}")

try:
    dbutils.fs.mount(
        source=f"abfss://{processed_container_name}@{storage_account_name}.dfs.core.windows.net/",
        mount_point=mount_point,
        extra_configs=configs
    )
    print(f"Mounted {container_name} successfully at {mount_point}")
except Exception as e:
    print(f"Error mounting: {e}")

/mnt/dlretail/curated has been unmounted.
Unmounted existing mount at /mnt/dlretail/curated
Mounted raw successfully at /mnt/dlretail/curated


In [0]:
#Checking the container contents
dbutils.fs.ls("/mnt/dlretail/curated")

[]

In [0]:
from delta.tables import DeltaTable

# Define the base path for the curated container
curated_container_path = "/mnt/dlretail/curated"

# Write the updated DataFrame to Azure Data Lake Storage in delta format
df1.write.format("delta").mode("append").save(f"{curated_container_path}/df1")
df2.write.format("delta").mode("append").save(f"{curated_container_path}/df2")
df3.write.format("delta").mode("append").save(f"{curated_container_path}/df3")
df4.write.format("delta").mode("append").save(f"{curated_container_path}/df4")
df5.write.format("delta").mode("append").save(f"{curated_container_path}/df5")

df1.show(3)
df2.show(3)
df3.show(3)
df4.show(3)
df5.show(3)

+----------+--------------------+--------+-------+-----------+-----------+
|address_id|             address|district|city_id|postal_code|      phone|
+----------+--------------------+--------+-------+-----------+-----------+
|         4|1411 Lillydale Drive|     QLD|    576|       NULL| 6172235589|
|        16|    808 Bhopal Manor| Haryana|    582|      10672|4.65888E+11|
|        28|       96 Tafuna Way|  Crdoba|    128|      99865| 9.3473E+11|
+----------+--------------------+--------+-------+-----------+-----------+
only showing top 3 rows

+-------+-----------+----------+
|city_id|       city|country_id|
+-------+-----------+----------+
|      3|  Abu Dhabi|       101|
|     15| al-Qadarif|        89|
|     27|Antofagasta|        22|
+-------+-----------+----------+
only showing top 3 rows

+----------+----------+
|country_id|   country|
+----------+----------+
|         2|   Algeria|
|        12|Bangladesh|
|        22|     Chile|
+----------+----------+
only showing top 3 rows

+