# Raw data preparation

## Creating directories and loading the initial file

In [0]:

# Create directory to store the raw data
dbutils.fs.mkdirs('/FileStore/tables/SuperStoreSales')

# Create directory to save processed tables
dbutils.fs.mkdirs('/FileStore/tables/dimensions')

Out[1]: True

In [0]:
# Read raw data into a spark dataframe

supersales = spark.read.\
    format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .option("delimiter", ";")\
    .load("dbfs:/FileStore/tables/SuperStoreSales/Superstore.csv")
    

## Identifying dimensions and loading in parquet files

In [0]:
# Dividing the SuperStores dataset into dataframes that will give rise to the Dimension and Fact tables

# CUSTOMER dimension

df_customer = supersales.select('Customer ID', 'Customer Name', 'Segment').distinct()

df_customer.write \
  .format('parquet') \
  .mode("overwrite") \
  .save("/FileStore/tables/dimensions/customersParquet")


In [0]:
# ADDRESS dimension

df_address = supersales.select('Postal Code', 'Country', 'City', 'State', 'Region').distinct()
df_address = df_address.dropna(subset='Postal Code')

df_address_renamed = df_address.withColumnRenamed("Postal Code", "Postal_code")

df_address_renamed.write \
  .format('parquet') \
  .mode("overwrite") \
  .save("/FileStore/tables/dimensions/addressParquet")


In [0]:
# PRODUCT dimension

df_products = supersales.select('Product ID', 'Category', 'Sub-Category', 'Product Name').distinct()

df_products_renamed = df_products.withColumnRenamed("Product ID", "Product_ID")\
                                 .withColumnRenamed("Sub-Category", "Sub_Category")\
                                 .withColumnRenamed("Product Name", "Product_Name")

df_products_renamed.write \
  .format('parquet') \
  .mode("overwrite") \
  .save("/FileStore/tables/dimensions/productsParquet")

In [0]:
# FACT sales

from pyspark.sql.functions import regexp_replace, col


df_sales = supersales.select('Row ID', 'Order ID', 'Order Date', 'Ship Date', 'Ship Mode', 'Customer ID', 'Postal Code',\
                             'Product ID', 'Sales', 'Quantity', 'Discount', 'Profit' )


df_sales_converted = df_sales.withColumn("Sales", regexp_replace("Sales", ",", "."))\
                             .withColumn("Discount", regexp_replace("Discount", ",", "."))\
                             .withColumn("Profit", regexp_replace("Profit", ",", "."))\
                               
df_sales_renamed = df_sales_converted.select(
    [col(c).alias(c.replace(" ", "_")) if " " in c else col(c) for c in df_sales_converted.columns]
)

df_sales_final = df_sales_renamed.withColumn('Sales', df_sales_converted.Sales.cast('decimal(10,2)'))\
                                 .withColumn('Discount', df_sales_converted.Discount.cast('decimal(10,2)'))\
                                 .withColumn('Profit', df_sales_converted.Profit.cast('decimal(10,2)'))                        

df_sales_final.write \
  .format('parquet') \
  .mode("overwrite") \
  .save("/FileStore/tables/dimensions/salesParquet")

## Snowflake database connection setup

In [0]:
# Options for connecting to Snowflake

options = {
  "sfUrl": "https://qjyzviq-wj82064.snowflakecomputing.com",
  "sfUser": "********",
  "sfPassword": "**********",
  "sfDatabase": "SUPERSALESSTORE",
  "sfSchema": "STAGING",
  "sfWarehouse": "COMPUTE_WH"
}

In [0]:
# Function for inserting data into the STAGING tables of the Snowflake database

def write_to_snowflake(df, table_name):
  try:
    df.write \
      .format("snowflake") \
      .options(**options) \
      .option("dbtable",table_name)\
      .mode("append") \
      .save()
    return True
  
  except Exception as e:
    return e

## Loading parquet files and preparing them for insertion.

In [0]:
# Sending tables to Snowflake's STG database

customers = spark.read.parquet("/FileStore/tables/dimensions/customersParquet")
address = spark.read.parquet("/FileStore/tables/dimensions/addressParquet")
products = spark.read.parquet("/FileStore/tables/dimensions/productsParquet")
sales = spark.read.parquet("/FileStore/tables/dimensions/salesParquet")

tables_to_insert = {
    'ADDRESS': address,
    'CUSTOMERS': customers,
    'PRODUCTS': products,
    'SALES': sales
}

## Perform data insertion

In [0]:
for table, df in tables_to_insert.items():
    write_to_snowflake(df, table)
    