In [0]:
try:
    import prophet
    print("Prophet already installed.")
except ImportError:
    print("Prophet not found. Installing...")
    %pip install prophet

In [0]:
# Imports

import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import col, when, sum as spark_sum, countDistinct, count
from prophet import Prophet
from prophet.plot import plot_plotly
from prophet.plot import plot_components_plotly
import plotly.express as px

## Dataset Exploration

In [0]:
# Loading Datasets

# Campaign Related
campaign_desc = spark.table("campaign_desc")
campaign_table = spark.table("campaign_table")

# Causal Data
casual_data = spark.table("causal_data")

# Coupon Related
coupon_info = spark.table("coupon")
coupon_redempt = spark.table("coupon_redempt")

# Demographic Related
hh_demographic = spark.table("hh_demographic")

# Product Related
product = spark.table("product")

# Transaction Related
transaction_data = spark.table("transaction_data")

In [0]:
# Function to get a basic idea of the table
def get_info(df):
    print("Number of rows: ", df.count())
    print("Number of columns: ", len(df.columns))
    print("Columns: ", df.columns)
    print("Schema: ", df.printSchema())
    df.show(5)
    print("Null + Blank Counts: \n", empty_counts(df))

# Function to get null counts
def empty_counts(df):
    return df.select([
        count(
            when(
                (col(c).isNull()) | (trim(col(c)) == ""), c
            )
        ).alias(c)
        for c in df.columns
    ]).toPandas().to_string(index=False)


In [0]:
# Campaign type and dates

get_info(campaign_desc)

In [0]:
# household key, campaign, and description

get_info(campaign_table)

In [0]:
# Product, store, week, display, mailer

get_info(casual_data)

In [0]:
# Coupon code, product, campaign

get_info(coupon_info)

In [0]:
# household, redempt day, coupon code, campaign

get_info(coupon_redempt)

In [0]:
# Age, marital status, income, home owner

get_info(hh_demographic)

In [0]:
# Product, Manufacturer, Department, Brand, Commodity, Sub-Commodity, Size

get_info(product)

In [0]:
# Information regarding Sales Value, Day, Product, Quantity, Store, Retail Disc, Trans Time, Week No, Coupon Disc, Coupon Match Disc

get_info(transaction_data)

In [0]:
# Insights so far...

'''
    - We have transaction data at a household level along with campaign data and demographic data.
    
    - The campaign data is split into two tables, one with campaign description and the other with household key and campaign.
        - We will join these two tables to get campaign information for each household.

    - There are no null or blank values in any of the tables.
    
    - We define the scope of the project as follows
        - We will analyze the sales and discounts to look for anomalies
        - We will create useful visualization to understand the data better.
'''

## TDQ/BDQ

In this section we will apply table specific checks to campaign, coupon and transaction data and come up with ways to handle data. Please note that in real-life scenarios all the tables have to go through these checks but for the scope of this project we are limiting to these 4 tables.

### Campaign Description Table

In [0]:
campaign_desc.show(5)

In [0]:
print("MIN CAMPAIGN VALUE:", campaign_desc.toPandas()['CAMPAIGN'].min())
print("Max CAMPAIGN VALUE:", campaign_desc.toPandas()['CAMPAIGN'].max())

print("DAY when first campaign was started:", campaign_desc.toPandas()['START_DAY'].min())
print("DAY when the last campaign ended:", campaign_desc.toPandas()['END_DAY'].max())

In [0]:
print("CAMPAIGN_DESC - TDQ + BDQ CHECKS")
print("="*60)

# 1. Uniqueness check on CAMPAIGN key
total = campaign_desc.count()
unique = campaign_desc.select(countDistinct("CAMPAIGN")).first()[0]
is_unique = (total == unique)
status = "PASSED" if is_unique else "FAILED - DUPLICATES FOUND!"

print(f"CAMPAIGN key uniqueness check => {status}")
print(f"   Total rows     : {total:,}")
print(f"   Unique values  : {unique:,}")
if not is_unique:
    print("   Duplicates present!")

