# Data Loading

Loading Calendar Data

In [0]:
# Approach 1:
# df = spark.read \
#     .option("header", True) \
#     .option("inferSchema", True) \
#     .csv("abfss://bronze@awstordatalake.dfs.core.windows.net/Calendar")
# df.display()

In [0]:
# Approach 2
# calendar_df = spark.read.format("csv") \
#     .option("header", True) \
#     .option("inferSchema", True) \
#     .load("abfss://bronze@awstordatalake.dfs.core.windows.net/Calendar")

In [0]:
folders = [
    "Calendar",
    "Customers",
    "Product_Categories",
    "Product_Subcategories",
    "Products",
    "Returns",
    "Sales_2015",
    "Sales_2016",
    "Sales_2017",
    "Territories"
]

In [0]:
for i in folders:
    path = f"abfss://bronze@awstordatalake.dfs.core.windows.net/{i}"

    df = spark.read.format("csv") \
        .option("header", True) \
        .option("inferSchema", True) \
        .load(path)

    var_name = f"{i.lower()}_df".replace(" ", "_")
    globals()[var_name] = df

    print(f"Created DataFrame: {var_name}")

In [0]:
calendar_df.display()

# Transformations

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

#### 1. Calendar Data

In [0]:
calendar_df = calendar_df.withColumn('Year',year(col('Date')))\
                         .withColumn('Month',month(col('Date')))
# calendar_df.display()

In [0]:
calendar_df.write.format('parquet')\
    .mode('overwrite')\
        .option('path','abfss://silver@awstordatalake.dfs.core.windows.net/Calendar')\
            .save()

#### 2. Customers Data

In [0]:
customers_df = customers_df.withColumn('FullName',concat_ws(' ',col('FirstName'),col('LastName')))

In [0]:
customers_df = customers_df.drop("Prefix")

In [0]:
customers_df.write.format('parquet')\
    .mode('overwrite')\
        .option('path','abfss://silver@awstordatalake.dfs.core.windows.net/Customers')\
            .save()

#### 3. Products Data

In [0]:
products_df = products_df.withColumn('ProductSKU',split(col('ProductSKU'),'-')[0])\
                            .withColumn('ProductName',split(col('ProductName'),' ')[0])

In [0]:
products_df.write.format('parquet')\
    .mode('overwrite')\
        .option('path','abfss://silver@awstordatalake.dfs.core.windows.net/Products')\
            .save()

#### 4. Returns Data

In [0]:
returns_df.write.format('parquet')\
    .mode('overwrite')\
        .option('path','abfss://silver@awstordatalake.dfs.core.windows.net/Returns')\
            .save()

#### 5. Product Categories Data

In [0]:
product_categories_df.write.format('parquet')\
    .mode('overwrite')\
        .option('path','abfss://silver@awstordatalake.dfs.core.windows.net/Product_Categories')\
            .save()

#### 6. Product Sub-Categories Data

In [0]:
product_subcategories_df.write.format('parquet')\
    .mode('overwrite')\
        .option('path','abfss://silver@awstordatalake.dfs.core.windows.net/Product_SubCategories')\
            .save()

#### 7. Sales 2015 Data/2016 Data/2017 Data

In [0]:
sales_df = sales_2015_df.union(sales_2016_df).union(sales_2017_df)

In [0]:
sales_df = sales_df.withColumn('StockDate',to_timestamp('StockDate'))\
                        .withColumn('OrderNumber',regexp_replace(col('OrderNumber'),'S','T'))

In [0]:
sales_df.write.format('parquet')\
    .mode('overwrite')\
        .option('path','abfss://silver@awstordatalake.dfs.core.windows.net/Sales')\
            .save()

#### 8. Territories Data

In [0]:
territories_df.write.format('parquet')\
    .mode('overwrite')\
        .option('path','abfss://silver@awstordatalake.dfs.core.windows.net/Territories')\
            .save()