# PySpark in the context of sales data 

Sales and customer data provide important insight for companies. Analytics has today an important role in providing data about sales trends, customer profiles, warnings about (un)profitable products, sales volume forecasts and stock supply as well as personalizing customer relationships and giving product recommendations. All this information needs to be delivered in time, preferably in a (semi)automated fashion.

These data can become large to the point that analytical tools and software have performance issues. For example, many businesses embrace spreadsheet software like Excel for understandable reasons, but it would breakdown with large data. Many data scientist work with Python/R, but modules like Pandas would become slow and run out of memory with large data as well.

Apache Spark enables large and big data analyses. It does this by using parallel processing using different threads and cores optimally. It can therefore improve performance on a cluster but also on a single machine [1]. It can do this for (i) unstructured data such as text or for (ii) structured data such as DataFrames arranged in columns. Spark does not require loading all data into memory before processing and it is faster than for example Hadoop. 
Spark is a multi-language tool. Interfaces exist for Scala, Java, Python and R users, and it can be used in the cloud. These and other features make it a suitable platform for large scale data analyses. Google trends suggests that PySpark -- Spark with a Python interface -- enjoys increasing popularity.

The following blog shows a detailed short example using PySpark in the context of the Online retail sales data [2]. These are data that are arranged in column format, containing for example invoice number, invoice dates, quantity, price, product description. The chosen data serve as an example and the size would still work in Pandas for most single machine users, even though it would be slower.

The current blog does not provide a benchmark as done previously [1]. It rather gives hands-on analytical steps with code (i.e., concatenate data, removal of data records, renaming columns, replacing strings, casting data types, creation of new features, filtering data). It therefore allows a first glimpse into the world of PySpark. 

**IMPORT MODULES**

The Jupyter notebook file *PySpark-retaildata.ipynb* can be found at GitHub: https://github.com/RRighart/Retail
The notebook can be run in Google Colab. To install PySpark, type in the first cell `!pip install pyspark`. 

An alternative is using Docker. A PySpark notebook can be started with https://github.com/jupyter/docker-stacks/tree/master/pyspark-notebook .

After PySpark is installed and the Jupyter notebook is up and running, we first need import the modules and create a Spark session:

In [1]:
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd
import json
import pyspark.sql.functions as F
import pyspark.sql
from pyspark.sql.functions import col, skewness, kurtosis
from pyspark.context import SparkContext
from pyspark.sql.functions import * 
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import when
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.functions import from_unixtime, unix_timestamp
from pyspark.sql.types import StringType
from datetime import date, timedelta, datetime
import time

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
spark.version

'2.4.5'

Note that the Spark version used for here is 2.4.5, which can be found by the command `spark.version`.

**LOADING AND DISPLAYING THE DATA**

The Online retail data [2] can be downloaded from http://archive.ics.uci.edu/ml/machine-learning-databases/00502/ . The data sheets should be converted to online1.csv and online2.csv to facilitate loading from disk. The command `pwd` or `os.getcwd()` can be used to find the current directory from which PySpark will load the files. Below it can be seen that PySpark only takes a couple of seconds whereas Pandas would take a couple of minutes on the same machine. 

In [4]:
def load_data(data):
    """data="online1.csv" or "online2.csv" """
    t1=time.time()
    dat = spark.read.options(header=True, inferSchema=True).csv(data)
    t2=time.time()
    print("Duration:", np.round((t2-t1), 2), "seconds")
    return(dat)

In [5]:
df1 = load_data('online1.csv')

Duration: 5.44 seconds


In [6]:
df2 = load_data('online2.csv')

Duration: 1.76 seconds


Size of the DataFrames (or shapes in terms of Pandas) can be obtained with the following code:

In [7]:
def datashape(data):
    print("Data shape (rows, columns):", data.count(), "x", len(data.columns))
    
datashape(df1)
datashape(df2)

Data shape (rows, columns): 525461 x 8
Data shape (rows, columns): 541910 x 8


To have all the data together in one DataFrame, df1 and df2 will be concatenated vertically.

In [8]:
df = df1.unionByName(df2)

datashape(df)

Data shape (rows, columns): 1067371 x 8


The following displays the first 5 rows. The command `.limit(5)` will be used frequently throughout the text, which is comparable to the equivalent `.head(5)` in Pandas, to set the number of rows that is displayed.

In [9]:
df.limit(5).show()

