#Transform the raw data

##Data Access to Gen2 storage account

In [0]:
spark.conf.set("fs.azure.account.auth.type.salesdataprojectstorage.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.salesdataprojectstorage.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.salesdataprojectstorage.dfs.core.windows.net", "e1b0a35e-7eb7-4201-9dd2-5b287eb997c0")
spark.conf.set("fs.azure.account.oauth2.client.secret.salesdataprojectstorage.dfs.core.windows.net", "rBc8Q~0pU43Rg_0RD1fosfN~VQZcbP9vMZTR0boO")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.salesdataprojectstorage.dfs.core.windows.net", "https://login.microsoftonline.com/729afaa2-2f3c-4167-bff8-2b2452c198ee/oauth2/token")

#Data Loading

In [0]:

# Reading calendar data from a specified path in Azure Data Lake Storage
calendar_data_path = "abfss://rawdatasales@salesdataprojectstorage.dfs.core.windows.net/calendar/calendar.csv"
calendar_df = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferSchema",True)\
                .load(calendar_data_path)

# Displaying the calendar data
display(calendar_df)

In [0]:

# Reading product data from a specified path in Azure Data Lake Storage
product_data_path = "abfss://rawdatasales@salesdataprojectstorage.dfs.core.windows.net/products/Product_Categories.csv"
product_df = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferSchema",True)\
                .load(product_data_path)

# Displaying the product data
display(product_df)

In [0]:

# Reading product subcategories data from a specified path in Azure Data Lake Storage
product_subcategory_data_path = "abfss://rawdatasales@salesdataprojectstorage.dfs.core.windows.net/products/Product_SubCategories.csv"
product_subcategory_df = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferSchema",True)\
                .load(product_subcategory_data_path)

# Displaying the product data
display(product_subcategory_df)

In [0]:

# Reading Customer data from a specified path in Azure Data Lake Storage
customer_data_path = "abfss://rawdatasales@salesdataprojectstorage.dfs.core.windows.net/Customers/Customers.csv"
customer_df = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferSchema",True)\
                .load(customer_data_path)

# Displaying the customer data
display(customer_df)

In [0]:

# Reading return data from a specified path in Azure Data Lake Storage
return_data_path = "abfss://rawdatasales@salesdataprojectstorage.dfs.core.windows.net/Returns/Returns.csv"
return_df = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferSchema",True)\
                .load(return_data_path)

# Displaying the return data
display(return_df)

In [0]:

# Reading sales data from a specified path in Azure Data Lake Storage
sales_data_path = "abfss://rawdatasales@salesdataprojectstorage.dfs.core.windows.net/Sales/*"
sales_df = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferSchema",True)\
                .load(sales_data_path)

# Displaying the sales data
display(sales_df)

In [0]:
#Checking the year of the sales
from pyspark.sql.functions import year
df=sales_df.where(year(sales_df['OrderDate'])==2017)
display(df)

In [0]:

# Reading territories data from a specified path in Azure Data Lake Storage
territories_data_path = "abfss://rawdatasales@salesdataprojectstorage.dfs.core.windows.net/Territories/Territories.csv"
territories_df = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferSchema",True)\
                .load(territories_data_path)

# Displaying the territories data
display(territories_df)

##Transformations

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import * 

calendar_trans = calendar_df.withColumn("Year", year(calendar_df['Date']))\
                            .withColumn("Month", month(calendar_df['Date']))\
                            .withColumn("Day", dayofmonth(calendar_df['Date']))
display(calendar_trans)

In [0]:
# Writing the transformed calendar data to Azure Data Lake Storage in parquet format
output_path = "abfss://transformdatasales@salesdataprojectstorage.dfs.core.windows.net/calendar"
calendar_trans.write.format('parquet')\
    .option('compression','snappy')\
    .mode("overwrite")\
    .save(output_path)

###Transforming customer data

In [0]:
trans_cus=customer_df.withColumn("fullname",concat(col("Prefix"),lit(" "),col("FirstName"),lit(" "),col("LastName")))
display(trans_cus)

In [0]:
trans_cus=customer_df.withColumn("fullname",concat_ws(" ",col("Prefix"),col("FirstName"),col("LastName")))
display(trans_cus)

