In [0]:
spark

## Loading Libraries

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

## Data Access Using APP

In [0]:
spark.conf.set("fs.azure.account.auth.type.awadls5.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.awadls5.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.awadls5.dfs.core.windows.net", "cd805989-79a1-422d-8ca0-97e4c3ffd136")
spark.conf.set("fs.azure.account.oauth2.client.secret.awadls5.dfs.core.windows.net", "o1.8Q~DYBSpdrVLIhLjFLrXzGefJtlSamHgVTa_4")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.awadls5.dfs.core.windows.net", "https://login.microsoftonline.com/39425619-35e1-47a3-9de9-c8aea38b3890/oauth2/token")

## Loading Data from ADLS

In [0]:
df_cal = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferSchema", "true")\
                .load("abfs://bronze@awadls5.dfs.core.windows.net/AW-Calendar")

In [0]:
df_cust = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferSchema", "true")\
                .load("abfs://bronze@awadls5.dfs.core.windows.net/AW-Customers")

In [0]:
df_proCat = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferSchema", "true")\
                .load("abfs://bronze@awadls5.dfs.core.windows.net/AW-ProductCategories")

In [0]:
df_proSubCat = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferSchema", "true")\
                .load("abfs://bronze@awadls5.dfs.core.windows.net/AW-ProductSubcategories")

In [0]:
df_pro = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferSchema", "true")\
                .load("abfs://bronze@awadls5.dfs.core.windows.net/AW-Products")

In [0]:
df_ret = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferSchema", "true")\
                .load("abfs://bronze@awadls5.dfs.core.windows.net/AW-Returns")

In [0]:
df_sales = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferSchema", "true")\
                .load("abfs://bronze@awadls5.dfs.core.windows.net/AW-Sales*")

In [0]:
df_ter = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferSchema", "true")\
                .load("abfs://bronze@awadls5.dfs.core.windows.net/AW-Territories")

## Transformations

### Calendar Dataset

In [0]:
df_cal = df_cal.withColumn("Month", month(col("Date")))\
                .withColumn("Year", year(col("Date")))

In [0]:
df_cal.write.format("parquet")\
            .mode("append")\
            .option("path", "abfss://silver@awadls5.dfs.core.windows.net/AW-Calendar")\
            .save()

### Customer Dataset

In [0]:
df_cust = df_cust.withColumn("FullName", concat_ws(" ", col("prefix"), col("FirstName"), col("LastName")))

In [0]:
df_cust.write.format("parquet")\
            .mode("append")\
            .option("path", "abfss://silver@awadls5.dfs.core.windows.net/AW-Customers")\
            .save()

### Product Subcategories Dataset

In [0]:
df_proSubCat.write.format("parquet")\
            .mode("append")\
            .option("path", "abfss://silver@awadls5.dfs.core.windows.net/AW-ProductSubcategories")\
            .save()

### Products Dataset

In [0]:
df_pro = df_pro.withColumn("ProductSKU", split(col("ProductSKU"), "-")[0])\
                .withColumn("ProductName", split(col("ProductName"), " ")[0])

In [0]:
df_pro.write.format("parquet")\
            .mode("append")\
            .option("path", "abfss://silver@awadls5.dfs.core.windows.net/AW-Products")\
            .save()

### Returns Dataset

In [0]:
df_ret.write.format("parquet")\
            .mode("append")\
            .option("path", "abfss://silver@awadls5.dfs.core.windows.net/AW-Returns")\
            .save()

### Territories Dataset

In [0]:
df_ter.write.format("parquet")\
            .mode("append")\
            .option("path", "abfss://silver@awadls5.dfs.core.windows.net/AW-Territories")\
            .save()

### Sales Dataset

In [0]:
df_sales = df_sales.withColumn("OrderDate", to_timestamp("OrderDate"))\
                    .withColumn("OrderNumber", regexp_replace("OrderNumber","S", "T"))\
                    .withColumn("multiply", col("OrderLineItem") * col("OrderQuantity"))

In [0]:
df_sales.write.format("parquet")\
            .mode("append")\
            .option("path", "abfss://silver@awadls5.dfs.core.windows.net/AW-Sales")\
            .save()

In [0]:
df_sales.groupBy("OrderDate").agg(count("OrderNumber").alias("TotalOrders")).display()

Databricks visualization. Run in Databricks to view.

In [0]:
sa = df_sales.join(df_pro, df_pro["ProductKey"] == df_sales["ProductKey"], "inner").select(df_pro["ProductKey"], df_pro["productName"], df_sales["OrderNumber"])

Databricks visualization. Run in Databricks to view.

In [0]:
sa.groupBy("productName").agg(count("OrderNumber")).display()

Databricks visualization. Run in Databricks to view.