# Data Analysis of a Retail Store using Apache Spark

These tables come from the Cloudera Installation. A fictitious retail house with the following tables: customers, departments, categories, products, orders and order_items. In this notebook we will go through Spark SQL as well as Spark DF API based transformations and actions ranging from simple to quite complex.

In [1]:
import os
import pandas as pd
import numpy as np

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
from pyspark.sql.window import Window

import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

In [2]:
# setting random seed for notebook reproducability
rnd_seed=23
np.random.seed=23
np.random.set_state=23

## 1. Understanding the Data Set:

```
mysql> describe customers;
+-------------------+--------------+------+-----+---------+----------------+
| Field             | Type         | Null | Key | Default | Extra          |
+-------------------+--------------+------+-----+---------+----------------+
| customer_id       | int(11)      | NO   | PRI | NULL    | auto_increment |
| customer_fname    | varchar(45)  | NO   |     | NULL    |                |
| customer_lname    | varchar(45)  | NO   |     | NULL    |                |
| customer_email    | varchar(45)  | NO   |     | NULL    |                |
| customer_password | varchar(45)  | NO   |     | NULL    |                |
| customer_street   | varchar(255) | NO   |     | NULL    |                |
| customer_city     | varchar(45)  | NO   |     | NULL    |                |
| customer_state    | varchar(45)  | NO   |     | NULL    |                |
| customer_zipcode  | varchar(45)  | NO   |     | NULL    |                |
+-------------------+--------------+------+-----+---------+----------------+

mysql> describe departments;
+-----------------+-------------+------+-----+---------+----------------+
| Field           | Type        | Null | Key | Default | Extra          |
+-----------------+-------------+------+-----+---------+----------------+
| department_id   | int(11)     | NO   | PRI | NULL    | auto_increment |
| department_name | varchar(45) | NO   |     | NULL    |                |
+-----------------+-------------+------+-----+---------+----------------+

mysql> describe categories;
+------------------------+-------------+------+-----+---------+----------------+
| Field                  | Type        | Null | Key | Default | Extra          |
+------------------------+-------------+------+-----+---------+----------------+
| category_id            | int(11)     | NO   | PRI | NULL    | auto_increment |
| category_department_id | int(11)     | NO   |     | NULL    |                |
| category_name          | varchar(45) | NO   |     | NULL    |                |
+------------------------+-------------+------+-----+---------+----------------+

mysql> describe products;
+---------------------+--------------+------+-----+---------+----------------+
| Field               | Type         | Null | Key | Default | Extra          |
+---------------------+--------------+------+-----+---------+----------------+
| product_id          | int(11)      | NO   | PRI | NULL    | auto_increment |
| product_category_id | int(11)      | NO   |     | NULL    |                |
| product_name        | varchar(45)  | NO   |     | NULL    |                |
| product_description | varchar(255) | NO   |     | NULL    |                |
| product_price       | float        | NO   |     | NULL    |                |
| product_image       | varchar(255) | NO   |     | NULL    |                |
+---------------------+--------------+------+-----+---------+----------------+

mysql> describe orders;
+-------------------+-------------+------+-----+---------+----------------+
| Field             | Type        | Null | Key | Default | Extra          |
+-------------------+-------------+------+-----+---------+----------------+
| order_id          | int(11)     | NO   | PRI | NULL    | auto_increment |
| order_date        | datetime    | NO   |     | NULL    |                |
| order_customer_id | int(11)     | NO   |     | NULL    |                |
| order_status      | varchar(45) | NO   |     | NULL    |                |
+-------------------+-------------+------+-----+---------+----------------+

mysql> describe order_items;
+--------------------------+------------+------+-----+---------+----------------+
| Field                    | Type       | Null | Key | Default | Extra          |
+--------------------------+------------+------+-----+---------+----------------+
| order_item_id            | int(11)    | NO   | PRI | NULL    | auto_increment |
| order_item_order_id      | int(11)    | NO   |     | NULL    |                |
| order_item_product_id    | int(11)    | NO   |     | NULL    |                |
| order_item_quantity      | tinyint(4) | NO   |     | NULL    |                |
| order_item_subtotal      | float      | NO   |     | NULL    |                |
| order_item_product_price | float      | NO   |     | NULL    |                |
+--------------------------+------------+------+-----+---------+----------------+
```

## 2. Creating the Spark Session:

In [3]:
os.environ['SPARK_HOME']

'/Users/anindyas/work/spark-2.2.0-bin-hadoop2.6'

In [4]:
spark = SparkSession.builder.master("local[2]").appName("retail_database_analysis").getOrCreate()

In [5]:
spark

In [6]:
sc = spark.sparkContext
sc

In [7]:
sqlContext = SQLContext(spark.sparkContext)
sqlContext

<pyspark.sql.context.SQLContext at 0x1139b26a0>

## 3. Load the Data From Files Into DataFrames:

In [8]:
CUSTOMERS_DATA = 'data/retail_db/customers.csv'
DEPARTMENTS_DATA = 'data/retail_db/departments.csv'
CATEGORIES_DATA = 'data/retail_db/categories.csv'
PRODUCTS_DATA = 'data/retail_db/products.csv'
ORDERS_DATA = 'data/retail_db/orders.csv'
ORDER_ITEMS_DATA = 'data/retail_db/order_items.csv'

In [9]:
# define the schema, corresponding to a line in the csv data file for Customer
customers_schema = StructType([
    StructField('customer_id', IntegerType(), nullable=True),
    StructField('customer_fname', StringType(), nullable=True),
    StructField('customer_lname', StringType(), nullable=True),
    StructField('customer_email', StringType(), nullable=True),
    StructField('customer_password', StringType(), nullable=True),
    StructField('customer_street', StringType(), nullable=True),
    StructField('customer_city', StringType(), nullable=True),
    StructField('customer_state', StringType(), nullable=True),
    StructField('customer_zipcode', StringType(), nullable=True)])

