In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F


>>> **Please see the dataset creation logic in the later columns**

**General Plan:**
- Understand the problem and the data tables
- Observe the data
- Join the tree tables based on account_id and offer_id
- Create the labels for the first classifier which defines if the user had transaction after receiving an offer or not
- Do exploratory analysis based on profil info, credirt limit, gender, min value of the offer and so on
- Create a dataset for those transactions without offer or after a due time
- Train both models and test them with test sets
- Create the final uplift model
- Mention the problems with dataset like lack of having a date which might result in some kind of data leak


In this notebook I take a look at the data and based on their characteristics create two datasets for uplift modeling

## Data loading and initial data preperations

In [0]:
spark = SparkSession.builder.appName("marketing-offers").getOrCreate()

In [0]:
## loading the tables
df_transactions = spark.read.json("/Volumes/workspace/default/data/transactions.json")
df_profiles = spark.read.json("/Volumes/workspace/default/data/profile.json")
df_offers = spark.read.json("/Volumes/workspace/default/data/offers.json")

In [0]:
df_transactions.show()

In [0]:
# Here I am standardizing the transaction schema:
# - Keeping core fields (account_id, event, time_since_test_start).
# - Unifying inconsistent column naming for offer_id by using coalesce:
#   if "value.offer_id" is null, fall back to "value.`offer id`".
# - Extracting and rename amount and reward for clarity.

df_transactions = df_transactions.select(
    "account_id",
    "event",
    "time_since_test_start",
    F.coalesce(F.col("value.offer_id"), F.col("value.`offer id`")).alias("offer_id"),
    F.col("value.amount").alias("amount"),
    F.col("value.reward").alias("reward")
)

df_transactions.show(5, truncate=False)

In [0]:
# Filtering those rows with amount not being null
df_offers_received = (
    df_transactions
    .filter(
        F.col("amount").isNotNull()
    )
)

df_offers_received.show(10, truncate=False)

In [0]:
# This renaming is important for the next joins
df_profiles = df_profiles.withColumnRenamed("id", "account_id")
df_profiles.show()

In [0]:
df_offers.show()

## Joining the tables

### transactions + **profile**

In [0]:
profile_transactions = df_transactions.join(df_profiles, on= 'account_id', how = "inner")
profile_transactions.show()

### Understanding the data

In [0]:
profile_transactions.filter(
        F.col("offer_id").isNotNull()
).show()

In [0]:
profile_transactions.filter(
        F.col("time_since_test_start")!=0.0
).show()

In [0]:
profile_transactions.filter(
        (F.col("event")=="transaction") & (F.col("amount")!=0.0)
).show()

In [0]:
profile_transactions.filter(
        (F.col("event")!="transaction")
).show()

In [0]:
set(profile_transactions.select("time_since_test_start").collect())

In [0]:
# Here, I am doing a quick observation on data to understand their characteristics and to identify possible outliers
profile_transactions.groupBy("event").agg(
    F.min("time_since_test_start").alias("min_time"),
    F.max("time_since_test_start").alias("max_time"),
    F.avg("time_since_test_start").alias("avg_time")
).show()


In [0]:
profile_transactions.filter(
    (F.col("event")=="transaction") & (F.col("time_since_test_start")>0)
).count()

In [0]:
profile_transactions.filter(
    (F.col("event")=="offer received") & (F.col("time_since_test_start")>0)
).count()


## Joining the table with offers

In [0]:
df_offers = df_offers.withColumnRenamed('id', 'offer_id')
joint_data = profile_transactions.join(df_offers, on= 'offer_id', how = "left")
joint_data.show()

In [0]:
joint_data.filter(F.col("offer_id").isNotNull()).show()

In [0]:
joint_data.groupBy("event").count().show()

## Creating the main dataset with labels

### Business Logic of Labeling Data

This code builds a **labeled dataset** that links each user (profile) with each offer they received and determines whether the offer led to a transaction.  

1. **Join offers and transactions**  
   - Each user (`account_id`) may receive multiple offers and may also have multiple transactions.  
   - The join matches offers with transactions belonging to the same user.  

2. **Identify offer-driven transactions**  
   - A transaction is considered influenced by an offer if it happens *after* the offer start time and *within the offer’s validity window (`duration`)*.  
   - If such a transaction exists, we label it with `led_to_tx = 1`. Otherwise, it gets `0`.  

   Mathematically, for each transaction `t` and offer `o`:  

