# **Data Management with Databricks: Adventure Works Challenge**

At Kaggle, I found a sample database of a fictional multinational company that sells bikes, accessories, and clothing. The sample database contains various tables, i.e. customers, products, returns, and sales. In this notebook, I create delta tables and a database using Databricks. After the sales database has been created, I catch a glimpse of the data. I do some data cleansing and I visualize the data. First, I load the csv files. So, let's get started with Databricks.

Before I load the csv files, I upload the csv files in the Catalog. After storing the files, I would like to display the content of the catalog. Because I have already created multiple folders in the catalog, I would like to display the information about the content of the Adventure Works folder. With dbutils, I can catch a glimpse of the file info. And here is the result:

In [0]:
#display information about the content of the catalog or folder of Adventure Works
dbutils.fs.ls("dbfs:/FileStore/AW")

Out[68]: [FileInfo(path='dbfs:/FileStore/AW/AdventureWorks_Customers.csv', name='AdventureWorks_Customers.csv', size=1963594, modificationTime=1717340887000),
 FileInfo(path='dbfs:/FileStore/AW/AdventureWorks_Products.csv', name='AdventureWorks_Products.csv', size=63509, modificationTime=1717340886000),
 FileInfo(path='dbfs:/FileStore/AW/AdventureWorks_Returns.csv', name='AdventureWorks_Returns.csv', size=87401, modificationTime=1717340887000),
 FileInfo(path='dbfs:/FileStore/AW/AdventureWorks_Sales_2015.csv', name='AdventureWorks_Sales_2015.csv', size=194786, modificationTime=1717340887000),
 FileInfo(path='dbfs:/FileStore/AW/AdventureWorks_Sales_2016.csv', name='AdventureWorks_Sales_2016.csv', size=1786110, modificationTime=1717340889000),
 FileInfo(path='dbfs:/FileStore/AW/AdventureWorks_Sales_2017.csv', name='AdventureWorks_Sales_2017.csv', size=2187175, modificationTime=1717340891000)]

After, I read the csv files using the Spark dataframe API (is the Spark Read Option). I start with the sales table or dataset. I have 3 equal sales datasets. Each dataset contains data from a different year: 2015, 2016, and 2017.  

In [0]:
# Read csv files for sales using spark dataframeAPI
sales_raw_df = spark.read.option("header","true").csv("dbfs:/FileStore/AW/AdventureWorks_Sales_*.csv")

## Show the datafarme
sales_raw_df.show(n=5, truncate=False) 

+---------+----------+-----------+----------+-----------+-------------+-------------+---------+-------------+-------------+
|OrderDate|StockDate |OrderNumber|ProductKey|CustomerKey|OrderLineItem|OrderQuantity|Region   |Country      |Continent    |
+---------+----------+-----------+----------+-----------+-------------+-------------+---------+-------------+-------------+
|1/1/2017 |12/13/2003|SO61285    |529       |23791      |2            |2            |Northwest|United States|North America|
|1/1/2017 |9/24/2003 |SO61285    |214       |23791      |3            |1            |Northwest|United States|North America|
|1/1/2017 |9/4/2003  |SO61285    |540       |23791      |1            |1            |Northwest|United States|North America|
|1/1/2017 |9/28/2003 |SO61301    |529       |16747      |2            |2            |Northwest|United States|North America|
|1/1/2017 |10/21/2003|SO61301    |377       |16747      |1            |1            |Northwest|United States|North America|
+-------

In [0]:
sales_raw_df.count()

Out[70]: 56046

The entire sales dataframe contains 56046 entries. I have counted all rows in the dataset, containing data from 2015 till 2017. Now, let's create the 'Sales Database'. I create the delta tables with the write mode, and I save the tables in the 'Sales Database'. I do this for the sales table, products table, returned products table, and the customers table.

In [0]:
# First, create Database SalesDB if it doesn't exist
dbsales = "SalesDB"

spark.sql(f"CREATE DATABASE IF NOT EXISTS {dbsales}")
spark.sql(f"USE {dbsales}")

Out[71]: DataFrame[]

In [0]:
sales_raw_df.write.mode("overwrite").format("delta").option("overwriteSchema","true").saveAsTable("SALES_RAW")

