# Pyspark Tutorial

This is a DataCamp tutorial on PySpark

https://www.datacamp.com/tutorial/pyspark-tutorial-getting-started-with-pyspark

# Installing Libraries

Ensure pyspark is installed in the environment 

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Pandas
import pandas as pd

# Creating a SparkSession

A SparkSession is an entry point into all functionality in Spark, and is required if you want to build a dataframe in PySpark. Run the following lines of code to initialize a SparkSession:

In [2]:
from pyspark.sql import SparkSession 

spark = (
    SparkSession.builder
    .appName("Datacamp Pyspark Tutorial")
    .config("spark.memory.offHeap.enabled", "true")
    .config("spark.memory.offHeap.size", "10g")
    .getOrCreate()
)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/07 16:26:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Using the codes above, we built a spark session and set a name for the application. Then, the data was cached in off-heap memory to avoid storing it directly on disk, and the amount of memmory was manually specified.

# Creating the DataFrame

Data is mamually downloaded from the following link: https://archive.ics.uci.edu/dataset/352/online+retail

In [3]:
import pandas as pd
pandas_df = pd.read_excel("datacamp_ecommerse.xlsx") # There's no modules for reading xlsx files in Spark so you need to use Pandas first
df = spark.createDataFrame(pandas_df) # Convert pandas dataframe into Spark dataframe

In [4]:
df.show(5, 0)

25/10/07 16:27:08 WARN TaskSetManager: Stage 0 contains a task of very large size (3433 KiB). The maximum recommended task size is 1000 KiB.


+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate        |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |2010-12-01 08:26:00|2.55     |17850.0   |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |2010-12-01 08:26:00|2.75     |17850.0   |United Kingdom|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|
+---------+-----

Exception ignored in: <_io.BufferedWriter name=5>                   (0 + 1) / 1]
Traceback (most recent call last):
  File "/Users/aiomote/Data Science Projects/datacamp_spark_tutorial/.venv/lib/python3.13/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 200, in manager
BrokenPipeError: [Errno 32] Broken pipe
                                                                                

In [5]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



The DataFrame consists of 8 variables:

1. InvoiceNo: The unique identifier of each customer invoice
2. StockCode: The unique identifier of each item in stock
3. Description: The item purchased by the customer
4. Quantity: The number of each item purchased by a customer in a single invoice
5. InvoiceDate: The purchase date
6. UnitPrice: Price of one unit of each item
7. CustomerID: Unique identifier assigned to each user
8. Coountry: The country from where the purchase was made

# Data Exploration

In [6]:
df.count()

