# Broze To Silver Transformation (Level 1)

This Notebook's purpose is to **transform** the raw data in ***bronze*** conatainer and **store** it onto the ***silver*** container for the further processing.  
In this **Level 1** notebook basic transformation are performed, which includes 
1. Identification & Removal of **Null** values.
2. **Feature Selection** for reduced dataset.
3. Modification of **colomns** names.
4. **Type Casting** of **'date'** colomn.

In [None]:
# Displays the files & folder of SalesLT folder in bronze container.
dbutils.fs.ls("/mnt/bronze/SalesLT")

In [None]:
# Listing the files in silver container which will be empty for now.
dbutils.fs.ls("/mnt/silver")

Since each file is stored under it's own specific folder, creating a variable *'table_names'* which containes all the table names.


In [None]:
table_names = []

for i in dbutils.fs.ls("/mnt/bronze/SalesLT"):
    table_names.append(i.name.split('/')[0])

Since we can't transform all tables at once, Each table is *loaded* as dataframe *transformed* and *stored* onto the ***silver*** container. This goes for all tables one after the other.

To automate the transformations for all tables in one run, *For Loop* is used to iterate over *table_names*.

In the dataset to change the date column format from ***timestamp*** to ***YYYY-MM-DD***, *For Loop and IF condition* is used to find the columns which contains string *'date' Or 'Date'* in it. which then is changed into requied format i.e, ***YYYY-MM-DD***.

Once the ***df*** is transformed it is stored onto ***silver*** container accordingly. 

To Store/Load the tranformed df, ***write*** method is used where format option is set to ***delta***.  
Here ***mode*** can have two values:
- ***overwrite*** - Replaces all the existing data with the current data.
- ***append*** - Appends/Adds the current data to already existing data. It is mainly used for increamental load

Since this project does not follow increamental load, the option is set to ***overwrite***.

In [None]:
from pyspark.sql.functions import from_utc_timestamp, date_format, col
from pyspark.sql.types import TimestampType

for table in table_names: # To iterate over the tables
    table_path = "/mnt/bronze/SalesLT/" + table + "/" + table + ".parquet"
    df = spark.read.format('parquet').load(table_path)
    columns = df.columns

    for column in columns: # To iterate over the columns of each tabel
        if 'Date' in column or 'date' in column: # Finds columns which has 'Date' or 'date' string.
            df = df.withColumn(column, date_format(from_utc_timestamp(col(column).cast(TimestampType()), 'UTC'), 'yyyy-MM-dd')) # Converting timestamp to date format

    output_path = "/mnt/silver/SalesLT/" + table + "/" # Loding path in silver container.
    df.write.format('delta').mode('overwrite').save(output_path) # Loading/Overwriting the data