In [0]:
# Read the other csv files using spark dataframeAPI
customers_raw_df = spark.read.option("header","true").csv("dbfs:/FileStore/AW/AdventureWorks_Customers.csv")
products_raw_df = spark.read.option("header","true").csv("dbfs:/FileStore/AW/AdventureWorks_Products.csv")
returns_raw_df = spark.read.option("header","true").csv("dbfs:/FileStore/AW/AdventureWorks_Returns.csv")


In [0]:
## Create Delta Tables 
customers_raw_df.write.mode("overwrite").format("delta").option("overwriteSchema","true").saveAsTable("CUSTOMERS_RAW")
products_raw_df.write.mode("overwrite").format("delta").option("overwriteSchema","true").saveAsTable("PRODUCTS_RAW")
returns_raw_df.write.mode("overwrite").format("delta").option("overwriteSchema","true").saveAsTable("RETURNS_RAW")


Let's show the contents of the 'Sales Database' I created. You can either use Python or SQL to show the tables.

In [0]:
display(spark.sql(f"SHOW TABLES"))

database,tableName,isTemporary
salesdb,customers,False
salesdb,customers_raw,False
salesdb,products,False
salesdb,products_raw,False
salesdb,returns,False
salesdb,returns_raw,False
salesdb,sales,False
salesdb,sales_raw,False


In [0]:
%sql
-- Switch to SQL Cell using %SQL
SHOW tables

database,tableName,isTemporary
salesdb,customers,False
salesdb,customers_raw,False
salesdb,products,False
salesdb,products_raw,False
salesdb,returns,False
salesdb,returns_raw,False
salesdb,sales,False
salesdb,sales_raw,False


Let's use the SQL command to count the number of sales entries from the sales table. Also, let's show the details of the delta table 'Sale'. And lastly, let's catch a glimpse of the first rows of all tables using SQL.

**Sales Table**

In [0]:
%sql
select count(*) from sales_raw;

count(1)
56046


In [0]:
%sql

describe DETAIL sales_raw;

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures,statistics
delta,b81e1a79-eb1a-44b9-84bb-8145a1c63430,spark_catalog.salesdb.sales_raw,,dbfs:/user/hive/warehouse/salesdb.db/sales_raw,2024-06-04T17:47:34.012+0000,2024-06-09T13:14:09.000+0000,List(),3,624724,Map(),1,2,"List(appendOnly, invariants)",Map()


In [0]:
%sql

select * from sales_raw limit 5;

OrderDate,StockDate,OrderNumber,ProductKey,CustomerKey,OrderLineItem,OrderQuantity,Region,Country,Continent
1/1/2015,9/21/2001,SO45080,332,14657,1,1,Northwest,United States,North America
1/1/2015,12/5/2001,SO45079,312,29255,1,1,Southwest,United States,North America
1/1/2015,10/29/2001,SO45082,350,11455,1,1,Australia,Australia,Pacific
1/1/2015,11/16/2001,SO45081,338,26782,1,1,Canada,Canada,North America
1/2/2015,12/15/2001,SO45083,312,14947,1,1,United Kingdom,United Kingdom,Europe


**Customers Table**

In [0]:
%sql

select * from customers_raw limit 5;

CustomerKey,Prefix,FirstName,LastName,BirthDate,MaritalStatus,Gender,EmailAddress,AnnualIncome,TotalChildren,EducationLevel,Occupation,HomeOwner
11000,MR.,JON,YANG,4/8/1966,M,M,jon24@adventure-works.com,"$90,000",2,Bachelors,Professional,Y
11001,MR.,EUGENE,HUANG,5/14/1965,S,M,eugene10@adventure-works.com,"$60,000",3,Bachelors,Professional,N
11002,MR.,RUBEN,TORRES,8/12/1965,M,M,ruben35@adventure-works.com,"$60,000",3,Bachelors,Professional,Y
11003,MS.,CHRISTY,ZHU,2/15/1968,S,F,christy12@adventure-works.com,"$70,000",0,Bachelors,Professional,N
11004,MRS.,ELIZABETH,JOHNSON,8/8/1968,S,F,elizabeth5@adventure-works.com,"$80,000",5,Bachelors,Professional,Y


**Products Table**

In [0]:
%sql
select count(*) from products_raw;

count(1)
293


In [0]:
%sql

