# Lab: Basic Spark Operations and Transformation

Create SparkSession variable

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName('spark_training') \
.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

spark

# Loading data into a Spark environment

We are going to use multiple files in this exercise.
- `northwind_order_details.csv`
- `northwind_orders.csv`
- `northwind_products.csv`
- `northwind_categories.csv`

Load these files and take a look at the structure.

In [None]:
import os

if "DATABRICKS_RUNTIME_VERSION" in os.environ:
    DATA_FOLDER = 's3://bah-data'
else: 
    DATA_FOLDER = '../data'

In [None]:
# Set your data directory to the base folder of your lab files


ord_detail = spark.read.option('header', 'true').option('inferSchema', 'true').csv(f'{DATA_FOLDER}/northwind_order_details.csv')
orders = spark.read.option('header', 'true').option('inferSchema', 'true').csv(f'{DATA_FOLDER}/northwind_orders.csv')
prod = spark.read.option('header', 'true').option('inferSchema', 'true').csv(f'{DATA_FOLDER}/northwind_products.csv')
cat =  spark.read.option('header', 'true').option('inferSchema', 'true').csv(f'{DATA_FOLDER}/northwind_categories.csv')

ord_detail.show(2)
orders.show(2)
prod.show(2)
cat.show(2)

+-------+---------+---------+--------+--------+
|orderID|productID|unitPrice|quantity|discount|
+-------+---------+---------+--------+--------+
|  10248|       11|     14.0|      12|     0.0|
|  10248|       42|      9.8|      10|     0.0|
+-------+---------+---------+--------+--------+
only showing top 2 rows

+-------+----------+----------+-------------------+-------------------+--------------------+-------+-------+--------------------+------------------+--------+----------+--------------+-----------+
|orderID|customerID|employeeID|          orderDate|       requiredDate|         shippedDate|shipVia|freight|            shipName|       shipAddress|shipCity|shipRegion|shipPostalCode|shipCountry|
+-------+----------+----------+-------------------+-------------------+--------------------+-------+-------+--------------------+------------------+--------+----------+--------------+-----------+
|  10248|     VINET|         5|1996-07-04 00:00:00|1996-08-01 00:00:00|1996-07-16 00:00:...|      3

Customers have largest average freight in their orders? Make sure that your report has understandable names.

In [None]:
orders \
.groupBy('customerID') \
.mean('freight') \
.orderBy('avg(freight)', ascending=False) \
.withColumnRenamed('avg(freight)', 'Average freight per customer\'s order') \
.show(5)

+----------+------------------------------------+
|customerID|Average freight per customer's order|
+----------+------------------------------------+
|     SAVEA|                  215.60322580645163|
|     ERNSH|                  206.84633333333335|
|     QUICK|                  200.20107142857142|
|     QUEEN|                   152.5153846153846|
|     HUNGO|                  145.01263157894738|
+----------+------------------------------------+
only showing top 5 rows



Similarily, the management is interested in which employee have handled most order. Create a DF displaying this information.

In [None]:
orders \
.groupBy('employeeID') \
.count() \
.orderBy('count', ascending=False) \
.withColumnRenamed('count', 'Orders handled') \
.show(5)

+----------+--------------+
|employeeID|Orders handled|
+----------+--------------+
|         4|           156|
|         3|           127|
|         1|           123|
|         8|           104|
|         2|            96|
+----------+--------------+
only showing top 5 rows



Which employee had smallest number of orders?

In [None]:
orders \
.groupBy('employeeID') \
.count() \
.orderBy('count', ascending=True) \
.withColumnRenamed('count', 'Orders handled') \
.show(5)

+----------+--------------+
|employeeID|Orders handled|
+----------+--------------+
|         5|            42|
|         9|            43|
|         6|            67|
|         7|            72|
|         2|            96|
+----------+--------------+
only showing top 5 rows



Is this good metric to access employee performance ? Some orders are more valuable than others. Calculate total amount employee's orders brought to the company.

Note: Don't forget discounts.

In [None]:
import pyspark.sql.functions as s_f

order_revenue_expr = s_f.col('unitPrice') * s_f.col('quantity') * (s_f.lit(1) - s_f.col('discount'))
order_revenue_expr = s_f.round(order_revenue_expr, 4)

orders.select('orderID', 'employeeID') \
.join(ord_detail, 'orderID',  'inner') \
.withColumn('order_revenue', order_revenue_expr) \
.groupBy('employeeID') \
.sum('order_revenue') \
.orderBy('sum(order_revenue)', ascending=True) \
.withColumnRenamed('sum(order_revenue)', 'Total order revenue') \
.show()


+----------+-------------------+
|employeeID|Total order revenue|
+----------+-------------------+
|         5|         68792.2825|
|         6|         73913.1295|
|         9|  77308.06650000002|
|         7| 124568.23500000002|
|         8| 126862.27749999995|
|         2|         166537.755|
|         1| 192107.60450000007|
|         3| 202812.84299999996|
|         4| 232890.84600000005|
+----------+-------------------+