In [10]:
departments_schema = StructType([
    StructField('department_id', IntegerType(), nullable=True),
    StructField('category_name', StringType(), nullable=True)])

In [11]:
categories_schema = StructType([
    StructField('category_id', IntegerType(), nullable=True),
    StructField('category_department_id', IntegerType(), nullable=True),
    StructField('category_name', StringType(), nullable=True)])

In [12]:
products_schema = StructType([
    StructField('product_id', IntegerType(), nullable=True),
    StructField('product_category_id', IntegerType(), nullable=True),
    StructField('product_name', StringType(), nullable=True),
    StructField('product_description', StringType(), nullable=True),
    StructField('product_price', FloatType(), nullable=True),
    StructField('product_image', StringType(), nullable=True)])

In [13]:
orders_schema = StructType([
    StructField('order_id', IntegerType(), nullable=True),
    StructField('order_date', StringType(), nullable=True),
    StructField('order_customer_id', IntegerType(), nullable=True),
    StructField('order_status', StringType(), nullable=True)])

In [14]:
order_items_schema = StructType([
    StructField('order_item_id', IntegerType(), nullable=True),
    StructField('order_item_order_id', IntegerType(), nullable=True),
    StructField('order_item_product_id', IntegerType(), nullable=True),
    StructField('order_item_quantity', IntegerType(), nullable=True),
    StructField('order_item_subtotal', FloatType(), nullable=True),
    StructField('order_item_product_price', FloatType(), nullable=True)])

In [15]:
# Load data
customers_df = spark.read.csv(path=CUSTOMERS_DATA, schema=customers_schema).cache()
departments_df = spark.read.csv(path=DEPARTMENTS_DATA, schema=departments_schema).cache()
categories_df = spark.read.csv(path=CATEGORIES_DATA, schema=categories_schema).cache()
products_df = spark.read.csv(path=PRODUCTS_DATA, schema=products_schema).cache()
orders_df = spark.read.csv(path=ORDERS_DATA, schema=orders_schema).cache()
order_items_df = spark.read.csv(path=ORDER_ITEMS_DATA, schema=order_items_schema).cache()

In [16]:
customers_df.createOrReplaceTempView("customers")

In [17]:
customers_df.select([col(choice) for choice in np.random.choice(customers_df.columns, size=5, replace=False)]).show(5)

+--------------+----------------+--------------------+--------------+-----------+
|customer_lname|customer_zipcode|     customer_street|customer_fname|customer_id|
+--------------+----------------+--------------------+--------------+-----------+
|     Hernandez|           78521|  6303 Heather Plaza|       Richard|          1|
|       Barrett|           80126|9526 Noble Embers...|          Mary|          2|
|         Smith|           00725|3422 Blue Pioneer...|           Ann|          3|
|         Jones|           92069|  8324 Little Common|          Mary|          4|
|        Hudson|           00725|10 Crystal River ...|        Robert|          5|
+--------------+----------------+--------------------+--------------+-----------+
only showing top 5 rows



In [18]:
departments_df.createOrReplaceTempView("departments")

In [19]:
departments_df.show(5)

+-------------+-------------+
|department_id|category_name|
+-------------+-------------+
|            2|      Fitness|
|            3|     Footwear|
|            4|      Apparel|
|            5|         Golf|
|            6|     Outdoors|
+-------------+-------------+
only showing top 5 rows



In [20]:
categories_df.createOrReplaceTempView("categories")

In [21]:
categories_df.show(5)

+-----------+----------------------+-------------------+
|category_id|category_department_id|      category_name|
+-----------+----------------------+-------------------+
|          1|                     2|           Football|
|          2|                     2|             Soccer|
|          3|                     2|Baseball & Softball|
|          4|                     2|         Basketball|
|          5|                     2|           Lacrosse|
+-----------+----------------------+-------------------+
only showing top 5 rows



In [22]:
products_df.createOrReplaceTempView("products")

In [23]:
products_df.show(5)

+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|        product_name|product_description|product_price|       product_image|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|         1|                  2|Quest Q64 10 FT. ...|               null|        59.98|http://images.acm...|
|         2|                  2|Under Armour Men'...|               null|       129.99|http://images.acm...|
|         3|                  2|Under Armour Men'...|               null|        89.99|http://images.acm...|
|         4|                  2|Under Armour Men'...|               null|        89.99|http://images.acm...|
|         5|                  2|Riddell Youth Rev...|               null|       199.99|http://images.acm...|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
only showing top 5 

In [24]:
orders_df.createOrReplaceTempView("orders")

In [25]:
orders_df.show(5)

+--------+--------------------+-----------------+---------------+
|order_id|          order_date|order_customer_id|   order_status|
+--------+--------------------+-----------------+---------------+
|       1|2013-07-25 00:00:...|            11599|         CLOSED|
|       2|2013-07-25 00:00:...|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|            12111|       COMPLETE|
|       4|2013-07-25 00:00:...|             8827|         CLOSED|
|       5|2013-07-25 00:00:...|            11318|       COMPLETE|
+--------+--------------------+-----------------+---------------+
only showing top 5 rows



In [26]:
order_items_df.createOrReplaceTempView("order_items")

In [27]:
order_items_df.show(5)

+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|            1|                  1|                  957|                  1|             299.98|                  299.98|
|            2|                  2|                 1073|                  1|             199.99|                  199.99|
|            3|                  2|                  502|                  5|              250.0|                    50.0|
|            4|                  2|                  403|                  1|             129.99|                  129.99|
|            5|                  4|                  897|                  2|              49.98|                   24.99|
+-------------+-

## 4. Data Analysis:

In [28]:
import re

def strip_margin(text):
    nomargin = re.sub('\n[ \t]*\|', ' ', text)
    trimmed = re.sub('\s+', ' ', nomargin)
    return trimmed

### 4.1 Get How many Orders were placed:

**SQL:**

In [29]:
spark.sql("select count(1) from orders").show()

+--------+
|count(1)|
+--------+
|   68883|
+--------+



**DF API:**

