## README
- This notebook cleans and samples from the transaction data to obtain the final modelling datasets. 
- It should be used twice in the customer segmentation process: once to obtain a sample for fitting the model (y.csv, x.csv, h.csv, products.csv), 
and once to process the 'to segment' customer set (y_segmentation.csv, x_segmentation.csv, h_segmentation.csv, products_segmentation.csv) to be used in the *CTM-3 Customer Segmentation* notebook

The notebook consists of the following three parts.

PART 1 - The data pre-processing steps

0. Only keep in-store transactions
1. Remove 'to-go' and Belgian shops
2. Remove 'abusive' bonus cards
3. Only keep customers shopping more than X weeks
4. Aggregate all products to lowest_level taxonomy
5. Take % random sample OR set of customers we want to segment  

PART 2 - Resulting datasets

(!) For fitting the model: these 4 datasets need to be fully saved as csv - and the header row should be deleted manually for df_y, df_x and df_h - to be used in Python with the following names (!)
- **y.csv**: customer_id, basket_id, product_id
- **x.csv**: basket_id, trip-specific variables
- **h.csv**: customer_id, customer-specific variables 
- **products.csv**: product_id, lowest_level ((!) must also uploaded to Databricks for later alignment purposes in segmentation (!))

(!) For customer segmentation, upload these files to Databricks: (done automatically in the notebook) (!)
Now, these files are saved to hive_metastore.default, but perhaps a seperate location would be better.

- Renamed y.csv to **y_segmentation.csv** 
- **customers_segmentation.csv**: customer_id, customerID

PART 3 - Descriptive statistics
- descriptive statistics for df_y, df_x and df_h


The details that need to be changed when using the notebook as data pre-processing step for fitting instead of segmenting:

- Timeframe (cmd5, optional)
- Sample size (cmd23)
- Uncomment line of code that uploads products.csv to Databricks (cmd 44)

In [0]:
import pyspark.sql.functions as F

In [0]:
%run /CODELIBRARY/main

## (!) Global Parameters - Timeframe (!)

In [0]:
# Select desired timeframe of the final sample 

# For fitting the CTM model in the thesis: 202309 until 202318

# For customer segmentation, choose the desired timeframe 
start_date = get_monday_of_weekkey('202325')
end_date = get_monday_of_weekkey('202333')  

# Part 1: Data pre-processing

## 0. Only keep in-store transactions

In [0]:
# Load sales transactions dataset
df_sales_transactions = get_table_sales_transactions(start_date, end_date)

# Keep only transactions that occurred in-store
df_sales_transactions_offline = df_sales_transactions.filter('OnlineFlag = 0')

### 1. Remove 'to-go' and Belgian shops

In [0]:
# Define function to get relevant stores, with label of what store it is
def get_store_banners(min_store_sales_date):
  """StoreNbr 1200 and 9998 are also located at headquarters, there is no column that allows for filtering"""
  return (
    get_table_store()
    # Removes headquarters (Zaandam, Coffee company) and DC's. Non-valid StoreNbr's
    .filter(~F.col('StoreNbr').between(100, 1000))
    .filter(~F.col('StoreNbr').isin(STORES_HSC))
    .filter(F.col('StoreSalesEndDt') >= min_store_sales_date)
    .filter(F.col('StoreFormatNbr') != -1) # Invalid store format
    .filter(F.col('StoreFormatNbr') != 7) # Remove gall stores
    .withColumn(
    'StoreFormat', F
      .when(F.col('StoreNbr').isin(STORES_BE), 'BEL')
      .when(F.col('StoreNbr').isin(STORES_TOGO), 'ToGo')
      .when(F.col('FranchiseInd') == 'J', 'FR')
      .when(F.col('FranchiseInd') == 'N', 'WWM')
    )
    .select('StoreNbr', 'StoreFormat', 'StoreFormatNbr')
  )

# Collect store numbers of to-go stores 
# store_numbers_to_go = get_store_banners('2022-01-01').filter(F.col("StoreFormat") == 'ToGo').select('StoreNbr', 'StoreFormat')
store_numbers_to_go = get_store_banners('2022-01-01') \
    .filter((F.col("StoreFormat") == 'ToGo') | (F.col("StoreFormat") == 'BEL')) \
    .select('StoreNbr', 'StoreFormat')