# 2. BDQ: START_DAY <= END_DAY check
campaign_with_check = campaign_desc \
    .withColumn("day_sanity_flag", 
                when(col("START_DAY").cast("int") > col("END_DAY").cast("int"), 1)
                .otherwise(0))

invalid_count = campaign_with_check.select(spark_sum("day_sanity_flag")).first()[0]

print(f"\nSTART_DAY vs END_DAY check")
print(f"   Cases where START_DAY > END_DAY : {invalid_count}")
if invalid_count > 0:
    print("   START_DAY > END_DAY for some rows! => BDQ FAILURE")
    display(campaign_with_check.filter(col("day_sanity_flag") == 1)
            .select("CAMPAIGN", "DESCRIPTION", "START_DAY", "END_DAY"))
else:
    print("   START_DAY <= END_DAY for all rows! => PASSED")

# Drop the flag column
campaign_desc = campaign_with_check.drop("day_sanity_flag")

# Final verdict
if is_unique and invalid_count == 0:
    print(f"\nFINAL VERDICT: ALL CHECKS PASSED - CAMPAIGN_DESC IS CLEAN")
else:
    print(f"\nFINAL VERDICT: ISSUES FOUND - NEEDS ATTENTION")

In [0]:
# Hence unique on CAMPAIGN key level

### Campaign Table

In [0]:
campaign_table.show(5)

In [0]:
print("MIN CAMPAIGN VALUE:", campaign_table.toPandas()['CAMPAIGN'].min())
print("Max CAMPAIGN VALUE:", campaign_table.toPandas()['CAMPAIGN'].max())

In [0]:
df = campaign_table

total_rows = df.count()

# Check 1: Full composite key uniqueness (DESCRIPTION + household_key + CAMPAIGN)
composite_unique = df.select(countDistinct("DESCRIPTION", "household_key", "CAMPAIGN")).collect()[0][0]

# Check 2: Only (household_key + CAMPAIGN) uniqueness
household_campaign_unique = df.select(countDistinct("household_key", "CAMPAIGN")).collect()[0][0]

# Pretty print results
print("UNIQUE KEY ANALYSIS")
print("="*50)
print(f"Total rows: {total_rows:,}")

print(f"\n1. Unique on (DESCRIPTION + household_key + CAMPAIGN) => {composite_unique:,}")
print(f"   => {'PASSED' if composite_unique == total_rows else 'FAILED - DUPLICATES EXIST!'}")

print(f"\n2. Unique on (household_key + CAMPAIGN) => {household_campaign_unique:,}")
print(f"   => {'PASSED' if household_campaign_unique == total_rows else 'FAILED - SAME HOUSEHOLD GOT SAME CAMPAIGN MULTIPLE TIMES!'}")

# If household are duplicate then how many are duplicate
if household_campaign_unique < total_rows:
    duplicates = df.groupBy("household_key", "CAMPAIGN").count().filter("count > 1")
    affected_households = duplicates.count()
    print(f"\n   Affected (household + campaign) pairs : {affected_households}")
    print("   Sample duplicates:")
    display(duplicates.orderBy(col("count").desc()).limit(10))

### Coupon Redemption Table

In [0]:
coupon_redempt.show(5)

In [0]:
print("MIN CAMPAIGN VALUE:", coupon_redempt.toPandas()['CAMPAIGN'].min())
print("Max CAMPAIGN VALUE:", coupon_redempt.toPandas()['CAMPAIGN'].max())

print(coupon_redempt.toPandas()['DAY'].min())

In [0]:
# Checking for duplicated rows

total_rows = coupon_redempt.count()
distinct_rows = coupon_redempt.select("household_key", "DAY", "CAMPAIGN", "COUPON_UPC").distinct().count()

print("Total rows:    ", total_rows)
print("Distinct rows: ", distinct_rows)

if total_rows == distinct_rows:
    print("\n Data IS at household_key x DAY x CAMPAIGN level x COUPON_UPC (no duplicates).")