select * from products_raw limit 5;

ProductKey,CategoryName,ProductSubcategory,ProductSKU,ProductName,ModelName,ProductDescription,ProductColor,ProductSize,ProductStyle,ProductCost,ProductPrice
214,Accessories,Helmets,HL-U509-R,"Sport-100 Helmet, Red",Sport-100,"Universal fit, well-vented, lightweight , snap-on visor.",Red,0,0,13.0863,34.99
215,Accessories,Helmets,HL-U509,"Sport-100 Helmet, Black",Sport-100,"Universal fit, well-vented, lightweight , snap-on visor.",Black,0,0,12.0278,33.6442
218,Clothing,Socks,SO-B909-M,"Mountain Bike Socks, M",Mountain Bike Socks,Combination of natural and synthetic fibers stays dry and provides just the right cushioning.,White,M,U,3.3963,9.5
219,Clothing,Socks,SO-B909-L,"Mountain Bike Socks, L",Mountain Bike Socks,Combination of natural and synthetic fibers stays dry and provides just the right cushioning.,White,L,U,3.3963,9.5
220,Accessories,Helmets,HL-U509-B,"Sport-100 Helmet, Blue",Sport-100,"Universal fit, well-vented, lightweight , snap-on visor.",Blue,0,0,12.0278,33.6442


**Product Returns Table**

In [0]:
%sql

select * from returns_raw limit 5;

ReturnDate,Continent,Country,Region,ProductKey,ReturnQuantity
1/18/2015,Pacific,Australia,Australia,312,1
1/18/2015,Europe,United Kingdom,United Kingdom,310,1
1/21/2015,Europe,Germany,Germany,346,1
1/22/2015,North America,United States,Southwest,311,1
2/2/2015,North America,Canada,Canada,312,1


#### Transform Data in the Delta Table

All previously loaded data still has string formats. However, some tables contain dates and numerical data. For example, the 'Sales table' has OrderDate and StockDate which are dates. Order Quantity is a numerical value. So, let's convert these so that we can use this in the data analysis. 

**Sales Table**

In [0]:
#read Delta Table using spark dataframe
sales_df=  spark.read.table("salesdb.sales_raw")
#And show the first 5 rows
sales_df.show(n=5,truncate=False)

+---------+----------+-----------+----------+-----------+-------------+-------------+--------------+--------------+-------------+
|OrderDate|StockDate |OrderNumber|ProductKey|CustomerKey|OrderLineItem|OrderQuantity|Region        |Country       |Continent    |
+---------+----------+-----------+----------+-----------+-------------+-------------+--------------+--------------+-------------+
|1/1/2015 |9/21/2001 |SO45080    |332       |14657      |1            |1            |Northwest     |United States |North America|
|1/1/2015 |12/5/2001 |SO45079    |312       |29255      |1            |1            |Southwest     |United States |North America|
|1/1/2015 |10/29/2001|SO45082    |350       |11455      |1            |1            |Australia     |Australia     |Pacific      |
|1/1/2015 |11/16/2001|SO45081    |338       |26782      |1            |1            |Canada        |Canada        |North America|
|1/2/2015 |12/15/2001|SO45083    |312       |14947      |1            |1            |Unite

In [0]:
#Use withColumn method & datetype to convert the columns OrderDate and StockDate to a date format instead of string format
from pyspark.sql.functions import *
from pyspark.sql.types import DateType
from datetime import datetime
from pyspark.sql.functions import col, udf
func = udf(lambda x: datetime.strptime(x,'%m/%d/%Y'),DateType())
sales_df =  sales_df.withColumn("OrderDate",func(col('OrderDate')))
sales_df.show(n=5,truncate=False)

+----------+----------+-----------+----------+-----------+-------------+-------------+--------------+--------------+-------------+
|OrderDate |StockDate |OrderNumber|ProductKey|CustomerKey|OrderLineItem|OrderQuantity|Region        |Country       |Continent    |
+----------+----------+-----------+----------+-----------+-------------+-------------+--------------+--------------+-------------+
|2015-01-01|9/21/2001 |SO45080    |332       |14657      |1            |1            |Northwest     |United States |North America|
|2015-01-01|12/5/2001 |SO45079    |312       |29255      |1            |1            |Southwest     |United States |North America|
|2015-01-01|10/29/2001|SO45082    |350       |11455      |1            |1            |Australia     |Australia     |Pacific      |
|2015-01-01|11/16/2001|SO45081    |338       |26782      |1            |1            |Canada        |Canada        |North America|
|2015-01-02|12/15/2001|SO45083    |312       |14947      |1            |1          

