# Tutorial 1

The link to the tutorial page is: https://www.datacamp.com/tutorial/pyspark-tutorial-getting-started-with-pyspark

Along side with pyspark syntax, I am using pandas for a comparison between both solutions.

## Installing everything needed for this notebook to work

To create an environment to make thi snotebook work, you need to have anaconda available on your computer and then:

```bash
conda create -n da python=3.8 pandas
conda activate da
conda install pyspark jupyter seaborn pyarrow
```

## Setting up the notebook

In [171]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [96]:
from pyspark.sql import SparkSession
# from pyspark.sql.functions import regexp_replace, count_distinct, count, desc, asc, col
# from pyspark.sql.functions import to_timestamp, udf, sum, first
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, FloatType, StringType, IntegerType, LongType

import pandas as pd
import numpy as np

In [3]:
spark = (
    SparkSession.builder
    .appName("Datacamp Pyspark Tutorial")
    .config("spark.memory.offHeap.enabled","true")
    .config("spark.memory.offHeap.size","10g")
    .getOrCreate()
)

23/01/02 18:57:15 WARN Utils: Your hostname, kango-linux resolves to a loopback address: 127.0.1.1; using 192.168.0.45 instead (on interface enp4s0)
23/01/02 18:57:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/01/02 18:57:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Dataset

The dataset used for this tutorial is taken from here:

https://archive.ics.uci.edu/ml/datasets/online+retail#

This is a transnational data set which contains all the transactions occurring between 01/12/2010 and 09/12/2011 for a UK-based and registered non-store online retail.The company mainly sells unique all-occasion gifts. Many customers of the company are wholesalers.

Dataset columns:

- **InvoiceNo**: Invoice number. Nominal, a 6-digit integral number uniquely assigned to each transaction. If this code starts with letter 'c', it indicates a cancellation.
- **StockCode**: Product (item) code. Nominal, a 5-digit integral number uniquely assigned to each distinct product.
- **Description**: Product (item) name. Nominal.
- **Quantity**: The quantities of each product (item) per transaction. Numeric.
- **InvoiceDate**: Invice Date and time. Numeric, the day and time when each transaction was generated.
- **UnitPrice**: Unit price. Numeric, Product price per unit in sterling.
- **CustomerID**: Customer number. Nominal, a 5-digit integral number uniquely assigned to each customer.
- **Country**: Country name. Nominal, the name of the country where each customer resides.

**Citation:** Daqing Chen, Sai Liang Sain, and Kun Guo, Data mining for the online retail industry: A case study of RFM model-based customer segmentation using data mining, Journal of Database Marketing and Customer Strategy Management, Vol. 19, No. 3, pp. 197â€“208, 2012 (Published online before print: 27 August 2012. doi: 10.1057/dbm.2012.17).

Dua, D. and Graff, C. (2019). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science.

## Reading the dataset

In [4]:
df_spark = spark.read.csv("/files/cedric/datasets/uci/online_retail.csv", header=True)

In [5]:
df_pandas = pd.read_csv("/files/cedric/datasets/uci/online_retail.csv")

In [6]:
df_spark

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: string, InvoiceDate: string, UnitPrice: string, CustomerID: string, Country: string]

In [7]:
df_pandas

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,01/12/2010 08:26,255,17850.0,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,01/12/2010 08:26,339,17850.0,United Kingdom
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,01/12/2010 08:26,275,17850.0,United Kingdom
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,01/12/2010 08:26,339,17850.0,United Kingdom
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,01/12/2010 08:26,339,17850.0,United Kingdom
...,...,...,...,...,...,...,...,...
541904,581587,22613,PACK OF 20 SPACEBOY NAPKINS,12,09/12/2011 12:50,085,12680.0,France
541905,581587,22899,CHILDREN'S APRON DOLLY GIRL,6,09/12/2011 12:50,21,12680.0,France
541906,581587,23254,CHILDRENS CUTLERY DOLLY GIRL,4,09/12/2011 12:50,415,12680.0,France
541907,581587,23255,CHILDRENS CUTLERY CIRCUS PARADE,4,09/12/2011 12:50,415,12680.0,France


By default, printing the `df_spark` lists the columns name and type, whereas pandas displays a preview of the dataset.

We can have a look at the *schema* of the dataset. pyspark uses `printSchema` method when pandas uses `info`:

In [8]:
df_spark.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)



In [9]:
df_pandas.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 541909 entries, 0 to 541908
Data columns (total 8 columns):
 #   Column       Non-Null Count   Dtype  
