#Data Processing

##1. Configuration

In Community or Free edition you only have access to serverless compute. In this serverless compute, access to legacy directory such as Filestore is not allowed. 

In the new Free Edition, access to the legacy DBFS root is restricted or disabled to move to secure storage pattern UC Volumes. 

To access the ifood json files, we need to create a managed volume in Unity Catalog.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("ifood-case").getOrCreate()

In [0]:
catalog = 'workspace'
schema = 'default'
volume_name = 'ifood'
user_id = spark.sql('select current_user() as user').collect()[0]['user']

#Create volume
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog}.{schema}.{volume_name}")

In [0]:
from glob import glob

#getting the paths of the json files
raw_files_path = f'/Workspace/Users/{user_id}/ifood-case/data/raw/'
json_files = glob(raw_files_path + '*.json')

In [0]:
import os

catalog_volume_path = f"/Volumes/{catalog}/{schema}/{volume_name}/"

for json_file in json_files:
    source_path = "dbfs:" + json_file 
    destination_path = catalog_volume_path + os.path.basename(json_file)

    # Copy the file from DBFS to the Volume
    try:
        dbutils.fs.cp(source_path, destination_path)
        print(f"Successfully copied file from {source_path} to {destination_path}")
    except Exception as e:
        print(f"An error occurred: {e}")

##2. Data Loading

Load the raw data from the volume directory (Unity Catalog).

In [0]:
offers_path = f"{catalog_volume_path}/offers.json"
profile_path = f"{catalog_volume_path}/profile.json"
transactions_path = f"{catalog_volume_path}/transactions.json"

offers_df = spark.read.option("multiLine", True).json(offers_path)
profiles_df = spark.read.option("multiLine", True).json(profile_path)
transactions_df = spark.read.option("multiLine", True).json(transactions_path)

##3. General View

* Data Count
* Column type
* Data display
* Data general info
* Metrics
* Null values by column
* Unique values per table/column

In [0]:
print("\n=== Data count ===")
print(f"Profiles: {profiles_df.count():,}")
print(f"Offers: {offers_df.count():,}")
print(f"Transactions: {transactions_df.count():,}")

In [0]:
print("\n=== Dtypes ===")
print(f"Profiles: {profiles_df.dtypes}")
print(f"Offers: {offers_df.dtypes}")
print(f"Transactions: {transactions_df.dtypes}")

In [0]:
from pyspark.sql import functions as F

def null_count(df):
    null_counts_expr = [F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns]

    # Select and display the counts
    df.select(*null_counts_expr).show()

In [0]:
def showUniqueValues(df):
    for col_name in df.columns:
        print(f"Unique values for column '{col_name}':")
        df.select(col_name).distinct().display()

In [0]:
from pyspark.sql import functions as f
from pyspark.sql.types import IntegerType
from pyspark.sql import Window

def find_outliers(df, numeric_columns):

    # Identifying the numerical columns in a spark dataframe
    #numeric_columns = [column[0] for column in df.dtypes if column[1]=='int']

    for column in numeric_columns:
        
        less_Q1 = 'less_Q1_{}'.format(column)
        more_Q3 = 'more_Q3_{}'.format(column)
        Q1 = 'Q1_{}'.format(column)
        Q3 = 'Q3_{}'.format(column)
        
        # Q1 : First Quartile ., Q3 : Third Quartile
        Q1 = df.approxQuantile(column,[0.25],relativeError=0)
        Q3 = df.approxQuantile(column,[0.75],relativeError=0)
        
        # IQR : Inter Quantile Range
        # We need to define the index [0], as Q1 & Q3 are a set of lists., to perform a mathematical operation
        # Q1 & Q3 are defined seperately so as to have a clear indication on First Quantile & 3rd Quantile
        IQR = Q3[0] - Q1[0]
        
        #selecting the data, with -1.5*IQR to + 1.5*IQR., where param = 1.5 default value
        less_Q1 =  Q1[0] - 1.5*IQR
        more_Q3 =  Q3[0] + 1.5*IQR
        
        isOutlierCol = 'is_outlier_{}'.format(column)
        
        df = df.withColumn(isOutlierCol, f.when((df[column] > more_Q3) | (df[column] < less_Q1), 1).otherwise(0))
    
    selected_columns = [column for column in df.columns if column.startswith("is_outlier")]

    window_spec = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    for column in selected_columns:
        df = df.withColumn('total_outliers', F.sum(column).over(window_spec))
    df = df.drop(*[column for column in df.columns if column.startswith("is_outlier")])

    return df
    

