<a href="https://colab.research.google.com/github/FelipeTufaile/customer_lifetime_value/blob/main/notebooks/Feature_Processing_Customer_Spend_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Installing Kaggle API

In [None]:
# Installing Kaggle API
!pip install kaggle



## Configuring Kaggle Credentials

Setup kaggle API correctly following https://www.kaggle.com/docs/api
```
%%shell
mkdir ~/.kaggle
echo \{\"username\":\"{your kaggle username}\",\"key\":\"{your kaggle api key}\"\} > ~/.kaggle/kaggle.json
pip install kaggle
```

In [None]:
%%shell
mkdir ~/.kaggle
echo \{\"username\":\"{your kaggle username}\",\"key\":\"{your kaggle api key}\"\} > ~/.kaggle/kaggle.json
pip install kaggle





## Installing PySpark

Since part of the feature engineering process may involve dealing with large amounts of data, we will be using PySpark.
To facilitate future work, the final processed table will be stored as a parquet file in Google Drive.

In [1]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

[33m0% [Working][0m            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
[33m0% [Connecting to archive.ubuntu.com (185.125.190.81)] [1 InRelease 5,484 B/129[0m                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
[33m0% [Connecting to archive.ubuntu.com (185.125.190.81)] [1 InRelease 46.0 kB/129[0m[33m0% [Connecting to archive.ubuntu.com (185.125.190.81)] [1 InRelease 83.7 kB/129[0m[33m0% [Connecting to archive.ubuntu.com (185.125.190.81)] [Connected to r2u.stat.i[0m                                                                               Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
[33m0% [Waiting for headers] [Waiting for headers] [Connecting to ppa.launchpadcont[0m                                                                               Ign:4 https://r

'/usr/local/lib/python3.10/dist-packages/pyspark'

## Loading Libraries

In [2]:
# Importing Numpy library
import numpy as np

# Importing Pandas library
import pandas as pd

# Importing datetime library
from datetime import datetime, timedelta

# importing the zipfile module
from zipfile import ZipFile

# Importing gzip library
import gzip

# Importing plotying libraries
import matplotlib.pyplot as plt
import seaborn as sns

# Importing userdata from Google Colab Library
from google.colab import userdata, drive

# Importing pyspark libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType, TimestampType
import pyspark.sql.functions as f
from pyspark.sql.window import Window

# Creating a spark session
spark = SparkSession.builder.appName("DataProcessingPySpark").getOrCreate()

spark

In [3]:
# Mounting Google Drive Folder
# This step gives us access to data stored in our Google drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Downloading Customer Spend Model Data

This notebook use the customer spend model data available in the Kaggle. The dataset can be found in the following link: https://www.kaggle.com/competitions/customer-spend-model/data.
The task requested by the author of the competition is to **predict customer purchases from German book company**. In order to do so, we are provided with the transaction history for each customer up to Nov 24, 2014.
The output that we are requested to provide is the log of the estimated amount spent by each customer after Nov 24, 2014. In order to avoid log(0), we are requested to adjust the log calculation to log(x+1), where x is the estimate of the future amount spent.
For more details about the task description you can access the following link: https://www.kaggle.com/competitions/customer-spend-model/overview.

In [None]:
# Downloading the Customer Spend Model Data from Kaggle
!kaggle competitions download -c customer-spend-model

customer-spend-model.zip: Skipping, found more recently modified local copy (use --force to force download)


## Unzipping the Transactions Dataset

This is step is necessary to unzip the csv file download in the previous cell.

In [None]:
# loading the customer-spend-model.zip and creating a zip object
with ZipFile("../content/customer-spend-model.zip", 'r') as zip_object:

	# Extracting specific file in the zip into a specific location.
	zip_object.extract("customer.csv", path="../content/drive/MyDrive/Colab/Sandbox/Data")

 	# Extracting specific file in the zip into a specific location.
	zip_object.extract("orders.csv", path="../content/drive/MyDrive/Colab/Sandbox/Data")

# closing object
zip_object.close()

## Processing Data Using PySpark

### Loading Orders Dataframe

**orders.csv**: all orders prior to 11/25/2014 for training (n=5,551) and test (n=11,230) sets. You should find 353,687 records plus a header.

- **id**: unique customer identifier;
- **orddate**: order date;
- **ordnum**: order number;
- **category**:
    - category identifier:,
        - 1 = fiction;
        - 3 = classics;
        - 5 = cartoons;
        - 6 = legends;
        - 7 = philosophy;
        - 8 = religion;
        - 9 = psychology;
        - 10 = linguistics;
        - 12 = art;
        - 14 = music;
        - 17 = art reprints;
        - 19 = history;
        - 20 = contemporary history;
        - 21 = economy;
        - 22 = politics;
        - 23 = science;
        - 26 = computer science;
        - 27 = traffic, railroads;
        - 30 = maps;
        - 31 = travel guides;
        - 35 = health;
        - 36 = cooking;
        - 37 = learning;
        - 38 = games and riddles;
        - 39 = sports;
        - 40 = hobby;
        - 41 = nature/animals/plants;
        - 44 = encyclopedia;
        - 50 = videos, DVDs;
        - 99 = non books
- **qty**: quantity;
- **price**: price paid;

In [4]:
# Reading orders dataframe
orders_df = spark.read.format("csv").option("header", "true").load("../content/drive/MyDrive/Colab/Sandbox/Data/orders.csv")

# Show orders dataframe
orders_df.show()

+---+---------+------+--------+---+------------+
| id|  orddate|ordnum|category|qty|       price|
+---+---------+------+--------+---+------------+
|957|10FEB2008| 38650|      35|  1|5.0106582642|
|957|10FEB2008| 38650|      35|  1|20.426101685|
|957|10FEB2008| 38650|      19|  1|20.400543213|
|957|15MAR2008| 48972|      40|  1|25.539016724|
|957|22NOV2008|150011|      40|  1|14.316169739|
|957|22NOV2008|150011|      40|  1|8.5896987915|
|957|03OCT2009|286151|      19|  1|15.313186646|
|957|04APR2010|376779|      14|  1|12.782295227|
|957|04APR2010|376779|      14|  1|5.0873527527|
|957|04APR2010|376779|      35|  1|6.5445327759|
|957|14AUG2011|622093|      99|  1|8.6919555664|
|957|14AUG2011|622093|      19|  1|10.174705505|
|957|14AUG2011|622093|       5|  1|15.236495972|
|957|10SEP2011|639810|      99|  1|9.9497375488|
|957|10SEP2011|639810|      35|  1|10.200271606|
|957|10SEP2011|639810|      99|  1|6.5445327759|
|957|10SEP2011|639810|      99|  1|2.5564575195|
|957|10SEP2011|63981

### Creating a Transaction Dataframe

In [5]:
# Creating a transaction dataframe
trans_df = (

    # Referencing orders dataframe
    orders_df

    # Converting the transaction date format
    .withColumn("trans_dt", f.date_format(f.to_date(f.col("orddate"), "ddMMMyyyy"), 'yyyy-MM-dd'))

    # Selecting columns
    .select(
        f.col("id").alias("cust_id"),
        f.trunc("trans_dt", "month").alias("ref_dt"),
        f.col("trans_dt"),
        #f.col("category"),                             # Will not be used right now
        #f.col("qty").alias("trans_qnt"),               # Will not be used right now
        f.col("price").alias("trans_amt")
    )

    # Consider only purchase amount higher than 0. Purchases amounts <= 0 does not make sense.
    .filter(f.col("trans_amt") > 0)

    # Agregating by customer id and transaction date
    .groupBy(["cust_id", "ref_dt"])
    .agg(
        f.min("trans_dt").alias("trans_dt"),
        f.sum("trans_amt").alias("trans_amt")
    )

    # Ordering by customer id and transaction date
    .orderBy(f.asc("cust_id"), f.asc("trans_dt"))

# The caching step here aims to avoid crashing the feature processing, possibly due to long DAGs.
).cache()

# Counting the number of rows and caching the dataframe
print(f"Number of rows in the transaction dataframe: {trans_df.count()}")

# Printing the transaction dataframe
trans_df.show()

Number of rows in the transaction dataframe: 92245
+--------+----------+----------+------------------+
| cust_id|    ref_dt|  trans_dt|         trans_amt|
+--------+----------+----------+------------------+
|  100021|2009-02-01|2009-02-25|      6.6212272644|
|  100021|2010-01-01|2010-01-20|     33.0805740352|
|  100021|2010-03-01|2010-03-06|     45.8117370608|
|  100021|2010-04-01|2010-04-14|        50.1065979|
|  100021|2010-05-01|2010-05-10|27.047328949399997|
|  100021|2010-08-01|2010-08-28|      50.899093628|
|  100021|2010-09-01|2010-09-26|      15.287620544|
|10005188|2012-07-01|2012-07-28|30.451723098600002|
|10005188|2012-08-01|2012-08-26|     45.2999877928|
|10005188|2013-05-01|2013-05-08|24.799987792899998|
|10005188|2013-09-01|2013-09-22|42.849990844000004|
|10005188|2014-06-01|2014-06-15|      19.799987793|
|10005188|2014-10-01|2014-10-04|     19.8999938964|
|10009396|2012-08-01|2012-08-07|     24.9506454467|
|10009396|2013-01-01|2013-01-05|189.09993743780004|
|10009396|201

## Creating a Customer Lifetime value Dataframe

Next we will create a customer lifetime value dataframe. This dataframe will be a representation "snapshot" of each customer's behavior pattern on 24 Nov 2014, which considers each customers transactions history. Historically, features that best describe customer behavior are RFML features (Recency, Frequency, Monetary and Lifetime). Therefore, we will calculate RFML features along with additional features (e.g. Rolling 6 months transaction amounts).

In [6]:
# Calculating the first transaction date accross all customers
start_date = trans_df.agg({"trans_dt": "min"}).withColumnRenamed("min(trans_dt)", "date").collect()[0].date[:-2]+'01'

print(f"First transaction date across all customers: {start_date}")

First transaction date across all customers: 2007-11-01


In [7]:
# Calculating the last transaction date accross all customers
end_date = trans_df.agg({"trans_dt": "max"}).withColumnRenamed("max(trans_dt)", "date").collect()[0].date[:-2]+'01'

print(f"Last transaction date across all customers: {end_date}")

Last transaction date across all customers: 2014-11-01


Although we have a specific task of generating predictions for purchases after 24 Nov 2014, we can actually train our model on much broader timeframe. That is, we can generate a dataframe containing information on customers behavior for each month/year available for each customer instead of using only the "snapshot" on 24 Nov 2014.

The next feature processing step will generate such dataframe containing information on customer behavior for each valid combination of month/year and customer id.

In [10]:
# Create a PySpark calendar dataframe with month-start (MS) frequency
calendar_df = (
  spark.createDataFrame(pd.DataFrame({'ref_dt': pd.date_range(start=start_date, end=end_date, freq='MS')}))
  .withColumn("ref_dt", f.to_date("ref_dt"))
).cache()

# Counting the number of rows and caching the dataframe
print(f"Number of rows in the calendar dataframe: {calendar_df.count()}")

Number of rows in the calendar dataframe: 85


In [8]:
# Creating a customer id dataframe
# This dataframe should contain all distinct customer ids available in the transaction dataframe
customers_id_df = trans_df.select(f.col('cust_id')).distinct().cache()

# Counting the number of rows and caching the dataframe
print(f"Number of rows in the customer id dataframe: {customers_id_df.count()}")

Number of rows in the customer id dataframe: 16661


In [11]:
# Creating a cross-join dataframe
cross_join_df = (

    # Referencing the calendar table
    calendar_df

    # Performing a crossjoin with distinct customer ids and distinct transaction dates.
    # This should result in one record per month per customer
    .crossJoin(customers_id_df)

    # Adding aggregated transaction information about customers
    .join(trans_df, on=["cust_id", "ref_dt"], how="left")

    # Calculating the first transaction date per customer
    .withColumn("first_trans_dt", f.min("trans_dt").over(Window.partitionBy(["cust_id"])))

    # Selecting only the dates after the first transaction date for each customer
    .filter(f.col("ref_dt") >= f.trunc("first_trans_dt", "month"))

    # Replace null transaction amounts to zero
    .withColumn("trans_amt", f.when(f.col("trans_amt").isNull(), f.lit(0)).otherwise(f.col("trans_amt")))

    # Calculating the last transaction date per customer
    .withColumn("last_trans_dt", f.last("trans_dt", ignorenulls=True).over(Window.partitionBy(["cust_id"]).orderBy(f.asc("ref_dt")).rowsBetween(Window.unboundedPreceding, 0)))

    # Ordering columns
    .select(
        "cust_id",
        "ref_dt",
        "trans_amt",
        "first_trans_dt",
        "last_trans_dt"
    )

).cache()


# Counting the number of rows and caching the dataframe
print(f"Number of rows in the cross-join dataframe: {cross_join_df.count()}")

# Printing cross-join dataframe
cross_join_df.show()

Number of rows in the cross-join dataframe: 749989
+--------+----------+-------------+--------------+-------------+
| cust_id|    ref_dt|    trans_amt|first_trans_dt|last_trans_dt|
+--------+----------+-------------+--------------+-------------+
|10094474|2013-12-01| 30.799732208|    2013-12-02|   2013-12-02|
|10094474|2014-01-01|          0.0|    2013-12-02|   2013-12-02|
|10094474|2014-02-01|          0.0|    2013-12-02|   2013-12-02|
|10094474|2014-03-01|          0.0|    2013-12-02|   2013-12-02|
|10094474|2014-04-01|          0.0|    2013-12-02|   2013-12-02|
|10094474|2014-05-01| 25.849992752|    2013-12-02|   2014-05-04|
|10094474|2014-06-01|36.9399871822|    2013-12-02|   2014-06-18|
|10094474|2014-07-01|          0.0|    2013-12-02|   2014-06-18|
|10094474|2014-08-01|          0.0|    2013-12-02|   2014-06-18|
|10094474|2014-09-01|          0.0|    2013-12-02|   2014-06-18|
|10094474|2014-10-01| 29.989990234|    2013-12-02|   2014-10-07|
|10094474|2014-11-01|          0.0|    

In [15]:
# Creating a customer lifetime value dataframe
cltv_df = (

    # Referencing the cross-join table
    cross_join_df

    # Get month number from reference date
    .withColumn("ref_month", f.month("ref_dt"))

    # Get month number from reference date
    .withColumn("ref_year", f.year("ref_dt"))

    # Calculating first transaction amount per customer
    .withColumn("first_trans_amt", f.first("trans_amt", ignorenulls=True).over(Window.partitionBy(["cust_id"]).orderBy(f.asc("ref_dt")).rowsBetween(Window.unboundedPreceding, 0)))

    # Creating a transaction indicator
    .withColumn("trans_ind", f.when(f.col("trans_amt") > 0, f.lit(1)).otherwise(f.lit(0)))

    # Calculating the recency of each customer: the number of months between the reference date and the last transaction date
    .withColumn("recency", f.datediff(f.col("ref_dt"), f.col("last_trans_dt"))/(365/12))

    # Calculating Frequency: The number of months with transactions per customer between the firt transaction date and the reference date
    .withColumn("frequency", f.sum("trans_ind").over(Window.partitionBy(["cust_id"]).orderBy(f.asc("ref_dt")).rowsBetween(Window.unboundedPreceding, 0)))

    # Calculating Monetary: The total amount spent per customer between the firt transaction date and the reference date
    .withColumn("monetary", f.sum("trans_amt").over(Window.partitionBy(["cust_id"]).orderBy(f.asc("ref_dt")).rowsBetween(Window.unboundedPreceding, 0)))

    # Calculating the lifetime of each customer: the number of months between the reference date and the first transaction date
    .withColumn("lifetime", f.datediff(f.col("ref_dt"), f.col("first_trans_dt"))/(365/12))

    # Calculating cycle length: the average number of months between subsequent transaction
    .withColumn("cycle_length", f.col("lifetime") / f.col("frequency"))

    # Calculating rolling 3 months transaction quantity
    .withColumn("trans_qnt_R03m_lag0", f.sum("trans_ind").over(Window.partitionBy(["cust_id"]).orderBy(f.asc("ref_dt")).rowsBetween(-2, 0)))
    .withColumn("trans_qnt_R03m_lag1", f.sum("trans_ind").over(Window.partitionBy(["cust_id"]).orderBy(f.asc("ref_dt")).rowsBetween(-5, -3)))
    .withColumn("trans_qnt_R03m_lag2", f.sum("trans_ind").over(Window.partitionBy(["cust_id"]).orderBy(f.asc("ref_dt")).rowsBetween(-8, -6)))
    .withColumn("trans_qnt_R03m_lag3", f.sum("trans_ind").over(Window.partitionBy(["cust_id"]).orderBy(f.asc("ref_dt")).rowsBetween(-11, -9)))

    # Calculating rolling 3 months transaction amount
    .withColumn("trans_amt_R03m_lag0", f.sum("trans_amt").over(Window.partitionBy(["cust_id"]).orderBy(f.asc("ref_dt")).rowsBetween(-2, 0)))
    .withColumn("trans_amt_R03m_lag1", f.sum("trans_amt").over(Window.partitionBy(["cust_id"]).orderBy(f.asc("ref_dt")).rowsBetween(-5, -3)))
    .withColumn("trans_amt_R03m_lag2", f.sum("trans_amt").over(Window.partitionBy(["cust_id"]).orderBy(f.asc("ref_dt")).rowsBetween(-8, -6)))
    .withColumn("trans_amt_R03m_lag3", f.sum("trans_amt").over(Window.partitionBy(["cust_id"]).orderBy(f.asc("ref_dt")).rowsBetween(-11, -9)))

    # Calculating the transaction amount for next 3 months
    # This column can be used as a target column (as an alternative for logtarg) for model training
    .withColumn("trans_amt_R03m_lead1",  f.sum("trans_amt").over(Window.partitionBy(["cust_id"]).orderBy(f.asc("ref_dt")).rowsBetween(1, 3)))

    # Fill missing values with 0
    # This is step will add 0 to the rolling 3 months calculations in cases where there is not enough transaction history to calculate the corresponding feature
    .fillna(0)

    # Ordering and selecting columns
    .select(
      "cust_id",
      "ref_dt",
      "ref_year",
      "ref_month",
      "first_trans_amt",
      "cycle_length",
      "recency",
      "frequency",
      "monetary",
      "lifetime",
      "trans_qnt_R03m_lag0",
      "trans_qnt_R03m_lag1",
      "trans_qnt_R03m_lag2",
      "trans_qnt_R03m_lag3",
      "trans_amt_R03m_lag0",
      "trans_amt_R03m_lag1",
      "trans_amt_R03m_lag2",
      "trans_amt_R03m_lag3",
      "trans_amt_R03m_lead1", # This feature can be used as target
    )

    # Ordering by customer id and transaction date
    .orderBy(f.asc("cust_id"), f.asc("ref_dt"))

).cache()

print(f"The dataframe has: {cltv_df.count()} rows")

The dataframe has: 749989 rows


In [16]:
# Printing a sample of the dataset
cltv_df.show()

+-------+----------+--------+---------+---------------+-------------------+--------------------+---------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+
|cust_id|    ref_dt|ref_year|ref_month|first_trans_amt|       cycle_length|             recency|frequency|          monetary|           lifetime|trans_qnt_R03m_lag0|trans_qnt_R03m_lag1|trans_qnt_R03m_lag2|trans_qnt_R03m_lag3|trans_amt_R03m_lag0|trans_amt_R03m_lag1|trans_amt_R03m_lag2|trans_amt_R03m_lag3|trans_amt_R03m_lead1|
+-------+----------+--------+---------+---------------+-------------------+--------------------+---------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+
| 100021|2009-02-01

## Storing the Processed Dataset as Parquet Files

In [17]:
# Defining dataframe path
file_path = "../content/drive/MyDrive/Colab/Sandbox/customer_spend_model"

# Storing Customer Lifetime Value Data
cltv_df.write.format("parquet").mode("overwrite").option("overwriteSchema", "true").save(file_path)