### Loading, preprocessing and running OLAP analyses on large datasets with pySpark + SQL


This is an example project demonstrating how to load some large datasets into Spark, 
then use the Spark.sql interface to perform OLAP analyses by running SQL queries.
The data are from the Corporacion Favorita dataset, hosted on Kaggle
(https://www.kaggle.com/c/favorita-grocery-sales-forecasting/data).
They consist of purchases made at a chain of grocery stores (Favorita) which has locations throughout Ecuador.
Each purchase has an item ID, store ID, date, and unit sales (which can be negative -> returns).
These are in the main table unit_sales.
There are two more tables with additional information:
stores: store ID | geographic data
items: item ID | product category

#### 1. Initialize Spark, load data

In [1]:
#create a spark session via spark builder
#in this instance, we are running on the local machine 
#instead of local, could also have spark://IP address : port

# e.g. spark://ec2-18-188-22-23.us-east-2.compute.amazonaws.com:7077

from pyspark.sql import SparkSession
import time

# Create SparkSession object
spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName("test") \
                    .getOrCreate()

In [2]:
folder_path = "/Users/cstoneki/Documents/data/favorita"
table_names = ["unit_sales", "stores", "items"]

In [3]:
#just print the size of the files

def format_file_size(size):
    d = [(1,"B"), (1024,"kB"), (1024**2,"MB"), (1024**3,"GB"), (1024**4,"TB")]
    d.reverse()
    for (unit, name) in d:
        if(size >= 0.1*unit or unit==1):
            return ("%.2f "+name)%(size/unit)
    


for t in table_names:
    s1 = t + ":"
    s2 = format_file_size((int(os.path.getsize(folder_path + "/" + t + ".csv"))))
    print(s1 + (30 - len(s1) - len(s2))*' ' + s2)

unit_sales:            4.65 GB
stores:                1.35 kB
items:                99.45 kB


So unit_sales is a fairly large file, but still small enough that it can be used to demonstrate use of Spark on a personal computer.

In [5]:
#use knowledge of tables to specify correct data types
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType

schemas = {}

# Specify column names and types
schemas['unit_sales'] = StructType([
    StructField("id", IntegerType()),
    StructField("date", StringType()),
    StructField("store_nbr", IntegerType()),
    StructField("item_nbr", IntegerType()),
    StructField("unit_sales", FloatType()),
    StructField("onpromotion", IntegerType())
])
schemas['stores'] = StructType([
    StructField("store_nbr", IntegerType()),
    StructField("city", StringType()),
    StructField("state", StringType()),
    StructField("type", StringType()),
    StructField("cluster", IntegerType())
])
schemas['items'] = StructType([
    StructField("item_nbr", IntegerType()),
    StructField("family", StringType()),
    StructField("class", IntegerType()),
    StructField("perishable", IntegerType())
])

In [6]:
for t in table_names:
    t0 = time.time()
    df = spark.read.csv(folder_path + "/" + t + ".csv", header=True, schema=schemas[t])
    df.createOrReplaceTempView(t)
    t_elapsed = time.time() - t0
    print("Loaded "+t+" in %.2f s"%t_elapsed)


Loaded unit_sales in 1.16 s
Loaded stores in 0.03 s
Loaded items in 0.02 s


In [7]:
#show some information on the tables - 
for t in table_names:
    print(t)
    query = "SELECT * FROM " + t + " LIMIT 3"
    spark.sql(query).show()

unit_sales
+---+----------+---------+--------+----------+-----------+
| id|      date|store_nbr|item_nbr|unit_sales|onpromotion|
+---+----------+---------+--------+----------+-----------+
|  0|2013-01-01|       25|  103665|       7.0|       null|
|  1|2013-01-01|       25|  105574|       1.0|       null|
|  2|2013-01-01|       25|  105575|       2.0|       null|
+---+----------+---------+--------+----------+-----------+

stores
+---------+-----+---------+----+-------+
|store_nbr| city|    state|type|cluster|
+---------+-----+---------+----+-------+
|        1|Quito|Pichincha|   D|     13|
|        2|Quito|Pichincha|   D|     13|
|        3|Quito|Pichincha|   D|      8|
+---------+-----+---------+----+-------+

items
+--------+---------+-----+----------+
|item_nbr|   family|class|perishable|
+--------+---------+-----+----------+
|   96995|GROCERY I| 1093|         0|
|   99197|GROCERY I| 1067|         0|
|  103501| CLEANING| 3008|         0|
+--------+---------+-----+----------+



#### 2. OLAP analyses

Favorita runs large grocery stores that sell many different types of products.
As a first step, we want to find out how much of the total sales volume each individual product family accounts for.

In [39]:
#get the percentage of total sales represented by each product family
query = """
SELECT family, 100*sales_by_family/(SUM(sales_by_family) OVER()) AS percent_sales
FROM
(SELECT i.family AS family, SUM(u.unit_sales) AS sales_by_family
 FROM unit_sales AS u
 INNER JOIN items AS i
 ON i.item_nbr = u.item_nbr
 GROUP BY i.family)
"""

df2 = spark.sql(query)

In [44]:
from pyspark.sql.functions import format_number 
df2 = df2.orderBy("family", ascending=True)
df2.select(['family', format_number(df2['percent_sales'], 2).alias('percent_sales')]).show(df2.count())

+--------------------+-------------+
|              family|percent_sales|
+--------------------+-------------+
|          AUTOMOTIVE|         0.05|
|           BABY CARE|         0.00|
|              BEAUTY|         0.03|
|           BEVERAGES|        20.21|
|               BOOKS|         0.00|
|        BREAD/BAKERY|         3.92|
|         CELEBRATION|         0.07|
|            CLEANING|         9.08|
|               DAIRY|         6.01|
|                DELI|         2.25|
|                EGGS|         1.45|
|        FROZEN FOODS|         1.31|
|           GROCERY I|        31.99|
|          GROCERY II|         0.18|
|            HARDWARE|         0.01|
|  HOME AND KITCHEN I|         0.17|
| HOME AND KITCHEN II|         0.14|
|     HOME APPLIANCES|         0.00|
|           HOME CARE|         1.49|
|          LADIESWEAR|         0.06|
|     LAWN AND GARDEN|         0.05|
|            LINGERIE|         0.06|
|    LIQUOR,WINE,BEER|         0.72|
|           MAGAZINES|         0.02|
|

The main categories are beverages, grocery I and produce. Next, we want to break down the purchases by geographic location.

In [35]:
#what geographic information do we have?
spark.sql("SELECT DISTINCT city FROM stores").show()
spark.sql("SELECT DISTINCT state FROM stores").show()

+-------------+
|         city|
+-------------+
|      Quevedo|
|       Cuenca|
|     Guaranda|
|Santo Domingo|
|       Playas|
|         Puyo|
|        Quito|
|        Manta|
|    Latacunga|
|    Guayaquil|
|         Loja|
|       Ibarra|
|    El Carmen|
|       Ambato|
|      Machala|
|        Daule|
|      Cayambe|
|      Salinas|
|     Libertad|
|     Babahoyo|
+-------------+
only showing top 20 rows

+--------------------+
|               state|
+--------------------+
|              Manabi|
|            Cotopaxi|
|           Pichincha|
|          Chimborazo|
|              Guayas|
|                Loja|
|         Santa Elena|
|            Imbabura|
|              El Oro|
|               Azuay|
|             Bolivar|
|          Tungurahua|
|             Pastaza|
|Santo Domingo de ...|
|            Los Rios|
|          Esmeraldas|
+--------------------+



The states can be divided into three regions, introduce these to get a more meaningful, larger-scale grouping.

In [27]:
#generate a new table in the spark session, linking states to regions
#this will normally be saved and available in subsequent sessions
#to make things simple, clear it in each run since it's very small
_ = spark.sql("DROP TABLE regions")
_ = spark.sql("CREATE TABLE IF NOT EXISTS regions(region varchar(255), state varchar(255))")

In [28]:
#check if regions already contains any data, insert if it does not
if(spark.sql("SELECT * FROM regions LIMIT 1").count() < 1):

    la_sierra_states = ["Carchi", "Imbabura", "Pichincha",\
                    "Cotopaxi", "Tungurahua", "Bolivar",\
                    "Chimborazo", "Canar", "Azuay", "Loja"]

    la_costa_states  = ["Esmeraldas", "Manabi", "Santo Domingo de ...",\
                        "Los Rios", "Santa Elena", "Guayas", "El Oro"]
    
    el_oriente_states = ["Sucumbios", "Napo", "Orellana", "Pastaza", "Morona Santiago", "Zamora Chinchipe"]
    
    #insert records
    #note that spark requires INSERT INTO ... SELECT
    #INSERT INTO ... VALUES is not implemented
    for state in la_sierra_states:
        spark.sql("INSERT INTO regions SELECT temp.* FROM (SELECT 'La Sierra', " + "'" + state +"'" + ") temp")
    for state in la_costa_states:
        spark.sql("INSERT INTO regions SELECT temp.* FROM (SELECT 'La Costa', " + "'" + state +"'" + ") temp")
    for state in el_oriente_states:
        spark.sql("INSERT INTO regions SELECT temp.* FROM (SELECT 'El Oriente', " + "'" + state +"'" + ") temp")

In [29]:
spark.sql("SELECT * FROM regions").show()

+----------+--------------------+
|    region|               state|
+----------+--------------------+
|  La Costa|              El Oro|
|  La Costa|            Los Rios|
| La Sierra|               Azuay|
| La Sierra|          Chimborazo|
|El Oriente|    Zamora Chinchipe|
|El Oriente|             Pastaza|
| La Sierra|            Imbabura|
|  La Costa|          Esmeraldas|
|El Oriente|     Morona Santiago|
|  La Costa|         Santa Elena|
|  La Costa|              Guayas|
| La Sierra|          Tungurahua|
|  La Costa|              Manabi|
|El Oriente|            Orellana|
| La Sierra|                Loja|
| La Sierra|               Canar|
| La Sierra|              Carchi|
| La Sierra|             Bolivar|
|  La Costa|Santo Domingo de ...|
| La Sierra|            Cotopaxi|
+----------+--------------------+
only showing top 20 rows



Now use GROUP BY ROLLUP to get a pivot table, joining data from all tables:

In [34]:
#get a summary of sales by product family, further broken down into region
query = """
SELECT i.family, r.region, SUM(unit_sales) AS total_sales
FROM unit_sales AS u 
LEFT JOIN items AS i
ON i.item_nbr = u.item_nbr
LEFT JOIN stores AS s
ON s.store_nbr = u.store_nbr
LEFT JOIN regions AS r
ON r.state = s.state
WHERE r.region IS NOT NULL AND i.family IS NOT NULL
GROUP BY ROLLUP(i.family, r.region)
"""


df = spark.sql(query)

In [49]:
df = df.orderBy(["family", "region"], ascending=True)
df.select(['family', 'region', format_number(df['total_sales'], 2).alias('total_sales')]).show(df.count())

+--------------------+----------+----------------+
|              family|    region|     total_sales|
+--------------------+----------+----------------+
|                null|      null|1,037,775,952.75|
|          AUTOMOTIVE|      null|      528,085.00|
|          AUTOMOTIVE|El Oriente|        3,677.00|
|          AUTOMOTIVE|  La Costa|      149,390.00|
|          AUTOMOTIVE| La Sierra|      375,018.00|
|           BABY CARE|      null|        9,189.00|
|           BABY CARE|El Oriente|          196.00|
|           BABY CARE|  La Costa|        3,456.00|
|           BABY CARE| La Sierra|        5,537.00|
|              BEAUTY|      null|      324,079.00|
|              BEAUTY|El Oriente|        1,050.00|
|              BEAUTY|  La Costa|       67,781.00|
|              BEAUTY| La Sierra|      255,248.00|
|           BEVERAGES|      null|  210,618,755.00|
|           BEVERAGES|El Oriente|      857,889.00|
|           BEVERAGES|  La Costa|   59,062,960.00|
|           BEVERAGES| La Sierr

The final pivot table shows total sales by product family, further subdivided by region.

Finally, we want to look at seasonal (month-by-month) trends in purchases, focusing on a few classes of items.

In [53]:
query = """
SELECT MONTH(CAST(u.date AS DATE)) AS month, SUM(u.unit_sales) AS total_sales
FROM unit_sales as u
LEFT JOIN items AS i
ON i.item_nbr = u.item_nbr
WHERE i.family IN ('LAWN AND GARDEN')
GROUP BY month
ORDER BY month
"""
df3 = spark.sql(query)

In [55]:
df3 = df3.orderBy(["month"], ascending=True)
df3.select(['month', format_number(df3['total_sales'], 2).alias('total_sales')]).show(df3.count())

+-----+-----------+
|month|total_sales|
+-----+-----------+
|    1|  48,207.00|
|    2|  53,696.00|
|    3|  52,824.00|
|    4|  49,691.00|
|    5|  63,874.00|
|    6|  53,628.00|
|    7|  54,999.00|
|    8|  40,458.00|
|    9|  27,286.00|
|   10|  29,538.00|
|   11|  28,116.00|
|   12|  46,521.00|
+-----+-----------+



The sales of Lawn and Garden products show some fairly strong seasonality. Total sales in the fall months (Sep-Nov) are around 60% of those in the winter-spring-early summer months (Jan-Jul).

This concludes the example analyses using pySpark + SQL. A worthwhile next step would be to integrate Spark with Tableau to create visualizations.