---  ------       --------------   -----  
 0   InvoiceNo    541909 non-null  object 
 1   StockCode    541909 non-null  object 
 2   Description  540455 non-null  object 
 3   Quantity     541909 non-null  int64  
 4   InvoiceDate  541909 non-null  object 
 5   UnitPrice    541909 non-null  object 
 6   CustomerID   406829 non-null  float64
 7   Country      541909 non-null  object 
dtypes: float64(1), int64(1), object(6)
memory usage: 33.1+ MB


A noticeable difference is that all columns have been read as strings in pyspark. In pandas this is a bit better, the UnitPrice is still showing as a string (object).

With pyspark it is also possible to infer the schema:

In [10]:
df_spark = spark.read.csv("/files/cedric/datasets/uci/online_retail.csv", header=True, inferSchema=True)

In [11]:
df_spark

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: string, CustomerID: int, Country: string]

Ok, now *Quantity* and *CustomerId* are interpreted as integers. On the other side, pandas has interpreted *CustomerId* as a float number. In both cases the *UnitPrice* is interpreted as a string because the csv document uses `,` as decimal separator. So we need further tunings:

In [12]:
df_pandas = pd.read_csv(
    "/files/cedric/datasets/uci/online_retail.csv", 
    decimal=",", 
    dtype={"CustomerID": "Int64"},  # Int64 is a nullable integer type in pandas, it won't work with int or int64
)

In [13]:
df_pandas.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 541909 entries, 0 to 541908
Data columns (total 8 columns):
 #   Column       Non-Null Count   Dtype  
---  ------       --------------   -----  
 0   InvoiceNo    541909 non-null  object 
 1   StockCode    541909 non-null  object 
 2   Description  540455 non-null  object 
 3   Quantity     541909 non-null  int64  
 4   InvoiceDate  541909 non-null  object 
 5   UnitPrice    541909 non-null  float64
 6   CustomerID   406829 non-null  Int64  
 7   Country      541909 non-null  object 
dtypes: Int64(1), float64(1), int64(1), object(5)
memory usage: 33.6+ MB


Ok, it's better. With pyspark we cannot apply exactly the same strategy and we need to fix the data type after reading. We could specify our own schema instead of using schema inferrence as above:

In [14]:
schema = StructType([
    StructField("InvoiceNo", StringType(), True),  # The last argument enables to state whether the field is nullable
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", StringType(), True),
    StructField("UnitPrice", FloatType(), True),  
    StructField("CustomerId", IntegerType(), True),
    StructField("Country", StringType(), True),
])
df_spark = spark.read.csv("/files/cedric/datasets/uci/online_retail.csv", header=True, schema=schema)

The problem is the following. As *UnitPrice* cannot be converted to float due to the `,` separator, we need to set the schema as a string type and then change the type:

In [15]:
df_spark.select("UnitPrice").show(5)

+---------+
|UnitPrice|
+---------+
|     null|
|     null|
|     null|
|     null|
|     null|
+---------+
only showing top 5 rows



In [16]:
schema = StructType([
    StructField("InvoiceNo", StringType(), True),  # The last argument enables to state whether the field is nullable
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", StringType(), True),
    StructField("UnitPrice", StringType(), True),  
    StructField("CustomerId", IntegerType(), True),
    StructField("Country", StringType(), True),
])
df_spark = spark.read.csv("/files/cedric/datasets/uci/online_retail.csv", header=True, schema=schema)

In [17]:
df_spark = df_spark.withColumn("UnitPrice", F.regexp_replace("UnitPrice", ",", "."))
# The UnitPrice appearing here: ^                           ^
# is the name of the output column.                         |
# The UnitPrice appearing here:------------------------------
# is the name of the input column.
df_spark = df_spark.withColumn("UnitPrice", df_spark["UnitPrice"].cast(FloatType()))

