<a href="https://www.kaggle.com/code/chanoncharuchinda/e-commerce-campaign-analysis?scriptVersionId=133023696" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# E-commerce Campaign Analysis

The aim of this notebook is to analyze the [E-commerce multichannel direct messaging 2021-2023](https://www.kaggle.com/datasets/mkechinov/direct-messaging?select=messages-demo.csv&sort=published) dataset.  <br> 
I will be using [PySpark](https://spark.apache.org/docs/latest/api/python/#:~:text=PySpark%20is%20the%20Python%20API,for%20interactively%20analyzing%20your%20data.) for processing and analyzing the data. 

### Installation
In order to use PySpark on  [Kaggle](https://www.kaggle.com/), it must be installed by typing the following command  `!pip install pyspark`  into one of the cell. Can add option  `-q`  which mean quite to not show the downloading progress.

##### By Chanon Charuchinda 

In [1]:
!pip install pyspark -q

[0m

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = (
    SparkSession
    .builder
    .appName("ecommerce")
    .master("local[*]")
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/10 06:50:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
holidays_path = "/kaggle/input/direct-messaging/holidays.csv"
messages_demo_path = "/kaggle/input/direct-messaging/messages-demo.csv"
campaigns_path = "/kaggle/input/direct-messaging/campaigns.csv"

holidays = spark.read.csv(holidays_path, inferSchema = True, header=True)
message_demo = spark.read.csv(messages_demo_path, inferSchema = True, header=True)
campaigns = spark.read.csv(campaigns_path, inferSchema = True, header=True)


                                                                                

In [4]:
print("Message Demo Schema")
message_demo.printSchema()
print("Campaign Schema")
campaigns.printSchema()

Message Demo Schema
root
 |-- id: integer (nullable = true)
 |-- message_id: string (nullable = true)
 |-- campaign_id: integer (nullable = true)
 |-- message_type: string (nullable = true)
 |-- client_id: long (nullable = true)
 |-- channel: string (nullable = true)
 |-- category: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- email_provider: string (nullable = true)
 |-- stream: string (nullable = true)
 |-- date: date (nullable = true)
 |-- sent_at: timestamp (nullable = true)
 |-- is_opened: string (nullable = true)
 |-- opened_first_time_at: timestamp (nullable = true)
 |-- opened_last_time_at: timestamp (nullable = true)
 |-- is_clicked: string (nullable = true)
 |-- clicked_first_time_at: timestamp (nullable = true)
 |-- clicked_last_time_at: timestamp (nullable = true)
 |-- is_unsubscribed: string (nullable = true)
 |-- unsubscribed_at: timestamp (nullable = true)
 |-- is_hard_bounced: string (nullable = true)
 |-- hard_bounced_at: timestamp (nullable = t

### Problem with unique campaign identifier
The documentation mentions that the unique indentify for each campaign should be `campaign_type`+`campaign_id`. However no such column name `campign_id` was found. Hence, I made the assumption that it was meant to be the column `id`. Therefore, the combination of `campaign_type`+`id` would be used as a unique identifier for each campaign. 

In the follwing notebook cell, I show that `id` alone cannot be used as a unique identifier indeed since there are several occurance where the count of `id` is greater than 1. 

### Solution to create the link between two datasets
In the dataset `message-demo.csv`, column name `message_type` correlates to the `campaign_type` column in the `campaigns.csv` dataset. In order to create a link between these two dataset, I will use the following combination:
1. `campaign_type` + `id`: for `campaigns.csv`
2. `message_type` + `campaign_id`: for `message-demo.csv`


In [5]:
print("Using id as a unique identifier: (filter key that count isn't 1)")
campaign_groupID = (
    campaigns
    .select("id","campaign_type","channel")
    .groupBy("id")
    .count()
    .filter(F.col("count") != 1)
)
campaign_groupID.show(campaign_groupID.count())

print("Using campaign_type+id as unique identifier: (filter key that count isn't 1)")
campaign_withUniqueIdenti = (
    campaigns
    .withColumn("campaign_id", F.concat(F.col("campaign_type"),F.col("id")))
)
(
    campaign_withUniqueIdenti
    .groupBy("campaign_id")
    .count()
    .filter(F.col("count") != 1)
).show()

Using id as a unique identifier: (filter key that count isn't 1)
+---+-----+
| id|count|
+---+-----+
|368|    2|
|372|    2|
| 64|    2|
|366|    2|
|374|    2|
|381|    2|
|373|    2|
+---+-----+

Using campaign_type+id as unique identifier: (filter key that count isn't 1)
+-----------+-----+
|campaign_id|count|
+-----------+-----+
+-----------+-----+



To show that indeed `campaign_type` and `message_type` columns contains the same set of values

In [6]:
print("From campaigns.csv")
(
    campaigns
    .select("campaign_type")
    .distinct()
).show()
print("From message_demo.csv")
(
    message_demo
    .select("message_type")
    .distinct()
).show()

From campaigns.csv
+-------------+
|campaign_type|
+-------------+
|         bulk|
|      trigger|
|transactional|
+-------------+

From message_demo.csv




+-------------+
| message_type|
+-------------+
|         bulk|
|      trigger|
|transactional|
+-------------+



                                                                                

Perform similar procedure to create a new unique identifier for the campaign that was sent to the users by combining `message_type` and `campaign_id` columns

In [7]:
message_demo_withUniqueIdenti = (
    message_demo
    .withColumn("campaign_id", F.concat(F.col("message_type"),F.col("campaign_id")))
)
print("Snippet of the number of users that receive notification for each campaigns")
(
    message_demo_withUniqueIdenti
    .groupBy("campaign_id")
    .count()
    .orderBy(F.desc(F.col("count")))
).show()

Snippet of the number of users that receive notification for each campaigns




+----------------+-------+
|     campaign_id|  count|
+----------------+-------+
|         bulk111|1045187|
|          bulk64|1037324|
|          bulk79| 921673|
|         bulk150| 781357|
|         bulk230| 651859|
|    trigger18212| 557548|
|    trigger18012| 472521|
|    trigger18011| 391891|
|    trigger18016| 302380|
|         bulk143| 283063|
|         bulk152| 274633|
|    trigger18010| 241400|
|         bulk296| 231972|
| transactional27| 188578|
|         bulk136| 177363|
|transactional221| 168998|
| transactional32| 168455|
|         bulk366| 165878|
|         bulk171| 154495|
|         bulk257| 147859|
+----------------+-------+
only showing top 20 rows



                                                                                

# Analysis

In this section, I will start to come out with a question and try to answer them using PySpark

### Question: How successful was each campaigns
I will investigate the the `is_purchased` flag (which indicates whether the customer made a purchase within 24 hours after openining the notification campaign

In [8]:
campaignData = (
    campaign_withUniqueIdenti
    .select("campaign_id","channel","topic","started_at","finished_at","total_count")
)
(
    campaignData
    .orderBy(F.desc(F.col("total_count")))
).show(truncate = False)

+-----------+-------+--------+--------------------------+-------------------+-----------+
|campaign_id|channel|topic   |started_at                |finished_at        |total_count|
+-----------+-------+--------+--------------------------+-------------------+-----------+
|bulk4918   |email  |sale out|2022-06-15 06:30:07.193285|2022-06-15 09:38:54|5371769    |
|bulk1961   |email  |sale out|2021-11-23 07:22:46.753197|null               |5366976    |
|bulk2003   |email  |sale out|2021-11-24 07:03:20.136238|2021-11-24 14:31:56|5327892    |
|bulk9110   |email  |sale out|2022-12-30 07:00:11.67307 |2022-12-30 10:54:27|5251007    |
|bulk2499   |email  |sale out|2022-01-03 07:00:11.774563|2022-01-03 11:19:05|5186414    |
|bulk4679   |email  |sale out|2022-06-01 08:22:32.520607|2022-06-01 15:50:43|5110574    |
|bulk4744   |email  |sale out|2022-06-04 11:18:39.563776|2022-06-04 14:00:21|5036440    |
|bulk8252   |email  |sale out|2022-11-30 06:51:11.67617 |2022-11-30 11:04:51|4957576    |
|bulk7879 

In [9]:
message_demoData = (
    message_demo_withUniqueIdenti
    .select(
        "campaign_id",
        "client_id",
        "sent_at",
        "opened_first_time_at",
        "opened_last_time_at",
        "is_clicked",
        "clicked_first_time_at",
        "clicked_last_time_at",
        "is_purchased",
        "purchased_at"
    )
)
(
    message_demoData
).show(truncate = False)

+----------------+-------------------+-------------------+--------------------+-------------------+----------+---------------------+--------------------+------------+-------------------+
|campaign_id     |client_id          |sent_at            |opened_first_time_at|opened_last_time_at|is_clicked|clicked_first_time_at|clicked_last_time_at|is_purchased|purchased_at       |
+----------------+-------------------+-------------------+--------------------+-------------------+----------+---------------------+--------------------+------------+-------------------+
|transactional31 |1515915625489833514|2021-04-30 11:27:43|2021-05-04 05:47:05 |2021-05-04 05:47:05|t         |2021-05-04 05:47:46  |2021-05-04 05:47:46 |t           |2021-05-06 16:40:38|
|transactional32 |1515915625489220445|2021-04-30 08:00:35|2021-05-04 15:38:20 |2021-05-04 15:38:20|f         |null                 |null                |f           |null               |
|transactional32 |1515915625489854185|2021-04-30 05:56:37|2021-05

### Result

In [10]:
notificationEvaluation = (
    message_demoData
    .withColumn("validate_clicked", F.when((F.col("is_clicked") == "t") & (F.col("clicked_first_time_at").isNotNull()), 1)
                                    .otherwise(0))
    .withColumn("validate_purchase", F.when((F.col("is_purchased") == "t") & (F.col("purchased_at").isNotNull()), 1)
                                      .otherwise(0))
    .groupBy(F.col("campaign_id"))
    .agg(
        # Total Notification Click
        F.count(F.col("campaign_id")).alias("total_notification_send"),
        # Total number of the notification send
        F.sum(F.col("validate_clicked")).alias("total_clicked"),
        # Percentage Clicked
        F.round((F.col("total_clicked") * 100 / F.col("total_notification_send")),2).alias("clicked_percentage"),
        # Total number of purchase
        F.sum(F.col("validate_purchase")).alias("total_purchased"),
        # Total Revenue (assume that average price per item is 60)
        F.round(F.col("total_purchased") * 60.0).alias("total_revenue"),
        # Conversion rate
        F.round(F.col("total_purchased") * 100 / F.col("total_clicked"), 2).alias("conversion_rate")
    )
)
(
    notificationEvaluation
    .orderBy(F.desc(F.col("conversion_rate")))
).show(truncate = False)



+----------------+-----------------------+-------------+------------------+---------------+-------------+---------------+
|campaign_id     |total_notification_send|total_clicked|clicked_percentage|total_purchased|total_revenue|conversion_rate|
+----------------+-----------------------+-------------+------------------+---------------+-------------+---------------+
|transactional29 |142985                 |5764         |4.03              |1536           |92160.0      |26.65          |
|transactional33 |4376                   |195          |4.46              |30             |1800.0       |15.38          |
|transactional27 |188578                 |11226        |5.95              |1384           |83040.0      |12.33          |
|bulk361         |59475                  |3992         |6.71              |480            |28800.0      |12.02          |
|trigger18010    |241400                 |8447         |3.5               |1012           |60720.0      |11.98          |
|transactional55 |6458  

                                                                                

# Continue ... 

# Thank you for checking out my notebook

This is a work in progress; hence, I will update the notebook from time to time