In [30]:
orders_df.count()

68883

### 4.2 Get Average Revenue Per Order:

**SQL:**

In [31]:
# Some orders are cancelled and they do not have corresponding entries in order_items, 
# so we need count(distinct oi.order_item_order_id)
spark.sql(strip_margin(
        """SELECT sum(oi.order_item_subtotal) / count(distinct oi.order_item_order_id) as avg_rev_per_order
          |FROM orders o JOIN order_items oi 
          |    ON o.order_id = oi.order_item_order_id
        """)).show()

+-----------------+
|avg_rev_per_order|
+-----------------+
|597.6322996016944|
+-----------------+



**DF API:**

In [32]:
# how best to join two DataFrames without having a duplicated colum? Mention them as expression
(orders_df.join(order_items_df, orders_df.order_id == order_items_df.order_item_order_id)
 .select(['order_item_subtotal', 'order_item_order_id'])
 .select((F.sum('order_item_subtotal') / F.countDistinct('order_item_order_id')).alias('avg_rev_per_order'))
 .show())

+-----------------+
|avg_rev_per_order|
+-----------------+
|597.6322996016944|
+-----------------+



### 4.3 Get Average Revenue Per Day:

**SQL:**

In [33]:
# Some orders are cancelled and they do not have corresponding entries in order_items, 
# so we need count(distinct oi.order_item_order_id)
spark.sql(strip_margin(
        """SELECT o.order_date, sum(oi.order_item_subtotal) / count(distinct oi.order_item_order_id) as avg_rev_per_day
          |FROM orders o JOIN order_items oi 
          |    ON o.order_id = oi.order_item_order_id
          |GROUP BY o.order_date 
          |ORDER BY o.order_date
        """)).show(truncate=False)

+---------------------+-----------------+
|order_date           |avg_rev_per_day  |
+---------------------+-----------------+
|2013-07-25 00:00:00.0|587.5330286848134|
|2013-07-26 00:00:00.0|585.9234878147109|
|2013-07-27 00:00:00.0|577.5676682063512|
|2013-07-28 00:00:00.0|551.4119109020958|
|2013-07-29 00:00:00.0|635.5883909684641|
|2013-07-30 00:00:00.0|564.5363838698838|
|2013-07-31 00:00:00.0|630.9955146643533|
|2013-08-01 00:00:00.0|608.4982189502356|
|2013-08-02 00:00:00.0|587.8871075517388|
|2013-08-03 00:00:00.0|599.1628419048382|
|2013-08-04 00:00:00.0|594.3201416863335|
|2013-08-05 00:00:00.0|592.8305590897799|
|2013-08-06 00:00:00.0|579.68106844792  |
|2013-08-07 00:00:00.0|583.906170096101 |
|2013-08-08 00:00:00.0|588.4743191939134|
|2013-08-09 00:00:00.0|629.4593056380147|
|2013-08-10 00:00:00.0|586.3113241756664|
|2013-08-11 00:00:00.0|551.5472206441007|
|2013-08-12 00:00:00.0|612.4790563343757|
|2013-08-13 00:00:00.0|604.1594044945457|
+---------------------+-----------

**DF API:**

In [34]:
# how best to join two DataFrames without having a duplicated colum? Mention them as expression
(orders_df.join(order_items_df, orders_df.order_id == order_items_df.order_item_order_id)
 .select(['order_date', 'order_item_subtotal', 'order_item_order_id'])
 .groupBy('order_date')
 .agg((F.sum('order_item_subtotal') / F.countDistinct('order_item_order_id')).alias('avg_rev_per_day'))
 .orderBy('order_date')
 .show(truncate=False))

+---------------------+-----------------+
|order_date           |avg_rev_per_day  |
+---------------------+-----------------+
|2013-07-25 00:00:00.0|587.5330286848134|
|2013-07-26 00:00:00.0|585.9234878147109|
|2013-07-27 00:00:00.0|577.5676682063512|
|2013-07-28 00:00:00.0|551.4119109020958|
|2013-07-29 00:00:00.0|635.5883909684641|
|2013-07-30 00:00:00.0|564.5363838698838|
|2013-07-31 00:00:00.0|630.9955146643533|
|2013-08-01 00:00:00.0|608.4982189502356|
|2013-08-02 00:00:00.0|587.8871075517388|
|2013-08-03 00:00:00.0|599.1628419048382|
|2013-08-04 00:00:00.0|594.3201416863335|
|2013-08-05 00:00:00.0|592.8305590897799|
|2013-08-06 00:00:00.0|579.68106844792  |
|2013-08-07 00:00:00.0|583.906170096101 |
|2013-08-08 00:00:00.0|588.4743191939134|
|2013-08-09 00:00:00.0|629.4593056380147|
|2013-08-10 00:00:00.0|586.3113241756664|
|2013-08-11 00:00:00.0|551.5472206441007|
|2013-08-12 00:00:00.0|612.4790563343757|
|2013-08-13 00:00:00.0|604.1594044945457|
+---------------------+-----------

### 4.4 Get Highest Priced Product:

**SQL:**

In [35]:
spark.sql(strip_margin(
        """SELECT p.* 
          |FROM products p
          |WHERE p.product_price = (SELECT max(q.product_price) FROM products q)
        """)).show()

+----------+-------------------+-------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|       product_name|product_description|product_price|       product_image|
+----------+-------------------+-------------------+-------------------+-------------+--------------------+
|       208|                 10|SOLE E35 Elliptical|               null|      1999.99|http://images.acm...|
+----------+-------------------+-------------------+-------------------+-------------+--------------------+



**SQL Using Window Function:**

In [36]:
spark.sql(strip_margin(
        """SELECT * 
          |FROM ( 
          |    SELECT *, 
          |         rank() OVER (ORDER BY product_price DESC) as rank
          |    FROM products) tmp
          |WHERE rank <= 1
        """)).show()

