<a href="https://colab.research.google.com/github/Jairo-PeC/cafe-rewards-Jairo-PeC/blob/main/Ballastlane_Data_Engineer_exercise_Jairo_Pe%C3%B1a.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Ballastlane - Data Engineer exercise

### Setting up kaggle connection to upload data into RAW folder

In [7]:
#Install kagglehub to download data
!pip install kagglehub



In [8]:
# Upload kaggle.json manually
from google.colab import files
files.upload()


Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"jairopeacruz","key":"7bdd2ddb2e94fe2957c017498abed502"}'}

In [9]:
# Set correct directory permissions
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [10]:
import kagglehub
import shutil
import os

# Define the target directory
save_path = "/content/caf-rewards-offer/raw"

# Download dataset
dataset_path = kagglehub.dataset_download("arshmankhalid/caf-rewards-offer-dataset")

# Move files to directory
os.makedirs(save_path, exist_ok=True)

for file in os.listdir(dataset_path):
    shutil.move(os.path.join(dataset_path, file), save_path)

print("Dataset saved to:", save_path)
print("Files:", os.listdir(save_path))



Dataset saved to: /content/caf-rewards-offer/raw
Files: ['customers.csv', 'data_dictionary.csv', 'events.csv', 'offers.csv']


### Retrieving raw tables to clean and save data into trusted layer as columnar type files (.parquet)

In [11]:
!pip install pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("cafRewardsOffer").getOrCreate()
spark



In [12]:
raw_path = "/content/caf-rewards-offer/raw"
files = os.listdir(raw_path)

# Dictionary to store DataFrames
dfs = {}

# Read all CSV files into DataFrames
for file in files:
    if file.endswith(".csv") and not file.endswith('data_dictionary.csv'):
        file_path = os.path.join(raw_path, file)
        df_name = file.replace(".csv", "")  # Use filename as key (without .csv)

        # Read CSV into a DataFrame
        df = spark.read.option("header", "true").option("inferSchema", "true").csv(file_path)
        dfs[df_name] = df
        df.printSchema()
        print(f"Loaded {file} into DataFrame: {df_name}")
        df.show(5)


root
 |-- customer_id: string (nullable = true)
 |-- became_member_on: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- income: integer (nullable = true)

Loaded customers.csv into DataFrame: customers
+--------------------+----------------+------+---+------+
|         customer_id|became_member_on|gender|age|income|
+--------------------+----------------+------+---+------+
|68be06ca386d4c319...|        20170212|  NULL|118|  NULL|
|0610b486422d4921a...|        20170715|     F| 55|112000|
|38fe809add3b4fcf9...|        20180712|  NULL|118|  NULL|
|78afa995795e4d85b...|        20170509|     F| 75|100000|
|a03223e636434f42a...|        20170804|  NULL|118|  NULL|
+--------------------+----------------+------+---+------+
only showing top 5 rows

root
 |-- customer_id: string (nullable = true)
 |-- event: string (nullable = true)
 |-- value: string (nullable = true)
 |-- time: integer (nullable = true)

Loaded events.csv into DataFrame: e

In [13]:
##Initialize spark and retrieve data from RAW layer
from pyspark.sql.functions import *
from pyspark.sql.types import *
import os

# Paths
raw_path = "/content/caf-rewards-offer/raw"
trusted_path = "/content/caf-rewards-offer/trusted"
os.makedirs(trusted_path, exist_ok=True)

# Load DataFrames
offers_df = spark.read.option("header", "true").option("inferSchema", "true").csv(os.path.join(raw_path, "offers.csv"))
customers_df = spark.read.option("header", "true").option("inferSchema", "true").csv(os.path.join(raw_path, "customers.csv"))
events_df = spark.read.option("header", "true").option("inferSchema", "true").csv(os.path.join(raw_path, "events.csv"))

offers_df.printSchema()
customers_df.printSchema()
events_df.printSchema()