![Screenshot From 2025-08-21 11-07-59.png](./imgs/Screenshot From 2025-08-21 11-07-59.png "Screenshot From 2025-08-21 11-07-59.png")

3. **Aggregate per offer**  
   - Since an offer may be linked with multiple transactions, we take the **maximum** of `led_to_tx`.  
   - This ensures that if *any* transaction falls inside the offer window, the offer is labeled as successful (`offer_led_to_transaction = 1`).  

4. **Enrich with user and offer details**  
   - Finally, we join with `df_profiles` (user demographics, behavior, etc.) and `df_offers` (offer characteristics such as type, channel, discount).  
   - The resulting dataset has one row per `(account_id, offer_id)`, with features from the user profile and offer plus the target label (`offer_led_to_transaction`).  

---

This creates the **training dataset** for modeling the effectiveness of offers: predicting whether a given offer to a specific user will result in a transaction.


In [0]:
offers = joint_data.filter(F.col("offer_id").isNotNull())
transactions = joint_data.filter((F.col("event") == "transaction"))

In [0]:
transactions.show()

In [0]:
labeled_data = (
    offers.alias("o")
    .join(transactions.alias("t"), on="account_id", how="left")
    .withColumn(
        "led_to_tx",
        F.when(
            (F.col("t.time_since_test_start") > F.col("o.time_since_test_start")) &
            (F.col("t.time_since_test_start") <= F.col("o.time_since_test_start") + F.col("o.duration")),
            1
        ).otherwise(0)
    )
    .groupBy("o.account_id", "o.offer_id")
    .agg(F.max("led_to_tx").alias("offer_led_to_transaction"))
).join(
    df_profiles, on="account_id", how="inner"
).join(
    df_offers, on="offer_id", how="inner"
)

In [0]:
labeled_data.show()

In [0]:
labeled_data.filter(F.col('offer_led_to_transaction')==0).count()

In [0]:
labeled_data_csv_ready = labeled_data.withColumn(
    "channels", F.concat_ws(",", F.col("channels"))
)
labeled_data_csv_ready.write.mode("overwrite").option("header", "true").csv("/Volumes/workspace/default/data/labeled_data_with_offer.csv")

### Coltrol group data
This is a group whose transactions doesnt depend on offers or has no transaction at all.

I will use this to calculate the uplift.
Here I consider that those users who had some transaction within 30 days before offer, as loyal users. I am not considering as potential transactions those users with old outdated transactions.
Low time gap means possible **False Negatives**, and high time gap might lead to **False Positives**


### User Profile Classes Definition

We define two types of users based on their transaction behavior relative to offers:

---

**Case 1: Users who never transacted**

![Screenshot From 2025-08-21 11-12-37.png](./imgs/Screenshot From 2025-08-21 11-12-37.png "Screenshot From 2025-08-21 11-12-37.png")

---

**Case 2: Users who transacted outside the offer window (loyal buyers)**

![Screenshot From 2025-08-21 11-12-48.png](./imgs/Screenshot From 2025-08-21 11-12-48.png "Screenshot From 2025-08-21 11-12-48.png")

---

**Final dataset**

![Screenshot From 2025-08-21 11-13-01.png](./imgs/Screenshot From 2025-08-21 11-13-01.png "Screenshot From 2025-08-21 11-13-01.png")

---

Here:
- `class = 0` → **Never buyers** (no transactions at all).  
- `class = 1` → **Faithful buyers** (those who transacted in the last 30 days before an offer, independent of offers).


In [0]:
never_transacted_users = (
    df_profiles.alias("p")
    .join(transactions.alias("t"), on="account_id", how="left_anti")
    .withColumn("class", F.lit(0))
    .select("p.*", "class")
)

outside_window_users = (
    offers.alias("o")
    .join(transactions.alias("t"), on="account_id", how="inner")
    .filter(
        (F.col("t.time_since_test_start") < F.col("o.time_since_test_start"))
        & 
        (F.col("t.time_since_test_start") >= F.col("o.time_since_test_start") - 30)
    )
    .select("o.account_id")
    .distinct()
    .join(df_profiles, on="account_id", how="inner")
    .withColumn("class", F.lit(1))
)
user_profiles_classes = never_transacted_users.unionByName(outside_window_users)

In [0]:
user_profiles_classes.filter(F.col("class")==0).show()

In [0]:
outside_window_users.count()

In [0]:
user_profiles_classes.write.mode("overwrite").option("header", "true").csv("/Volumes/workspace/default/data/user_profiles_transaction_classes.csv")