# Join with sales transactions to remove transactions from to-go stores
df_sales_transactions_without_togo = (
  df_sales_transactions_offline
  .join(store_numbers_to_go, 'StoreNbr', 'leftanti')
)

## 2. Remove 'abusive' bonus cards

In [0]:
# Load abusive bonus cards
src_abusive_cardholders = spark.read.format('delta').load('/mnt/sa-datalake-prd/Projects/CustomerTransformation/CustomerData/FixedFeatures/customerid_abusive/') 

# Filter out transactions with abusive bonus cards
df_sales_transactions_without_togo_abusive = (
  df_sales_transactions_without_togo
  .join(src_abusive_cardholders, 'CustomerID', 'leftanti')
)

## 3. Only keep customers shopping more than X weeks


In [0]:
# Set threshold for shopping more than X weeks
threshold_amount_of_weeks = 1 

In [0]:
# Add a new column to obtain the week number of a shopping trip
df_sales_transactions_without_togo_abusive = df_sales_transactions_without_togo_abusive.withColumn('Week', F.weekofyear('TransactionEndDts'))

# Calculate the number of weeks each customer made a purchase
df_weeks_purchased = df_sales_transactions_without_togo_abusive.groupBy('CustomerID').agg(F.countDistinct('Week').alias('WeeksPurchased'))

# Filter out customers with less than 30 weeks of purchases
df_loyal_customers = df_weeks_purchased.filter(F.col('WeeksPurchased') >= threshold_amount_of_weeks)

# Join back with sales transaction data
df_sales_transactions_without_togo_abusive_loyal = (
  df_sales_transactions_without_togo_abusive
  .join(df_loyal_customers, 'CustomerID', 'inner')
)

## 4. Aggregate all products to lowest_level taxonomy

In [0]:
# Get all product taxonomies
src_taxonomy = spark.read.csv('/mnt/sa-datalake-prd/Personal/Diederik/Taxonomies/product_taxonomy_paths_longest.csv', header=True, inferSchema="true", sep=';')

# Join lowest level taxonomy information with sales transactions
df_sales_transactions_without_togo_abusive_loyal_lowest_level = (
  df_sales_transactions_without_togo_abusive_loyal
  .join(src_taxonomy, 'RetailItemNbr', 'inner')
)

# Remove products with lowest level null 
df_sales_transactions_without_togo_abusive_loyal_lowest_level = df_sales_transactions_without_togo_abusive_loyal_lowest_level.dropna(subset=['lowest_level'])

df_sales_transactions_without_togo_abusive_loyal_lowest_level.display()

## (!) 5. Sample size (!)
- depends on use for fitting or segmentation

In [0]:
# Set % size of subset

# Sample size used for fitting the model in thesis: 0.00014
# sample_size = 0.0001

# OR 

# Sample size for customer segmentation (0.5 means 50% of customers)
sample_size = 0.2 # For customer segmentation 

In [0]:
# First, get a list of all customer IDs that are in the resulting sales transactions dataset
df_customer_IDs = df_sales_transactions_without_togo_abusive_loyal_lowest_level.select("CustomerID").distinct()

# Select a random sample of of these customer IDs for fitting the model OR use all customers for customer segmentation 
df_final_sample_IDs = df_customer_IDs.sample(sample_size)

# Part 2: Resulting Datasets

## Sample size
- Should contain 125K transactions max. for the CTM model in Python!!!
- When using too many transactions for fitting the model, the following error will occur when estimating the model in Python:

zsh: segmentation fault  python estimate.py -MODEL CTM -M 3 -N_ITER 2000 -N_SAVE_PER 500

In [0]:
# Keep only relevant columns
df_sales_transactions_without_togo_abusive_loyal_lowest_level = (
  df_sales_transactions_without_togo_abusive_loyal_lowest_level
  .select('CustomerID', 'RetailItemNbr', 'TransactionID', 'Date', 'EventTimestamp', 'lowest_level', 'QuantityCE', 'SalesGoodsExclDiscountEUR', 'SalesGoodsEUR')
)

# Final dataset
df_final_sample_transactions = (
df_sales_transactions_without_togo_abusive_loyal_lowest_level
.join(df_final_sample_IDs, 'CustomerID', 'inner')
)