+-------+---------+--------------------+--------+--------------+-----+-----------+--------------+
|Invoice|StockCode|         Description|Quantity|   InvoiceDate|Price|Customer ID|       Country|
+-------+---------+--------------------+--------+--------------+-----+-----------+--------------+
| 489434|    85048|15CM CHRISTMAS GL...|      12|12/1/2009 7:45| 6.95|      13085|United Kingdom|
| 489434|   79323P|  PINK CHERRY LIGHTS|      12|12/1/2009 7:45| 6.75|      13085|United Kingdom|
| 489434|   79323W| WHITE CHERRY LIGHTS|      12|12/1/2009 7:45| 6.75|      13085|United Kingdom|
| 489434|    22041|"RECORD FRAME 7""...|      48|12/1/2009 7:45|  2.1|      13085|United Kingdom|
| 489434|    21232|STRAWBERRY CERAMI...|      24|12/1/2009 7:45| 1.25|      13085|United Kingdom|
+-------+---------+--------------------+--------+--------------+-----+-----------+--------------+



This is the traditional Spark DataFrame output.
By using the following setting we will get from now on a Pandas-like output.

In [10]:
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

A number of descriptive statistics can be obtained, like count, standard deviation, mean, minimum and maximum.

In [11]:
summary = df.describe().toPandas()
summary = summary.T
summary.columns = summary.iloc[0]
summary = summary.drop(summary.index[0])
summary

summary,count,mean,stddev,min,max
Invoice,1067371,537608.1499316233,26662.4504469045,489434,C581569
StockCode,1067371,29011.161534536903,18822.94286618918,10002,m
Description,1062989,21848.25,922.9197780233488,DOORMAT UNION JACK GUNS AND ROSES,wrongly sold sets
Quantity,1067371,9.9388984711033,172.7057940767536,-80995,80995
InvoiceDate,1067371,,,1/10/2010 10:26,9/9/2011 9:52
Price,1067371,4.649387727417394,123.55305872146346,-53594.36,38970.0
Customer ID,824364,15324.63850435002,1697.4644503793093,12346,18287
Country,1067371,,,Australia,West Indies


The table column *count* suggests that there are missing values in *Description* and *Customer ID*. The number of NaN is displayed by the following code:

In [12]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns])

Invoice,StockCode,Description,Quantity,InvoiceDate,Price,Customer ID,Country
0,0,4382,0,0,0,243007,0


Cancelled transactions start with a capital C in the column Invoice. These will be removed by the following.

In [13]:
df = df[~df['Invoice'].startswith("C")]

The datashape function confirms that cancelled transactions have been removed.

In [14]:
datashape(df)

Data shape (rows, columns): 1047877 x 8


**CHANGING NAME AND DATA TYPE**

The columnname *Customer ID* contains an annoying white space that under certain circumstances can cause problems. So we should better rename that.

In [15]:
df = df.withColumnRenamed('Customer ID', 'CustomerID')

Further, the *Country* EIRE can be replaced by Ireland. The result can be verified by `df.filter(df.Country == "Ireland").limit(5)`

In [16]:
df = df.replace(['EIRE'],['Ireland'])

Information about the column type can be obtained. If you want some supplementary information, an alternative command is `df.explain(df)` .

In [17]:
df.schema

