In [0]:
spark

In [0]:
# prepare csv file url and container access details

storage_account_key = "vAh0MYzXTTOUynsGtdwk6AFZT+0dNPazQiueINdLM2hyZY+cjJQNMmaawILe0DSKB01YmGf85Ma1+AStxBfsqw=="

storage_account = "myfirstblobstoragesample"
container_name = "databrickssample"
file_name = "super_store.csv"

csv_file_url = f"wasbs://{container_name}@{storage_account}.blob.core.windows.net/{file_name}"

# Set up the Spark configuration for Azure Blob Storage
spark.conf.set(
    f"fs.azure.account.key.{storage_account}.blob.core.windows.net",
    storage_account_key
)

In [0]:
# read CSV file and load it in to dataframe
csvDF = spark.read.csv(csv_file_url, header=True, inferSchema=True)

In [0]:
# show above dataframe
csvDF.show(5, truncate=False, vertical=True)

-RECORD 0-------------------------------------------------------------------
 RowID        | 1                                                           
 OrderID      | CA-2016-152156                                              
 OrderDate    | 2016-11-08                                                  
 ShipDate     | 2016-11-11                                                  
 ShipMode     | Second Class                                                
 CustomerID   | CG-12520                                                    
 CustomerName | Claire Gute                                                 
 Segment      | Consumer                                                    
 Country      | United States                                               
 City         | Henderson                                                   
 State        | Kentucky                                                    
 Postal Code  | 42420                                                       

In [0]:
csvDF.printSchema()