###3.1 Profiles

First insights for profiles data.

In [0]:

profiles_df.display()

In [0]:
profiles_df.printSchema()

In [0]:
profiles_df.summary().display()

In [0]:
profiles_df.groupBy("gender").count().orderBy("count", ascending=False).show()

In [0]:
null_count(profiles_df)

In [0]:
showUniqueValues(profiles_df)

In [0]:
from pyspark.sql.functions import col


result_df = profiles_df.filter(col("credit_card_limit").isNull()) \
              .select("age") \
              .distinct()

result_df.show()

In [0]:
from pyspark.sql.functions import col


result_df = profiles_df.filter(col("credit_card_limit").isNull()) \
              .select("gender") \
              .distinct()

result_df.show()

In [0]:
# Drop null values from limit and gender 
profiles_df = profiles_df.dropna(subset=['gender', 'credit_card_limit'])
print("shape: ", (profiles_df.count(), len(profiles_df.columns)))

In [0]:
profiles_df.groupBy("gender").count().orderBy("count", ascending=False).show()

In [0]:
from pyspark.sql.functions import countDistinct

distinct_ids = profiles_df.select(countDistinct("id"))
distinct_ids.show()

**Initial Comments (Profiles Data):**

Distribution by gender:
-  8484 profiles with gender 'M'
- 6129 with gender 'F'
- 212 with gender 'O' (other)
-  2175 wuth null values for the gender information.

Profiles with null credit_card_limit also have null gender information and age 118 (count = 2175) => indicate default or invalid data and were removed to clean the dataset.

After removal, the dataset remained with 14825 profiles, all with distinct ids:

- **8484** profiles with gender 'M'
- 6129 with gender 'F'
- 212 with gender 'O' (other)





###3.2 Offers

First insights for offers data.

In [0]:
offers_df.display()

In [0]:
offers_df.summary().display()

In [0]:
null_count(offers_df)

In [0]:
showUniqueValues(offers_df)

In [0]:
offers_df.groupBy("offer_type").count().orderBy("count", ascending=False).show()

**Initial Comments (Offers Data):**

Offer type distribution:
- 4 BOGO
- 4 discount
- 3 informational

Null values = 0
Each offer has at least 2 distribution channels.



###3.3 Transactions

In [0]:
transactions_df.display()

In [0]:
transactions_df.summary().display()

In [0]:
null_count(transactions_df)

In [0]:
showUniqueValues(transactions_df)

In [0]:
transactions_df.groupBy("event").count().orderBy("count", ascending=False).show()

**Initial Comments (Transactions Data):**

Event Distribution:
- transaction = 138953
- offer received =  76277
- offer viewed =  57725
- offer completed = 33579

## 4. Preprocessing

###4.1 Categorical mapping


Since most machine learning algorithms require numerical input to make predictions, encoding methods simplify categorical variables, enabling algorithms to identify patterns and relationships in the data. 

One Hot Encoding was chosen since data consists of nominal categories and the number of unique categories is small. Also, One Hot Encoding is gonna be able to handle new data (not used in training).

In [0]:
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import col, lit, when

def generateOneHot(df, cat_col, alias):

    col_type = df.schema[cat_col].dataType

    if isinstance(col_type, ArrayType):
        df_exploded = df.select("id", F.explode(cat_col).alias(alias))
        df_cats = (
            df_exploded.groupBy("id")
            .pivot(alias)
            .agg(F.lit(1))
            .na.fill(0)
        )

        df_final = df.join(df_cats, on="id", how="left_outer")
        return df_final
    elif isinstance(col_type, StringType):
        unique_values = [row[0] for row in df.select(cat_col).distinct().collect()]

        for value in unique_values:
            col_name = f"{cat_col}_{value}"
            df = df.withColumn(col_name, when(col(cat_col) == value, lit(1)).otherwise(lit(0)))
    else:
        print(f"Column '{cat_col}' is neither ArrayType nor StringType, it is {type(col_type)}.")
   
    return df