25/10/07 16:27:09 WARN TaskSetManager: Stage 1 contains a task of very large size (3433 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

541909

There are 541,909 rows in the dataframe

In [7]:
# Number of unique customers
df.select("CustomerID").distinct().count()

25/10/07 16:27:10 WARN TaskSetManager: Stage 4 contains a task of very large size (3433 KiB). The maximum recommended task size is 1000 KiB.


4373

## Purchase by country

In [8]:
# Purchases made by each country
df.groupBy("Country").agg(count_distinct("CustomerID").alias('country_count')).show() 

25/10/07 16:27:11 WARN TaskSetManager: Stage 10 contains a task of very large size (3433 KiB). The maximum recommended task size is 1000 KiB.


+------------------+-------------+
|           Country|country_count|
+------------------+-------------+
|            Sweden|            8|
|         Singapore|            1|
|           Germany|           95|
|               RSA|            1|
|            France|           88|
|            Greece|            4|
|European Community|            1|
|           Belgium|           25|
|           Finland|           12|
|             Malta|            2|
|       Unspecified|            5|
|             Italy|           15|
|              EIRE|            4|
|         Lithuania|            1|
|            Norway|           10|
|             Spain|           31|
|           Denmark|            9|
|         Hong Kong|            1|
|           Iceland|            1|
|            Israel|            5|
+------------------+-------------+
only showing top 20 rows


In [9]:
# Order the above table 
df.groupBy('Country').agg(count_distinct('CustomerID').alias('country_count')).orderBy(desc('country_count')).show()

25/10/07 16:27:11 WARN TaskSetManager: Stage 16 contains a task of very large size (3433 KiB). The maximum recommended task size is 1000 KiB.


+---------------+-------------+
|        Country|country_count|
+---------------+-------------+
| United Kingdom|         3951|
|        Germany|           95|
|         France|           88|
|          Spain|           31|
|        Belgium|           25|
|    Switzerland|           22|
|       Portugal|           20|
|          Italy|           15|
|        Finland|           12|
|        Austria|           11|
|         Norway|           10|
|        Denmark|            9|
|Channel Islands|            9|
|      Australia|            9|
|    Netherlands|            9|
|         Sweden|            8|
|         Cyprus|            8|
|          Japan|            8|
|         Poland|            6|
|    Unspecified|            5|
+---------------+-------------+
only showing top 20 rows


## Recency

When was the most recent purchase made by a customer on the e-commerse platform?

To find when the latest purchase was made on the platform, we need to convert the 'InvoiceDate' column into a timestamp format and use the 'max()' function in PySpark.

In [10]:
df = df.withColumn(
    "date" # New column name 
    , coalesce( # Coalesce function returns the first none-null value (evaluated left to right)
        to_timestamp(col("InvoiceDate"), "yy/MM/dd HH:mm"), # First attempt: tries to parse InvoiceDate using the pattern yy/MM/dd HH:mm and returns a Timestamp (e.g. 2017-07-15 14:30:00)
        to_timestamp(col("InvoiceDate"), "yyyy-MM-dd HH:mm:ss"),# If the first attempt fails and returns null, then it tries the next pattern
        to_timestamp(col("InvoiceDate")) # Best effort fallback 
    )
)
df.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|               date|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|2010-12-01 08:26:00|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|2010-12-01 08:26:00|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|2010-12-01 08:26:00|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|2010-12-01 08:26:00|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingd

25/10/07 16:27:12 WARN TaskSetManager: Stage 22 contains a task of very large size (3433 KiB). The maximum recommended task size is 1000 KiB.
Exception ignored in: <_io.BufferedWriter name=5>
Traceback (most recent call last):
  File "/Users/aiomote/Data Science Projects/datacamp_spark_tutorial/.venv/lib/python3.13/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 200, in manager
BrokenPipeError: [Errno 32] Broken pipe


In [11]:
df.select(max("date")).show()

25/10/07 16:27:12 WARN TaskSetManager: Stage 23 contains a task of very large size (3433 KiB). The maximum recommended task size is 1000 KiB.


+-------------------+
|          max(date)|
+-------------------+
|2011-12-09 12:50:00|
+-------------------+



When was the earliest purchase made by a customer on the e-commerse platform?

In [12]:
df.select(min("date")).show()

+-------------------+
|          min(date)|
+-------------------+
|2010-12-01 08:26:00|
+-------------------+



25/10/07 16:27:12 WARN TaskSetManager: Stage 26 contains a task of very large size (3433 KiB). The maximum recommended task size is 1000 KiB.


# Data Pre-Processing

**RFM** is commonly used in marketing to evaluate a client's value based on their:

1. **Recency**: How recently has each customer made a purchase?
2. **Frequency**: How often have they bought something?
3. **Monetary Value**: How much money do they spend on average when making purchases?


## Recency

First, let's calculate the value of recency - the latest date and time a purchase was made on the platform. This can be achieved in two steps:

### Assign a receny score to each customer

We will subtract every date in the dataframe from the earliest date. This will tell us how recently a customer was seen in the dataframe. A value 0 indicates the lowest recency, as it will be assigned to the person who was seen making a purchase on the eariest date.

#### Adding a column with constant

There are three possible ways to do this

**1. .collect() + .lit()**

In [13]:
# Get the recent date
max_date = df.agg(max("InvoiceDate").alias("max_date")).collect()[0]["max_date"]
df = df.withColumn("from_date", lit(max_date))
df.show()


25/10/07 16:27:13 WARN TaskSetManager: Stage 29 contains a task of very large size (3433 KiB). The maximum recommended task size is 1000 KiB.


+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------------+-------------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|               date|          from_date|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------------+-------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|2010-12-01 08:26:00|2011-12-09 12:50:00|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|2010-12-01 08:26:00|2011-12-09 12:50:00|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|2010-12-01 08:26:00|2011-12-09 12:50:00|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|2010-

25/10/07 16:27:13 WARN TaskSetManager: Stage 32 contains a task of very large size (3433 KiB). The maximum recommended task size is 1000 KiB.
Exception ignored in: <_io.BufferedWriter name=5>
Traceback (most recent call last):
  File "/Users/aiomote/Data Science Projects/datacamp_spark_tutorial/.venv/lib/python3.13/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 200, in manager
BrokenPipeError: [Errno 32] Broken pipe


In PySpark, all expressions inside withColumn() must be **Spark column objects**, not plain Python values. 

This is because Spark doesn't know how to distribute a pure Python literal (e.g. such as integer of 123) across the cluster automatically in that context.

This is why you need functions.lit() (short for literal) to wrap a Python value (number, string, date, etc) into a **column expression** that Spark can handle. 

**In this case:**
- max_date is a **single Python value** (after .collect() brings it from Spark to the driver)
- To broadcast it as a constant column to all rows, you must turn it into a **Spark column**
- That's exactly what F.lit() does

**2. Using a cross join with an aggregate**

In [14]:
# Aggregate to find the max InvoiceDate (still a DataFrame, not collected)
max_df = df.agg(max("InvoiceDate").alias("max_date2")) # Produces a 1-row DataFrame 

# Cross join to add it to all rows
df = df.crossJoin(max_df) # Attaches the single row to every row of df (since max_df has only one row, the join is efficient)

df.show()

25/10/07 16:27:13 WARN TaskSetManager: Stage 33 contains a task of very large size (3433 KiB). The maximum recommended task size is 1000 KiB.


+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------------+-------------------+-------------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|               date|          from_date|          max_date2|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------------+-------------------+-------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|2010-12-01 08:26:00|2011-12-09 12:50:00|2011-12-09 12:50:00|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|2010-12-01 08:26:00|2011-12-09 12:50:00|2011-12-09 12:50:00|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|2010-12-01 08:26:00|2011-12-09 12:50:00|2011-12-09 12

25/10/07 16:27:13 WARN TaskSetManager: Stage 36 contains a task of very large size (3433 KiB). The maximum recommended task size is 1000 KiB.
Exception ignored in: <_io.BufferedWriter name=5>
Traceback (most recent call last):
  File "/Users/aiomote/Data Science Projects/datacamp_spark_tutorial/.venv/lib/python3.13/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 200, in manager
BrokenPipeError: [Errno 32] Broken pipe


**3. Using a window function**

In [15]:
from pyspark.sql import Window

df = df.withColumn(
    "max_date3",
    max("InvoiceDate").over(Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)) # defines a windor over the entire dataset (from first to last row)
)