In [18]:
df_spark.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: float (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Country: string (nullable = true)



In [19]:
df_spark.select("UnitPrice").show(5)

+---------+
|UnitPrice|
+---------+
|     2.55|
|     3.39|
|     2.75|
|     3.39|
|     3.39|
+---------+
only showing top 5 rows



All good !

Let's assume that we wanted to make the modification after reading with pandas:

In [20]:
df_pandas = pd.read_csv(
    "/files/cedric/datasets/uci/online_retail.csv",  
    dtype={"CustomerID": "Int64"},  # Int64 is a nullable integer type in pandas, it won't work with int or int64
)

In [21]:
df_pandas.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 541909 entries, 0 to 541908
Data columns (total 8 columns):
 #   Column       Non-Null Count   Dtype 
---  ------       --------------   ----- 
 0   InvoiceNo    541909 non-null  object
 1   StockCode    541909 non-null  object
 2   Description  540455 non-null  object
 3   Quantity     541909 non-null  int64 
 4   InvoiceDate  541909 non-null  object
 5   UnitPrice    541909 non-null  object
 6   CustomerID   406829 non-null  Int64 
 7   Country      541909 non-null  object
dtypes: Int64(1), int64(1), object(6)
memory usage: 33.6+ MB


In [22]:
df_pandas["UnitPrice"] = df_pandas["UnitPrice"].str.replace(",", ".").astype("float")

In [23]:
df_pandas.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 541909 entries, 0 to 541908
Data columns (total 8 columns):
 #   Column       Non-Null Count   Dtype  
---  ------       --------------   -----  
 0   InvoiceNo    541909 non-null  object 
 1   StockCode    541909 non-null  object 
 2   Description  540455 non-null  object 
 3   Quantity     541909 non-null  int64  
 4   InvoiceDate  541909 non-null  object 
 5   UnitPrice    541909 non-null  float64
 6   CustomerID   406829 non-null  Int64  
 7   Country      541909 non-null  object 
dtypes: Int64(1), float64(1), int64(1), object(5)
memory usage: 33.6+ MB


Cool, we now have consitent types for pandas and pyspark dataframes. We can display a few rows from the dataset. pyspark uses `show` when pandas uses `head` for this:

In [24]:
df_spark.show(5)

+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerId|       Country|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|01/12/2010 08:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|01/12/2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|01/12/2010 08:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|01/12/2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|01/12/2010 08:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
only showing top 5 rows



In [25]:
df_pandas.head(5)

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,01/12/2010 08:26,2.55,17850,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,01/12/2010 08:26,3.39,17850,United Kingdom
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,01/12/2010 08:26,2.75,17850,United Kingdom
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,01/12/2010 08:26,3.39,17850,United Kingdom
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,01/12/2010 08:26,3.39,17850,United Kingdom


pyspark also has a `head` method to select the first few rows of the dataset:

In [26]:
df_spark.head(5)

[Row(InvoiceNo='536365', StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity=6, InvoiceDate='01/12/2010 08:26', UnitPrice=2.549999952316284, CustomerId=17850, Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='71053', Description='WHITE METAL LANTERN', Quantity=6, InvoiceDate='01/12/2010 08:26', UnitPrice=3.390000104904175, CustomerId=17850, Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='84406B', Description='CREAM CUPID HEARTS COAT HANGER', Quantity=8, InvoiceDate='01/12/2010 08:26', UnitPrice=2.75, CustomerId=17850, Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='84029G', Description='KNITTED UNION FLAG HOT WATER BOTTLE', Quantity=6, InvoiceDate='01/12/2010 08:26', UnitPrice=3.390000104904175, CustomerId=17850, Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='84029E', Description='RED WOOLLY HOTTIE WHITE HEART.', Quantity=6, InvoiceDate='01/12/2010 08:26', UnitPrice=3.390000104904175, CustomerId=17850

But the result is not a dataframe anymore, it's a list of `Row` objects. So the true equivalent of pandas `head` is more `limit`:

In [27]:
df_spark.limit(5).show()

+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerId|       Country|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|01/12/2010 08:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|01/12/2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|01/12/2010 08:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|01/12/2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|01/12/2010 08:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+



## Exploratory data analysis

We can get the list of columns in pyspark and pandas using the `columns` attribute:

In [28]:
df_spark.columns

['InvoiceNo',
 'StockCode',
 'Description',
 'Quantity',
 'InvoiceDate',
 'UnitPrice',
 'CustomerId',
 'Country']

In [29]:
df_pandas.columns.to_list()  # without to_list and Index is returned instead of a list

['InvoiceNo',
 'StockCode',
 'Description',
 'Quantity',
 'InvoiceDate',
 'UnitPrice',
 'CustomerID',
 'Country']

To get the *shape* of the dataframes, pyspark differs from pandas:

In [30]:
df_spark.count(), len(df_spark.columns)

(541909, 8)

In [31]:
df_pandas.shape

(541909, 8)

To count unique values, pyspark and pandas uses different approaches:

In [32]:
df_spark.select('CustomerID').distinct().count()

4373

In [33]:
df_pandas["CustomerID"].unique().shape[0]

4373

If we want to count how many orders we have from each country, we need to do a *groupby* operation. Here again the syntax varies but the concept remains the same:

In [34]:
df_spark.groupby("Country").agg(F.count("Country").alias("count")).show(100)

+--------------------+------+
|             Country| count|
+--------------------+------+
|              Sweden|   462|
|             Germany|  9495|
|              France|  8557|
|             Belgium|  2069|
|             Finland|   695|
|               Italy|   803|
|                EIRE|  8196|
|           Lithuania|    35|
|              Norway|  1086|
|               Spain|  2533|
|             Denmark|   389|
|             Iceland|   182|
|              Israel|   297|
|     Channel Islands|   758|
|              Cyprus|   622|
|         Switzerland|  2002|
|               Japan|   358|
|              Poland|   341|
|            Portugal|  1519|
|           Australia|  1259|
|             Austria|   401|
|             Bahrain|    19|
|      United Kingdom|495478|
|         Netherlands|  2371|
|           Singapore|   229|
|              Greece|   146|
|           Hong Kong|   288|
|United Arab Emirates|    68|
|             Lebanon|    45|
|        Saudi Arabia|    10|
|         

In [35]:
df_pandas.groupby("Country")["Country"].count()
# or 
#df_pandas.groupby("Country").agg({"Country": "count"})

Country
Australia                 1259
Austria                    401
Bahrain                     19
Belgium                   2069
Brazil                      32
Canada                     151
Channel Islands            758
Cyprus                     622
Czech Republic              30
Denmark                    389
EIRE                      8196
European Community          61
Finland                    695
France                    8557
Germany                   9495
Greece                     146
Hong Kong                  288
Iceland                    182
Israel                     297
Italy                      803
Japan                      358
Lebanon                     45
Lithuania                   35
Malta                      127
Netherlands               2371
Norway                    1086
Poland                     341
Portugal                  1519
RSA                         58
Saudi Arabia                10
Singapore                  229
Spain                     2533


These are the same results but in a different order. We can sort the rows:

In [37]:
df_spark.groupby("Country").agg(F.count("Country").alias("count")).orderBy(F.desc("count")).show(100)
# or, an alternative:
# df_spark.groupBy("Country").agg(F.count("Country").alias("count")).orderBy(F.col("count").desc()).show(100)

+--------------------+------+
|             Country| count|
+--------------------+------+
|      United Kingdom|495478|
|             Germany|  9495|
|              France|  8557|
|                EIRE|  8196|
|               Spain|  2533|
|         Netherlands|  2371|
|             Belgium|  2069|
|         Switzerland|  2002|
|            Portugal|  1519|
|           Australia|  1259|
|              Norway|  1086|
|               Italy|   803|
|     Channel Islands|   758|
|             Finland|   695|
|              Cyprus|   622|
|              Sweden|   462|
|         Unspecified|   446|
|             Austria|   401|
|             Denmark|   389|
|               Japan|   358|
|              Poland|   341|
|              Israel|   297|
|                 USA|   291|
|           Hong Kong|   288|
|           Singapore|   229|
|             Iceland|   182|
|              Canada|   151|
|              Greece|   146|
|               Malta|   127|
|United Arab Emirates|    68|
|  Europea

*Note*: pyspark also has a `sort` method but the sort happens on each partition so the order is not guaranted.

In [38]:
df_pandas.groupby("Country")["Country"].count().sort_values(ascending=False)

Country
United Kingdom          495478
Germany                   9495
France                    8557
EIRE                      8196
Spain                     2533
Netherlands               2371
Belgium                   2069
Switzerland               2002
Portugal                  1519
Australia                 1259
Norway                    1086
Italy                      803
Channel Islands            758
Finland                    695
Cyprus                     622
Sweden                     462
Unspecified                446
Austria                    401
Denmark                    389
Japan                      358
Poland                     341
Israel                     297
USA                        291
Hong Kong                  288
Singapore                  229
Iceland                    182
Canada                     151
Greece                     146
Malta                      127
United Arab Emirates        68
European Community          61
RSA                         58


Technically speaking, there are small differences between the two results. pandas outputs a `Series` whereas pyspark outputs a `DataFrame`. It is possible to ouptut a dataframe with pandas as well to get the exact same result:

In [39]:
(
    df_pandas
    .groupby("Country")
    .agg({"Country": "count"})
    .rename(columns={"Country": "count"})
    .sort_values("count", ascending=False)
    .reset_index()
)

Unnamed: 0,Country,count
0,United Kingdom,495478
1,Germany,9495
2,France,8557
3,EIRE,8196
4,Spain,2533
5,Netherlands,2371
6,Belgium,2069
7,Switzerland,2002
8,Portugal,1519
9,Australia,1259


Let's count distinct (unique) customers per country:

In [42]:
(
    df_spark
    .groupby("Country")
    .agg(F.count_distinct("CustomerID").alias("count"))
    .orderBy(F.desc("count"))
).show(100)

+--------------------+-----+
|             Country|count|
+--------------------+-----+
|      United Kingdom| 3950|
|             Germany|   95|
|              France|   87|
|               Spain|   31|
|             Belgium|   25|
|         Switzerland|   21|
|            Portugal|   19|
|               Italy|   15|
|             Finland|   12|
|             Austria|   11|
|              Norway|   10|
|     Channel Islands|    9|
|           Australia|    9|
|         Netherlands|    9|
|             Denmark|    9|
|              Sweden|    8|
|              Cyprus|    8|
|               Japan|    8|
|              Poland|    6|
|              Israel|    4|
|                 USA|    4|
|              Greece|    4|
|              Canada|    4|
|         Unspecified|    4|
|                EIRE|    3|
|             Bahrain|    2|
|               Malta|    2|
|United Arab Emirates|    2|
|           Singapore|    1|
|                 RSA|    1|
|           Lithuania|    1|
|  European Co

In [43]:
(
    df_pandas
    .groupby("Country")
    .agg({"CustomerID": "nunique"})  # nunique is the count of unique (distinct) values
    .rename(columns={"CustomerID": "count"})
    .sort_values("count", ascending=False)
    .reset_index()
)

Unnamed: 0,Country,count
0,United Kingdom,3950
1,Germany,95
2,France,87
3,Spain,31
4,Belgium,25
5,Switzerland,21
6,Portugal,19
7,Italy,15
8,Finland,12
9,Austria,11


We can now try to find the most recent order on the platform. To do this, we need to convert the *InvoiceDate* column into a timestamp:

In [44]:
df_spark.select("InvoiceDate").show(5)

+----------------+
|     InvoiceDate|
+----------------+
|01/12/2010 08:26|
|01/12/2010 08:26|
|01/12/2010 08:26|
|01/12/2010 08:26|
|01/12/2010 08:26|
+----------------+
only showing top 5 rows



As we can see, the time format for *InvoiceDate* is `dd/MM/yyyy HH:mm`:

In [47]:
df_spark = df_spark.withColumn("InvoiceDate", F.to_timestamp("InvoiceDate", "dd/MM/yyyy HH:mm"))

In [48]:
df_spark.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerId|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



Perfect, *InvoiceDate* has been converted to timestamp. Let's do the same with pandas:

In [49]:
# pd.to_datetime(df_pandas["InvoiceDate"]).dt.date
# pd.to_datetime(df_pandas["InvoiceDate"]).dt.time
df_pandas["InvoiceDate"] = pd.to_datetime(df_pandas["InvoiceDate"], format="%d/%m/%Y %H:%M")
# Unluckily, the format for timestamp doest not use the same syntax: -------^

In [50]:
df_pandas.head()

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01 08:26:00,2.55,17850,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,2010-12-01 08:26:00,3.39,17850,United Kingdom
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01 08:26:00,2.75,17850,United Kingdom
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01 08:26:00,3.39,17850,United Kingdom
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01 08:26:00,3.39,17850,United Kingdom


We have something similar to what we have done with pyspark.

Let's now compute the amount of each record by multiplying the *UnitPrice* with the *Quantity*:

In [51]:
df_spark = df_spark.withColumn("Amount", F.col("Quantity") * F.col("UnitPrice"))
df_spark.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerId|       Country|   Amount|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|15.299999|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|    20.34|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|     22.0|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|    20.34|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|    20.34|
+---------+---------+--------------------+--------+-----

In [53]:
df_pandas["Amount"] = df_pandas["Quantity"] * df_pandas["UnitPrice"]
df_pandas.head()

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country,Amount
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01 08:26:00,2.55,17850,United Kingdom,15.3
1,536365,71053,WHITE METAL LANTERN,6,2010-12-01 08:26:00,3.39,17850,United Kingdom,20.34
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01 08:26:00,2.75,17850,United Kingdom,22.0
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01 08:26:00,3.39,17850,United Kingdom,20.34
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01 08:26:00,3.39,17850,United Kingdom,20.34


There are some alternatives if we want to apply functions row-wise. For pandas, this approach is much slower and shall not be used, it is just mentioned to illustrate the difference with pyspark:

In [54]:
def amount(data):
    data["Amount"] = data["Quantity"] * data["UnitPrice"]
    return data
df_pandas = df_pandas.apply(amount, axis=1)

The equivalent for pyspark is to define a user function using udf (user-defined function):

In [56]:
@F.udf
def amount(quantity, unit_price):
    return quantity * unit_price
df_spark = df_spark.withColumn("Amount", amount("Quantity", "UnitPrice"))

In [57]:
df_spark.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+------------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerId|       Country|            Amount|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|15.299999713897705|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom| 20.34000062942505|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|              22.0|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom| 20.34000062942505|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom| 20.

This yields to the same result (and is much faster than the pandas alternative).

We can then compute the total amount for each invoice and customer while keeping information for invoice date and country. We will only display the five orders with the most amount:

In [58]:
(
    df_spark
    .groupby(["InvoiceNo", "CustomerID"])
    .agg(F.first("InvoiceDate").alias("InvoiceData"), F.sum("Amount").alias("Amount"))
    .orderBy(F.desc("Amount"))
    .limit(5)
).show()

[Stage 51:>                                                       (0 + 12) / 12]

+---------+----------+-------------------+-----------------+
|InvoiceNo|CustomerID|        InvoiceData|           Amount|
+---------+----------+-------------------+-----------------+
|   581483|     16446|2011-12-09 09:15:00|168469.5938205719|
|   541431|     12346|2011-01-18 10:01:00|77183.59716892242|
|   574941|      null|2011-11-07 17:42:00|52940.93936300278|
|   576365|      null|2011-11-14 17:55:00| 50653.9094145298|
|   556444|     15098|2011-06-10 15:28:00|          38970.0|
+---------+----------+-------------------+-----------------+





In [59]:
(
    df_pandas
    .groupby(["InvoiceNo", "CustomerID"], as_index=False)
    .agg({"InvoiceDate": "first", "Amount": "sum"})
    .sort_values("Amount", ascending=False)
    .head(5)
)

Unnamed: 0,InvoiceNo,CustomerID,InvoiceDate,Amount
18503,581483,16446,2011-12-09 09:15:00,168469.6
1909,541431,12346,2011-01-18 10:01:00,77183.6
7926,556444,15098,2011-06-10 15:28:00,38970.0
12419,567423,17450,2011-09-20 11:05:00,31698.16
8112,556917,12415,2011-06-15 13:37:00,22775.93


We can see there is a difference is the way the rows with null *CustomerID* are processed. We can replicate the processing of pyspark using  pandas:

In [60]:
(
    df_pandas
    .groupby(["InvoiceNo", "CustomerID"], as_index=False, dropna=False)
    .agg({"InvoiceDate": "first", "Amount": "sum"})
    .sort_values("Amount", ascending=False)
    .head(5)
)

Unnamed: 0,InvoiceNo,CustomerID,InvoiceDate,Amount
22025,581483,16446.0,2011-12-09 09:15:00,168469.6
2303,541431,12346.0,2011-01-18 10:01:00,77183.6
18776,574941,,2011-11-07 17:42:00,52940.94
19468,576365,,2011-11-14 17:55:00,50653.91
9741,556444,15098.0,2011-06-10 15:28:00,38970.0


But it is probably better to change the behavior of pyspark in that context:

In [61]:
(
    df_spark
    .dropna(how="any", subset=["InvoiceNo", "CustomerID"])
    .groupby(["InvoiceNo", "CustomerID"])
    .agg(F.first("InvoiceDate").alias("InvoiceData"), F.sum("Amount").alias("Amount"))
    .orderBy(F.desc("Amount"))
    .limit(5)
).show()

+---------+----------+-------------------+------------------+
|InvoiceNo|CustomerID|        InvoiceData|            Amount|
+---------+----------+-------------------+------------------+
|   581483|     16446|2011-12-09 09:15:00| 168469.5938205719|
|   541431|     12346|2011-01-18 10:01:00| 77183.59716892242|
|   556444|     15098|2011-06-10 15:28:00|           38970.0|
|   567423|     17450|2011-09-20 11:05:00|31698.160014390945|
|   556917|     12415|2011-06-15 13:37:00|22775.929987192154|
+---------+----------+-------------------+------------------+



This forces us to drop the null values before we do the `groupby`. This can be done with the `dropna` method.

In order to compare the timeseries functions for pandas and pyspark, we are going to add a new column containing an epoch value. Which will be the total number of seconds since 01/01/1970 00:00.

In [62]:
df_spark.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+------------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerId|       Country|            Amount|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|15.299999713897705|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom| 20.34000062942505|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|              22.0|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom| 20.34000062942505|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom| 20.

In [109]:
epoch = F.to_timestamp(F.lit("01/01/1970 00:00"), "dd/MM/yyyy HH:mm").cast(LongType())
df_spark = (
    df_spark
    .withColumn("TimeDelta", F.col("InvoiceDate").cast(LongType()) - epoch)
)
df_spark.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+------------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerId|       Country|            Amount| TimeDelta|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+------------------+----------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|15.299999713897705|1291191960|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom| 20.34000062942505|1291191960|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|              22.0|1291191960|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom| 20.34000062942505|1291191960|
|   536365|   84029E|RED WOOLLY HO

`lit` creates a literal that is then converted to timestamp with `to_timestamp`. The newly created column is a time delta between invoice date and epoch.

To convert into seconds, we cast to a long type.

In pandas, we can compute the time delta between two timestamps and then convert the delta into seconds:

In [114]:
df_pandas["TimeDelta"] = (
    (df_pandas["InvoiceDate"] - pd.to_datetime("01/01/1970 00:00", format="%d/%m/%Y %H:%M")).dt.total_seconds()
)

In [115]:
df_pandas.head()

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country,Amount,TimeDelta
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01 08:26:00,2.55,17850,United Kingdom,15.3,1291192000.0
1,536365,71053,WHITE METAL LANTERN,6,2010-12-01 08:26:00,3.39,17850,United Kingdom,20.34,1291192000.0
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01 08:26:00,2.75,17850,United Kingdom,22.0,1291192000.0
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01 08:26:00,3.39,17850,United Kingdom,20.34,1291192000.0
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01 08:26:00,3.39,17850,United Kingdom,20.34,1291192000.0


OK, we have something similar in results ;)

We can use the new information to get the latest order on the platform:

In [127]:
df_spark.select(F.max("InvoiceDate"), F.max("TimeDelta")).show()

+-------------------+--------------+
|   max(InvoiceDate)|max(TimeDelta)|
+-------------------+--------------+
|2011-12-09 12:50:00|    1323435000|
+-------------------+--------------+



In [128]:
df_pandas["InvoiceDate"].max(), df_pandas["TimeDelta"].max()

(Timestamp('2011-12-09 12:50:00'), 1323435000.0)

And we can get the latest order per country:

In [139]:
(
    df_spark
    .groupby("Country")
    .agg(F.max("InvoiceDate").alias("InvoceDate"))
    .orderBy("Country")
).show(100)

+--------------------+-------------------+
|             Country|         InvoceDate|
+--------------------+-------------------+
|           Australia|2011-11-24 12:30:00|
|             Austria|2011-12-08 10:26:00|
|             Bahrain|2011-05-19 17:47:00|
|             Belgium|2011-12-09 10:10:00|
|              Brazil|2011-04-15 10:25:00|
|              Canada|2011-08-25 11:27:00|
|     Channel Islands|2011-12-08 11:53:00|
|              Cyprus|2011-12-02 11:21:00|
|      Czech Republic|2011-11-18 15:45:00|
|             Denmark|2011-12-05 16:48:00|
|                EIRE|2011-12-08 15:54:00|
|  European Community|2011-07-21 10:24:00|
|             Finland|2011-12-07 11:27:00|
|              France|2011-12-09 12:50:00|
|             Germany|2011-12-09 12:16:00|
|              Greece|2011-12-06 09:56:00|
|           Hong Kong|2011-11-14 13:27:00|
|             Iceland|2011-12-07 15:52:00|
|              Israel|2011-10-04 14:55:00|
|               Italy|2011-12-06 09:35:00|
|          

In [140]:
df_pandas.groupby("Country", as_index=False).agg({"InvoiceDate": "max"}).sort_values("Country")

Unnamed: 0,Country,InvoiceDate
0,Australia,2011-11-24 12:30:00
1,Austria,2011-12-08 10:26:00
2,Bahrain,2011-05-19 17:47:00
3,Belgium,2011-12-09 10:10:00
4,Brazil,2011-04-15 10:25:00
5,Canada,2011-08-25 11:27:00
6,Channel Islands,2011-12-08 11:53:00
7,Cyprus,2011-12-02 11:21:00
8,Czech Republic,2011-11-18 15:45:00
9,Denmark,2011-12-05 16:48:00


This gives us only the timestamp and the country. If we want to keep all columns for the last order (only one product will be shown).

The funny thing is that in both cases we are going to use a pandas function. The pandas function sort incoming data (grouped data by *Country* in this case) and then take only the first row.
When calling this method from pyspark we need to mention the returned schema. In this example the returned schema is exactly the same as the input one.

In [172]:
def apply_pandas(data):
    return data.sort_values("InvoiceDate").head(1).reset_index(drop=True)

In [173]:
df_spark.groupby("Country").applyInPandas(apply_pandas, schema=df_spark.schema).orderBy("Country").show(100)



+---------+---------+--------------------+--------+-------------------+---------+----------+--------------------+-------------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerId|             Country|             Amount| TimeDelta|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------------+-------------------+----------+
|   536389|    22941|CHRISTMAS LIGHTS ...|       6|2010-12-01 10:03:00|      8.5|     12431|           Australia|               51.0|1291197780|
|  C538971|    22153|ANGEL DECORATION ...|     -48|2010-12-15 11:39:00|     0.42|     12865|             Austria| -20.15999937057495|1292413140|
|   539500|   72802B|OCEAN SCENT CANDL...|      54|2010-12-20 11:02:00|     3.81|      null|             Bahrain| 205.73999691009521|1292842920|
|   537026|    84375|SET OF 20 KIDS CO...|      12|2010-12-03 16:35:00|      2.1|     12395|             Belgium|  25.199998855590

                                                                                

In [174]:
df_pandas.groupby("Country", as_index=False).apply(apply_pandas)

Unnamed: 0,Unnamed: 1,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country,Amount,TimeDelta
0,0,536389,22941,CHRISTMAS LIGHTS 10 REINDEER,6,2010-12-01 10:03:00,8.5,12431.0,Australia,51.0,1291198000.0
1,0,C538971,22153,ANGEL DECORATION STARS ON DRESS,-48,2010-12-15 11:39:00,0.42,12865.0,Austria,-20.16,1292413000.0
2,0,539500,72802B,OCEAN SCENT CANDLE IN JEWELLED BOX,54,2010-12-20 11:02:00,3.81,,Bahrain,205.74,1292843000.0
3,0,537026,84375,SET OF 20 KIDS COOKIE CUTTERS,12,2010-12-03 16:35:00,2.1,12395.0,Belgium,25.2,1291394000.0
4,0,550201,22423,REGENCY CAKESTAND 3 TIER,16,2011-04-15 10:25:00,10.95,12769.0,Brazil,175.2,1302863000.0
5,0,546533,20886,BOX OF 9 PEBBLE CANDLES,12,2011-03-14 13:53:00,1.95,15388.0,Canada,23.4,1300111000.0
6,0,538002,22690,DOORMAT HOME SWEET HOME BLUE,2,2010-12-09 11:48:00,7.95,14932.0,Channel Islands,15.9,1291895000.0
7,0,538826,85123A,WHITE HANGING HEART T-LIGHT HOLDER,64,2010-12-14 12:58:00,2.55,12370.0,Cyprus,163.2,1292331000.0
8,0,545072,22930,BAKING MOULD HEART MILK CHOCOLATE,18,2011-02-28 08:43:00,2.55,12781.0,Czech Republic,45.9,1298883000.0
9,0,538003,22847,BREAD BIN DINER STYLE IVORY,8,2010-12-09 12:05:00,14.95,12429.0,Denmark,119.6,1291896000.0


We get the same result once again :)

Ok, now let's see hwo we can use masking with pyspark to filter data:

In [184]:
mask = (df_spark["Amount"] > 500.0)
df_spark.filter(mask).show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerId|       Country|           Amount| TimeDelta|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------------+----------+
|   536387|    79321|       CHILLI LIGHTS|     192|2010-12-01 09:58:00|     3.82|     16029|United Kingdom|733.4399871826172|1291197480|
|   536387|    22780|LIGHT GARLAND BUT...|     192|2010-12-01 09:58:00|     3.37|     16029|United Kingdom|647.0399780273438|1291197480|
|   536387|    22779|WOODEN OWLS LIGHT...|     192|2010-12-01 09:58:00|     3.37|     16029|United Kingdom|647.0399780273438|1291197480|
|   536387|    22466|FAIRY TALE COTTAG...|     432|2010-12-01 09:58:00|     1.45|     16029|United Kingdom|626.4000205993652|1291197480|
|   536387|    21731|RED TOADSTOOL LED...

In [185]:
mask = (df_pandas["Amount"] > 500.0)
df_pandas[mask].head(5)

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country,Amount,TimeDelta
178,536387,79321,CHILLI LIGHTS,192,2010-12-01 09:58:00,3.82,16029,United Kingdom,733.44,1291197000.0
179,536387,22780,LIGHT GARLAND BUTTERFILES PINK,192,2010-12-01 09:58:00,3.37,16029,United Kingdom,647.04,1291197000.0
180,536387,22779,WOODEN OWLS LIGHT GARLAND,192,2010-12-01 09:58:00,3.37,16029,United Kingdom,647.04,1291197000.0
181,536387,22466,FAIRY TALE COTTAGE NIGHTLIGHT,432,2010-12-01 09:58:00,1.45,16029,United Kingdom,626.4,1291197000.0
182,536387,21731,RED TOADSTOOL LED NIGHT LIGHT,432,2010-12-01 09:58:00,1.25,16029,United Kingdom,540.0,1291197000.0


Mask can be created in the exact same way. The only difference is that we need to use `filter` method to apply the mask in pyspark.