In [0]:
offers_df = generateOneHot(offers_df, "channels", "channel")
#offers_df = offers_df.drop("channels")
offers_df.show()

In [0]:
offers_df = generateOneHot(offers_df, "offer_type", "type")
offers_df.show()

In [0]:
#offers_df = offers_df.drop("offer_type")

In [0]:
offers_df.show()

In [0]:
profiles_df = generateOneHot(profiles_df, "gender", "type")

In [0]:
#profiles_df = profiles_df.drop("gender")

In [0]:
profiles_df.show()

In [0]:
transactions_df = generateOneHot(transactions_df, "event", "type")

In [0]:
transactions_df = transactions_df.withColumnRenamed("event_offer received", "event_offer_received").withColumnRenamed("event_offer received", "event_offer_received").withColumnRenamed("event_offer viewed", "event_offer_viewed").withColumnRenamed("event_offer completed", "event_offer_completed") 

###4.2 Data Transformation

Create Age groups

In [0]:
profiles_df = profiles_df.withColumn(
  "age_group",
  when(col("age") < 25, "<25")
  .when((col("age") >= 25) & (col("age") < 35), "25-34")
        .when((col("age") >= 35) & (col("age") < 50), "35-49")
        .when(col("age") >= 50, ">50")
        .otherwise("unknown")
    )

In [0]:
profiles_df.show()

Format value column (transactions data) to have columns 'transaction_amount', 'offer_id', 'reward', 'time_since_test_start'__

In [0]:
from pyspark.sql.functions import col, lit, when

transactions_df = (
    transactions_df
    .withColumn("transaction_amount", col("value.amount"))
    .withColumn("offer id", col("value.`offer id`"))
    .withColumn("offer_id", col("value.offer_id"))
    .withColumn("reward", col("value.reward"))
    .drop("value")
)

transactions_df.show(10)

There are different offers ids for the same transaction ('offer_id' an 'offer id'), so we need to unify the offer id column:

In [0]:
from pyspark.sql.functions import col


result_df = transactions_df.filter(col("offer_id").isNull() & col("offer id").isNotNull()) \
              .select("event") \
              .distinct()

result_df.show()

In [0]:
result_df = transactions_df.filter(col("offer id").isNull() & col("offer_id").isNotNull()) \
              .select("event") \
              .distinct()

result_df.show()

In [0]:
result_df = transactions_df.filter(col("offer id").isNotNull() & col("offer_id").isNotNull()) \
              .select("event") \
              .distinct()

result_df.show()

In [0]:
result_df = transactions_df.filter(col("offer id").isNull() & col("offer_id").isNull()) \
              .select("event") \
              .distinct()

result_df.show()

**Insights:**
- Transaction Events have null offer ids.
- Offer completed events populate the offer_id column.
- Offer received and offer viewed events populate the offer id column.


In [0]:

transactions_df = transactions_df.withColumn(
    "offer_id_processed",
    F.coalesce(F.col("offer id"), F.col("offer_id"))
).drop("offer id", "offer_id")

In [0]:
transactions_df.show()

In [0]:
offers_df = offers_df.withColumnRenamed("id", "offer_id")

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import to_date, date_format
from pyspark.ml.feature import StringIndexer
from pyspark.sql.window import Window
     

In [0]:
offers_df.show()

In [0]:
profiles_df = profiles_df.withColumnRenamed("id", "profile_id")

In [0]:
profiles_df.show()

In [0]:
from pyspark.sql.functions import date_format, to_date, col

profiles_df = profiles_df.withColumn("date_registered_on", to_date(col("registered_on"), "yyyyMMdd"))

In [0]:
profiles_df.show()

In [0]:
profiles_df = profiles_df.withColumn('year_month', date_format(col('date_registered_on'), 'yyyy-MM'))

In [0]:
profiles_df = profiles_df.drop(col("registered_on"))

In [0]:
profiles_df.show()

##5. EDA

Exploratory Data Analysis

### 5.1 Profiles Analysis