df.show()

25/10/07 16:27:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/10/07 16:27:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/10/07 16:27:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/10/07 16:27:13 WARN TaskSetManager: Stage 37 contains a task of very large size (3433 KiB). The maximum recommended task size is 1000 KiB.
25/10/07 16:27:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/10/07 16:27:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/10/07 16:27:13 WARN WindowExec

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------------+-------------------+-------------------+-------------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|               date|          from_date|          max_date2|          max_date3|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------------+-------------------+-------------------+-------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|2010-12-01 08:26:00|2011-12-09 12:50:00|2011-12-09 12:50:00|2011-12-09 12:50:00|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|2010-12-01 08:26:00|2011-12-09 12:50:00|2011-12-09 12:50:00|2011-12-09 12:50:00|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-0

---

**Comparison:**

---
**1. .collect() + lit()**
- Not distributed (brings data to driver)
- Simple
- Best for small datasets

**2. crossJoin**
- Distributed
- Clear
- Best for medium to large datasets

**3. window function**
- Distributed
- Elegant, no join
- Best for when using window operations already

In [16]:
df = df.drop("max_date2", "max_date3")
df.show()

25/10/07 16:27:14 WARN TaskSetManager: Stage 43 contains a task of very large size (3433 KiB). The maximum recommended task size is 1000 KiB.
25/10/07 16:27:14 WARN TaskSetManager: Stage 46 contains a task of very large size (3433 KiB). The maximum recommended task size is 1000 KiB.


+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------------+-------------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|               date|          from_date|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------------+-------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|2010-12-01 08:26:00|2011-12-09 12:50:00|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|2010-12-01 08:26:00|2011-12-09 12:50:00|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|2010-12-01 08:26:00|2011-12-09 12:50:00|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|2010-

Exception ignored in: <_io.BufferedWriter name=5>
Traceback (most recent call last):
  File "/Users/aiomote/Data Science Projects/datacamp_spark_tutorial/.venv/lib/python3.13/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 200, in manager
BrokenPipeError: [Errno 32] Broken pipe


### Select the most recent purchase

In [None]:
# Calculate the dates between the latest date and customer's purchase date
df2 = df.withColumn("recency", col("date").cast("long") - col("from_date").cast("long"))

w = Window.partitionBy("CustomerID").orderBy(desc("recency")) # Order customers by their recency 
df2 = df2.withColumn("rn", row_number().over(w)).filter(col("rn") == 1).drop("rn") # Select the most recent purchase

df2 = df2.join(df2.groupBy('CustomerID').agg(max('recency').alias('recency')),on='recency',how='leftsemi')




df2.show()

25/10/07 16:33:25 WARN TaskSetManager: Stage 47 contains a task of very large size (3433 KiB). The maximum recommended task size is 1000 KiB.
25/10/07 16:33:25 WARN TaskSetManager: Stage 50 contains a task of very large size (3433 KiB). The maximum recommended task size is 1000 KiB.
[Stage 50:>                                                       (0 + 10) / 10]

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------------+-------------------+---------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|               date|          from_date|  recency|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------------+-------------------+---------+
|  C541433|    23166|MEDIUM CERAMIC TO...|  -74215|2011-01-18 10:17:00|     1.04|   12346.0|United Kingdom|2011-01-18 10:17:00|2011-12-09 12:50:00|-28089180|
|   581180|    23497|CLASSIC CHROME BI...|      12|2011-12-07 15:52:00|     1.45|   12347.0|       Iceland|2011-12-07 15:52:00|2011-12-09 12:50:00|  -161880|
|   568172|    23077| DOUGHNUT LIP GLOSS |     120|2011-09-25 13:13:00|     1.25|   12348.0|       Finland|2011-09-25 13:13:00|2011-12-09 12:50:00| -6482220|
|   577609|    23112|PARISIENNE CURIO ...|       2|2

                                                                                