root
 |-- RowID: integer (nullable = true)
 |-- OrderID: string (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- ShipDate: date (nullable = true)
 |-- ShipMode: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- SubCategory: string (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- Sales: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Profit: double (nullable = true)



In [0]:
# change datatype for : Sales - double, Quantity - integer, Discount - float

from pyspark.sql import functions as F
from pyspark.sql.types import *

updatedDF = csvDF.withColumn("Sales", F.col("Sales").cast(DoubleType())) \
.withColumn("Quantity", F.col("Quantity").cast(IntegerType())) \
.withColumn("Discount", F.col("Discount").cast(FloatType()))

In [0]:
updatedDF.printSchema()

root
 |-- RowID: integer (nullable = true)
 |-- OrderID: string (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- ShipDate: date (nullable = true)
 |-- ShipMode: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- SubCategory: string (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Discount: float (nullable = true)
 |-- Profit: double (nullable = true)



In [0]:
#Add Column - 'OrderYear' from 'Order Date' for further analytical queries
resultDF = updatedDF.withColumn("OrderYear",F.year(F.col("OrderDate")))
resultDF.printSchema()

root
 |-- RowID: integer (nullable = true)
 |-- OrderID: string (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- ShipDate: date (nullable = true)
 |-- ShipMode: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- SubCategory: string (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Discount: float (nullable = true)
 |-- Profit: double (nullable = true)
 |-- OrderYear: integer (nullable = true)



In [0]:
resultDF.show(2, truncate=False, vertical=True)

-RECORD 0-------------------------------------------------------------------
 RowID        | 1                                                           
 OrderID      | CA-2016-152156                                              
 OrderDate    | 2016-11-08                                                  
 ShipDate     | 2016-11-11                                                  
 ShipMode     | Second Class                                                
 CustomerID   | CG-12520                                                    
 CustomerName | Claire Gute                                                 
 Segment      | Consumer                                                    
 Country      | United States                                               
 City         | Henderson                                                   
 State        | Kentucky                                                    
 Postal Code  | 42420                                                       

In [0]:
# display orders by Year
resultDF.groupBy("OrderYear") \
    .agg(F.count("RowID").alias("TotalOrders")) \
    .orderBy("OrderYear") \
    .show(20, truncate=False, vertical=True)

-RECORD 0-----------
 OrderYear   | 2014 
 TotalOrders | 1993 
-RECORD 1-----------
 OrderYear   | 2015 
 TotalOrders | 2102 
-RECORD 2-----------
 OrderYear   | 2016 
 TotalOrders | 2587 
-RECORD 3-----------
 OrderYear   | 2017 
 TotalOrders | 3312 



In [0]:
# Top 10 customers based on sales
resultDF.groupBy("CustomerName") \
        .agg(F.round(F.sum("Sales"), 2).alias("Total_Sales")) \
        .orderBy(F.desc("Total_Sales")) \
.show(10, truncate=False, vertical=False)

+------------------+-----------+
|CustomerName      |Total_Sales|
+------------------+-----------+
|Sean Miller       |25043.05   |
|Tamara Chand      |19017.85   |
|Raymond Buch      |15117.34   |
|Tom Ashbrook      |14595.62   |
|Adrian Barton     |14355.61   |
|Sanjit Chand      |14142.33   |
|Ken Lonsdale      |14071.92   |
|Hunter Lopez      |12873.3    |
|Sanjit Engle      |12209.44   |
|Christopher Conant|12129.07   |
+------------------+-----------+
only showing top 10 rows



In [0]:
# Top10 Products based on Quantity

resultDF.groupBy("ProductName") \
        .agg(F.round(F.sum("Quantity"), 2).alias("Total_Quantity")) \
        .orderBy(F.desc("Total_Quantity")) \
.show(10, truncate=False, vertical=False)

+------------------------------------------------------------+--------------+
|ProductName                                                 |Total_Quantity|
+------------------------------------------------------------+--------------+
|"Tennsco Stur-D-Stor Boltless Shelving, 5 Shelves, 24"" Deep|4381          |
|"Tenex 46"" x 60"" Computer Anti-Static Chairmat            |2264          |
|"Rubbermaid ClusterMat Chairmats, Mat Size- 66"" x 60""     |2061          |
|"Belkin 19"" Vented Equipment Shelf                         |1395          |
|Wilson Jones Ledger-Size, Piano-Hinge Binder, 2"            |1209          |
|"Tyvek Interoffice Envelopes, 9 1/2"" x 12 1/2""            |935           |
|"Belkin 19"" Center-Weighted Shelf                          |846           |
|"Wilson Jones Elliptical Ring 3 1/2"" Capacity Binders      |830           |
|"Xerox Color Copier Paper, 11"" x 17""                      |591           |
|"Eldon Delta Triangular Chair Mat, 52"" x 58""              |54

In [0]:
# What are the total sales, and profit per quarter of every year

resultDF.groupBy("OrderYear", \
            F.quarter("OrderDate").alias("Quarter_Of_Year")) \
        .agg(F.round(F.sum("Sales"), 2).alias("Total_Sales"), \
             F.round(F.sum("Profit"), 2).alias("Total_Profit")) \
        .orderBy("OrderYear", "Quarter_Of_Year") \
.show(10, truncate=False, vertical=False)

+---------+---------------+-----------+------------+
|OrderYear|Quarter_Of_Year|Total_Sales|Total_Profit|
+---------+---------------+-----------+------------+
|2014     |1              |73807.36   |3622.56     |
|2014     |2              |86283.22   |11244.38    |
|2014     |3              |142601.19  |12644.68    |
|2014     |4              |179072.02  |21549.2     |
|2015     |1              |68630.68   |9200.96     |
|2015     |2              |86626.14   |12423.69    |
|2015     |3              |128609.75  |16747.01    |
|2015     |4              |180559.67  |23385.27    |
|2016     |1              |91971.55   |11316.65    |
|2016     |2              |134672.65  |16191.94    |
+---------+---------------+-----------+------------+
only showing top 10 rows



In [0]:
# which region is the most profitable

resultDF.groupBy("Region") \
        .agg(F.round(F.sum("Profit") , 2).alias("Total_Profit")) \
        .orderBy(F.desc("Total_Profit")) \
.show(1, truncate=False, vertical=False)

+------+------------+
|Region|Total_Profit|
+------+------------+
|West  |107303.7    |
+------+------------+
only showing top 1 row



In [0]:
# what are the 10 least profitable states

resultDF.groupBy("State") \
        .agg(F.round(F.sum("Profit"), 2).alias("Total_Profit")) \
        .orderBy("Total_Profit") \
.show(10, truncate=False, vertical=False)


+--------------+------------+
|State         |Total_Profit|
+--------------+------------+
|Texas         |-25523.99   |
|Ohio          |-16591.51   |
|Pennsylvania  |-15432.48   |
|Illinois      |-12020.47   |
|North Carolina|-7475.03    |
|Colorado      |-6486.08    |
|Tennessee     |-5275.77    |
|Florida       |-3378.91    |
|Arizona       |-3199.77    |
|Oregon        |-1205.48    |
+--------------+------------+
only showing top 10 rows



In [0]:
# how many customers do we have?

resultDF.agg(F.countDistinct("CustomerID").alias("No_Of_Customers")) \
.show(1, truncate=False, vertical=False)

+---------------+
|No_Of_Customers|
+---------------+
|793            |
+---------------+



In [0]:
# which is the most used shipped mode 

resultDF.groupBy("ShipMode") \
        .agg(F.count("ShipMode").alias("Ship_Mode_Count")) \
        .orderBy(F.desc("Ship_Mode_Count")) \
.show(1, truncate=False, vertical=False)

+--------------+---------------+
|ShipMode      |Ship_Mode_Count|
+--------------+---------------+
|Standard Class|5968           |
+--------------+---------------+
only showing top 1 row