StructType(List(StructField(Invoice,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(Price,DoubleType,true),StructField(CustomerID,IntegerType,true),StructField(Country,StringType,true)))

Knowing the column types, we could now cast the column Quantity from integer to float.

In [18]:
df = df.withColumn("Quantity", col("Quantity").cast("Float"))

The *InvoiceDate* shows up as a string. In order to exploit the time-related information, it would be best to convert it to date type. We will do this by creating a new column *n_InvoiceDate*.

In [19]:
df = df.withColumn("n_InvoiceDate", from_unixtime(unix_timestamp('InvoiceDate', 'MM/dd/yyyy HH:mm')).alias('n_InvoiceDate'))

Sorting the new dates, we can now display the number of records (rows) as a function of date and time.

In [20]:
df.groupby("n_InvoiceDate").count().sort("n_InvoiceDate", ascending=True).limit(10)

n_InvoiceDate,count
2009-12-01 07:45:00,8
2009-12-01 07:46:00,4
2009-12-01 09:06:00,19
2009-12-01 09:08:00,23
2009-12-01 09:24:00,17
2009-12-01 09:28:00,19
2009-12-01 09:43:00,2
2009-12-01 09:44:00,4
2009-12-01 09:46:00,23
2009-12-01 09:50:00,7


It is also possible to display a range of dates. The following spots all sales at 15 Jan 2010 from 8 till 10 o'clock.

In [21]:
df[(df["n_InvoiceDate"]> '2010-01-15 08:00:00') & (df["n_InvoiceDate"]< '2010-01-15 10:00:00') ]

Invoice,StockCode,Description,Quantity,InvoiceDate,Price,CustomerID,Country,n_InvoiceDate
494508,84380,SET OF 3 BUTTERFL...,24.0,1/15/2010 8:13,1.25,16353,United Kingdom,2010-01-15 08:13:00
494508,21174,POTTERING IN THE ...,12.0,1/15/2010 8:13,1.95,16353,United Kingdom,2010-01-15 08:13:00
494508,21041,RED SPOTTY OVEN G...,12.0,1/15/2010 8:13,2.95,16353,United Kingdom,2010-01-15 08:13:00
494508,21154,RED SPOTTY OVEN G...,20.0,1/15/2010 8:13,1.25,16353,United Kingdom,2010-01-15 08:13:00
494509,21700,BIG DOUGHNUT FRID...,36.0,1/15/2010 8:14,0.85,16353,United Kingdom,2010-01-15 08:14:00
494509,84378,SET OF 3 HEART CO...,12.0,1/15/2010 8:14,1.25,16353,United Kingdom,2010-01-15 08:14:00
494510,21733,RED HANGING HEART...,6.0,1/15/2010 8:59,2.55,17850,United Kingdom,2010-01-15 08:59:00
494510,85123A,WHITE HANGING HEA...,6.0,1/15/2010 8:59,2.55,17850,United Kingdom,2010-01-15 08:59:00
494510,15056N,EDWARDIAN PARASOL...,6.0,1/15/2010 8:59,4.95,17850,United Kingdom,2010-01-15 08:59:00
494510,82483,WOOD 2 DRAWER CAB...,4.0,1/15/2010 8:59,4.95,17850,United Kingdom,2010-01-15 08:59:00


**CONVERTING DATA**

Time-serie analyses often require different time units, like seconds, minutes, hours, days, weeks, months, years. For example, if we want to display the sales per week, we could use the function `weekofyear` that translates the date to week.

In [22]:
df = df.withColumn("weekofyear", weekofyear("n_InvoiceDate"))

Multiple columns can be used in a computation. The total *Amount* a customer spent can be computed by multiplication of the *Price* of a single product with its *Quantity*.

In [23]:
df = df.withColumn("Amount", col("Quantity") * col("Price"))
df.limit(5)

Invoice,StockCode,Description,Quantity,InvoiceDate,Price,CustomerID,Country,n_InvoiceDate,weekofyear,Amount
489434,85048,15CM CHRISTMAS GL...,12.0,12/1/2009 7:45,6.95,13085,United Kingdom,2009-12-01 07:45:00,49,83.4
489434,79323P,PINK CHERRY LIGHTS,12.0,12/1/2009 7:45,6.75,13085,United Kingdom,2009-12-01 07:45:00,49,81.0
489434,79323W,WHITE CHERRY LIGHTS,12.0,12/1/2009 7:45,6.75,13085,United Kingdom,2009-12-01 07:45:00,49,81.0
489434,22041,"""RECORD FRAME 7""""...",48.0,12/1/2009 7:45,2.1,13085,United Kingdom,2009-12-01 07:45:00,49,100.8
489434,21232,STRAWBERRY CERAMI...,24.0,12/1/2009 7:45,1.25,13085,United Kingdom,2009-12-01 07:45:00,49,30.0


**FILTERING DATA**

How to select rows that contain specific products? We can use the command `isin`, which is very similar to the Pandas isin function:

In [24]:
df[df.Description.isin('WHITE HANGING HEART T-LIGHT HOLDER')].limit(5)

Invoice,StockCode,Description,Quantity,InvoiceDate,Price,CustomerID,Country,n_InvoiceDate,weekofyear,Amount
489442,85123A,WHITE HANGING HEA...,6.0,12/1/2009 9:46,2.95,13635,United Kingdom,2009-12-01 09:46:00,49,17.700000000000003
489446,85123A,WHITE HANGING HEA...,32.0,12/1/2009 10:06,2.55,13758,United Kingdom,2009-12-01 10:06:00,49,81.6
489465,85123A,WHITE HANGING HEA...,64.0,12/1/2009 10:52,2.55,13767,United Kingdom,2009-12-01 10:52:00,49,163.2
489517,85123A,WHITE HANGING HEA...,1.0,12/1/2009 11:34,2.95,16329,United Kingdom,2009-12-01 11:34:00,49,2.95
489519,85123A,WHITE HANGING HEA...,32.0,12/1/2009 11:37,2.55,17700,United Kingdom,2009-12-01 11:37:00,49,81.6


If we want to search our data by key word, we would use the command `like`.

In [25]:
df[df.Description.like('%WHITE%')].limit(5)

Invoice,StockCode,Description,Quantity,InvoiceDate,Price,CustomerID,Country,n_InvoiceDate,weekofyear,Amount
489434,79323W,WHITE CHERRY LIGHTS,12.0,12/1/2009 7:45,6.75,13085,United Kingdom,2009-12-01 07:45:00,49,81.0
489436,22142,CHRISTMAS CRAFT W...,12.0,12/1/2009 9:06,1.45,13078,United Kingdom,2009-12-01 09:06:00,49,17.4
489436,21333,CLASSIC WHITE FRAME,6.0,12/1/2009 9:06,2.95,13078,United Kingdom,2009-12-01 09:06:00,49,17.700000000000003
489439,85014B,RED/WHITE DOTS RU...,3.0,12/1/2009 9:28,5.95,12682,France,2009-12-01 09:28:00,49,17.85
489441,84029E,RED WOOLLY HOTTIE...,36.0,12/1/2009 9:44,2.95,18087,United Kingdom,2009-12-01 09:44:00,49,106.2


To find big buyers, probably organizational customers, select rows where *Quantity* is larger than 50000. Note that `df.where(df.Quantity > 50000)` would give a similar result.

In [26]:
df.filter(df.Quantity > 50000)

Invoice,StockCode,Description,Quantity,InvoiceDate,Price,CustomerID,Country,n_InvoiceDate,weekofyear,Amount
541431,23166,MEDIUM CERAMIC TO...,74215.0,1/18/2011 10:01,1.04,12346,United Kingdom,2011-01-18 10:01:00,3,77183.6
581483,23843,"PAPER CRAFT , LIT...",80995.0,12/9/2011 9:15,2.08,16446,United Kingdom,2011-12-09 09:15:00,49,168469.6


Let us count the number of data records per country and sort the output, which shows that UK is clearly leading the list.

In [27]:
df.groupBy("Country").count().sort("count", ascending=False).limit(10)

Country,count
United Kingdom,964680
Ireland,17354
Germany,16703
France,13941
Netherlands,5093
Spain,3720
Switzerland,3137
Belgium,3069
Portugal,2562
Australia,1815


Sort can be used for *InvoiceDate* as well. This can show which hours customers purchase preferentially.

In [28]:
df.groupby("n_InvoiceDate").count().sort("n_InvoiceDate", ascending=True).limit(10)

n_InvoiceDate,count
2009-12-01 07:45:00,8
2009-12-01 07:46:00,4
2009-12-01 09:06:00,19
2009-12-01 09:08:00,23
2009-12-01 09:24:00,17
2009-12-01 09:28:00,19
2009-12-01 09:43:00,2
2009-12-01 09:44:00,4
2009-12-01 09:46:00,23
2009-12-01 09:50:00,7


**USING SQL IN PYSPARK**

Luckily, Spark supports SQL - Structured Query Language - which traditionally has an important role in managing relational databases. Using for example SQL queries, a subset of the data can be exported to CRM and KPI sales software. This offers a lot of flexibility for data analyses. Let us experiment with some very useful SQL queries, such as select and filter. We first need to register the DataFrame as a temporary table in the SQLContext. 

In [29]:
df.registerTempTable("df")

We first select a couple of columns, for example *Description* and *Quantity*. If you want to select all columns, simply use the star: `spark.sql("select * from df").show(3)`

In [30]:
spark.sql("select Description, Quantity from df").limit(5)

Description,Quantity
15CM CHRISTMAS GL...,12.0
PINK CHERRY LIGHTS,12.0
WHITE CHERRY LIGHTS,12.0
"""RECORD FRAME 7""""...",48.0
STRAWBERRY CERAMI...,24.0


Select the columns *Description* and *Quantity* and only those rows where *Quantity* has value = 6

In [31]:
spark.sql("select Description, Quantity from df where Quantity = 6").limit(5)

Description,Quantity
CLASSIC WHITE FRAME,6.0
CHRISTMAS CRAFT H...,6.0
CHRISTMAS CRAFT H...,6.0
STRIPES DESIGN MO...,6.0
FELTCRAFT DOLL ROSIE,6.0


Select the columns *Description*, *Quantity*, and *Country* where *Quantity* has value = 6 and *country* is United Kingdom.

In [32]:
spark.sql("select Description, Quantity, Country from df where Quantity=6 AND Country = 'United Kingdom'").limit(5)

Description,Quantity,Country
CLASSIC WHITE FRAME,6.0,United Kingdom
CHRISTMAS CRAFT H...,6.0,United Kingdom
CHRISTMAS CRAFT H...,6.0,United Kingdom
STRIPES DESIGN MO...,6.0,United Kingdom
FELTCRAFT DOLL ROSIE,6.0,United Kingdom


SQL can also be used to show distinct (unique) values in a column. To limit space, only five are displayed here.

In [33]:
spark.sql("SELECT DISTINCT Country from df").limit(5)

Country
Sweden
Singapore
Germany
RSA
France


And we can count the number of distinct values, that is how many countries are in total in the data.

In [34]:
spark.sql("SELECT COUNT(DISTINCT Country) from df")

count(DISTINCT Country)
43


Using SQL we can also exclude certain values. For example, exclude all records with United Kingdom. 

In [35]:
spark.sql("select Description, Quantity, Country from df where Quantity=6 AND NOT Country = 'United Kingdom'").limit(5)

Description,Quantity,Country
SET OF THREE VINT...,6.0,France
SET/3 RUSSIAN DOL...,6.0,France
CREAM FELT EASTER...,6.0,Australia
POTTING SHED TWINE,6.0,Australia
FOUR HOOK WHITE ...,6.0,Ireland


It would be possible to add a new column that categorizes UK (1) or not-UK (0). You could then use `df.filter(df.Country == "United Kingdom").limit(5)` and `filter(df.Country == "France").limit(5)` to check if the column is correctly coding UK versus not-UK.

In [36]:
df = df.withColumn('Country_UK', F.lit(0))
df = df.withColumn("Country_UK", when(df["Country"] == 'United Kingdom', 1).otherwise(df["Country_UK"]))

Next, we could count UK versus not-UK using the new column Country_UK.

In [37]:
df.groupBy("Country_UK").count()

Country_UK,count
1,964680
0,83197


And compute mean *Amount* and summed *Quantity*.

In [38]:
df.groupBy("Country_UK").agg({"Amount": "mean", "Quantity": "sum"})

Country_UK,avg(Amount),sum(Quantity)
1,18.360805279469652,9061154.0
0,37.28039221366158,2038330.0


**FINAL THOUGHTS**

That is it for now! In the current blog I have chosen to write out the code line by line. The code can of course be wrapped in several functions with the advantage that it can be used as a pre-processing script for different datasets, where you can set your own parameters. Further, smaller subsets of data can easily be exported and visualized in for ex. Plotly.

There are many other exciting features and developments in Spark:
1. Koalas is a Pandas API in Apache Spark, with similar capabilities but in a big data environment. This is particularly good news for people who already work in Pandas and need a quick translation to PySpark of their code.
2. Pandas UDF is a new feature that allows parallel processing on Pandas DataFrames.
3. There are excellent solutions using PySpark in the cloud. For example, AWS has big data platforms such as Elastic Map Reduce (EMR) that support PySpark.
4. Spark streaming allows real-time data analysis.
5. MLlib allows scalable machine learning in Spark.
6. GraphX enables graph computations.

If you have any questions or anything you think is missing in this tutorial, please feel free to share. Any suggestions for topics are welcome!

**REFERENCES**

[1] Wang, G., Xin, R., and Damji, J. (2018). Benchmarking Apache Spark on a single node machine. https://databricks.com/blog/2018/05/03/benchmarking-apache-spark-on-a-single-node-machine.html

[2] Chen, D., Laing Sain, S., Guo, K. (2012) Data mining for the online retail industry. A case
study of RFM model-based customer segmentation using data mining. Journal of database marketing & customer strategy management, 19, 197-208.