else:
    print("\n Data is NOT unique at household_key x DAY x CAMPAIGN level x COUPON_UPC")

### Transaction Data

In [0]:
transaction_data.show(5)

In [0]:
# Sales_value is value after multiplying by quantity and substracting discount

transaction_data[(transaction_data['PRODUCT_ID'] == 1004906) & 
                 (transaction_data['QUANTITY'] == 1)].show(5)

In [0]:
print("Min Day:", transaction_data.toPandas()['DAY'].min())
print("Max Day:", transaction_data.toPandas()['DAY'].max())

In [0]:
"""
    Sanity Checks:
        - SALES_VALUE and QUANTITY should always be positive
        - DISCOUNT should always be negative or 0
        - The transaction data seems to be on a household_key x day x basket_id x product_id level. This has to be confirmed.
"""

In [0]:
# High Level Check

condition = (
    (F.col("SALES_VALUE") < 0) |
    (F.col("QUANTITY") < 0) |
    (F.col("RETAIL_DISC") > 0) |
    (F.col("COUPON_DISC") > 0) |
    (F.col("COUPON_MATCH_DISC") > 0)
)

bad_count = transaction_data.filter(condition).count()

if bad_count > 0:
    print("ERROR: SALES_VALUE, QUANTITY, DISCOUNTS are not as expected")
else:
    print("SALES_VALUE, QUANTITY, DISCOUNTS are as expected")


In [0]:
# Detailed Check

transaction_flagged = transaction_data \
    .withColumn("FLAG_SALES_VALUE", F.col("SALES_VALUE") < 0) \
    .withColumn("FLAG_QUANTITY",    F.col("QUANTITY") <= 0) \
    .withColumn("FLAG_RETAIL_DISC", F.col("RETAIL_DISC") > 0) \
    .withColumn("FLAG_COUPON_DISC", F.col("COUPON_DISC") > 0) \
    .withColumn("FLAG_MATCH_DISC",  F.col("COUPON_MATCH_DISC") > 0)

transaction_flagged.select(
    F.sum(F.col("FLAG_SALES_VALUE").cast("int")).alias("bad_sales_value"),
    F.sum(F.col("FLAG_QUANTITY").cast("int")).alias("bad_quantity"),
    F.sum(F.col("FLAG_RETAIL_DISC").cast("int")).alias("bad_retail_disc"),
    F.sum(F.col("FLAG_COUPON_DISC").cast("int")).alias("bad_coupon_disc"),
    F.sum(F.col("FLAG_MATCH_DISC").cast("int")).alias("bad_match_disc")
).show()


In [0]:
# Displaying data with Retail Discount Anomaly

transaction_flagged.filter(
    F.col("FLAG_RETAIL_DISC")
).show(5, truncate=False)


In [0]:
# Displaying data with Quantity Anomaly

transaction_flagged.filter(
    F.col("FLAG_QUANTITY")
).show(5, truncate=False)


In [0]:
# Cleaning transaction data to remove transactions with anomalies

transaction_data = transaction_data.filter(
    (F.col("QUANTITY") != 0) &
    (F.col("RETAIL_DISC") <= 0)
)


In [0]:
# Running the high level check again

condition = (
    (F.col("SALES_VALUE") < 0) |
    (F.col("QUANTITY") < 0) |
    (F.col("RETAIL_DISC") > 0) |
    (F.col("COUPON_DISC") > 0) |
    (F.col("COUPON_MATCH_DISC") > 0)
)

bad_count = transaction_data.filter(condition).count()

if bad_count > 0:
    print("ERROR: SALES_VALUE, QUANTITY, DISCOUNTS are not as expected")
else:
    print("SALES_VALUE, QUANTITY, DISCOUNTS are as expected")


In [0]:
# Checking the uniqueness level of the data

# 1. Total rows
total_rows = transaction_data.count()