# Count how many customers in the final sample
customers_final_sample = df_final_sample_IDs.agg(F.countDistinct('CustomerID')).collect()[0][0]

# Count shopping trips 
shopping_trips_final_sample = df_final_sample_transactions.agg(F.countDistinct('TransactionID')).collect()[0][0]

# Count transactions
transactions_final_sample = df_final_sample_transactions.count()

# Count unique products
different_products_final_sample = df_final_sample_transactions.agg(F.countDistinct('lowest_level')).collect()[0][0] 

print("The final sample contains", customers_final_sample, "customers")
print("The final sample contains", shopping_trips_final_sample, "shopping trips")
print("The final sample contains", transactions_final_sample, "transactions")
print("The final sample contains", different_products_final_sample, "different products")

## Products dataset

In [0]:
# Define a window specification for customer_id
customer_window_spec = Window.orderBy("CustomerID")

# Add a new column 'customer_id' with new indices for customers ranging from 0 to I-1, ordered by lowest CustomerID values
df_final_sample_transactions = df_final_sample_transactions.withColumn("customer_id", F.dense_rank().over(customer_window_spec.orderBy("CustomerID")) - 1)

# Define a window specification for basket_id
basket_window_spec = Window.orderBy("CustomerID", "Date", "TransactionID")

# Add a new column 'basket_id' with sequential indices for each transaction across all customers 
df_final_sample_transactions = df_final_sample_transactions.withColumn("basket_id", F.dense_rank().over(basket_window_spec) - 1)

# Determine unique RetailItemNbr values and assign their corresponding IDs
unique_retail_items = df_final_sample_transactions.select("lowest_level").distinct()
unique_retail_items = unique_retail_items.withColumn("product_id", F.row_number().over(Window.orderBy("lowest_level")) - 1)

# Join the original DataFrame with the unique_retail_items DataFrame
df_final_sample = df_final_sample_transactions.join(
    unique_retail_items,
    on="lowest_level",
    how="left"
)

# Show the updated DataFrame
df_final_sample.drop('item_id')

# Split rows with quantityCE > 1 into multiple rows
exploded_df = df_final_sample.withColumn("exploded", F.explode(F.expr("split(repeat('1', QuantityCE), '')")))

# Create a new DataFrame with quantityCE = 1 for each exploded row
df_final_sample = exploded_df.drop("QuantityCE", "exploded")

# Create a mapping of product_id to lowest_level to be downloaded later as input for the model 
df_products = df_final_sample.select('product_id', 'lowest_level').dropDuplicates(['product_id'])

## Customers dataset (only relevant for segmentation (!))

In [0]:
# Store a dataframe that maps customer_id - customerID
df_customers_segmentation = df_final_sample.select('customer_id', 'CustomerID').dropDuplicates(['customer_id'])

# Display result
display(df_customers_segmentation)

## Dataset Y 
- customer_id, basket_id, product_id 

In [0]:
# Generate Y dataset 
df_y = df_final_sample.select('customer_id', 'basket_id', 'product_id')
df_y.display()


# Display summary statistics of Y dataset 
number_of_baskets = df_y.agg(F.max(F.col("basket_id"))).first()[0]
number_of_customers = df_y.agg(F.max(F.col("customer_id"))).first()[0]
number_of_products = df_y.agg(F.countDistinct(F.col('product_id'))).first()[0]

print('number of baskets', number_of_baskets)
print('number of customers', number_of_customers)
print('number of unique products', number_of_products)

## Dataset X
- basket_id, trip-specific variables (Promotion, Day of Week, Time of Day)

In [0]:
# Add a dummy column 'IsWeekend' indicating whether 'EventTimeStamp' is on a weekend (1 for weekend, 0 for weekday)
df_final_sample = df_final_sample.withColumn("EventTimestamp", F.expr("to_timestamp(EventTimestamp, 'yyyy-MM-dd''T''HH:mm:ss.SSSZ')"))
df_final_sample = df_final_sample.withColumn("IsWeekend", F.when(F.dayofweek(F.col("EventTimestamp")).isin([1, 7]), 1).otherwise(0))