In [0]:
func = udf(lambda x: datetime.strptime(x,'%m/%d/%Y'),DateType())
sales_df =  sales_df.withColumn("StockDate",func(col('StockDate')))
sales_df.show(n=5,truncate=False)

+----------+----------+-----------+----------+-----------+-------------+-------------+--------------+--------------+-------------+
|OrderDate |StockDate |OrderNumber|ProductKey|CustomerKey|OrderLineItem|OrderQuantity|Region        |Country       |Continent    |
+----------+----------+-----------+----------+-----------+-------------+-------------+--------------+--------------+-------------+
|2015-01-01|2001-09-21|SO45080    |332       |14657      |1            |1            |Northwest     |United States |North America|
|2015-01-01|2001-12-05|SO45079    |312       |29255      |1            |1            |Southwest     |United States |North America|
|2015-01-01|2001-10-29|SO45082    |350       |11455      |1            |1            |Australia     |Australia     |Pacific      |
|2015-01-01|2001-11-16|SO45081    |338       |26782      |1            |1            |Canada        |Canada        |North America|
|2015-01-02|2001-12-15|SO45083    |312       |14947      |1            |1          

Now, I converted the dates in the 'Sales table'. Let's check for missing values also.

In [0]:
# Count missing values for each column
display(sales_df.select([count(when(col(c).isNull(),c)).alias(c) for c in sales_df.columns]))

OrderDate,StockDate,OrderNumber,ProductKey,CustomerKey,OrderLineItem,OrderQuantity,Region,Country,Continent
0,0,0,0,0,0,0,0,0,0


In [0]:
#convert return quantity to a number
from pyspark.sql.types import IntegerType
sales_df =  sales_df.withColumn("OrderQuantity",sales_df["OrderQuantity"].cast(IntegerType()))
sales_df.show(n=5,truncate=False)

+----------+----------+-----------+----------+-----------+-------------+-------------+--------------+--------------+-------------+
|OrderDate |StockDate |OrderNumber|ProductKey|CustomerKey|OrderLineItem|OrderQuantity|Region        |Country       |Continent    |
+----------+----------+-----------+----------+-----------+-------------+-------------+--------------+--------------+-------------+
|2015-01-01|2001-09-21|SO45080    |332       |14657      |1            |1            |Northwest     |United States |North America|
|2015-01-01|2001-12-05|SO45079    |312       |29255      |1            |1            |Southwest     |United States |North America|
|2015-01-01|2001-10-29|SO45082    |350       |11455      |1            |1            |Australia     |Australia     |Pacific      |
|2015-01-01|2001-11-16|SO45081    |338       |26782      |1            |1            |Canada        |Canada        |North America|
|2015-01-02|2001-12-15|SO45083    |312       |14947      |1            |1          

No values are missing in the 'Sales table'. Also, I converted the order quantity into a numerical value. I do the same for the other tables.

**Customers Table**

In [0]:
#customers_df
#read Delta Table using spark dataframe
customers_df=  spark.read.table("salesdb.customers_raw")
#And show the first 5 rows
customers_df.show(n=5,truncate=False)

+-----------+------+---------+--------+---------+-------------+------+------------------------------+------------+-------------+--------------+------------+---------+
|CustomerKey|Prefix|FirstName|LastName|BirthDate|MaritalStatus|Gender|EmailAddress                  |AnnualIncome|TotalChildren|EducationLevel|Occupation  |HomeOwner|
+-----------+------+---------+--------+---------+-------------+------+------------------------------+------------+-------------+--------------+------------+---------+
|11000      |MR.   |JON      |YANG    |4/8/1966 |M            |M     |jon24@adventure-works.com     |$90,000     |2            |Bachelors     |Professional|Y        |
|11001      |MR.   |EUGENE   |HUANG   |5/14/1965|S            |M     |eugene10@adventure-works.com  |$60,000     |3            |Bachelors     |Professional|N        |
|11002      |MR.   |RUBEN    |TORRES  |8/12/1965|M            |M     |ruben35@adventure-works.com   |$60,000     |3            |Bachelors     |Professional|Y        

