In [0]:
dbutils.fs.ls('mnt/bronze/Sales/')      #Connecting to bronze container

[FileInfo(path='dbfs:/mnt/bronze/Sales/CountryRegionCurrency/', name='CountryRegionCurrency/', size=0, modificationTime=1712105093000),
 FileInfo(path='dbfs:/mnt/bronze/Sales/CreditCard/', name='CreditCard/', size=0, modificationTime=1712105086000),
 FileInfo(path='dbfs:/mnt/bronze/Sales/Currency/', name='Currency/', size=0, modificationTime=1712105094000),
 FileInfo(path='dbfs:/mnt/bronze/Sales/CurrencyRate/', name='CurrencyRate/', size=0, modificationTime=1712105077000),
 FileInfo(path='dbfs:/mnt/bronze/Sales/Customer/', name='Customer/', size=0, modificationTime=1712105077000),
 FileInfo(path='dbfs:/mnt/bronze/Sales/PersonCreditCard/', name='PersonCreditCard/', size=0, modificationTime=1712105084000),
 FileInfo(path='dbfs:/mnt/bronze/Sales/SalesOrderDetail/', name='SalesOrderDetail/', size=0, modificationTime=1712105080000),
 FileInfo(path='dbfs:/mnt/bronze/Sales/SalesOrderHeader/', name='SalesOrderHeader/', size=0, modificationTime=1712105085000),
 FileInfo(path='dbfs:/mnt/bronze/S

In [0]:
dbutils.fs.ls('mnt/silver/')        #Connecting to silver container. At present, it will return nothing as its empty. 

[FileInfo(path='dbfs:/mnt/silver/Sales/', name='Sales/', size=0, modificationTime=1712187216000)]

In [0]:
input_path = '/mnt/bronze/Sales/Customer/Customer.parquet'       #We have the taken the input path of the bronze containerccc

In [0]:
df = spark.read.format('parquet').load(input_path)      #Creating a PySpark dataframe for the input file

In [0]:
df.show(5)

+----------+--------+-------+-----------+-------------+--------------------+--------------------+
|CustomerID|PersonID|StoreID|TerritoryID|AccountNumber|             rowguid|        ModifiedDate|
+----------+--------+-------+-----------+-------------+--------------------+--------------------+
|         1|    NULL|    934|          1|   AW00000001|3f5ae95e-b87d-4ae...|2014-09-12 11:15:...|
|         2|    NULL|   1028|          1|   AW00000002|e552f657-a9af-4a7...|2014-09-12 11:15:...|
|         3|    NULL|    642|          4|   AW00000003|130774b1-db21-4ef...|2014-09-12 11:15:...|
|         4|    NULL|    932|          4|   AW00000004|ff862851-1daa-404...|2014-09-12 11:15:...|
|         5|    NULL|   1026|          4|   AW00000005|83905bdc-6f5e-4f7...|2014-09-12 11:15:...|
+----------+--------+-------+-----------+-------------+--------------------+--------------------+
only showing top 5 rows



When you create a PySpark dataframe, it's stored in a temporary location, making modifications straightforward.

In [0]:
from pyspark.sql.functions import from_utc_timestamp, date_format
from pyspark.sql.types import TimestampType

df = df.withColumn("ModifiedDate", date_format(from_utc_timestamp(df["ModifiedDate"].cast(TimestampType()), "UTC"), "yyyy-MM-dd"))

In [0]:
df.show(5)

+----------+--------+-------+-----------+-------------+--------------------+------------+
|CustomerID|PersonID|StoreID|TerritoryID|AccountNumber|             rowguid|ModifiedDate|
+----------+--------+-------+-----------+-------------+--------------------+------------+
|         1|    NULL|    934|          1|   AW00000001|3f5ae95e-b87d-4ae...|  2014-09-12|
|         2|    NULL|   1028|          1|   AW00000002|e552f657-a9af-4a7...|  2014-09-12|
|         3|    NULL|    642|          4|   AW00000003|130774b1-db21-4ef...|  2014-09-12|
|         4|    NULL|    932|          4|   AW00000004|ff862851-1daa-404...|  2014-09-12|
|         5|    NULL|   1026|          4|   AW00000005|83905bdc-6f5e-4f7...|  2014-09-12|
+----------+--------+-------+-----------+-------------+--------------------+------------+
only showing top 5 rows



In [0]:
%sql

-- Converting the dataframe to a temporary view and make use of SQL to view it.
SELECT 1 AS column1

column1
1


# Doing Transformation for all the tables (Changing datetime to date format)

In [0]:
table_name = []

# Iterating through the bronze container, getting the directory name, and appending it to the table name array.
for i in dbutils.fs.ls('mnt/bronze/Sales/'):        
    table_name.append(i.name.split('/')[0])

In [0]:
table_name

['CountryRegionCurrency',
 'CreditCard',
 'Currency',
 'CurrencyRate',
 'Customer',
 'PersonCreditCard',
 'SalesOrderDetail',
 'SalesOrderHeader',
 'SalesOrderHeaderSalesReason',
 'SalesPerson',
 'SalesPersonQuotaHistory',
 'SalesReason',
 'SalesTaxRate',
 'SalesTerritory',
 'SalesTerritoryHistory',
 'ShoppingCartItem',
 'SpecialOffer',
 'SpecialOfferProduct',
 'Store']

## Level 1 Transformation
                -

In [0]:
from pyspark.sql.functions import from_utc_timestamp, date_format
from pyspark.sql.types import TimestampType

# Outer for loop: - iterate through the array of tables that we have just created
for i in table_name:
    path = '/mnt/bronze/Sales/' + i +'/' + i + '.parquet'       #generating the input path
    df = spark.read.format('parquet').load(path)        #creating the dataframe
    column = df.columns     #iterating through the column names to check if any column is having "date" in it.

    for col in column:
        if "Date" in col or "date" in col:
            df = df.withColumn(col, date_format(from_utc_timestamp(df[col].cast(TimestampType()), "UTC"), "yyyy-MM-dd")) #performing transformation

    output_path = '/mnt/silver/Sales/' + i + '/'        #aftertransformation, transfering the output to silver container
    df.write.format('delta').mode("overwrite").save(output_path) #writing the transformed df to the datalake using the output path in delta format 

Data from Source to Bronze container: Parquet format<br>
Data from Bronze to Gold container: Delta format<br>
Data from Silver to Gold container: Delta format<br>

Delta format is built on top of Parquet format and it is developed by databricks. Delta format will have all the features that Parquet format already has along with some additional features ex: we can track different version history, it can also handle different schemas even if it is chaged in future.

In [0]:
df.show(5)

+----------------+--------------------+-------------+--------------------+--------------------+------------+
|BusinessEntityID|                Name|SalesPersonID|        Demographics|             rowguid|ModifiedDate|
+----------------+--------------------+-------------+--------------------+--------------------+------------+
|             292|Next-Door Bike Store|          279|<StoreSurvey xmln...|a22517e3-848d-4eb...|  2014-09-12|
|             294|Professional Sale...|          276|<StoreSurvey xmln...|b50ca50b-c601-4a1...|  2014-09-12|
|             296|      Riders Company|          277|<StoreSurvey xmln...|337c3688-1339-4e1...|  2014-09-12|
|             298|  The Bike Mechanics|          275|<StoreSurvey xmln...|7894f278-f0c8-4d1...|  2014-09-12|
|             300|   Nationwide Supply|          286|<StoreSurvey xmln...|c3fc9705-a8c4-4f3...|  2014-09-12|
+----------------+--------------------+-------------+--------------------+--------------------+------------+
only showing top 5 