# 2. Distinct rows at the expected granularity
distinct_rows = transaction_data.select(
    "HOUSEHOLD_KEY", "BASKET_ID", "DAY", "PRODUCT_ID"
).distinct().count()

print("Total rows:    ", total_rows)
print("Distinct grain:", distinct_rows)

if total_rows == distinct_rows:
    print("\n Data is at HOUSEHOLD_KEY x DAY x BASKET_ID x PRODUCT_ID level (no duplicates).")
else:
    print("\n Data is NOT at HOUSEHOLD_KEY x DAY x BASKET_ID x PRODUCT_ID level.")
    print("   There are duplicate rows for that combination.")


In [0]:
# Insights so far...

"""
    - Both campaign_desc and campaign_table are ready to use.
    - The first campaign was launched on DAY 224 and the first redemption was seen on DAY 225 which checks out
    - We have 2 years worth of transaction data from Day-1 to Day-711.
    - Anomalies exists in transaction table with respect to RETAIL_DISC and QUANTITY.
        - The rows with the anomalies will be marked and reported to the relevant teams 
        - The rows are removed for further analysis.
    - Data is at HOUSEHOLD_KEY x DAY x BASKET_ID x PRODUCT_ID level (no duplicates).
"""

## Feature Engineering

In [0]:
# Aim of this exercise...

"""
    - We need a combined campaign table that maps household key to campaign as well start and end date of the campaign
        - This will help us better understand the customer behaviour during campaigns
    - We will create a daily transaction table for each customer to understand the customer behaviour during campaigns
    - Come up with a master table that can be used to understand the impact of campaigns on customer behaviour, we will have flags for campaign availability and redemption by the customers to track the impact of campaigns, awareness and redemption.
    - Create a daily level table summarising business level metrics such as sales, quantity, discounts etc.
"""

In [0]:
combined_campaign = (
    campaign_table.alias("t")
    .join(
        campaign_desc.select("CAMPAIGN", "START_DAY", "END_DAY").alias("d"),
        on="CAMPAIGN",
        how="left"        # keep all rows from campaign_table
    )
)

combined_campaign.show(20, truncate=False)

In [0]:
print(campaign_table.count())
print(combined_campaign.count())

In [0]:
# Transaction aggregation table

transaction_daily = (
    transaction_data
    .groupBy("household_key", "DAY")
    .agg(
        F.sum("QUANTITY").alias("QUANTITY"),
        F.sum("SALES_VALUE").alias("SALES_VALUE"),
        F.sum("RETAIL_DISC").alias("RETAIL_DISC"),
        F.sum("COUPON_DISC").alias("COUPON_DISC")
    )
)

transaction_daily.show(10, truncate=False)


In [0]:
transaction_daily.count()

In [0]:
t = transaction_data.alias("t")
c = combined_campaign.alias("c")

# Join only on matching household AND day in campaign window
joined = (
    t.join(
        c,
        (F.col("t.household_key") == F.col("c.household_key")) &
        (F.col("t.DAY") >= F.col("c.START_DAY")) &
        (F.col("t.DAY") <= F.col("c.END_DAY")),
        how="left"
    )
    # keep only the transaction fields + CAMPAIGN (drop duplicate household_key from c)
    .select(
        F.col("t.household_key").alias("household_key"),
        F.col("t.DAY").alias("DAY"),
        F.col("t.QUANTITY").alias("QUANTITY"),
        F.col("t.SALES_VALUE").alias("SALES_VALUE"),
        F.col("t.RETAIL_DISC").alias("RETAIL_DISC"),
        F.col("t.COUPON_DISC").alias("COUPON_DISC"),
        F.col("c.CAMPAIGN").alias("CAMPAIGN")
    )
)

# Collapse back to one row per household_key x DAY and create campaign_flag
transaction_daily = (
    joined
    .groupBy("household_key", "DAY")
    .agg(
        F.first("QUANTITY").alias("QUANTITY"),
        F.first("SALES_VALUE").alias("SALES_VALUE"),
        F.first("RETAIL_DISC").alias("RETAIL_DISC"),
        F.first("COUPON_DISC").alias("COUPON_DISC"),
        F.max(F.when(F.col("CAMPAIGN").isNotNull(), 1).otherwise(0)).alias("campaign_flag")
    )
)