In [0]:
#Convert the date of birth to the correct format
func = udf(lambda x: datetime.strptime(x,'%m/%d/%Y'),DateType())
customers_df =  customers_df.withColumn("BirthDate",func(col('BirthDate')))
customers_df.show(n=5,truncate=False)

+-----------+------+---------+--------+----------+-------------+------+------------------------------+------------+-------------+--------------+------------+---------+
|CustomerKey|Prefix|FirstName|LastName|BirthDate |MaritalStatus|Gender|EmailAddress                  |AnnualIncome|TotalChildren|EducationLevel|Occupation  |HomeOwner|
+-----------+------+---------+--------+----------+-------------+------+------------------------------+------------+-------------+--------------+------------+---------+
|11000      |MR.   |JON      |YANG    |1966-04-08|M            |M     |jon24@adventure-works.com     |$90,000     |2            |Bachelors     |Professional|Y        |
|11001      |MR.   |EUGENE   |HUANG   |1965-05-14|S            |M     |eugene10@adventure-works.com  |$60,000     |3            |Bachelors     |Professional|N        |
|11002      |MR.   |RUBEN    |TORRES  |1965-08-12|M            |M     |ruben35@adventure-works.com   |$60,000     |3            |Bachelors     |Professional|Y  

In [0]:
# Count missing values for each column
display(customers_df.select([count(when(col(c).isNull(),c)).alias(c) for c in customers_df.columns]))

CustomerKey,Prefix,FirstName,LastName,BirthDate,MaritalStatus,Gender,EmailAddress,AnnualIncome,TotalChildren,EducationLevel,Occupation,HomeOwner
0,130,0,0,0,0,0,0,0,0,0,0,0


The prefix has 130 missing values. This is not important for the data analysis part. So, here, I ignore this.

In [0]:
#Total children is also an integer
customers_df =  customers_df.withColumn("TotalChildren",customers_df["TotalChildren"].cast(IntegerType()))

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace
customers_df = customers_df.withColumn("AnnualIncome", regexp_replace("AnnualIncome", '[^0-9]', ''))
customers_df =  customers_df.withColumn("AnnualIncome",customers_df["AnnualIncome"].cast(IntegerType()))
#And finally show the table 
customers_df.show(n=5,truncate=False)

+-----------+------+---------+--------+----------+-------------+------+------------------------------+------------+-------------+--------------+------------+---------+
|CustomerKey|Prefix|FirstName|LastName|BirthDate |MaritalStatus|Gender|EmailAddress                  |AnnualIncome|TotalChildren|EducationLevel|Occupation  |HomeOwner|
+-----------+------+---------+--------+----------+-------------+------+------------------------------+------------+-------------+--------------+------------+---------+
|11000      |MR.   |JON      |YANG    |1966-04-08|M            |M     |jon24@adventure-works.com     |90000       |2            |Bachelors     |Professional|Y        |
|11001      |MR.   |EUGENE   |HUANG   |1965-05-14|S            |M     |eugene10@adventure-works.com  |60000       |3            |Bachelors     |Professional|N        |
|11002      |MR.   |RUBEN    |TORRES  |1965-08-12|M            |M     |ruben35@adventure-works.com   |60000       |3            |Bachelors     |Professional|Y  

**Product Returns Table**

In [0]:
#returns_df
#read Delta Table using spark dataframe
returns_df=  spark.read.table("salesdb.returns_raw")
#And show the first 5 rows
returns_df.show(n=5,truncate=False)

+----------+-------------+--------------+--------------+----------+--------------+
|ReturnDate|Continent    |Country       |Region        |ProductKey|ReturnQuantity|
+----------+-------------+--------------+--------------+----------+--------------+
|1/18/2015 |Pacific      |Australia     |Australia     |312       |1             |
|1/18/2015 |Europe       |United Kingdom|United Kingdom|310       |1             |
|1/21/2015 |Europe       |Germany       |Germany       |346       |1             |
|1/22/2015 |North America|United States |Southwest     |311       |1             |
|2/2/2015  |North America|Canada        |Canada        |312       |1             |
+----------+-------------+--------------+--------------+----------+--------------+
only showing top 5 rows