+----------+-------------------+-------------------+-------------------+-------------+--------------------+----+
|product_id|product_category_id|       product_name|product_description|product_price|       product_image|rank|
+----------+-------------------+-------------------+-------------------+-------------+--------------------+----+
|       208|                 10|SOLE E35 Elliptical|               null|      1999.99|http://images.acm...|   1|
+----------+-------------------+-------------------+-------------------+-------------+--------------------+----+



**DF API:**

In [37]:
(products_df
 .select('*')
 .filter(col('product_price') == products_df.select(F.max('product_price')).collect()[0][0])
 .show())

+----------+-------------------+-------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|       product_name|product_description|product_price|       product_image|
+----------+-------------------+-------------------+-------------------+-------------+--------------------+
|       208|                 10|SOLE E35 Elliptical|               null|      1999.99|http://images.acm...|
+----------+-------------------+-------------------+-------------------+-------------+--------------------+



**DF API Using Window Function:**

In [38]:
windowSpec = Window.orderBy(products_df['product_price'].desc())

In [39]:
products_df.select('*', F.rank().over(windowSpec).alias('rank')).filter(col('rank') <= 1).show()

+----------+-------------------+-------------------+-------------------+-------------+--------------------+----+
|product_id|product_category_id|       product_name|product_description|product_price|       product_image|rank|
+----------+-------------------+-------------------+-------------------+-------------+--------------------+----+
|       208|                 10|SOLE E35 Elliptical|               null|      1999.99|http://images.acm...|   1|
+----------+-------------------+-------------------+-------------------+-------------+--------------------+----+



### 4.5 Get Highest Revenue Earning Products:

**SQL:**

In [40]:
spark.sql(strip_margin(
        """SELECT products.* 
          |FROM products, (SELECT order_item_product_id, sum(oi.order_item_subtotal) as product_revenue
          |                    FROM order_items oi 
          |                    GROUP BY order_item_product_id 
          |                    ORDER BY product_revenue DESC 
          |                    LIMIT 1) a
          |WHERE product_id = a.order_item_product_id
        """)).show()

+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|        product_name|product_description|product_price|       product_image|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|      1004|                 45|Field & Stream Sp...|               null|       399.98|http://images.acm...|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+



**DF API:**

In [41]:
# 1. Get the sum of revenue of all the products grouped by order_item_product_id from order_items table
# 2. Sort the result in descending order of their revenues
# 3. Take only the first one from the sorted order using the limit() function
# 4. Join with the prorcuts column to get the product details
(order_items_df.select(['order_item_product_id', 'order_item_subtotal'])
     .groupBy('order_item_product_id')
     .agg(F.sum('order_item_subtotal').alias('product_revenue'))
     .orderBy('product_revenue', ascending=False)
     .limit(1)
     .join(products_df, order_items_df.order_item_product_id == products_df.product_id)
     .select(products_df.columns)
     .show())

+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|        product_name|product_description|product_price|       product_image|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|      1004|                 45|Field & Stream Sp...|               null|       399.98|http://images.acm...|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+



### 4.6 Get Number of Orders By Status:

**SQL:**

In [42]:
spark.sql(strip_margin(
        """SELECT order_status, count(1) as total
          |FROM orders o
          |GROUP BY o.order_status
        """)).show()

+---------------+-----+
|   order_status|total|
+---------------+-----+
|PENDING_PAYMENT|15030|
|       COMPLETE|22899|
|        ON_HOLD| 3798|
| PAYMENT_REVIEW|  729|
|     PROCESSING| 8275|
|         CLOSED| 7556|
|SUSPECTED_FRAUD| 1558|
|        PENDING| 7610|
|       CANCELED| 1428|
+---------------+-----+



**DF API:**

In [43]:
orders_df.groupBy(orders_df.order_status).count().withColumnRenamed('count', 'total').show()

+---------------+-----+
|   order_status|total|
+---------------+-----+
|PENDING_PAYMENT|15030|
|       COMPLETE|22899|
|        ON_HOLD| 3798|
| PAYMENT_REVIEW|  729|
|     PROCESSING| 8275|
|         CLOSED| 7556|
|SUSPECTED_FRAUD| 1558|
|        PENDING| 7610|
|       CANCELED| 1428|
+---------------+-----+



### 4.7 Get Number of Orders By Order Date and Order Status:

**SQL:**

In [44]:
spark.sql(strip_margin(
        """SELECT order_date, order_status, count(1) as total
          |FROM orders o
          |GROUP BY order_date, o.order_status
        """)).show(truncate=False)

+---------------------+---------------+-----+
|order_date           |order_status   |total|
+---------------------+---------------+-----+
|2013-08-16 00:00:00.0|COMPLETE       |43   |
|2013-08-30 00:00:00.0|CLOSED         |17   |
|2013-09-10 00:00:00.0|COMPLETE       |80   |
|2013-10-05 00:00:00.0|SUSPECTED_FRAUD|4    |
|2013-12-02 00:00:00.0|SUSPECTED_FRAUD|3    |
|2013-12-09 00:00:00.0|ON_HOLD        |9    |
|2013-12-20 00:00:00.0|SUSPECTED_FRAUD|3    |
|2013-12-23 00:00:00.0|PAYMENT_REVIEW |2    |
|2014-01-02 00:00:00.0|CLOSED         |15   |
|2014-02-11 00:00:00.0|CANCELED       |3    |
|2014-02-14 00:00:00.0|ON_HOLD        |11   |
|2014-02-21 00:00:00.0|PROCESSING     |25   |
|2014-05-13 00:00:00.0|SUSPECTED_FRAUD|3    |
|2014-06-27 00:00:00.0|PENDING        |26   |
|2014-07-16 00:00:00.0|ON_HOLD        |3    |
|2013-08-16 00:00:00.0|PENDING_PAYMENT|30   |
|2013-08-29 00:00:00.0|PROCESSING     |31   |
|2013-09-10 00:00:00.0|SUSPECTED_FRAUD|3    |
|2013-09-25 00:00:00.0|CLOSED     

**DF API:**