root
 |-- offer_id: string (nullable = true)
 |-- offer_type: string (nullable = true)
 |-- difficulty: integer (nullable = true)
 |-- reward: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- channels: string (nullable = true)

root
 |-- customer_id: string (nullable = true)
 |-- became_member_on: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- income: integer (nullable = true)

root
 |-- customer_id: string (nullable = true)
 |-- event: string (nullable = true)
 |-- value: string (nullable = true)
 |-- time: integer (nullable = true)



In [14]:
###CLEANSING AND TRANSFORMING OFFERS TABLE
# Dropping duplicates
trusted_offers_df = offers_df.dropDuplicates(["offer_id"])

# Ensure difficulty, reward, and duration are non-negative
trusted_offers_df = trusted_offers_df.withColumn("difficulty", when(col("difficulty") < 0, 0).otherwise(col("difficulty")))
trusted_offers_df = trusted_offers_df.withColumn("reward", when(col("reward") < 0, 0).otherwise(col("reward")))
trusted_offers_df = trusted_offers_df.withColumn("duration", when(col("duration") < 0, 0).otherwise(col("duration")))

# Create boolean columns for each channel
trusted_offers_df = trusted_offers_df.withColumn("channels", split(regexp_replace(col("channels"), "[\\[\\]']", ""), ", "))
trusted_offers_df = trusted_offers_df.withColumn("email", array_contains(col("channels"), "email")) \
                     .withColumn("mobile", array_contains(col("channels"), "mobile")) \
                     .withColumn("social", array_contains(col("channels"), "social")) \
                     .withColumn("web", array_contains(col("channels"), "web"))

trusted_offers_df = trusted_offers_df.drop("channels")


trusted_offers_df.show(5)

+--------------------+-------------+----------+------+--------+-----+------+------+----+
|            offer_id|   offer_type|difficulty|reward|duration|email|mobile|social| web|
+--------------------+-------------+----------+------+--------+-----+------+------+----+
|0b1e1539f2cc45b7b...|     discount|        20|     5|      10| true| false| false|true|
|2298d6c36e964ae4a...|     discount|         7|     3|       7| true|  true|  true|true|
|2906b810c7d441179...|     discount|        10|     2|       7| true|  true| false|true|
|3f207df678b143eea...|informational|         0|     0|       4| true|  true| false|true|
|4d5c57ea9a6940dd8...|         bogo|        10|    10|       5| true|  true|  true|true|
+--------------------+-------------+----------+------+--------+-----+------+------+----+
only showing top 5 rows



In [15]:
###CLEANSING AND TRANSFORMING CUSTOMERS TABLE

# Dropping duplicates
trusted_customers_df = customers_df.dropDuplicates(["customer_id"])

# Force column back to INT (if it's in the wrong format)
#customers_df = customers_df.withColumn("became_member_on", col("became_member_on").cast("int"))
# Convert to string, then to proper date (yyyymmdd)
#customers_df = customers_df.withColumn("date",to_date(col("became_member_on").cast("int").cast("string"), "yyyyMMdd"))
trusted_customers_df = trusted_customers_df.withColumn("became_member_on", to_date(col("became_member_on"), "yyyyMMdd"))

# Validate gender values
valid_genders = ["M", "F", "O"]
trusted_customers_df = trusted_customers_df.withColumn("gender", when(col("gender").isin(valid_genders), col("gender")).otherwise(lit(None)))

# Replace age = 118 with NULL because it is meaningless
customers_df = customers_df.withColumn("age", when(col("age") == 118, None).otherwise(col("age")))

# Create a new column "age_group"
trusted_customers_df = trusted_customers_df.withColumn(
    "age_group",
    when(col("age").isNull(), "N/A")
    .when(col("age") < 25, "Under 25")
    .when((col("age") >= 25) & (col("age") <= 40), "25-40")
    .when((col("age") >= 41) & (col("age") <= 60), "41-60")
    .otherwise("61 and above")
)

trusted_customers_df.show(5)