In [0]:
#Convert the date of returns to the correct format
func = udf(lambda x: datetime.strptime(x,'%m/%d/%Y'),DateType())
returns_df =  returns_df.withColumn("ReturnDate",func(col('ReturnDate')))
returns_df.show(n=5,truncate=False)

+----------+-------------+--------------+--------------+----------+--------------+
|ReturnDate|Continent    |Country       |Region        |ProductKey|ReturnQuantity|
+----------+-------------+--------------+--------------+----------+--------------+
|2015-01-18|Pacific      |Australia     |Australia     |312       |1             |
|2015-01-18|Europe       |United Kingdom|United Kingdom|310       |1             |
|2015-01-21|Europe       |Germany       |Germany       |346       |1             |
|2015-01-22|North America|United States |Southwest     |311       |1             |
|2015-02-02|North America|Canada        |Canada        |312       |1             |
+----------+-------------+--------------+--------------+----------+--------------+
only showing top 5 rows



In [0]:
# Count missing values for each column
display(returns_df.select([count(when(col(c).isNull(),c)).alias(c) for c in returns_df.columns]))

ReturnDate,Continent,Country,Region,ProductKey,ReturnQuantity
0,0,0,0,0,0


In [0]:
#convert return quantity to a number
returns_df =  returns_df.withColumn("ReturnQuantity",returns_df["ReturnQuantity"].cast(IntegerType()))
returns_df.show(n=5,truncate=False)

+----------+-------------+--------------+--------------+----------+--------------+
|ReturnDate|Continent    |Country       |Region        |ProductKey|ReturnQuantity|
+----------+-------------+--------------+--------------+----------+--------------+
|2015-01-18|Pacific      |Australia     |Australia     |312       |1             |
|2015-01-18|Europe       |United Kingdom|United Kingdom|310       |1             |
|2015-01-21|Europe       |Germany       |Germany       |346       |1             |
|2015-01-22|North America|United States |Southwest     |311       |1             |
|2015-02-02|North America|Canada        |Canada        |312       |1             |
+----------+-------------+--------------+--------------+----------+--------------+
only showing top 5 rows



**Products Table**

In [0]:
#read Delta Table using spark dataframe
products_df=  spark.read.table("salesdb.products_raw")
#And show the first 5 rows
products_df.show(n=5,truncate=False)

+----------+------------+------------------+----------+-----------------------+-------------------+---------------------------------------------------------------------------------------------+------------+-----------+------------+-----------+------------+
|ProductKey|CategoryName|ProductSubcategory|ProductSKU|ProductName            |ModelName          |ProductDescription                                                                           |ProductColor|ProductSize|ProductStyle|ProductCost|ProductPrice|
+----------+------------+------------------+----------+-----------------------+-------------------+---------------------------------------------------------------------------------------------+------------+-----------+------------+-----------+------------+
|214       |Accessories |Helmets           |HL-U509-R |Sport-100 Helmet, Red  |Sport-100          |Universal fit, well-vented, lightweight , snap-on visor.                                     |Red         |0          |0          

In [0]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col, round, format_number

# Convert columns to DoubleType and round them to 2 decimal places
products_df = products_df.withColumn("ProductCost", round(col("ProductCost").cast(DoubleType()), 2))
products_df = products_df.withColumn("ProductPrice", round(col("ProductPrice").cast(DoubleType()), 2))

# If you want to keep the values as strings with exactly 2 decimal places
products_df = products_df.withColumn("ProductCost", format_number(col("ProductCost"), 2))
products_df = products_df.withColumn("ProductPrice", format_number(col("ProductPrice"), 2))


**Create Delta Table for Adjustments**

In [0]:
# Save all the adjustments in the sales database
spark.sql(f"USE salesdb")

## Create DeltaTable for the adjusted sales table, customer table, product table, and returns table: 

sales_df.write.mode("overwrite").format("delta").saveAsTable("SALES")
customers_df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").saveAsTable("CUSTOMERS")
returns_df.write.mode("overwrite").format("delta").saveAsTable("RETURNS")
products_df.write.mode("overwrite").format("delta").saveAsTable("PRODUCTS")