In [0]:
import matplotlib.pyplot as plt

In [0]:

print("=== Ages Summary == ")

profiles_df.select(col("age").cast("double")).describe().show()

profiles_age_pd = (
profiles_df
    .select(col("age").cast("double").alias("age"))
    .toPandas()
)

plt.figure(figsize=(6, 4))
plt.hist(profiles_age_pd["age"].dropna(), bins=20, edgecolor='black')
plt.xlabel("Age")
plt.ylabel("Count Profiles")
plt.title("Age Distribution")
plt.tight_layout()
plt.show()



In [0]:
# Group by age group and count occurrences
age_group_distribution_pyspark = profiles_df.groupBy("age_group").count().orderBy("count")

print("=== Age Group Distribution ===")
age_group_distribution_pyspark.show()

age_distribution_pandas = age_group_distribution_pyspark.toPandas()

import matplotlib.pyplot as plt

plt.figure(figsize=(10, 6))
plt.bar(age_distribution_pandas["age_group"], age_distribution_pandas["count"], color='skyblue')
plt.xlabel("Age Group")
plt.ylabel("Count")
plt.title("Age Group Distribution")
plt.show()


In [0]:
credit_card_limit_pd = (
    profiles_df
    .select("credit_card_limit")
    .toPandas()["credit_card_limit"]
)

plt.figure(figsize=(10, 6))
plt.hist(credit_card_limit_pd, bins=30, color='skyblue', edgecolor='black')
plt.xlabel('Credit Card Limit')
plt.ylabel('Count Profiles')
plt.title('Credit Card Limit Distribution')
plt.tight_layout()
plt.show()

In [0]:
credit_by_age_df = profiles_df.select("age", "credit_card_limit", "age_group").toPandas()

count_per_age = (
    credit_by_age_df.groupby("age_group")["credit_card_limit"]
    .count().sort_values(ascending=True)
    .reset_index()
)
count_per_age.plot.bar(x="age_group", y="credit_card_limit", rot=0, color="orange")
plt.xlabel('Age Group')
plt.ylabel('Credit Card Limit')
plt.title('Credit Card Limit by Age Group')

In [0]:
from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import when, col

# Define the split points for 10-year bins (0-10, 10-20, ..., 90-100+)
splits = [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, float('inf')]

# Create the Bucketizer instance
bucketizer = Bucketizer(splits=splits, inputCol="age", outputCol="bucket_id")

# Transform the DataFrame
df_buck = bucketizer.transform(profiles_df)

profiles_df = df_buck.withColumn(
    "age_bin",
    when(col("bucket_id") == 0.0, "0-9")
    .when(col("bucket_id") == 1.0, "10-19")
    .when(col("bucket_id") == 2.0, "20-29")
    .when(col("bucket_id") == 3.0, "30-39")
    .when(col("bucket_id") == 4.0, "40-49")
    .when(col("bucket_id") == 5.0, "50-59")
    .when(col("bucket_id") == 6.0, "60-69")
    .when(col("bucket_id") == 7.0, "70-79")
    .when(col("bucket_id") == 8.0, "80-89")
    .otherwise("90+") # Catches the float('inf') range (90+)
)

# Show the results
profiles_df.show()


In [0]:
credit_by_age_bin_df = profiles_df.select("age", "credit_card_limit", "age_bin").toPandas()

count_per_age_bin = (
    credit_by_age_bin_df.groupby("age_bin")["credit_card_limit"]
    .count()
    .reset_index()
)

count_per_age_bin.plot.bar(x="age_bin", y="credit_card_limit", rot=0, color="orange")
plt.xlabel('Age Bin')
plt.ylabel('Credit Card Limit')
plt.title('Credit Card Limit by Age Bin')

In [0]:
credit_by_gender_df = profiles_df.select("gender", "credit_card_limit").toPandas()

count_per_gender = (
    credit_by_gender_df.groupby("gender")["credit_card_limit"]
    .count().sort_values(ascending=True)
    .reset_index()
)

count_per_gender.plot.bar(x="gender", y="credit_card_limit", rot=0, color="orange")
plt.xlabel('Gender')
plt.ylabel('Credit Card Limit')
plt.title('Credit Card Limit by Gender')