+--------------------+----------------+------+---+------+------------+
|         customer_id|became_member_on|gender|age|income|   age_group|
+--------------------+----------------+------+---+------+------------+
|0009655768c64bdeb...|      2017-04-21|     M| 33| 72000|       25-40|
|00116118485d4dfda...|      2018-04-25|  NULL|118|  NULL|61 and above|
|0011e0d4e6b944f99...|      2018-01-09|     O| 40| 57000|       25-40|
|0020c2b971eb4e918...|      2016-03-04|     F| 59| 90000|       41-60|
|0020ccbbb6d84e358...|      2016-11-11|     F| 24| 60000|    Under 25|
+--------------------+----------------+------+---+------+------------+
only showing top 5 rows



In [16]:
### CLEANSING AND TRANSFORMING EVENTS TABLE
# Dropping duplicates
trusted_events_df = events_df.dropDuplicates()

# Extract values from the 'value' column
trusted_events_df = trusted_events_df.withColumn(
    "offer_id",
    when(col("event").isin(["offer received", "offer viewed", "offer completed"]),
         regexp_extract(col("value"), "'offer[ _]id': '([a-z0-9]+)'", 1))
).withColumn(
    "transaction_amount",
    when(col("event") == "transaction",
         regexp_extract(col("value"), "'amount': ([0-9.]+)", 1).cast(FloatType()))
).select(
    "customer_id",
    "offer_id",
    "event",
    "time",
    "transaction_amount"
)

# Show the final output
trusted_events_df.show(10)


+--------------------+--------------------+--------------+----+------------------+
|         customer_id|            offer_id|         event|time|transaction_amount|
+--------------------+--------------------+--------------+----+------------------+
|f8c45b7197e2414da...|ae264e3637204a6fb...|offer received|   0|              NULL|
|93c230e2ff31416c9...|9b98b8c7a33c4b65b...|offer received|   0|              NULL|
|554e8f98231e467e9...|3f207df678b143eea...|offer received|   0|              NULL|
|3d29f05a4e2140f78...|4d5c57ea9a6940dd8...|offer received|   0|              NULL|
|2e868c31f1bf4085a...|2298d6c36e964ae4a...|offer received|   0|              NULL|
|e5fec103d6ab47399...|f19421c1d4aa40978...|offer received|   0|              NULL|
|cce4f9124fcf46d1b...|ae264e3637204a6fb...|offer received|   0|              NULL|
|1bd0ff7244cb49cc9...|f19421c1d4aa40978...|offer received|   0|              NULL|
|df07a83f4b8343779...|9b98b8c7a33c4b65b...|offer received|   0|              NULL|
|e1a

In [17]:
### SAVE DATA AS PARQUET
trusted_offers_df.write.mode("overwrite").parquet(os.path.join(trusted_path, "trusted_offers.parquet"))
trusted_customers_df.write.mode("overwrite").parquet(os.path.join(trusted_path, "trusted_customers.parquet"))
trusted_events_df.write.mode("overwrite").parquet(os.path.join(trusted_path, "trusted_events.parquet"))

print("Cleaned and standarized data saved in Parquet format at:", trusted_path)

Cleaned and standarized data saved in Parquet format at: /content/caf-rewards-offer/trusted


### Enrichment layer creation alongside dim_time table creation in order to make events_fact from dim_events final table

In [18]:
### Retrieving data from trusted layer
enriched_path = "/content/caf-rewards-offer/enriched"
os.makedirs(enriched_path, exist_ok=True)

trusted_events_df = spark.read.parquet(os.path.join(trusted_path, "trusted_events.parquet"))

trusted_events_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- offer_id: string (nullable = true)
 |-- event: string (nullable = true)
 |-- time: integer (nullable = true)
 |-- transaction_amount: float (nullable = true)



In [19]:
### ENRICHED LAYER TRANSFORMATIONS

# Create dim_time (1-hour granularity)
dim_time_df = spark.range(0, 721, 1).select(
    col("id").alias("time_id"),
    (col("id") % 24).alias("hour"),
    (floor(col("id") / 24) + 1).alias("day"),
    concat_ws(" ", lit("Day"), (floor(col("id") / 24) + 1)).alias("period_day")
)