## Validate that the table was created successfully
display(spark.sql(f"SHOW TABLES"))

database,tableName,isTemporary
salesdb,customers,False
salesdb,customers_raw,False
salesdb,products,False
salesdb,products_raw,False
salesdb,returns,False
salesdb,returns_raw,False
salesdb,sales,False
salesdb,sales_raw,False


#### Data Visualization

First, I catch a glimpse of the products and sales table again. This would make it easier for the data analysis. Databricks has some nice build-in data visuals. I use SQL.

In [0]:
%sql

select * from products limit 5;

ProductKey,CategoryName,ProductSubcategory,ProductSKU,ProductName,ModelName,ProductDescription,ProductColor,ProductSize,ProductStyle,ProductCost,ProductPrice
214,Accessories,Helmets,HL-U509-R,"Sport-100 Helmet, Red",Sport-100,"Universal fit, well-vented, lightweight , snap-on visor.",Red,0,0,13.09,34.99
215,Accessories,Helmets,HL-U509,"Sport-100 Helmet, Black",Sport-100,"Universal fit, well-vented, lightweight , snap-on visor.",Black,0,0,12.03,33.64
218,Clothing,Socks,SO-B909-M,"Mountain Bike Socks, M",Mountain Bike Socks,Combination of natural and synthetic fibers stays dry and provides just the right cushioning.,White,M,U,3.4,9.5
219,Clothing,Socks,SO-B909-L,"Mountain Bike Socks, L",Mountain Bike Socks,Combination of natural and synthetic fibers stays dry and provides just the right cushioning.,White,L,U,3.4,9.5
220,Accessories,Helmets,HL-U509-B,"Sport-100 Helmet, Blue",Sport-100,"Universal fit, well-vented, lightweight , snap-on visor.",Blue,0,0,12.03,33.64


In [0]:
%sql

select * from sales limit 5;

OrderDate,StockDate,OrderNumber,ProductKey,CustomerKey,OrderLineItem,OrderQuantity,Region,Country,Continent
2015-01-01,2001-09-21,SO45080,332,14657,1,1,Northwest,United States,North America
2015-01-01,2001-12-05,SO45079,312,29255,1,1,Southwest,United States,North America
2015-01-01,2001-10-29,SO45082,350,11455,1,1,Australia,Australia,Pacific
2015-01-01,2001-11-16,SO45081,338,26782,1,1,Canada,Canada,North America
2015-01-02,2001-12-15,SO45083,312,14947,1,1,United Kingdom,United Kingdom,Europe


**Ordered Quantity by Country**

In [0]:
%sql

select Country, sum(OrderQuantity) from sales group by Country;

Country,sum(OrderQuantity)
Germany,7950
France,7862
United States,29823
Canada,10894
Australia,17951
United Kingdom,9694


Databricks visualization. Run in Databricks to view.

In [0]:
%sql

select Country, sum(OrderQuantity) from sales group by Country;

Country,sum(OrderQuantity)
Germany,7950
France,7862
United States,29823
Canada,10894
Australia,17951
United Kingdom,9694


Databricks visualization. Run in Databricks to view.

**Total Sales by Country**

In [0]:
%sql
select S.Country, SUM(P.ProductPrice * S.OrderQuantity) AS TotalSales from salesdb.sales S join
salesdb.products P 
on S.ProductKey=P.ProductKey
group by S.Country order by TotalSales desc;

Country,TotalSales
United States,1287494.9799999343
Australia,828808.2599999894
United Kingdom,439549.5500000054
Germany,383710.85000000335
France,360054.970000004
Canada,309688.4300000038


Databricks visualization. Run in Databricks to view.

**Total Profit by Country**

In [0]:

%sql
select S.Country, ROUND((SUM(P.ProductPrice * S.OrderQuantity) - SUM(P.ProductCost * S.OrderQuantity)), 2) AS TotalProfit from salesdb.sales S join
salesdb.products P 
on S.ProductKey=P.ProductKey
group by S.Country order by TotalProfit desc;

Country,TotalProfit
United States,242167.56
Canada,91201.84
Australia,74397.78
France,68687.69
United Kingdom,51734.22
Germany,47704.88


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

**Most Sales by Product Category**

