In [31]:
# Café Rewards Data Pipeline and Analysis

## 1. Setup and Initialization

# Import necessary libraries
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime

# PySpark imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Cafe Rewards Analysis") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# Display settings
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)
plt.style.use('ggplot')

## 2. Raw Layer - Data Ingestion

### 2.1 Load Datasets

# Define paths to data files
data_dir = "data/raw"
offers_path = os.path.join(data_dir, "offers.csv")
customers_path = os.path.join(data_dir, "customers.csv")
events_path = os.path.join(data_dir, "events.csv")

# Load datasets
offers_df = spark.read.option("header", "true").option("inferSchema", "true").csv(offers_path)
customers_df = spark.read.option("header", "true").option("inferSchema", "true").csv(customers_path)
events_df = spark.read.option("header", "true").option("inferSchema", "true").csv(events_path)


### 2.2 Explore Raw Data

# Display schema and sample data
print("Offers Schema:")
offers_df.printSchema()
print("\nOffers Sample Data:")
offers_df.show(5)

print("\nCustomers Schema:")
customers_df.printSchema()
print("\nCustomers Sample Data:")
customers_df.show(5)

print("\nEvents Schema:")
events_df.printSchema()
print("\nEvents Sample Data:")
events_df.show(5)

# Basic statistics
print("\nBasic Statistics:")
print(f"Number of offers: {offers_df.count()}")
print(f"Number of customers: {customers_df.count()}")
print(f"Number of events: {events_df.count()}")

## 3. Trusted Layer - Data Cleaning and Transformation

### 3.1 Clean Offers Data


# Clean offers data
def clean_offers(df):
    # Drop duplicates
    cleaned_df = df.dropDuplicates(["offer_id"])
    
    return cleaned_df

trusted_offers = clean_offers(offers_df)
print("\nCleaned Offers Data:")
trusted_offers.show(5)


### 3.2 Clean Customers Data


# Clean customers data
def clean_customers(df):
    # Drop duplicates
    cleaned_df = df.dropDuplicates(["customer_id"])
    
    # Handle missing values
    cleaned_df = cleaned_df.na.drop()
    
    # Calculate age from birth year (assuming current year is 2025)
    if "birthYear" in df.columns:
        current_year = 2025
        cleaned_df = cleaned_df.withColumn("age", F.lit(current_year) - F.col("birthYear"))
    
    # Create age groups
    cleaned_df = cleaned_df.withColumn(
        "age_group",
        F.when(F.col("age") < 20, "Under 20")
         .when(F.col("age").between(20, 29), "20-29")
         .when(F.col("age").between(30, 39), "30-39")
         .when(F.col("age").between(40, 49), "40-49")
         .when(F.col("age").between(50, 59), "50-59")
         .otherwise("60+")
    )
    
    return cleaned_df

trusted_customers = clean_customers(customers_df)
print("\nCleaned Customers Data:")
trusted_customers.show(5)


### 3.3 Clean Events Data


# Clean events data
def clean_events(df):
    
    cleaned_df = df
    
    if "timestamp" in df.columns:
        cleaned_df = cleaned_df.withColumn(
            "timestamp", 
            F.to_timestamp(F.col("timestamp"))
        )
    
    return cleaned_df

trusted_events = clean_events(events_df)
print("\nCleaned Events Data:")
trusted_events.show(5)


### 3.4 Save Trusted Data


# Save trusted data 
#trusted_dir = "data/trusted"
#trusted_dir = "C:\\Users\\gabri\\Jupyter\\data\\trusted"
#os.makedirs(trusted_dir, exist_ok=True)

#trusted_offers.write.mode("overwrite").parquet(os.path.join(trusted_dir, "offers"))
#trusted_customers.write.mode("overwrite").parquet(os.path.join(trusted_dir, "customers"))
#trusted_events.write.mode("overwrite").parquet(os.path.join(trusted_dir, "events"))


## 4. Refined Layer - Data Enrichment and Aggregation

### 4.1 Create Offer Journey Dataset