dim_time_df.show(10)

# Split into offer events and transactions
offer_events_df = trusted_events_df.filter(col("event").isin(["offer received", "offer viewed", "offer completed"]))
transaction_events_df = trusted_events_df.filter(col("event") == "transaction")

# Pivot offer events
offer_pivoted_df = offer_events_df.groupBy("customer_id", "offer_id").agg(
    max(when(col("event") == "offer received", col("time"))).alias("received_time_id"),
    max(when(col("event") == "offer viewed", col("time"))).alias("viewed_time_id"),
    max(when(col("event") == "offer completed", col("time"))).alias("completed_time_id")
)

# Join transactions where time matches completed_time_id
fact_events_with_linked_df = offer_pivoted_df.join(
    transaction_events_df.alias("t"),
    (offer_pivoted_df.customer_id == col("t.customer_id")) &
    (offer_pivoted_df.completed_time_id == col("t.time")),
    "left_outer"
).select(
    offer_pivoted_df.customer_id,
    offer_pivoted_df.offer_id,
    "received_time_id",
    "viewed_time_id",
    "completed_time_id",
    col("t.time").alias("transaction_time_id"),
    col("t.transaction_amount")
)

# Add standalone transactions (no match with completed_time_id)
standalone_transactions_df = transaction_events_df.join(
    offer_pivoted_df.alias("o"),
    (transaction_events_df.customer_id == col("o.customer_id")) &
    (transaction_events_df.time == col("o.completed_time_id")),
    "left_anti"  # Keep transactions not linked to any offer completion
).select(
    transaction_events_df.customer_id,
    lit(None).cast("string").alias("offer_id"),
    lit(None).cast("integer").alias("received_time_id"),
    lit(None).cast("integer").alias("viewed_time_id"),
    lit(None).cast("integer").alias("completed_time_id"),
    transaction_events_df.time.alias("transaction_time_id"),
    transaction_events_df.transaction_amount
)

# Union offer-linked and standalone transactions
fact_events_df = fact_events_with_linked_df.union(standalone_transactions_df)

fact_events_df.show(30)

trusted_events_df.filter(col("event") == "transaction").show(30)

dim_time_df.write.mode("overwrite").parquet(os.path.join(enriched_path, "dim_time.parquet"))
fact_events_df.write.mode("overwrite").parquet(os.path.join(enriched_path, "fact_events.parquet"))

+-------+----+---+----------+
|time_id|hour|day|period_day|
+-------+----+---+----------+
|      0|   0|  1|     Day 1|
|      1|   1|  1|     Day 1|
|      2|   2|  1|     Day 1|
|      3|   3|  1|     Day 1|
|      4|   4|  1|     Day 1|
|      5|   5|  1|     Day 1|
|      6|   6|  1|     Day 1|
|      7|   7|  1|     Day 1|
|      8|   8|  1|     Day 1|
|      9|   9|  1|     Day 1|
+-------+----+---+----------+
only showing top 10 rows

+--------------------+--------------------+----------------+--------------+-----------------+-------------------+------------------+
|         customer_id|            offer_id|received_time_id|viewed_time_id|completed_time_id|transaction_time_id|transaction_amount|
+--------------------+--------------------+----------------+--------------+-----------------+-------------------+------------------+
|0cfb5a7caa0e4bc89...|4d5c57ea9a6940dd8...|             576|           594|              642|                642|             21.01|
|2e0a05df6cda44328...|

In [20]:
###Retrieve final dimensions and facts
trusted_offers_df = spark.read.parquet(os.path.join(trusted_path, "trusted_offers.parquet"))
trusted_customers_df = spark.read.parquet(os.path.join(trusted_path, "trusted_customers.parquet"))
dim_time_df = spark.read.parquet(os.path.join(enriched_path, "dim_time.parquet"))
fact_events_df = spark.read.parquet(os.path.join(enriched_path, "fact_events.parquet"))