transaction_daily.show(5, truncate=False)
transaction_daily.count()


In [0]:
transaction_daily = (
    transaction_daily.alias("t")
    .join(
        # Collapse coupon_redempt to household_key x DAY with a flag
        coupon_redempt
            .select("household_key", "DAY")
            .dropDuplicates()
            .withColumn("coupon_redempt_flag", F.lit(1))
            .alias("c"),
        on=["household_key", "DAY"],
        how="left"
    )
    # Fill nulls with 0 for households/days with no redemption
    .withColumn(
        "coupon_redempt_flag",
        F.coalesce(F.col("coupon_redempt_flag"), F.lit(0))
    )
)

transaction_daily.count()
transaction_daily.show(20, truncate=False)


In [0]:
transaction_daily.count()

In [0]:
# Business Level Table

business_daily = (
    transaction_daily
    .groupBy("DAY")
    .agg(
        F.round(F.sum("SALES_VALUE"), 2).alias("total_sales"),
        F.round(F.sum("RETAIL_DISC"), 2).alias("total_retail_disc"),
        F.round(F.sum("COUPON_DISC"), 2).alias("total_coupon_disc"),

        # Household count metrics
        F.countDistinct(
            F.when(F.col("campaign_flag") == 1, F.col("household_key"))
        ).alias("campaign_count"),

        F.countDistinct(
            F.when(F.col("coupon_redempt_flag") == 1, F.col("household_key"))
        ).alias("coupon_redempt_count"),
    )
    .fillna(0)
    .orderBy("DAY")
)

display(business_daily)




In [0]:
# Development so far...

"""
    - We have built a combined campaign tables that maps every household to the campaign they were part of along with the days.
    - We have a daily level master transaction table with flags for campaign enrollment and coupon redemption.
    - We have a daily level business table with metrics for sales, retail discount, coupon discount, campaign enrollment and coupon redemption.
"""

## Discripency Check

In [0]:
"""
  - We want to check if all the coupon discount is accounted for in the redemption table. Also, are all the coupon redemptions accounted for in the campaign table.
"""

In [0]:
# Amount of unaccounted coupon discount

result = (
    transaction_daily
    .filter(
        (F.col("COUPON_DISC") < 0) &
        (F.col("coupon_redempt_flag") == 0)
    )
    .agg(F.sum("COUPON_DISC").alias("total_coupon_disc"))
    .collect()[0]["total_coupon_disc"]
)

print(result)


## ML Timeseries Anomaly Detection

In [0]:
"""
    We will detect anomalies in three criterions:
        - Sales
        - Retail Discount
        - Coupon Discount
"""

In [0]:
# Convert Spark daily table to Pandas
daily_pd = business_daily.select("DAY", "total_sales").orderBy("DAY").toPandas()

# Prophet requires columns: ds (datetime), y (value)

# Convert integer DAY to actual date
# Using 1960-01-01 origin (standard dunnhumby setup)
daily_pd["ds"] = pd.to_datetime(daily_pd["DAY"], unit="D", origin="1960-01-01")

### Sales

In [0]:
# Prophet target
daily_pd["y"] = daily_pd["total_sales"]

daily_pd = daily_pd[["ds", "y"]]
daily_pd.head()


In [0]:
m_sales = Prophet(
    yearly_seasonality=True,
    weekly_seasonality=True,
    daily_seasonality=False,
    interval_width=0.95
)

m_sales.fit(daily_pd)


In [0]:
# Forecast for historical + next 30 days
future_sales = m_sales.make_future_dataframe(periods=30)
forecast_sales = m_sales.predict(future_sales)

forecast_sales[["ds", "yhat", "yhat_lower", "yhat_upper"]].tail()