In [45]:
(orders_df
 .groupBy([orders_df.order_date, orders_df.order_status])
 .count().withColumnRenamed('count', 'total')
 .show(truncate=False))

+---------------------+---------------+-----+
|order_date           |order_status   |total|
+---------------------+---------------+-----+
|2013-08-16 00:00:00.0|COMPLETE       |43   |
|2013-08-30 00:00:00.0|CLOSED         |17   |
|2013-09-10 00:00:00.0|COMPLETE       |80   |
|2013-10-05 00:00:00.0|SUSPECTED_FRAUD|4    |
|2013-12-02 00:00:00.0|SUSPECTED_FRAUD|3    |
|2013-12-09 00:00:00.0|ON_HOLD        |9    |
|2013-12-20 00:00:00.0|SUSPECTED_FRAUD|3    |
|2013-12-23 00:00:00.0|PAYMENT_REVIEW |2    |
|2014-01-02 00:00:00.0|CLOSED         |15   |
|2014-02-11 00:00:00.0|CANCELED       |3    |
|2014-02-14 00:00:00.0|ON_HOLD        |11   |
|2014-02-21 00:00:00.0|PROCESSING     |25   |
|2014-05-13 00:00:00.0|SUSPECTED_FRAUD|3    |
|2014-06-27 00:00:00.0|PENDING        |26   |
|2014-07-16 00:00:00.0|ON_HOLD        |3    |
|2013-08-16 00:00:00.0|PENDING_PAYMENT|30   |
|2013-08-29 00:00:00.0|PROCESSING     |31   |
|2013-09-10 00:00:00.0|SUSPECTED_FRAUD|3    |
|2013-09-25 00:00:00.0|CLOSED     

### 4.8 Get all CANCELED orders with amount greater than \$1000:

**SQL:**

In [46]:
spark.sql(strip_margin(
    """SELECT q.* 
      |FROM (SELECT o.order_id, o.order_date, o.order_customer_id, o.order_status, sum(oi.order_item_subtotal) as order_total 
      |      FROM orders o JOIN order_items oi 
      |          ON o.order_id = oi.order_item_order_id 
      |      WHERE o.order_status = 'CANCELED' 
      |      GROUP BY o.order_id, o.order_date, o.order_customer_id, o.order_status) q 
      |WHERE q.order_total >= 1000 
      |ORDER BY q.order_id
    """)).show(truncate=False)

+--------+---------------------+-----------------+------------+------------------+
|order_id|order_date           |order_customer_id|order_status|order_total       |
+--------+---------------------+-----------------+------------+------------------+
|753     |2013-07-29 00:00:00.0|5094             |CANCELED    |1129.75           |
|2012    |2013-08-04 00:00:00.0|5165             |CANCELED    |1499.8600311279297|
|2144    |2013-08-05 00:00:00.0|7932             |CANCELED    |1099.900032043457 |
|2189    |2013-08-06 00:00:00.0|6829             |CANCELED    |1029.9400253295898|
|2271    |2013-08-06 00:00:00.0|7603             |CANCELED    |1229.9300231933594|
|2754    |2013-08-09 00:00:00.0|8946             |CANCELED    |1109.9500274658203|
|3551    |2013-08-14 00:00:00.0|5363             |CANCELED    |1299.8700408935547|
|4354    |2013-08-20 00:00:00.0|7268             |CANCELED    |1047.9000244140625|
|4801    |2013-08-23 00:00:00.0|11630            |CANCELED    |1016.9500217437744|
|533

**DF API:**

In [47]:
(orders_df
 .filter(col('order_status') == 'CANCELED')
 .join(order_items_df, orders_df.order_id == order_items_df.order_item_order_id)
 .groupBy(orders_df.order_id, orders_df.order_date, orders_df.order_customer_id, orders_df.order_status)
 .agg(F.sum('order_item_subtotal').alias('order_total'))
 .filter(col('order_total') >= 1000)
 .orderBy('order_id')
 .show())

+--------+--------------------+-----------------+------------+------------------+
|order_id|          order_date|order_customer_id|order_status|       order_total|
+--------+--------------------+-----------------+------------+------------------+
|     753|2013-07-29 00:00:...|             5094|    CANCELED|           1129.75|
|    2012|2013-08-04 00:00:...|             5165|    CANCELED|1499.8600311279297|
|    2144|2013-08-05 00:00:...|             7932|    CANCELED| 1099.900032043457|
|    2189|2013-08-06 00:00:...|             6829|    CANCELED|1029.9400253295898|
|    2271|2013-08-06 00:00:...|             7603|    CANCELED|1229.9300231933594|
|    2754|2013-08-09 00:00:...|             8946|    CANCELED|1109.9500274658203|
|    3551|2013-08-14 00:00:...|             5363|    CANCELED|1299.8700408935547|
|    4354|2013-08-20 00:00:...|             7268|    CANCELED|1047.9000244140625|
|    4801|2013-08-23 00:00:...|            11630|    CANCELED|1016.9500217437744|
|    5331|2013-0

### 4.9 Sort Products by Category and Price:

**SQL:**

In [48]:
spark.sql(strip_margin(
        """SELECT p.* 
          |FROM products p
          |ORDER BY p.product_category_id ASC, p.product_price DESC
        """)).show()

+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|        product_name|product_description|product_price|       product_image|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|        16|                  2|Riddell Youth 360...|               null|       299.99|http://images.acm...|
|        11|                  2|Fitness Gear 300 ...|               null|       209.99|http://images.acm...|
|         5|                  2|Riddell Youth Rev...|               null|       199.99|http://images.acm...|
|        14|                  2|Quik Shade Summit...|               null|       199.99|http://images.acm...|
|        12|                  2|Under Armour Men'...|               null|       139.99|http://images.acm...|
|        23|                  2|Under Armour Men'...|               null|       139.99|http://images.acm...|
|         6|       

**DF API:**

In [49]:
(products_df
 .orderBy(['product_category_id', 'product_price'], ascending=[1,0])
 .show())