# Create a dataset that captures the full journey of an offer
def create_offer_journey(events_df, offers_df, customers_df):
    # Filter events by type

    event_col = "event" if "event" in events_df.columns else None
    if not event_col:
        # Try to find a column that might contain event types
        event_candidates = [col for col in events_df.columns if "event" in col.lower() or "type" in col.lower()]
        event_col = event_candidates[0] if event_candidates else None
    
    if not event_col:
        raise ValueError("Could not find event type column in events dataframe")

    cust_col = next((col for col in events_df.columns if "customer_id" in col.lower()), None)
    if not cust_col:
        raise ValueError("Could not find customer ID column in events dataframe")

    offer_col = next((col for col in events_df.columns if "value" in col.lower()), None)
    if not offer_col:
        raise ValueError("Could not find offer ID column in events dataframe")

    time_col = next((col for col in events_df.columns if "time" in col.lower()), None)
    if not time_col:
        raise ValueError("Could not find timestamp column in events dataframe")

    
    received_offers = events_df.filter(F.lower(F.col(event_col)).like("%receive%"))
    viewed_offers = events_df.filter(F.lower(F.col(event_col)).like("%view%"))
    completed_offers = events_df.filter(F.lower(F.col(event_col)).like("%complete%"))
    
    # Join the different event types to create a journey
    journey = received_offers.select(
        F.col(cust_col),
        F.col(offer_col),
        F.col(time_col).alias("received_time")
    )
    
    # Left join with viewed events
    journey = journey.join(
        viewed_offers.select(
            F.col(cust_col),
            F.col(offer_col),
            F.col(time_col).alias("viewed_time")
        ),
        [cust_col, offer_col],
        "left"
    )
    
    # Left join with completed events
    journey = journey.join(
        completed_offers.select(
            F.col(cust_col),
            F.col(offer_col),
            F.col(time_col).alias("completed_time")
        ),
        [cust_col, offer_col],
        "left"
    )
    
    # Calculate time differences
    journey = journey.withColumn(
        "time_to_view_hours",
        F.when(F.col("viewed_time").isNotNull(),
               (F.unix_timestamp("viewed_time") - F.unix_timestamp("received_time")) / 3600)
        .otherwise(None)
    )
    
    journey = journey.withColumn(
        "time_to_complete_hours",
        F.when(F.col("completed_time").isNotNull(),
               (F.unix_timestamp("completed_time") - F.unix_timestamp("received_time")) / 3600)
        .otherwise(None)
    )
    
    # Add completion flag
    journey = journey.withColumn(
        "is_completed",
        F.when(F.col("completed_time").isNotNull(), 1).otherwise(0)
    )
    
    # Join with offers to get offer details
    journey = journey.join(
        offers_df,
        offer_col,
        "inner"
    )
    
    # Join with customers to get customer details
    journey = journey.join(
        customers_df,
        cust_col,
        "inner"
    )
    
    return journey

# Create the offer journey dataset
offer_journey = create_offer_journey(trusted_events, trusted_offers, trusted_customers)
print("\nOffer Journey Dataset:")
offer_journey.show(5)


### 4.2 Create Marketing Channel Metrics


# Analyze marketing channel effectiveness
def analyze_channel_effectiveness(journey_df):
    channel_metrics = journey_df.groupBy("channel").agg(
        F.count("offer_id").alias("total_offers"),
        F.sum("is_completed").alias("completed_offers"),
        (F.sum("is_completed") / F.count("offer_id") * 100).alias("completion_rate")
    ).orderBy(F.desc("completion_rate"))
    
    return channel_metrics

channel_metrics = analyze_channel_effectiveness(offer_journey)
print("\nMarketing Channel Effectiveness:")
channel_metrics.show()


### 4.3 Create Age Distribution Metrics


# Analyze age distribution of offer completion
def analyze_age_distribution(journey_df):
    age_metrics = journey_df.groupBy("age_group").agg(
        F.count("offer_id").alias("total_offers"),
        F.sum("is_completed").alias("completed_offers"),
        (F.sum("is_completed") / F.count("offer_id") * 100).alias("completion_rate"),
        F.count(F.when(F.col("is_completed") == 0, 1)).alias("not_completed_offers")
    ).orderBy("age_group")
    
    return age_metrics

age_metrics = analyze_age_distribution(offer_journey)
print("\nAge Distribution of Offer Completion:")
age_metrics.show()

# Visualize the age distribution
age_pandas = age_metrics.toPandas()

plt.figure(figsize=(12, 6))
x = age_pandas['age_group']
y1 = age_pandas['completed_offers']
y2 = age_pandas['not_completed_offers']

plt.bar(x, y1, color='green', label='Completed Offers')
plt.bar(x, y2, bottom=y1, color='red', label='Not Completed Offers')

plt.title('Offer Completion by Age Group')
plt.xlabel('Age Group')
plt.ylabel('Number of Offers')
plt.legend()
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# Plot completion rate by age group
plt.figure(figsize=(10, 5))
plt.bar(age_pandas['age_group'], age_pandas['completion_rate'], color='blue')
plt.title('Offer Completion Rate by Age Group')
plt.xlabel('Age Group')
plt.ylabel('Completion Rate (%)')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()


### 4.4 Analyze Time to Completion