In [0]:
# Merge actual (y) with forecast bounds
merged = daily_pd.merge(
    forecast_sales[["ds", "yhat", "yhat_lower", "yhat_upper"]],
    on="ds",
    how="left"
)

# Condition: actual > upper bound or actual < lower bound => anomaly
merged["is_anomaly"] = (
    (merged["y"] > merged["yhat_upper"]) | 
    (merged["y"] < merged["yhat_lower"])
)


merged.head()


In [0]:
# Extract anomaly dates
anomaly_sales_pd = merged[merged["is_anomaly"] == True][["ds", "y", "yhat_upper"]]

# Convert ds to DAY (integer) for joining back to Spark tables
anomaly_sales_pd["DAY"] = (anomaly_sales_pd["ds"] - pd.Timestamp("1960-01-01")).dt.days

anomaly_sales = spark.createDataFrame(anomaly_sales_pd[["DAY", "y", "yhat_upper"]])

anomaly_sales.show()

In [0]:
fig = plot_plotly(m_sales, forecast_sales)
fig.update_layout(title="Daily Sales Forecast + Anomalies", height=600)
fig.show()

In [0]:
fig2 = plot_components_plotly(m_sales, forecast_sales)
fig2.update_layout(height=600)
fig2.show()

In [0]:
merged["anomaly_flag"] = (
    (merged["y"] > merged["yhat_upper"]) | 
    (merged["y"] < merged["yhat_lower"])
)

fig3 = px.scatter(
    merged,
    x="ds",
    y="y",
    color="anomaly_flag",
    color_discrete_map={False: "black", True: "red"},
    title="Sales Anomalies Highlighted",
    labels={"ds": "Date", "y": "Total Daily Sales"},
    height=500
)

fig3.show()

### Retail Discount

In [0]:
# Spark -> Pandas
retail_pd = (
    business_daily
    .select("DAY", "total_retail_disc", "campaign_count")
    .orderBy("DAY")
    .toPandas()
)

# Convert DAY -> date
retail_pd["ds"] = pd.to_datetime(retail_pd["DAY"], unit="D", origin="1960-01-01")

# Prophet expects positive values; retail_disc is negative, so take magnitude
retail_pd["y"] = retail_pd["total_retail_disc"].abs()

# Optional regressor: how many households in campaigns (explains normal discount spikes)
retail_pd["campaign_count"] = retail_pd["campaign_count"]

retail_pd = retail_pd[["ds", "y", "campaign_count"]]
retail_pd.head()

In [0]:
m_retail = Prophet(
    yearly_seasonality=True,
    weekly_seasonality=True,
    daily_seasonality=False,
    interval_width=0.95
)

# Add regressor
m_retail.add_regressor("campaign_count")

m_retail.fit(retail_pd)

In [0]:
# Future dataframe (historical + 30 days ahead)
future_retail = m_retail.make_future_dataframe(periods=30)

# Add regressor for future (set to last known value or 0 â€“ you mainly care about history)
last_campaign = retail_pd["campaign_count"].iloc[-1]
future_retail["campaign_count"] = last_campaign

forecast_retail = m_retail.predict(future_retail)

# Merge actuals with forecast bounds
merged_retail = retail_pd.merge(
    forecast_retail[["ds", "yhat", "yhat_lower", "yhat_upper"]],
    on="ds",
    how="left"
)

# Flag BOTH high and low anomalies (magnitude too big or too small)
merged_retail["is_anomaly"] = (
    (merged_retail["y"] > merged_retail["yhat_upper"]) |
    (merged_retail["y"] < merged_retail["yhat_lower"])
)

In [0]:
fig_ret = plot_plotly(m_retail, forecast_retail)
fig_ret.update_layout(title="Daily Retail Discount Forecast + Actuals", height=600)
fig_ret.update_xaxes(rangeslider_visible=True)
fig_ret.show()