Using similar approach, help regional managers understand which countries bring the most revenue to the company.

In [None]:
orders.select('orderID', 'shipCountry') \
.join(ord_detail, 'orderID',  'inner') \
.withColumn('order_revenue', order_revenue_expr) \
.groupBy('shipCountry') \
.sum('order_revenue') \
.orderBy('sum(order_revenue)', ascending=False) \
.withColumnRenamed('sum(order_revenue)', 'Total order revenue') \
.show(5)

+-----------+-------------------+
|shipCountry|Total order revenue|
+-----------+-------------------+
|        USA| 245584.61050000007|
|    Germany|        230284.6335|
|    Austria| 128003.83850000001|
|     Brazil| 106925.77650000002|
|     France|  81358.32249999998|
+-----------+-------------------+
only showing top 5 rows



Another line of inquiry is about warehouse efficiency - how soon we ship orders to clients. Calculate the statistics (avg, stdev, min, max) on days between order creation and shipping date.

Note: Make sure your date columns are properly formatted.
Hint: `pyspark.sql.functions.datediff()` might be useful here

In [None]:
orders = orders \
.withColumn('orderDate', s_f.to_date(s_f.col('orderDate'), 'yyyy-MM-dd HH:mm:ss.000')) \
.withColumn('shippedDate', s_f.to_date(s_f.col('shippedDate'), 'yyyy-MM-dd HH:mm:ss.000'))

In [None]:
orders \
.withColumn('warehouse_delay', s_f.datediff('shippedDate', 'orderDate')) \
.select('warehouse_delay').describe().show()

+-------+------------------+
|summary|   warehouse_delay|
+-------+------------------+
|  count|               809|
|   mean| 8.491965389369591|
| stddev|6.8386820004845355|
|    min|                 1|
|    max|                37|
+-------+------------------+



Another department in the company is interested to understand how our company's products are split across categories.

Using the information in the `products` and `categories` find out which categories hold the highest percentage of our warehouse value.

In [None]:
prod.select('categoryID', (s_f.col('unitPrice') * s_f.col('unitsInStock')).alias('value')) \
.join(cat, 'categoryID', 'inner') \
.select('categoryName', 'value') \
.groupBy('categoryName').sum('value') \
.orderBy('sum(value)', ascending=False) \
.show()

+--------------+------------------+
|  categoryName|        sum(value)|
+--------------+------------------+
|       Seafood|          13010.35|
|     Beverages|          12480.25|
|    Condiments|          12023.55|
|Dairy Products|           11271.2|
|   Confections|10392.199999999999|
|  Meat/Poultry|           5729.45|
|Grains/Cereals|            5594.5|
|       Produce|3549.3500000000004|
+--------------+------------------+



What is the most profitable product?

In [None]:
ord_detail.select('orderID', 'productID', order_revenue_expr.alias('order_revenue')) \
.join(prod.select('productID', 'productName'), 'productID') \
.groupBy(['productID', 'productName']).sum('order_revenue') \
.withColumnRenamed('sum(order_revenue)', 'Total product revenue') \
.orderBy('Total product revenue', ascending=False) \
.show(truncate=False)

+---------+-------------------------------+---------------------+
|productID|productName                    |Total product revenue|
+---------+-------------------------------+---------------------+
|38       |Côte de Blaye                  |141396.735           |
|29       |Thüringer Rostbratwurst        |80368.67199999999    |
|59       |Raclette Courdavault           |71155.7              |
|62       |Tarte au sucre                 |47234.97             |
|60       |Camembert Pierrot              |46825.48             |
|56       |Gnocchi di nonna Alice         |42593.06             |
|51       |Manjimup Dried Apples          |41819.65             |
|17       |Alice Mutton                   |32698.379999999997   |
|18       |Carnarvon Tigers               |29171.875            |
|28       |Rössle Sauerkraut              |25696.640000000007   |
|72       |Mozzarella di Giovanni         |24900.13             |
|43       |Ipoh Coffee                    |23526.699999999997   |
|20       

Finally, find most profitable categories from orders.

In [None]:
ord_detail.select('orderID', 'productID', order_revenue_expr.alias('order_revenue')) \
.join(prod.select('productID', 'productName', 'categoryID'), 'productID') \
.join(cat.select('categoryID', 'categoryName'), 'categoryID', 'inner') \
.groupBy(['categoryID', 'categoryName']).sum('order_revenue') \
.withColumnRenamed('sum(order_revenue)', 'Total category revenue') \
.orderBy('Total category revenue', ascending=False) \
.show()

+----------+--------------+----------------------+
|categoryID|  categoryName|Total category revenue|
+----------+--------------+----------------------+
|         1|     Beverages|    267868.17999999993|
|         4|Dairy Products|            234507.285|
|         3|   Confections|    167357.22499999995|
|         6|  Meat/Poultry|           163022.3595|
|         8|       Seafood|    131261.73750000002|
|         2|    Condiments|    106047.08500000002|
|         7|       Produce|     99984.57999999999|
|         5|Grains/Cereals|     95744.58750000001|
+----------+--------------+----------------------+