In [0]:

    for gender in sorted(credit_by_gender_df["gender"].unique()):
        subset = credit_by_gender_df[credit_by_gender_df["gender"] == gender]
        plt.hist(
            subset["credit_card_limit"],
            bins=10,
            alpha=0.6,
            label=str(gender),
            edgecolor='black',
            linewidth=0.5
        )
    plt.xlabel("Credit Card Limit", fontsize=12)
    plt.ylabel("Profiles", fontsize=12)
    plt.title("Distribution of Credit Card Limit by Gender", fontsize=14, fontweight='bold')
    plt.legend(title="Gender", fontsize=10)
    plt.show()

In [0]:
import pandas as pd 
profiles_pd = profiles_df.toPandas()
profiles_pd.groupby(pd.to_datetime(profiles_pd['date_registered_on']).dt.to_period('M'))['profile_id'].count().plot.line()
plt.show()

In [0]:

correlation = profiles_pd[["age", "credit_card_limit"]].corr()
print(correlation)
     

***Small correlation between age and credit_card_limit***

In [0]:
profiles_df_numeric = profiles_df.withColumn("gender_numeric", 
    when(col("gender") == "F", 1)
    .when(col("gender") == "M", 2)
    .when(col("gender") == "O", 3)
    .otherwise(0) # Handle potential nulls or unmapped values
)
profiles_df_numeric.show()

In [0]:
profiles_pd = profiles_df_numeric.toPandas()
correlation = profiles_pd[["gender_numeric", "credit_card_limit"]].corr()
print(correlation)

***No correlation between gender and credit_card_limit***

In [0]:
profiles_df = profiles_df.withColumn(
    "days_since_start",
    F.datediff(F.current_date(), F.col("date_registered_on"))
)

In [0]:
profiles_df.show()

In [0]:
profiles_pd = profiles_df.toPandas()
correlation = profiles_pd[["days_since_start", "credit_card_limit"]].corr()
print(correlation)

***Small correlationdays since registration and credit_card_limit***

***Insights:*** 
* Correlation between age and credit_card_limit:
  Limit increases with age, with age bin "50-59" presenting the highest limits (it decreases for higer ages).
* No correlation between gender and credit_card_limit: 
  Males have bigger limits, but are a lot more representative in the database.
* Correlation between days since registration and credit_card_limit: 
  Indicates that more financial historic data reflects in higher credit limits.
* Years  2017 e 2018 presented a expressive number profile registration.


In [0]:
numeric_columns = ['age',
 'credit_card_limit']

In [0]:
profiles_df.display()

In [0]:


new_profiles_df = find_outliers(profiles_df, numeric_columns)
new_profiles_df.show()


In [0]:
new_profiles_df_with_no_outliers = new_profiles_df.filter(new_profiles_df['total_Outliers']<=1)
new_profiles_df_with_no_outliers = new_profiles_df_with_no_outliers.select(*profiles_df.columns)

new_profiles_df_with_no_outliers.show()


In [0]:
data_with_outliers = new_profiles_df.filter(new_profiles_df['total_Outliers']>=2)
data_with_outliers.show()

No outliers detected. suspected data (credit_card_limit and age were previously excluded from the data).

Save processed data...

In [0]:
#processed_files_path = f'/Workspace/Users/{user_id}/ifood-case/data/processed/'

In [0]:
catalog_volume_path = f"/Volumes/{catalog}/{schema}/{volume_name}/data/processed"

In [0]:
profiles_df.dtypes


In [0]:
from pyspark.sql.types import IntegerType, DoubleType

profiles_df = profiles_df.withColumn("age", col("age").cast(IntegerType()))

In [0]:
profiles_processed_path = f"{catalog_volume_path}/profiles_processed.parquet"

profiles_df.write.mode("overwrite").parquet(profiles_processed_path)

print(f"profiles_processed.parqued' saved in: {profiles_processed_path}")

### 5.2 Offers Analysis


In [0]:
offers_df = offers_df.withColumn("channels_count", F.size(F.col("channels")))

In [0]:
offers_df.display()

Summary anda distributions already observerd in the Initial View Step

In [0]:
offers_df.dtypes

