### Overview

We create an example dataset from a [publicly available dataset](https://www.kaggle.com/datasets/frtgnn/dunnhumby-the-complete-journey). Note that these data are for demonstrative purposes only and the specifics are not intended for generalized applications.

For example, the window to value mapping is messy and over estimates total value of the original dataset.

The exported data are intended to be self-contained on their own. If users have questions regarding proper window creation etc., please contact the authors directly.

### Data Preamble

- Download [Dunnhumby](https://www.kaggle.com/datasets/frtgnn/dunnhumby-the-complete-journey) data set and extract to a `data` subdirectory.
- Build and launch the provided Docker image.
- Execute the following notebook from either the container's terminal or Jupyter Notebook (localhost:8888)

This will generate a JSON file that can be read and used directly in subsequent analyes/demonstrations.

In [1]:
# Databricks notebook source
# Import libraries
import datetime
import pandas as pd
import pyspark.sql.functions as sf
from pyspark.sql.window import Window
from dateutil.relativedelta import relativedelta
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType

# SPARK
spark = (
    SparkSession
    .builder
    .getOrCreate()
)

# We'll allow for lagging effects, such as a campaign having effects after the campaign ends
# The value here is arbitrary, the user can change this at their discretion
N_RESPONSE_DAYS = 90

# Start date is arbitrary, just wanted something more "human-readable"
START_DATE = datetime.date(2020, 1, 1)

# UDF to convert number of days to a calendar date
days_to_date = sf.udf(lambda x: START_DATE + relativedelta(days=x-1),DateType())

In [2]:
# Let's pull in some raw data
campaign_table = (
  spark
  .createDataFrame(
    # Bishop cheated and used LameDas to do some reading
    pd.read_csv(r"campaign_table.csv")
  )
  .withColumnRenamed(
    'description',
    'campaign_type'
  )
  # Reduce description to A/B/C
  .withColumn(
    'campaign_type',
    sf.upper('campaign_type').substr(5, 1)
  )
  .withColumnRenamed(
    'campaign',
    'campaign_id'
  )
  .withColumnRenamed(
    'household_key',
    'household_id'
  )
).cache()

display(campaign_table.limit(10).toPandas())

Unnamed: 0,campaign_type,household_id,campaign_id
0,A,17,26
1,A,27,26
2,A,212,26
3,A,208,26
4,A,192,26
5,A,187,26
6,A,183,26
7,A,142,26
8,A,140,26
9,A,134,26


In [3]:
# Campaign descriptions
campaign_desc = (
  spark
  .createDataFrame(
    pd.read_csv('campaign_desc.csv')
  )
  .withColumnRenamed(
    'description',
    'campaign_type'
  )
  # Reduce description to A/B/C
  .withColumn(
    'campaign_type',
    sf.upper('campaign_type').substr(5, 1)
  )
  .withColumnRenamed(
    'start_day',
    'window_start'
  )
  .withColumnRenamed(
    'end_day',
    'window_end'
  )
  # Add in response time
  .withColumn(
    'window_end',
    sf.col('window_end') + sf.lit(N_RESPONSE_DAYS)
  )
  .withColumn(
    'window_start',
    days_to_date(sf.col('window_start'))
  )
  .withColumn(
    'window_end',
    days_to_date(sf.col('window_end'))
  )
#   # Arbitrary start date for demonstrative purposes
#   .withColumn(
#     '_start_date',
#     sf.lit(datetime.date(2020, 1, 1))
#   )
#   .withColumn(
#     'transaction_day',
#     # EXPR required here, see https://issues.apache.org/jira/browse/SPARK-26393
#     sf.expr(
#       "date_add(_start_date, day - 1)"
#     )
#   )
  .withColumnRenamed(
    'campaign',
    'campaign_id'
  )
  .select(
    'campaign_id',
    'campaign_type',
    'window_start',
    'window_end'
  )
  .orderBy(
    'campaign_id'
  )
).cache()

# display(campaign_desc.limit(10).toPandas())

In [4]:
# Sales data
transaction_data = (
  spark
  .createDataFrame(
    pd.read_csv('transaction_data.csv')
  )
  .withColumn(
    'day',
    sf.col('day').cast('integer')
  )
  .withColumnRenamed(
    'household_key',
    'household_id'
  )
  .withColumnRenamed(
    'sales_value',
    'revenue_usd'
  )
  .withColumn(
    'transaction_day',
    days_to_date(sf.col('day'))
  )
).cache()

# display(transaction_data.limit(10).toPandas())

In [5]:
# Only need daily-level granularity for this toy example
transaction_summary = (
  transaction_data
  .groupBy(
    'household_id',
    'transaction_day'
  )
  .agg(
    sf.sum('revenue_usd').alias('revenue_usd')
  )
  .orderBy(
    'household_id',
    'transaction_day'
  )
).cache()

display(transaction_summary.limit(10).toPandas())

Unnamed: 0,household_id,transaction_day,revenue_usd
0,1,2020-02-20,78.66
1,1,2020-03-07,41.1
2,1,2020-03-28,26.9
3,1,2020-04-03,63.43
4,1,2020-04-10,53.45
5,1,2020-04-17,26.76
6,1,2020-04-20,23.55
7,1,2020-05-07,110.34
8,1,2020-05-16,87.44
9,1,2020-05-25,73.32


In [6]:
# Let's see if we have overlapping campaigns
master_campaign = (
  campaign_table
  .join(
    campaign_desc.select('campaign_id', 'window_start', 'window_end'),
    how='left',
    on='campaign_id'
  )
  # We need to determine if there is OVERLAP between campaigns of the same type
  # If so, then consolidate into a single record. This will make the window
  # creation logic easier below.
  .withColumn(
    'window_end_lag',
    sf.lag('window_end').over(
      Window()
      .partitionBy(
        'household_id',
        'campaign_type'
      )
      .orderBy(
        'window_start'
      )
    )
  )
  # Does the previous campaign end AFTER the next campaign begins?
  .withColumn(
    'is_overlap',
    sf.col('window_end_lag') >= sf.col('window_start')
  )
  .withColumn(
    'campaign_type_lag',
    sf.lag('campaign_type').over(
      Window()
      .partitionBy(
        'household_id'
      )
      .orderBy(
        'window_start'
      )
    )
  )
  .withColumn(
    'is_diff_type',
    sf.col('campaign_type_lag') != sf.col('campaign_type')
  )
  .fillna(
    False,
    subset=['is_overlap', 'is_diff_type']
  )
  .withColumn(
    'is_new_window',
    (
      (~sf.col('is_overlap')) |
      (
        sf.col('is_overlap') &
        sf.col('is_diff_type')
      )
    )
  )
  # Add in a campaign_record_id
  .withColumn(
    'campaign_record_id',
    sf.sum((sf.col('is_new_window')).cast('integer')).over(
      Window()
      .orderBy(
        'household_id',
        'window_start'
      )
      .rowsBetween(
        Window.unboundedPreceding,
        0
      )
    )
  )
  .orderBy(
    'household_id',
    'window_start'
  )
).cache()

display(master_campaign.limit(10).toPandas())

Unnamed: 0,campaign_id,campaign_type,household_id,window_start,window_end,window_end_lag,is_overlap,campaign_type_lag,is_diff_type,is_new_window,campaign_record_id
0,29,B,1,2020-10-07,2021-02-27,,False,,False,True,1
1,8,A,1,2021-02-15,2021-07-03,,False,B,True,True,2
2,12,B,1,2021-04-21,2021-08-21,2021-02-27,False,A,True,True,3
3,13,A,1,2021-05-18,2021-10-02,2021-07-03,True,B,True,True,4
4,18,A,1,2021-08-09,2022-01-01,2021-10-02,True,A,False,False,4
5,20,C,1,2021-09-06,2022-02-13,,False,A,True,True,5
6,22,B,1,2021-09-15,2022-01-15,2021-08-21,False,C,True,True,6
7,23,B,1,2021-10-07,2022-02-12,2022-01-15,True,B,False,False,6
8,18,A,2,2021-08-09,2022-01-01,,False,,False,True,7
9,8,A,3,2021-02-15,2021-07-03,,False,,False,True,8


In [7]:
# Consolidate campaign_records so we have a single record per time slot/campaign type
# This makes window creation simpler below.
master_campaign = (
  master_campaign
  .groupBy(
    'campaign_record_id',
    'household_id',
    'campaign_type'
  )
  .agg(
    sf.min('window_start').alias('window_start'),
    sf.max('window_end').alias('window_end')
  )
).cache()

# Spot check with a household_id we know has some overlapping campaign_types
display(
    master_campaign
    .filter(sf.col('household_id') == 1)
    .orderBy('window_start')
    .limit(10)
    .toPandas()
)

Unnamed: 0,campaign_record_id,household_id,campaign_type,window_start,window_end
0,1,1,B,2020-10-07,2021-02-27
1,2,1,A,2021-02-15,2021-07-03
2,3,1,B,2021-04-21,2021-08-21
3,4,1,A,2021-05-18,2022-01-01
4,5,1,C,2021-09-06,2022-02-13
5,6,1,B,2021-09-15,2022-02-12


In [8]:
# Let's build our "treatment" windows for attribution purposes
window_map = (
  master_campaign.alias('a')
  .join(
    master_campaign.alias('b'),
    on=(
      (
        sf.col('b.window_start').between(sf.col('a.window_start'), sf.col('a.window_end'))
      ) &
      (
        sf.col('a.household_id') == sf.col('b.household_id')
      )
    )
  )
  .select(
    'a.household_id',
    sf.col('a.campaign_record_id').alias('window_id'),
    'a.window_start',
    'a.window_end',
    'b.campaign_record_id',
    'b.campaign_type'
  )
  # We'll assign a new 'player_id' column to conform to package requirements
  .withColumn(
    'treatment',
    sf.col('campaign_type')
  )
  .orderBy(
    'window_id',
    'window_start'
  )
).cache()

display(window_map.limit(10).toPandas())

Unnamed: 0,household_id,window_id,window_start,window_end,campaign_record_id,campaign_type,treatment
0,1,1,2020-10-07,2021-02-27,1,B,B
1,1,1,2020-10-07,2021-02-27,2,A,A
2,1,2,2021-02-15,2021-07-03,2,A,A
3,1,2,2021-02-15,2021-07-03,3,B,B
4,1,2,2021-02-15,2021-07-03,4,A,A
5,1,3,2021-04-21,2021-08-21,3,B,B
6,1,3,2021-04-21,2021-08-21,4,A,A
7,1,4,2021-05-18,2022-01-01,4,A,A
8,1,4,2021-05-18,2022-01-01,5,C,C
9,1,4,2021-05-18,2022-01-01,6,B,B


In [15]:
# Now that we have the windows mapped, we need to pivot into a wide data frame for attribution
attribution_window = (
  window_map
  .groupBy(
    'household_id',
    'window_id',
    'window_start',
    'window_end'
  )
  .pivot(
    'treatment'
  )
  .count()
  .fillna(
    0,
    # subset="treatment"
  )
).cache()

In [16]:
# Convert player columns into boolean values
treatments = (
    window_map
    .select('treatment')
    .dropDuplicates()
    .toPandas()
    ['treatment']
    .tolist()
)

# Sort player_ids
treatments.sort()

# Convert to tuple
treatments = tuple(treatments)

display(treatments)

('A', 'B', 'C')

In [17]:
display(window_map.limit(10).toPandas())

Unnamed: 0,household_id,window_id,window_start,window_end,campaign_record_id,campaign_type,treatment
0,1,1,2020-10-07,2021-02-27,1,B,B
1,1,1,2020-10-07,2021-02-27,2,A,A
2,1,2,2021-02-15,2021-07-03,2,A,A
3,1,2,2021-02-15,2021-07-03,3,B,B
4,1,2,2021-02-15,2021-07-03,4,A,A
5,1,3,2021-04-21,2021-08-21,3,B,B
6,1,3,2021-04-21,2021-08-21,4,A,A
7,1,4,2021-05-18,2022-01-01,4,A,A
8,1,4,2021-05-18,2022-01-01,5,C,C
9,1,4,2021-05-18,2022-01-01,6,B,B


In [18]:
treatment_data = (
    window_map
    .groupBy(
        'window_id'
    )
    # Don't need to maintain order in our case, so collect set is OK here
    .agg(
        sf.collect_set('treatment').alias('impressions')
    )
    .withColumnRenamed(
        'window_id',
        'identifier'
    )
).cache()


# # Convert impressions column to tuple
# treatment_data['impressions'] = treatment_data.impressions.apply(lambda x: tuple(x))

# treatment_data.head()
# display(treatment_data.limit(10).toPandas().to_dict(orient='records'))

In [19]:
# Now we need REVENUE per window
revenue_to_window = (
  attribution_window.select('household_id', 'window_id', 'window_start', 'window_end').alias('a')
  .join(
    transaction_summary.alias('b'),
    on=(
      sf.col('b.transaction_day').between(
        sf.col('a.window_start'),
        sf.col('a.window_end')
      ) &
      (
        sf.col('a.household_id') == sf.col('b.household_id')
      )
    ),
    how='left'
  )
  # Sum up revenue per window
  .groupBy(
    # Household_id here mostly for spot checking
#     'a.household_id',
    'window_id'
  )
  .agg(
    sf.sum('revenue_usd').alias('revenue_usd')
  )
  .fillna(
    0,
    subset=['revenue_usd']
  )
  # Truncate to dollars so I don't have to stare at floating point numbers
  .withColumn(
    'revenue_usd',
    sf.col('revenue_usd').cast('long')
  )
  .withColumnRenamed(
    'window_id',
    'identifier'
  )
).cache()

display(revenue_to_window.limit(10).toPandas())

Unnamed: 0,identifier,revenue_usd
0,1,944
1,2,952
2,3,776
3,4,1389
4,5,717
5,6,628
6,7,332
7,8,433
8,9,199
9,10,68


In [20]:
attribution_data = (
    treatment_data
    .join(
        revenue_to_window,
        on='identifier',
        how='left'
    )
    .withColumnRenamed(
        'revenue_usd',
        'value'
    )
    .toPandas()
)

attribution_data['impressions'] = attribution_data.impressions.apply(lambda x: tuple(x))

# Convert to JSON
attribution_data_json = attribution_data.to_dict(orient='records')

In [23]:
# Write to file
import json

with open('example_data.json', 'w') as f:
    json.dump(attribution_data_json, f)
    