# Add a dummy column After5PM, indicating whether 'EventTimeStamp' is after or before 5pm (1 for evening, 0 for day)
df_final_sample = df_final_sample.withColumn("After5PM", F.when(F.hour(F.col("EventTimestamp")) >= 17, 1).otherwise(0))

# Add dummy colums for each month 
df_final_sample = df_final_sample.withColumn("January", F.when(F.month(F.col("EventTimestamp")) == 1, 1).otherwise(0))
df_final_sample = df_final_sample.withColumn("February", F.when(F.month(F.col("EventTimestamp")) == 2, 1).otherwise(0))
df_final_sample = df_final_sample.withColumn("March", F.when(F.month(F.col("EventTimestamp")) == 3, 1).otherwise(0))
df_final_sample = df_final_sample.withColumn("April", F.when(F.month(F.col("EventTimestamp")) == 4, 1).otherwise(0))
df_final_sample = df_final_sample.withColumn("May", F.when(F.month(F.col("EventTimestamp")) == 5, 1).otherwise(0))
df_final_sample = df_final_sample.withColumn("June", F.when(F.month(F.col("EventTimestamp")) == 6, 1).otherwise(0))
df_final_sample = df_final_sample.withColumn("July", F.when(F.month(F.col("EventTimestamp")) == 7, 1).otherwise(0))
df_final_sample = df_final_sample.withColumn("August", F.when(F.month(F.col("EventTimestamp")) == 8, 1).otherwise(0))
df_final_sample = df_final_sample.withColumn("September", F.when(F.month(F.col("EventTimestamp")) == 9, 1).otherwise(0))
df_final_sample = df_final_sample.withColumn("October", F.when(F.month(F.col("EventTimestamp")) == 10, 1).otherwise(0))
df_final_sample = df_final_sample.withColumn("November", F.when(F.month(F.col("EventTimestamp")) == 11, 1).otherwise(0))
df_final_sample = df_final_sample.withColumn("December", F.when(F.month(F.col("EventTimestamp")) == 12, 1).otherwise(0))

# Display the updated DataFrame
df_final_sample.display()

In [0]:
# Calculate the total discount for each basket
df_discount = df_final_sample.groupBy('basket_id').agg(
    F.sum(F.col('SalesGoodsExclDiscountEUR') - F.col('SalesGoodsEUR')).alias('total_discount'),
    F.sum('SalesGoodsExclDiscountEUR').alias('total_sales_excl_discount')
)

# Calculate the percentage discount of the total basket value 
window_spec = Window.partitionBy('basket_id')
df_discount = df_discount.withColumn(
    'basket_discount_percentage',
    (F.col('total_discount') / F.col('total_sales_excl_discount'))
)

# Join the discount DataFrame back to the original DataFrame
df_final_sample_transactions_final_discounts = df_final_sample.join(df_discount, on='basket_id', how='left')