In [0]:
from pyspark.sql.types import IntegerType, DoubleType

offers_df = offers_df.withColumn("discount_value", col("discount_value").cast(DoubleType()))
offers_df = offers_df.withColumn("min_value", col("min_value").cast(DoubleType()))

In [0]:
offers_df.dtypes

In [0]:
offers_processed_path = f"{catalog_volume_path}/offers_processed.parquet"

offers_df.write.mode("overwrite").parquet(offers_processed_path)

print(f"offers_processed.parqued' saved in: {offers_processed_path}")

###5.3 Transactions Analysis


In [0]:
transactions_df.display()

In [0]:
from pyspark.sql.functions import (
    coalesce, col, lit, count as spark_count, sum as spark_sum, 
    concat, substring, to_date, countDistinct, date_format, max as spark_max
)

In [0]:
max_since_test_start = transactions_df.agg(spark_max(col("time_since_test_start"))).collect()[0][0]
print("Max time_since_test_start:", max_since_test_start)

In [0]:
# Remove negative values
transactions_df = transactions_df.withColumn(
    "transaction_amount",
    when(col("transaction_amount") < 0, None).otherwise(col("transaction_amount"))
)

In [0]:
transactions_df = transactions_df.withColumn(
    "is_success",
    when(col("event") == "transaction", lit(1)).otherwise(lit(0))
)


In [0]:
transactions_processed_path = f"{catalog_volume_path}/offers_processed.parquet"

transactions_df.write.mode("overwrite").parquet(transactions_processed_path)

print(f"transactions_processed.parqued' saved in: {transactions_processed_path}")

In [0]:
from pyspark.sql.functions import col, when, sum as spark_sum, avg

transactions_metrics = (
    transactions_df.groupBy("account_id")
    .agg(
        spark_sum(when(col("event")=="transaction",1).otherwise(0)).alias("count_profile_transactions"),
        spark_sum(when(col("event")=="transaction", col("transaction_amount")).otherwise(0)).alias("profile_amount"),
        avg(when(col("event")=="transaction", col("transaction_amount"))).alias("profile_avg_amount"),
        spark_sum(when(col("event_offer_received")==1,1).otherwise(0)).alias("count_received_profile"),
        spark_sum(when(col("event_offer_viewed")==1,1).otherwise(0)).alias("count_viewed_profile"),
        spark_sum(when(col("event_offer_completed")==1,1).otherwise(0)).alias("count_completed_profile")
    )
    .withColumn(
        "profile_conversion_rate",
        when(col("count_received_profile") > 0, col("count_completed_profile") / col("count_received_profile")).otherwise(lit(0.0))
    )
    .withColumn(
        "profile_view_rate",
        when(col("count_received_profile") > 0, col("count_viewed_profile") / col("count_received_profile")).otherwise(lit(0.0))
    )
)
transactions_metrics.display()

In [0]:
transactions_metrics_path = f"{catalog_volume_path}/transactions_metrics.parquet"

transactions_metrics.write.mode("overwrite").parquet(transactions_metrics_path)

print(f"transactions_metrics.parqued' saved in: {transactions_metrics_path}")

###6. Data Aggregation

Generate final dataset


* Events


In [0]:
from pyspark.sql.functions import (
    col, min, when, count, sum, avg, lit, array_contains, lag,
    countDistinct
)

#Pair Profile/Offer
events_agg = (
    transactions_df
    .filter(col("offer_id_processed").isNotNull())
    .groupBy("account_id", "offer_id_processed")
    .agg(
        min(when(col("event_offer_received") == 1, col("time_since_test_start"))).alias("received"),
        min(when(col("event_offer_viewed")==1, col("time_since_test_start"))).alias("viewed"),
        min(when(col("event_offer_completed")==1, col("time_since_test_start"))).alias("completed"),
        count(when(col("event_offer_received")==1, True)).alias("count_received"),
        count(when(col("event_offer_viewed")==1, True)).alias("count_viewed"),
        count(when(col("event_offer_completed")==1, True)).alias("count_completed")
    )
)


# Get only clients that received offers
events = events_agg.filter(col("count_received") > 0)

events.display()

print(f"{events.count()} profiles received offers")

