# PySpark Tutorial - Learning Guide

### This tutorial was built inside a Python Virtual Environment, inside the WSL system, for better compatibilty with the most recent versions of PySpark.

---

#### Python 3.12.3

#### Packages versions:

* pyspark==4.1.0

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [2]:
spark = (
    SparkSession.builder
    .appName("Datacamp Pyspark Tutorial")
    .config("spark.memory.offHeap.enabled", "true")
    .config("spark.memory.offHeap.size", "10g")
    .getOrCreate()
)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/19 22:41:04 WARN Utils: Your hostname, Ryzen, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/12/19 22:41:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/19 22:41:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Exploratory Data Analysis

In [3]:
df = spark.read.csv("./data/datacamp_ecommerce.csv", header=True, escape='"', inferSchema=True)

In [4]:
df.show(5,0)

+-----+---------+---------+----------------------------------+--------+--------------+---------+----------+--------------+
|Index|InvoiceNo|StockCode|Description                       |Quantity|InvoiceDate   |UnitPrice|CustomerID|Country       |
+-----+---------+---------+----------------------------------+--------+--------------+---------+----------+--------------+
|0    |536571   |84991    |60 TEATIME FAIRY CAKE CASES       |1       |12/24/10 11:43|0.55     |14006     |United Kingdom|
|1    |536122   |84625    |PINK CHERRY LIGHTS STRING SET     |10      |12/3/10 14:14 |4.65     |15456     |United Kingdom|
|2    |536122   |22197    |POPCORN HOLDER                    |5       |12/3/10 14:14 |1.65     |17865     |United Kingdom|
|3    |539108   |84969    |BOX OF 6 ASSORTED COLOUR TEASPOONS|3       |12/26/10 13:44|4.25     |19121     |United Kingdom|
|4    |539108   |22616    |PACK OF 12 LONDON TISSUES         |2       |12/26/10 13:44|1.45     |19841     |United Kingdom|
+-----+---------

Number of rows in the DataFrame

In [5]:
df.count()

10000

Number of unique customers in the DataFrame

In [6]:
df.select('CustomerID').distinct().count()

5655

Country from which most purchases are made

In [7]:
df.groupBy('Country').agg(F.countDistinct(F.col('CustomerID')).alias('country_count')).orderBy(F.desc('country_count')).show()

+--------------+-------------+
|       Country|country_count|
+--------------+-------------+
|United Kingdom|         5170|
|        France|          313|
|       Germany|          193|
|   Netherlands|          180|
|         Spain|          169|
|          EIRE|          151|
|     Australia|          111|
|      Portugal|           87|
|       Belgium|           86|
|        Norway|           58|
|       Finland|           56|
|       Austria|           52|
|   Switzerland|           46|
|       Denmark|           42|
+--------------+-------------+



When the latest purchase was made on the platform

In [15]:
df.withColumn(
    "date",
    F.coalesce(
        F.try_to_timestamp(F.col("InvoiceDate"), F.lit("M/d/yy H:mm")),
        F.try_to_timestamp(F.col("InvoiceDate"), F.lit("MM/dd/yy HH:mm")),
        F.try_to_timestamp(F.col("InvoiceDate"), F.lit("M/d/yyyy H:mm")),
        F.try_to_timestamp(F.col("InvoiceDate"), F.lit("MM/dd/yyyy HH:mm")),
        F.to_timestamp(F.col("InvoiceDate"))  # last resort
    ),
).select(F.max("date")).show()

+-------------------+
|          max(date)|
+-------------------+
|2010-12-31 07:38:00|
+-------------------+



Earliest purchase date and time

In [17]:
df.withColumn(
    "date",
    F.coalesce(
        F.try_to_timestamp(F.col("InvoiceDate"), F.lit("M/d/yy H:mm")),
        F.try_to_timestamp(F.col("InvoiceDate"), F.lit("MM/dd/yy HH:mm")),
        F.try_to_timestamp(F.col("InvoiceDate"), F.lit("M/d/yyyy H:mm")),
        F.try_to_timestamp(F.col("InvoiceDate"), F.lit("MM/dd/yyyy HH:mm")),
        F.to_timestamp(F.col("InvoiceDate"))  # last resort
    ),
).select(F.min("date")).show()

+-------------------+
|          min(date)|
+-------------------+
|2010-12-01 08:10:00|
+-------------------+



### Data Pre-processing

We will use the existing variables to derive three new informative features - recency, frequency, and monetary value (RFM).

RFM is commonly used in marketing to evaluate a clientâ€™s value based on their:

* Recency: How recently has each customer made a purchase?
* Frequency: How often have they bought something?
* Monetary Value: How much money do they spend on average when making purchases?