# Analyze the average time taken to complete offers
def analyze_completion_time(journey_df):
    # Filter for completed offers only
    completed_df = journey_df.filter(F.col("is_completed") == 1)
    
    # Calculate average time to completion
    avg_time = completed_df.agg(
        F.avg("time_to_complete_hours").alias("avg_hours"),
        F.stddev("time_to_complete_hours").alias("stddev_hours"),
        F.min("time_to_complete_hours").alias("min_hours"),
        F.max("time_to_complete_hours").alias("max_hours")
    )
    
    # Get distribution of completion times
    time_bins = completed_df.withColumn(
        "hour_bin", 
        F.floor(F.col("time_to_complete_hours") / 6) * 6
    ).groupBy("hour_bin").count().orderBy("hour_bin")
    
    return avg_time, time_bins

avg_time, time_bins = analyze_completion_time(offer_journey)
print("\nAverage Time to Completion:")
avg_time.show()

# Visualize time to completion
time_bins_pd = time_bins.toPandas()
plt.figure(figsize=(12, 6))
plt.bar(time_bins_pd['hour_bin'], time_bins_pd['count'], width=5)
plt.axvline(x=avg_time.collect()[0]['avg_hours'], color='red', linestyle='--', 
            label=f"Average: {avg_time.collect()[0]['avg_hours']:.2f} hours")
plt.title('Distribution of Time to Offer Completion')
plt.xlabel('Hours to Complete (binned by 6-hour intervals)')
plt.ylabel('Count of Offers')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()


### 4.5 Save Refined Data

# Save refined datasets for future reference
refined_dir = "data/refined"
os.makedirs(refined_dir, exist_ok=True)

channel_metrics.write.mode("overwrite").parquet(os.path.join(refined_dir, "channel_metrics"))
age_metrics.write.mode("overwrite").parquet(os.path.join(refined_dir, "age_metrics"))
offer_journey.write.mode("overwrite").parquet(os.path.join(refined_dir, "offer_journey"))


## 5. Answer Analytical Questions

### 5.1 Most Effective Marketing Channel

# channel_metrics = spark.read.parquet(os.path.join(refined_dir, "channel_metrics"))

print("\nQuestion 1: Which marketing channel is the most effective in terms of offer completion rate?")
most_effective = channel_metrics.orderBy(F.desc("completion_rate")).first()
print(f"The most effective marketing channel is '{most_effective['channel']}' with a completion rate of {most_effective['completion_rate']:.2f}%")

# Visualize all channels
channel_pd = channel_metrics.toPandas()
plt.figure(figsize=(10, 6))
plt.bar(channel_pd['channel'], channel_pd['completion_rate'], color='blue')
plt.title('Offer Completion Rate by Marketing Channel')
plt.xlabel('Channel')
plt.ylabel('Completion Rate (%)')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()


### 5.2 Age Distribution Comparison

# age_metrics = spark.read.parquet(os.path.join(refined_dir, "age_metrics"))

print("\nQuestion 2: How is the age distribution of customers who completed offers compared to those who did not?")
print("Age distribution metrics:")
age_metrics.show()

### 5.3 Average Time to Completion


print("\nQuestion 3: What is the average time taken by customers to complete an offer after receiving it?")
avg_hours = avg_time.collect()[0]['avg_hours']
print(f"The average time to complete an offer after receiving it is {avg_hours:.2f} hours (approximately {avg_hours/24:.2f} days)")



# Clean up the Spark session
spark.stop()


Offers Schema:
root
 |-- offer_id: string (nullable = true)
 |-- offer_type: string (nullable = true)
 |-- difficulty: integer (nullable = true)
 |-- reward: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- channels: string (nullable = true)


Offers Sample Data:
+--------------------+-------------+----------+------+--------+--------------------+
|            offer_id|   offer_type|difficulty|reward|duration|            channels|
+--------------------+-------------+----------+------+--------+--------------------+
|ae264e3637204a6fb...|         bogo|        10|    10|       7|['email', 'mobile...|
|4d5c57ea9a6940dd8...|         bogo|        10|    10|       5|['web', 'email', ...|
|3f207df678b143eea...|informational|         0|     0|       4|['web', 'email', ...|
|9b98b8c7a33c4b65b...|         bogo|         5|     5|       7|['web', 'email', ...|
|0b1e1539f2cc45b7b...|     discount|        20|     5|      10|    ['web', 'email']|
+--------------------+-----------

AnalysisException: [UNRESOLVED_USING_COLUMN_FOR_JOIN] USING column `customer_id;event;value;time` cannot be resolved on the right side of the join. The right-side columns: [`channels`, `difficulty`, `duration`, `offer_id`, `offer_type`, `reward`].