In [0]:
fig_ret_scatter = px.scatter(
    merged_retail,
    x="ds",
    y="y",
    color="is_anomaly",
    color_discrete_map={False: "white", True: "red"},
    title="Retail Discount Anomalies (High + Low)",
    labels={"ds": "Date", "y": "Daily |RETAIL_DISC|"},
    height=500
)
fig_ret_scatter.show()

In [0]:
retail_anom_pd = merged_retail[merged_retail["is_anomaly"] == True][["ds", "y", "yhat_upper", "yhat_lower"]]
retail_anom_pd["DAY"] = (retail_anom_pd["ds"] - pd.Timestamp("1960-01-01")).dt.days

retail_anomalies = spark.createDataFrame(retail_anom_pd[["DAY", "y", "yhat_upper", "yhat_lower"]])
retail_anomalies.show()

### Coupon Discount

In [0]:
coupon_pd = (
    business_daily
    .select("DAY", "total_coupon_disc", "campaign_count", "coupon_redempt_count")
    .orderBy("DAY")
    .toPandas()
)

# Convert DAY to a real date for Prophet
coupon_pd["ds"] = pd.to_datetime(coupon_pd["DAY"], unit="D", origin="1960-01-01")

# Prophet target: model magnitude of coupon discounts
coupon_pd["y"] = coupon_pd["total_coupon_disc"].abs()

# Regressors
coupon_pd["campaign_count"] = coupon_pd["campaign_count"]
coupon_pd["coupon_redempt_count"] = coupon_pd["coupon_redempt_count"]

coupon_pd = coupon_pd[["ds", "y", "campaign_count", "coupon_redempt_count"]]
coupon_pd.head()


In [0]:
m_coupon = Prophet(
    yearly_seasonality=True,
    weekly_seasonality=True,
    daily_seasonality=False,
    interval_width=0.95
)

# Add regressors to explain normal campaign-driven spikes
m_coupon.add_regressor("campaign_count")
m_coupon.add_regressor("coupon_redempt_count")

m_coupon.fit(coupon_pd)


In [0]:
future_coupon = m_coupon.make_future_dataframe(periods=30)

# Set regressor values for forecast period
future_coupon["campaign_count"] = coupon_pd["campaign_count"].iloc[-1]
future_coupon["coupon_redempt_count"] = coupon_pd["coupon_redempt_count"].iloc[-1]

# Predict
forecast_coupon = m_coupon.predict(future_coupon)

# Merge actuals with forecast bounds
merged_coupon = coupon_pd.merge(
    forecast_coupon[["ds", "yhat", "yhat_lower", "yhat_upper"]],
    on="ds",
    how="left"
)

# Detect BOTH high + low anomalies
merged_coupon["is_anomaly"] = (
    (merged_coupon["y"] > merged_coupon["yhat_upper"]) |
    (merged_coupon["y"] < merged_coupon["yhat_lower"])
)

In [0]:
fig_coupon = plot_plotly(m_coupon, forecast_coupon)
fig_coupon.update_layout(title="Daily Coupon Discount Forecast + Anomalies", height=600)
fig_coupon.update_xaxes(rangeslider_visible=False)
fig_coupon.show()

In [0]:
fig_coupon_scatter = px.scatter(
    merged_coupon,
    x="ds",
    y="y",
    color="is_anomaly",
    color_discrete_map={False: "red", True: "blue"},
    title="Coupon Discount Anomalies (High + Low)",
    labels={"ds": "Date", "y": "|COUPON_DISC|"},
    height=500
)

fig_coupon_scatter.show()

In [0]:
coupon_anom_pd = merged_coupon[merged_coupon["is_anomaly"] == True][["ds", "y", "yhat_upper", "yhat_lower"]]

# Convert ds to DAY integer
coupon_anom_pd["DAY"] = (coupon_anom_pd["ds"] - pd.Timestamp("1960-01-01")).dt.days

coupon_anomalies = spark.createDataFrame(
    coupon_anom_pd[["DAY", "y", "yhat_upper", "yhat_lower"]]
)

coupon_anomalies.show()