+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|        product_name|product_description|product_price|       product_image|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|        16|                  2|Riddell Youth 360...|               null|       299.99|http://images.acm...|
|        11|                  2|Fitness Gear 300 ...|               null|       209.99|http://images.acm...|
|         5|                  2|Riddell Youth Rev...|               null|       199.99|http://images.acm...|
|        14|                  2|Quik Shade Summit...|               null|       199.99|http://images.acm...|
|        12|                  2|Under Armour Men'...|               null|       139.99|http://images.acm...|
|        23|                  2|Under Armour Men'...|               null|       139.99|http://images.acm...|
|         6|       

### 4.10 Get the topmost 5 products overall sorted by Price Highest to Lowest:
**sortN: get top 5 products by price overall; globalSorting**

**SQL:**

In [50]:
spark.sql(strip_margin(
        """SELECT product_id, product_category_id, product_name, product_price
          |FROM products
          |ORDER BY product_price DESC
          |LIMIT 5
        """)).show(truncate=False)

+----------+-------------------+------------------------------------------------+-------------+
|product_id|product_category_id|product_name                                    |product_price|
+----------+-------------------+------------------------------------------------+-------------+
|208       |10                 |SOLE E35 Elliptical                             |1999.99      |
|66        |4                  |SOLE F85 Treadmill                              |1799.99      |
|199       |10                 |SOLE F85 Treadmill                              |1799.99      |
|496       |22                 |SOLE F85 Treadmill                              |1799.99      |
|1048      |47                 |"Spalding Beast 60"" Glass Portable Basketball "|1099.99      |
+----------+-------------------+------------------------------------------------+-------------+



**DF API:**

In [51]:
(products_df
    .select('product_id', 'product_category_id', 'product_name', 'product_price')
     .orderBy('product_price', ascending=0)
     .limit(5)
     .show(truncate=False))

+----------+-------------------+------------------------------------------------+-------------+
|product_id|product_category_id|product_name                                    |product_price|
+----------+-------------------+------------------------------------------------+-------------+
|208       |10                 |SOLE E35 Elliptical                             |1999.99      |
|66        |4                  |SOLE F85 Treadmill                              |1799.99      |
|199       |10                 |SOLE F85 Treadmill                              |1799.99      |
|496       |22                 |SOLE F85 Treadmill                              |1799.99      |
|1048      |47                 |"Spalding Beast 60"" Glass Portable Basketball "|1099.99      |
+----------+-------------------+------------------------------------------------+-------------+



### 4.11 Get the topmost 5 products in each category where the products are sorted by Price Highest to Lowest:
**sort: sortingByKey, sort() by price per category**

**SQL:**

In [52]:
spark.sql(strip_margin(
        """SELECT product_category_id, product_id, product_name, product_price, row_num
          |FROM ( 
          |     SELECT q.*, row_number() OVER (PARTITION BY q.product_category_id ORDER BY q.product_price) as row_num 
          |     FROM products q)
          |WHERE row_num <= 5 
          |ORDER BY product_category_id, row_num
          |      
        """)).show(truncate=False)

+-------------------+----------+----------------------------------------------+-------------+-------+
|product_category_id|product_id|product_name                                  |product_price|row_num|
+-------------------+----------+----------------------------------------------+-------------+-------+
|2                  |18        |Reebok Men's Full Zip Training Jacket         |29.97        |1      |
|2                  |22        |Kijaro Dual Lock Chair                        |29.99        |2      |
|2                  |9         |Nike Adult Vapor Jet 3.0 Receiver Gloves      |50.0         |3      |
|2                  |21        |Under Armour Kids' Highlight RM Football Clea |54.99        |4      |
|2                  |1         |Quest Q64 10 FT. x 10 FT. Slant Leg Instant U |59.98        |5      |
|3                  |38        |Nike Men's Hypervenom Phantom Premium FG Socc |0.0          |1      |
|3                  |39        |Nike Women's Pro Victory Compression Bra      |21.

**DF API:**

In [53]:
windowSpec = Window.partitionBy(products_df['product_category_id']).orderBy(products_df['product_price'].asc())

In [54]:
(products_df
    .select('product_category_id', 'product_id', 'product_name', 'product_price', F.row_number().over(windowSpec).alias('row_num'))
    .filter(col('row_num') <= 5)
    .orderBy('product_category_id', 'row_num')
    .show(truncate=False))

+-------------------+----------+----------------------------------------------+-------------+-------+
|product_category_id|product_id|product_name                                  |product_price|row_num|
+-------------------+----------+----------------------------------------------+-------------+-------+
|2                  |18        |Reebok Men's Full Zip Training Jacket         |29.97        |1      |
|2                  |22        |Kijaro Dual Lock Chair                        |29.99        |2      |
|2                  |9         |Nike Adult Vapor Jet 3.0 Receiver Gloves      |50.0         |3      |
|2                  |21        |Under Armour Kids' Highlight RM Football Clea |54.99        |4      |
|2                  |1         |Quest Q64 10 FT. x 10 FT. Slant Leg Instant U |59.98        |5      |
|3                  |38        |Nike Men's Hypervenom Phantom Premium FG Socc |0.0          |1      |
|3                  |39        |Nike Women's Pro Victory Compression Bra      |21.

### RANK and DENSE_RANK

**RANK** gives us the ranking within our ordered partition. Ties are assigned the same rank, with the next ranking(s) skipped. So, if we have 3 items at rank 2, the next rank listed would be ranked 5.

**DENSE_RANK** again gives us the ranking within our ordered partition, but the ranks are consecutive. No ranks are skipped if there are ranks with multiple items.

The Following 3 examples plays with the rank(), dense_rank() and row_number() functions.