In [0]:
# Writing the transformed customer data to Azure Data Lake Storage in parquet format
customer_path = "abfss://transformdatasales@salesdataprojectstorage.dfs.core.windows.net/customer"
trans_cus.write.format('parquet')\
    .option('compression','snappy')\
    .mode("overwrite")\
    .save(customer_path)

##Subcategories Transformation

In [0]:
# Writing the transformed calendar data to Azure Data Lake Storage in parquet format
subcategories_path = "abfss://transformdatasales@salesdataprojectstorage.dfs.core.windows.net/product_subcategory"
product_subcategory_df.write.format('parquet')\
    .option('compression','snappy')\
    .mode("overwrite")\
    .save(subcategories_path)

In [0]:
display(product_df)

In [0]:
product_details_path = "abfss://rawdatasales@salesdataprojectstorage.dfs.core.windows.net/products/Products.csv"
product_details_df = spark.read.format('csv')\
        .option('header','true')\
        .option('inferSchema','true')\
        .load(product_details_path)
display(product_details_df)
            

###Transformation of Product data

In [0]:
trans_product=product_details_df.withColumn("ProductCode",split(col("ProductSKU"),"-")[0])\
    .withColumn("ProductSubName",split(col("ProductName")," ")[0])\
    .withColumn("ModelNumber",split(col("ModelName"),"-")[0])
trans_product.display()

In [0]:
# write the product details data to Azure Data Lake Storage in parquet format
product_details_path = "abfss://transformdatasales@salesdataprojectstorage.dfs.core.windows.net/product_details"
trans_product.write.format('parquet')\
    .option('compression','snappy')\
    .mode("overwrite")\
        .save(product_details_path)

In [0]:
# write the return details data to Azure Data Lake Storage in parquet format
return_path = "abfss://transformdatasales@salesdataprojectstorage.dfs.core.windows.net/return"
return_df.write.format('parquet')\
    .option('compression','snappy')\
    .mode("overwrite")\
        .save(return_path)

In [0]:
# write the territories data to Azure Data Lake Storage in parquet format
territories_path = "abfss://transformdatasales@salesdataprojectstorage.dfs.core.windows.net/territories"
territories_df.write.format('parquet')\
    .option('compression','snappy')\
    .mode("overwrite")\
        .save(territories_path)

In [0]:
# write the categories data to Azure Data Lake Storage in parquet format
categories_path = "abfss://transformdatasales@salesdataprojectstorage.dfs.core.windows.net/product_categories"
product_subcategory_df.write.format('parquet')\
    .option('compression','snappy')\
    .mode("overwrite")\
        .save(categories_path)

##Transformation in Sales data

In [0]:
sales_df.display()
sales_df=sales_df.withColumn("StockDate",to_timestamp(col("StockDate")))\
    .withColumn("OrderNumber",regexp_replace(col("OrderNumber"),"S","T"))


In [0]:
# Calculate Revenue based on the quantity of product from order and product price from product details
sales_revenue_df=sales_df.join(trans_product,['ProductKey'])\
    .withColumn("Revenue",col("OrderQuantity")*col("ProductPrice"))
sales_revenue_df.display()

In [0]:
# Aggregate the sales revenue by year
sales_revenue_df.withColumn("OrderYear",year("OrderDate"))\
        .groupBy("OrderYear").agg(sum("Revenue")).orderBy("OrderYear").display()
    


Databricks visualization. Run in Databricks to view.

In [0]:
# write the sales data to Azure Data Lake Storage in parquet format
sales_path = "abfss://transformdatasales@salesdataprojectstorage.dfs.core.windows.net/sales"
sales_df.write.format('parquet')\
        .option('compression','snappy')\
                        .mode("overwrite")\
        .save(sales_path)

In [0]:
# Join the product details and product subcategory
# Visualize the product details and product subcategory
product_subcategory_df.join(product_details_df,['ProductSubcategoryKey']).display()

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
# Aggregate the sales revenue by year and product subcategory
# Visualize the sales revenue by year and product subcategory
sales_revenue_df.join(product_details_df,['ProductKey']).withColumn("OrderYear",year("OrderDate"))\
        .groupBy("OrderYear","ProductSubName").agg(sum("Revenue").alias("Total Sum"))\
                .orderBy("OrderYear","ProductSubName")\
                    .display()

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.