Events + Offers

In [0]:
offers_features = (
    offers_df
    .withColumn("above_average_discount", (col("discount_value") >= 5).cast("int"))
)

events = events.withColumnRenamed("offer_id_processed", "offer_id")

events_offers = (
    events
    .join(offers_features, on="offer_id", how="left")
    .withColumn("valid_until", col("received") + col("duration"))
)

events_offers.display()

Transactions:
- Get cliente transactions associated with a offer.
- Target feature (model).
- Aggregate Profile Data


In [0]:
from pyspark.sql import functions as F

transactions = (
    transactions_df
    .filter(col("event") == "transaction")
    .select(
        col("account_id").alias("account_id"),
        col("time_since_test_start").alias("started_time"),
        col("transaction_amount")
    )
)

#select where if transaction occurred after offer was received and before offer expired. Associates person with a successiful offer.
transactions_events_offers = (
    (events_offers.account_id == transactions.account_id) &
    (transactions.started_time >= events_offers.received) &
    (transactions.started_time <= events_offers.valid_until)
)
events_transactions = events_offers.join(transactions, transactions_events_offers, how="left")

events_transactions_ = events_transactions.drop(transactions["account_id"]) 

amount_agg = events_transactions_.groupBy("account_id", "offer_id").agg(F.sum("transaction_amount").alias("amount_during_offer"))

events_amount = (
    events_offers.join(amount_agg, on=["account_id", "offer_id"], how="left")
    .fillna({"amount_during_offer": 0.0})
)


# Converged true if completed within validity
target_feat = events_amount.withColumn(
    "converged",
    when(
        (col("completed").isNotNull()) & (col("completed") <= col("valid_until")),
        lit(1)
    ).otherwise(lit(0))
)

target_feat.groupBy("converged").count().show()

customer_behavior = (
    transactions_df
    .groupBy("account_id")
    .agg(
        count("*").alias("events_by_customer"),
        count(when(col("event_transaction")==1, True)).alias("count_transactions"),
        sum(when(col("event_transaction")==1, col("transaction_amount")).otherwise(0.0)).alias("amount_by_customer"),
        avg(when(col("event_transaction")==1, col("transaction_amount")).otherwise(None)).alias("avg_amount_by_customer"),
        count(when(col("event_offer_received")==1, True)).alias("count_received_customer"),
        count(when(col("event_offer_viewed")==1, True)).alias("count_viewed_customer"),
        count(when(col("event_offer_completed")==1, True)).alias("count_completed_customer")
    )
    .withColumn(
        "conversion_rate",
        when(col("count_received_customer") > 0,
             col("count_completed_customer") / col("count_received_customer")).otherwise(lit(0.0))
    )
    .withColumn(
        "view_rate",
        when(col("count_received_customer") > 0,
             col("count_viewed_customer") / col("count_received_customer")).otherwise(lit(0.0))
    )
)

offer_types = (
    target_feat
    .groupBy("account_id")
    .agg(
        countDistinct("offer_type").alias("customer_offer_types"),
        sum("converged").alias("customer_completed_offers")
    )
)

profiles_model = profiles_df.withColumnRenamed("profile_id", "account_id")
customers_features = (
    profiles_model
    .join(customer_behavior, on="account_id", how="left")
    .join(offer_types, on="account_id", how="left")
    .fillna({
        "events_by_customer": 0,
        "count_transactions": 0,
        "amount_by_customer": 0.0,
        "avg_amount_by_customer": 0.0,
        "count_received_customer": 0,
        "count_viewed_customer": 0,
        "count_completed_customer": 0,
        "conversion_rate": 0.0,
        "view_rate": 0.0,
        "customer_offer_types": 0,
        "customer_completed_offers": 0
    })
)

Current success rate = 0,439%

In [0]:

target_feat.display()

Target Feature

In [0]:
dataset = target_feat.join(
    customers_features,
    on="account_id",
    how="left"
)

print("Count Rowns:", dataset.count())

In [0]:
final_path = f"{catalog_volume_path}/final_dataset.parquet"

dataset.write.mode("overwrite").parquet(final_path)

print(f"final_dataset.parque' saved in: {final_path}")