# Register as views for SQL
fact_events_df.createOrReplaceTempView("fact_events")
dim_time_df.createOrReplaceTempView("dim_time")
trusted_offers_df.createOrReplaceTempView("dim_offer")
trusted_customers_df.createOrReplaceTempView("dim_customer")

trusted_offers_df.printSchema()
trusted_customers_df.printSchema()
dim_time_df.printSchema()
fact_events_df.printSchema()

root
 |-- offer_id: string (nullable = true)
 |-- offer_type: string (nullable = true)
 |-- difficulty: integer (nullable = true)
 |-- reward: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- email: boolean (nullable = true)
 |-- mobile: boolean (nullable = true)
 |-- social: boolean (nullable = true)
 |-- web: boolean (nullable = true)

root
 |-- customer_id: string (nullable = true)
 |-- became_member_on: date (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- income: integer (nullable = true)
 |-- age_group: string (nullable = true)

root
 |-- time_id: long (nullable = true)
 |-- hour: long (nullable = true)
 |-- day: long (nullable = true)
 |-- period_day: string (nullable = true)

root
 |-- customer_id: string (nullable = true)
 |-- offer_id: string (nullable = true)
 |-- received_time_id: integer (nullable = true)
 |-- viewed_time_id: integer (nullable = true)
 |-- completed_time_id: integer (nullable = true)
 |

In [21]:
# completion_summary: Aggregate completions by offer_id for channel analysis
completion_summary_df = fact_events_df.groupBy("offer_id").agg(
    count(when(col("received_time_id").isNotNull(), 1)).alias("offers_received"),
    count(when(col("completed_time_id").isNotNull(), 1)).alias("offers_completed")
).join(
    trusted_offers_df,
    "offer_id",
    "inner"
).select(
    "offer_id",
    "offers_received",
    "offers_completed",
    "offer_type",
    "email",
    "mobile",
    "social",
    "web"
)

# customer_completion_summary: Customer-level completion status with age
customer_completion_summary_df = fact_events_df.groupBy("customer_id").agg(
    max(when(col("received_time_id").isNotNull(), 1).otherwise(0)).alias("received_any"),
    max(when(col("completed_time_id").isNotNull(), 1).otherwise(0)).alias("completed_any")
).join(
    trusted_customers_df,
    "customer_id",
    "left"
).groupBy("age_group").agg(
    sum(when(col("received_any") == 1, 1)).alias("total_customers"),
    sum(when(col("completed_any") == 1, 1)).alias("completed_customers"),
    sum(when(col("completed_any") == 0, 1)).alias("not_completed_customers")
).filter(col("total_customers").isNotNull())

# time_to_complete: Time difference for completed offers
time_to_complete_df = fact_events_df.filter(
    col("received_time_id").isNotNull() & col("completed_time_id").isNotNull()
).select(
    "customer_id",
    "offer_id",
    (col("completed_time_id") - col("received_time_id")).alias("time_to_complete_hours")
)

completion_summary_df.write.mode("overwrite").parquet(os.path.join(enriched_path, "completion_summary.parquet"))
customer_completion_summary_df.write.mode("overwrite").parquet(os.path.join(enriched_path, "customer_completion_summary.parquet"))
time_to_complete_df.write.mode("overwrite").parquet(os.path.join(enriched_path, "time_to_complete.parquet"))

# Show samples
completion_summary_df.show(10, truncate=False)
customer_completion_summary_df.show(10, truncate=False)
time_to_complete_df.show(10, truncate=False)

+--------------------------------+---------------+----------------+-------------+-----+------+------+-----+
|offer_id                        |offers_received|offers_completed|offer_type   |email|mobile|social|web  |
+--------------------------------+---------------+----------------+-------------+-----+------+------+-----+
|0b1e1539f2cc45b7b9fa7c272da2e1d7|6374           |2978            |discount     |true |false |false |true |
|4d5c57ea9a6940dd891ad53e9dbe8da0|6330           |2885            |bogo         |true |true  |true  |true |
|9b98b8c7a33c4b65b9aebfe6a799e6d9|6355           |3784            |bogo         |true |true  |false |true |
|f19421c1d4aa40978ebb69ca19b0e20d|6262           |3741            |bogo         |true |true  |true  |true |
|fafdcd668e3743c1bb461111dcafc2a4|6332           |4530            |discount     |true |true  |true  |true |
|ae264e3637204a6fb9bb56bc8210ddfd|6374           |3177            |bogo         |true |true  |true  |false|
|5a8bc65990b245e5a138643cd4e

### Answering analytical questions

In [22]:
# Register as views for SQL
completion_summary_df.createOrReplaceTempView("completion_summary")
customer_completion_summary_df.createOrReplaceTempView("customer_completion_summary")
time_to_complete_df.createOrReplaceTempView("time_to_complete")

In [23]:
### Which marketing channel is the most effective in terms of offer completion rate?
channel_completion_query = """
WITH channel_counts AS (
    SELECT
        'email' AS channel,
        SUM(offers_received) AS total_received,
        SUM(offers_completed) AS total_completed
    FROM completion_summary
    WHERE email = TRUE
    UNION ALL
    SELECT
        'mobile' AS channel,
        SUM(offers_received),
        SUM(offers_completed)
    FROM completion_summary
    WHERE mobile = TRUE
    UNION ALL
    SELECT
        'social' AS channel,
        SUM(offers_received),
        SUM(offers_completed)
    FROM completion_summary
    WHERE social = TRUE
    UNION ALL
    SELECT
        'web' AS channel,
        SUM(offers_received),
        SUM(offers_completed)
    FROM completion_summary
    WHERE web = TRUE
)
SELECT
    channel,
    total_received,
    total_completed,
    CONCAT(ROUND(total_completed / total_received * 100, 2), '%') AS completion_rate
FROM channel_counts
ORDER BY completion_rate DESC
"""
spark.sql(channel_completion_query).show()

+-------+--------------+---------------+---------------+
|channel|total_received|total_completed|completion_rate|
+-------+--------------+---------------+---------------+
|    web|         50594|          25819|         51.03%|
| social|         37943|          18754|         49.43%|
|  email|         63288|          28996|         45.82%|
| mobile|         56914|          26018|         45.71%|
+-------+--------------+---------------+---------------+



In [24]:
### How is the age distribution of customers who completed offers compared to those who did not?
age_distribution_query = """
SELECT
    age_group,
    total_customers,
    completed_customers,
    not_completed_customers,
    CONCAT(ROUND(completed_customers / total_customers * 100, 2), '%') AS completion_rate
FROM customer_completion_summary
ORDER BY CASE
        WHEN age_group = 'Under 25' THEN 1
        WHEN age_group = '25-40' THEN 2
        WHEN age_group = '41-60' THEN 3
        WHEN age_group = '61 and above' THEN 4
        ELSE 5
    END
"""
spark.sql(age_distribution_query).show()

+------------+---------------+-------------------+-----------------------+---------------+
|   age_group|total_customers|completed_customers|not_completed_customers|completion_rate|
+------------+---------------+-------------------+-----------------------+---------------+
|    Under 25|            876|                609|                    267|         69.52%|
|       25-40|           2426|               1823|                    603|         75.14%|
|       41-60|           5979|               4920|                   1061|         82.29%|
|61 and above|           7713|               5422|                   2295|          70.3%|
+------------+---------------+-------------------+-----------------------+---------------+



In [25]:
### What is the average time taken by customers to complete an offer after receiving it?
avg_time_query = """
SELECT
    ROUND(AVG(time_to_complete_hours), 2) AS avg_completion_hours
FROM time_to_complete
"""
spark.sql(avg_time_query).show()

+--------------------+
|avg_completion_hours|
+--------------------+
|               52.59|
+--------------------+