### 4.12 Get topN products by price in each category:
**topN: For each product category get the top 5 records. i.e. top 5 ranked products in each category (some of the products may have same price so the top 5 products will all be distinct products but their prices may not be distinct 5. So, the number of distinct prices <= 5 in the top5 but the count distinct products may be >= 5. top 5 ranked products does not necessary mean there will be exactly 5 products may be less or more too.**

**SQL:**

In [55]:
spark.sql(strip_margin(
        """SELECT product_category_id, product_id, product_name, product_price, rank
          |FROM ( 
          |     SELECT q.*, rank() OVER (PARTITION BY q.product_category_id ORDER BY q.product_price DESC) as rank 
          |     FROM products q)
          |WHERE rank <= 5 
          |ORDER BY product_category_id, rank
          |      
        """)).show(truncate=False)

+-------------------+----------+------------------------------------------------+-------------+----+
|product_category_id|product_id|product_name                                    |product_price|rank|
+-------------------+----------+------------------------------------------------+-------------+----+
|2                  |16        |Riddell Youth 360 Custom Football Helmet        |299.99       |1   |
|2                  |11        |Fitness Gear 300 lb Olympic Weight Set          |209.99       |2   |
|2                  |14        |Quik Shade Summit SX170 10 FT. x 10 FT. Canop   |199.99       |3   |
|2                  |5         |Riddell Youth Revolution Speed Custom Footbal   |199.99       |3   |
|2                  |23        |Under Armour Men's Highlight MC Alter Ego Hul   |139.99       |5   |
|2                  |12        |Under Armour Men's Highlight MC Alter Ego Fla   |139.99       |5   |
|3                  |40        |Quik Shade Summit SX170 10 FT. x 10 FT. Canop   |199.99    

**DF API:**

In [56]:
windowSpec = Window.partitionBy(products_df['product_category_id']).orderBy(products_df['product_price'].desc())

In [57]:
(products_df
    .select('product_category_id', 'product_id', 'product_name', 'product_price', F.rank().over(windowSpec).alias('rank'))
    .filter(col('rank') <= 5)
    .orderBy('product_category_id', 'rank')
    .show(truncate=False))

+-------------------+----------+------------------------------------------------+-------------+----+
|product_category_id|product_id|product_name                                    |product_price|rank|
+-------------------+----------+------------------------------------------------+-------------+----+
|2                  |16        |Riddell Youth 360 Custom Football Helmet        |299.99       |1   |
|2                  |11        |Fitness Gear 300 lb Olympic Weight Set          |209.99       |2   |
|2                  |5         |Riddell Youth Revolution Speed Custom Footbal   |199.99       |3   |
|2                  |14        |Quik Shade Summit SX170 10 FT. x 10 FT. Canop   |199.99       |3   |
|2                  |12        |Under Armour Men's Highlight MC Alter Ego Fla   |139.99       |5   |
|2                  |23        |Under Armour Men's Highlight MC Alter Ego Hul   |139.99       |5   |
|3                  |40        |Quik Shade Summit SX170 10 FT. x 10 FT. Canop   |199.99    

### 4.13 Get 'topN priced' products in each category:
**topDenseN: For each category get top 5 priced products i.e. if there are 10 products with distinct top 5 prices, the DF should give us all 10 products, so all the products will be distinct as well as we will get 5 different distinct prices too among them.**

**SQL:**

In [58]:
spark.sql(strip_margin(
        """SELECT product_category_id, product_id, product_name, product_price, dense_rank
          |FROM ( 
          |     SELECT q.*, dense_rank() OVER (PARTITION BY q.product_category_id ORDER BY q.product_price DESC) as dense_rank 
          |     FROM products q)
          |WHERE dense_rank <= 5 
          |ORDER BY product_category_id, dense_rank
          |      
        """)).show(truncate=False)

+-------------------+----------+------------------------------------------------+-------------+----------+
|product_category_id|product_id|product_name                                    |product_price|dense_rank|
+-------------------+----------+------------------------------------------------+-------------+----------+
|2                  |16        |Riddell Youth 360 Custom Football Helmet        |299.99       |1         |
|2                  |11        |Fitness Gear 300 lb Olympic Weight Set          |209.99       |2         |
|2                  |14        |Quik Shade Summit SX170 10 FT. x 10 FT. Canop   |199.99       |3         |
|2                  |5         |Riddell Youth Revolution Speed Custom Footbal   |199.99       |3         |
|2                  |12        |Under Armour Men's Highlight MC Alter Ego Fla   |139.99       |4         |
|2                  |23        |Under Armour Men's Highlight MC Alter Ego Hul   |139.99       |4         |
|2                  |6         |Jorda

**DF API:**

In [59]:
windowSpec = Window.partitionBy(products_df['product_category_id']).orderBy(products_df['product_price'].desc())

In [60]:
(products_df
    .select('product_category_id', 'product_id', 'product_name', 'product_price', F.dense_rank().over(windowSpec).alias('dense_rank'))
    .filter(col('dense_rank') <= 5)
    .orderBy('product_category_id', 'dense_rank')
    .show(truncate=False))

+-------------------+----------+------------------------------------------------+-------------+----------+
|product_category_id|product_id|product_name                                    |product_price|dense_rank|
+-------------------+----------+------------------------------------------------+-------------+----------+
|2                  |16        |Riddell Youth 360 Custom Football Helmet        |299.99       |1         |
|2                  |11        |Fitness Gear 300 lb Olympic Weight Set          |209.99       |2         |
|2                  |14        |Quik Shade Summit SX170 10 FT. x 10 FT. Canop   |199.99       |3         |
|2                  |5         |Riddell Youth Revolution Speed Custom Footbal   |199.99       |3         |
|2                  |12        |Under Armour Men's Highlight MC Alter Ego Fla   |139.99       |4         |
|2                  |23        |Under Armour Men's Highlight MC Alter Ego Hul   |139.99       |4         |
|2                  |6         |Jorda

### 4.14 Get the Customer Id with max revenue on Daily basis:

**SQL:**

In [61]:
# 1. Join orders and order_items and group by order_date and customer_id to calculate the revenue per customer per day
# 2. Sort the above using rank() function to get the maximum revenue per day
# 3. Select only rows with rank = 1, that will give the customer_id with max revenue
# 4. Join with customers table to get the customer details
spark.sql(strip_margin(
        """SELECT a. order_date, a.order_customer_id, concat_ws(' ', c.customer_fname, c.customer_lname) as customer_name, a.order_total
          |FROM ( 
          |     SELECT *, rank() OVER (PARTITION BY b.order_date ORDER BY b.order_total DESC) as rank 
          |     FROM ( 
          |          SELECT o.order_date, o.order_customer_id, round(sum(oi.order_item_subtotal), 2) as order_total 
          |          FROM orders o INNER JOIN order_items oi 
          |              ON o.order_id = oi.order_item_order_id  
          |          WHERE o.order_status <> 'CANCELED' AND o.order_status <> 'SUSPECTED_FRAUD' 
          |          GROUP BY o.order_date, o.order_customer_id) b 
          |     ) a INNER JOIN customers c
          |           ON a.order_customer_id = c.customer_id
          |WHERE rank = 1 
          |ORDER BY order_date
          |      
        """)).show(truncate=False)

+---------------------+-----------------+----------------+-----------+
|order_date           |order_customer_id|customer_name   |order_total|
+---------------------+-----------------+----------------+-----------+
|2013-07-25 00:00:00.0|11941            |Jeffrey Pugh    |1649.8     |
|2013-07-26 00:00:00.0|32               |Alice Smith     |2009.75    |
|2013-07-27 00:00:00.0|11491            |David Smith     |1379.88    |
|2013-07-28 00:00:00.0|5738             |Mildred Taylor  |1499.87    |
|2013-07-29 00:00:00.0|2632             |John Smith      |1389.86    |
|2013-07-29 00:00:00.0|5182             |Thomas Morgan   |1389.86    |
|2013-07-30 00:00:00.0|10029            |Mary Silva      |1529.92    |
|2013-07-31 00:00:00.0|1175             |Mary Gray       |1699.91    |
|2013-08-01 00:00:00.0|9151             |Aaron Smith     |1709.82    |
|2013-08-02 00:00:00.0|5548             |Michael Crawford|1594.92    |
|2013-08-03 00:00:00.0|9572             |Mary Nelson     |1569.79    |
|2013-

**DF API:**

In [62]:
rev_per_day_per_cust = (orders_df
                         .select('order_date', 'order_id', 'order_customer_id', 'order_status')
                         .where((col('order_status') != 'CANCELED') & (col('order_status') != 'SUSPECTED_FRAUD'))
                         .join(order_items_df, orders_df.order_id == order_items_df.order_item_order_id)
                         .select(['order_date', 'order_customer_id', 'order_item_subtotal'])
                         .groupBy(['order_date', 'order_customer_id'])
                         .agg(F.round(F.sum('order_item_subtotal'), 2).alias('order_total'))).cache()

In [63]:
rev_per_day_per_cust.show(5, truncate=False)

+---------------------+-----------------+-----------+
|order_date           |order_customer_id|order_total|
+---------------------+-----------------+-----------+
|2013-07-26 00:00:00.0|7710             |1199.82    |
|2013-07-27 00:00:00.0|1180             |1129.94    |
|2013-07-30 00:00:00.0|5511             |319.97     |
|2013-07-31 00:00:00.0|12018            |347.94     |
|2013-08-03 00:00:00.0|6698             |709.94     |
+---------------------+-----------------+-----------+
only showing top 5 rows



In [64]:
windowSpec = Window.partitionBy(rev_per_day_per_cust['order_date']).orderBy(rev_per_day_per_cust['order_total'].desc())

In [65]:
top_cust_per_day_by_rev = (rev_per_day_per_cust
                           .select('*', F.rank().over(windowSpec).alias('rank'))
                           .filter(col('rank') == 1)
                           .orderBy('order_date')).cache()

In [66]:
top_cust_per_day_by_rev.show(5, truncate=False)

+---------------------+-----------------+-----------+----+
|order_date           |order_customer_id|order_total|rank|
+---------------------+-----------------+-----------+----+
|2013-07-25 00:00:00.0|11941            |1649.8     |1   |
|2013-07-26 00:00:00.0|32               |2009.75    |1   |
|2013-07-27 00:00:00.0|11491            |1379.88    |1   |
|2013-07-28 00:00:00.0|5738             |1499.87    |1   |
|2013-07-29 00:00:00.0|2632             |1389.86    |1   |
+---------------------+-----------------+-----------+----+
only showing top 5 rows



In [67]:
(top_cust_per_day_by_rev
 .join(customers_df, top_cust_per_day_by_rev.order_customer_id == customers_df.customer_id, how='inner')
 .select('order_date', 'order_customer_id', F.concat_ws(' ', 'customer_fname', 'customer_lname').alias('customer_name'), 'order_total')
 .show(truncate=False))

+---------------------+-----------------+----------------+-----------+
|order_date           |order_customer_id|customer_name   |order_total|
+---------------------+-----------------+----------------+-----------+
|2013-07-25 00:00:00.0|11941            |Jeffrey Pugh    |1649.8     |
|2013-07-26 00:00:00.0|32               |Alice Smith     |2009.75    |
|2013-07-27 00:00:00.0|11491            |David Smith     |1379.88    |
|2013-07-28 00:00:00.0|5738             |Mildred Taylor  |1499.87    |
|2013-07-29 00:00:00.0|2632             |John Smith      |1389.86    |
|2013-07-29 00:00:00.0|5182             |Thomas Morgan   |1389.86    |
|2013-07-30 00:00:00.0|10029            |Mary Silva      |1529.92    |
|2013-07-31 00:00:00.0|1175             |Mary Gray       |1699.91    |
|2013-08-01 00:00:00.0|9151             |Aaron Smith     |1709.82    |
|2013-08-02 00:00:00.0|5548             |Michael Crawford|1594.92    |
|2013-08-03 00:00:00.0|9572             |Mary Nelson     |1569.79    |
|2013-

In [68]:
spark.stop()