In [0]:
%sql
select P.CategoryName, SUM(S.OrderQuantity) as QuantitySold from salesdb.sales S join
salesdb.products P 
on S.ProductKey=P.ProductKey
group by P.CategoryName order by QuantitySold desc;

CategoryName,QuantitySold
Accessories,57809
Bikes,13929
Clothing,12436


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
select P.CategoryName, ROUND(SUM(P.ProductPrice * S.OrderQuantity),2) AS TotalSales from salesdb.sales S join
salesdb.products P 
on S.ProductKey=P.ProductKey
group by P.CategoryName order by TotalSales desc;

CategoryName,TotalSales
Bikes,2337239.92
Accessories,906656.58
Clothing,365410.54


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
select P.ProductSubcategory, SUM(S.OrderQuantity) as QuantitySold from salesdb.sales S join
salesdb.products P 
on S.ProductKey=P.ProductKey
where P.ProductSubcategory like '%Bikes%' group by P.ProductSubcategory order by QuantitySold desc;

ProductSubcategory,QuantitySold
Road Bikes,7099
Mountain Bikes,4706
Touring Bikes,2124


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
select P.ProductSubcategory, ROUND(SUM(P.ProductPrice * S.OrderQuantity),2) AS TotalSales from salesdb.sales S join
salesdb.products P 
on S.ProductKey=P.ProductKey
where P.ProductSubcategory like '%Bikes%' group by P.ProductSubcategory order by TotalSales desc;

ProductSubcategory,TotalSales
Road Bikes,1272615.98
Mountain Bikes,671178.44
Touring Bikes,393445.5


Databricks visualization. Run in Databricks to view.

**Returned Products**

In [0]:
%sql
select * from returns limit 5;

ReturnDate,Continent,Country,Region,ProductKey,ReturnQuantity
2015-01-18,Pacific,Australia,Australia,312,1
2015-01-18,Europe,United Kingdom,United Kingdom,310,1
2015-01-21,Europe,Germany,Germany,346,1
2015-01-22,North America,United States,Southwest,311,1
2015-02-02,North America,Canada,Canada,312,1


**Quantity Returned by Country**

In [0]:
%sql

select Country, SUM(ReturnQuantity) as quantityreturned from returns group by Country;

Country,quantityreturned
Germany,163
France,186
United States,633
Canada,238
Australia,404
United Kingdom,204


Databricks visualization. Run in Databricks to view.

**Costs of Returned Products**

By Country:

In [0]:
%sql
select R.Country, ROUND(SUM(P.ProductCost * R.ReturnQuantity),2) AS TotalCostsReturns from salesdb.returns R join
salesdb.products P 
on R.ProductKey=P.ProductKey
group by R.Country order by TotalCostsReturns desc;

Country,TotalCostsReturns
United States,33817.66
Australia,22291.15
Germany,12243.13
United Kingdom,11498.13
France,7335.2
Canada,5806.41


Databricks visualization. Run in Databricks to view.

By Product category Bikes:

In [0]:
%sql
select P.ProductSubcategory, ROUND(SUM(P.ProductCost * R.ReturnQuantity),2) AS TotalCostsReturns from salesdb.returns R join
salesdb.products P 
on R.ProductKey=P.ProductKey
where P.ProductSubcategory like '%Bikes%' group by P.ProductSubcategory order by TotalCostsReturns desc;


ProductSubcategory,TotalCostsReturns
Road Bikes,50227.5
Touring Bikes,18710.29
Mountain Bikes,10502.66


Databricks visualization. Run in Databricks to view.

**Returned Products by Date**

In [0]:
%sql
select date_trunc('month',ReturnDate) as MONTH, SUM(ReturnQuantity) as quantityreturned from returns group by 1 order by 1 asc;

MONTH,quantityreturned
2015-01-01T00:00:00.000+0000,4
2015-02-01T00:00:00.000+0000,4
2015-03-01T00:00:00.000+0000,9
2015-04-01T00:00:00.000+0000,14
2015-05-01T00:00:00.000+0000,11
2015-06-01T00:00:00.000+0000,4
2015-07-01T00:00:00.000+0000,3
2015-08-01T00:00:00.000+0000,6
2015-09-01T00:00:00.000+0000,2
2015-10-01T00:00:00.000+0000,11


Databricks visualization. Run in Databricks to view.

#### End of this Notebook