# Display the updated DataFrame
df_final_sample_transactions_final_discounts.select('customer_id', 'basket_id', 'product_id', 'SalesGoodsExclDiscountEUR', 'SalesGoodsEUR', 'total_discount', 'total_sales_excl_discount', 'basket_discount_percentage', 'IsWeekend', 'After5PM', 'January', 'February', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December').display()

In [0]:
# Display the final X dataset 
df_x = df_final_sample_transactions_final_discounts.select('basket_id', 'basket_discount_percentage', 'IsWeekend', 'After5PM', 'January', 'February', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December')
df_x = df_x.drop_duplicates(subset=['basket_id'])
display(df_x)

# Display the number of baskets 
number_of_baskets = df_x.agg(F.max(F.col("basket_id"))).first()[0]
print('number of baskets', number_of_baskets)

## Dataset H
- customer_id, customer-specific variables (Age, Location) 

In [0]:
# Load customer master table 
src_customer_master = get_table_customer_master()

# Join the final dataset with the customer_master table, to find the Membernbr for each offline customer 
df_final_transactions_with_member_numbers = (
df_final_sample
.join(src_customer_master, 'CustomerID', 'left')
)

# Load customer demographics table 
df_customer_demographics = get_table_customers_privacylocker_thor()

# Using each customer's member number, add available customer Age and Location information to each customer 
df_customer_demographics_offline = (
  df_final_transactions_with_member_numbers
  .join(df_customer_demographics, 'MemberNbr', 'left')
  .select('customer_id', 'CityName', 'DateOfBirth')
)

df_customer_demographics_offline = df_customer_demographics_offline.drop_duplicates(subset=['customer_id'])

# Display customer information (Location, Age)
df_customer_demographics_offline.display()

In [0]:
# Create dummies for age 
df_ages = df_customer_demographics_offline.withColumn('Age', (F.current_date() - F.col('DateOfBirth')).cast('integer') / 365)
df_ages = df_ages.withColumn('UnknownAge', F.when(F.col('DateOfBirth').isNull(), 1).otherwise(0))
df_ages = df_ages.withColumn('Age_0_25', F.when((F.col('Age') >= 0) & (F.col('Age') <= 25), 1).otherwise(0))
df_ages = df_ages.withColumn('Age_25_35', F.when((F.col('Age') >= 25) & (F.col('Age') <= 35), 1).otherwise(0))
df_ages = df_ages.withColumn('Age_35_45', F.when((F.col('Age') > 35) & (F.col('Age') <= 45), 1).otherwise(0))
df_ages = df_ages.withColumn('Age_45_55', F.when((F.col('Age') > 45) & (F.col('Age') <= 55), 1).otherwise(0))
df_ages = df_ages.withColumn('Age_55_65', F.when((F.col('Age') > 55) & (F.col('Age') <= 65), 1).otherwise(0))
df_ages = df_ages.withColumn('Age_65_plus', F.when(F.col('Age') > 65, 1).otherwise(0))
df_ages = df_ages.drop('Age')

# Create dummy for large city or not. A large city is a city with more than 250.000 habitants 
cities = ['AMSTERDAM', 'ROTTERDAM', "'S-GRAVENHAGE", 'UTRECHT'] # Cities with more than 250.000 habitants 

df_ages_and_cities = df_ages.withColumn('IsCity', F.when(F.col('CityName').isin(cities), 1).otherwise(0))

In [0]:
# Display the number of customers in the sample and structure the H dataset
df_h = df_ages_and_cities.select('customer_id', 'UnknownAge', 'Age_0_25','Age_25_35', 'Age_35_45', 'Age_55_65', 'Age_65_plus', 'IsCity')
number_of_customers = df_y.agg(F.max(F.col("customer_id"))).first()[0]
print(' number of customers', number_of_customers)

## In case of data pre-processing for fitting the CTM model, download these files:
## (!) Final datasets - download as CSV (all rows) as input for CTM model in Python (!) 

- Make sure to delete the column headers (manually) from y.csv, x.csv and h.csv, otherwise you will get a Python error of this kind:

ValueError: could not convert string 'basket_discount_percentage' to float64 at row 0, column 1.

In [0]:
# (!) download df_y as csv (!) 
df_y.display()

In [0]:
# (!) download df_x as csv (!) 
df_x.select('basket_discount_percentage', 'IsWeekend', 'After5PM').display()

In [0]:
# (!) download df_h as csv (!) 
df_h.drop('customer_id').display(header=None)

In [0]:
# (!) download df_products as csv (!)
df_products.display()

### (!) comment out following cell when used to generate segmenting set (!) 

In [0]:
# Write df_products to Databricks, must be done ONLY if this notebook is used for fitting the model
# Because we need to save the product id - lowest level alignment to be used in customer segmentation part 

# WHEN USED FOR GENERATING A 'TO SEGMENT' SET, THE FOLLOWING LINE MUST BE COMMENTED OUT! 

# df_products.write.mode("overwrite").option('overwriteSchema', True).saveAsTable('products')

## In case of data pre-processing for customer segmentation, write these files to Databricks:
## (!) Final datasets - upload to Databricks for customer segmentation (!)

In [0]:
# We need to align the product_id - lowest_level mapping when fitting the model with the mapping used in segmentation. 
# By running this cell, we ensure that the products in y_segmentation get the same product_id's as in the fitted model. That is needed to ensure that the counts_phi values
# correspond to the correct products. 

# Obtain the df_products that was used when fitting the model (from above or manually upload an earlier products file to Databricks)
df_products = table('default.products') 

# Join the to segment customer sample with the product_id - lowest_level mapping from the fitted model 
df_final_sample = df_final_sample.withColumnRenamed("product_id", "product_id_segmentation")
df_final_sample_with_original_product_ids = df_final_sample.join(df_products, 'lowest_level', 'left')

# Filter out lowest levels that did not exist when fitting the model 
df_final_sample_segmentation = df_final_sample_with_original_product_ids.filter(F.col('product_id').isNotNull())

# Display the updated sample, with the same lowest_level - product_id mapping as when fitting the model
display(df_final_sample_segmentation)

# Create y_segmentation with the updated product id's
df_y_segmentation = df_final_sample_segmentation.select('customer_id', 'basket_id', 'product_id')

In [0]:
# (!) Upload df_y_segmentation to Databricks (!)
df_y_segmentation.write.mode("overwrite").option('overwriteSchema', True).saveAsTable('y_segmentation')
df_y_segmentation.display()

In [0]:
# (!) Upload df_customers_segmentation to Databricks (!)
# So that in the final results, we can retrieve the original customer IDs of 'to-segment' customers 
df_customers_segmentation.write.mode("overwrite").option('overwriteSchema', True).saveAsTable('customers_segmentation')
df_customers_segmentation.display()

# Part 3: Descriptive Statistics

### Descriptive statistics y 

In [0]:
print('number of baskets', number_of_baskets)
print('number of customers', number_of_customers)
print('number of unique products', number_of_products)

### Descriptive statistics x

In [0]:
# Compute the summary statistics
summary_df = df_x.agg(
    F.avg(F.col('basket_discount_percentage')).alias('Average_discount'),
    F.sum(F.col('IsWeekend')).alias('IsWeekend'),
    F.sum(F.col('After5PM')).alias('After5PM'),
    F.sum(F.col('January')).alias('January'),
    F.sum(F.col('February')).alias('February'),
    F.sum(F.col('April')).alias('April'),
    F.sum(F.col('May')).alias('May'),
    F.sum(F.col('June')).alias('June'),
    F.sum(F.col('July')).alias('July'),
    F.sum(F.col('August')).alias('August'),
    F.sum(F.col('September')).alias('September'),
    F.sum(F.col('October')).alias('October'),
    F.sum(F.col('November')).alias('November'),
    F.sum(F.col('December')).alias('December')
)

# Compute the frequency and percentage of 1's for dummy variables
dummy_cols = ['IsWeekend', 'After5PM', 'January', 'February', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December']

for col_name in dummy_cols:
    total_count = df_x.count()
    sum_col = df_x.agg(F.sum(F.col(col_name))).collect()[0][0]
    percentage = (sum_col / total_count) * 100
    summary_df = summary_df.withColumn(col_name + '_Frequency', F.lit(sum_col))\
                           .withColumn(col_name + '_Percentage', F.lit(percentage))

# Drop the columns not needed in the final table
summary_df = summary_df.drop('basket_id', *dummy_cols)

# Show the final table
summary_df.display()

### Descriptive statistics h

In [0]:
# Compute the summary statistics
summary_df_h = df_h.agg(
    F.sum(F.col('UnknownAge')).alias('UnknownAge_Frequency'),
    F.sum(F.col('Age_0_25')).alias('Age_0_25_Frequency'),
    F.sum(F.col('Age_25_35')).alias('Age_25_35_Frequency'),
    F.sum(F.col('Age_35_45')).alias('Age_35_45_Frequency'),
    F.sum(F.col('Age_55_65')).alias('Age_55_65_Frequency'),
    F.sum(F.col('Age_65_plus')).alias('Age_65_plus_Frequency'),
    F.sum(F.col('IsCity')).alias('IsCity_Frequency')
)

# Compute the percentage of 1's for dummy variables
dummy_cols_h = ['UnknownAge', 'Age_0_25', 'Age_25_35', 'Age_35_45', 'Age_55_65', 'Age_65_plus', 'IsCity']

for col_name in dummy_cols_h:
    total_count = df_h.count()
    sum_col = df_h.agg(F.sum(F.col(col_name))).collect()[0][0]
    percentage = (sum_col / total_count) * 100
    summary_df_h = summary_df_h.withColumn(col_name + '_Percentage', F.lit(percentage))

# Show the final table for df_h
summary_df_h.display()