In [9]:
# 01-data-preparation.ipynb
# Customer Lifetime Value Prediction - Data Preparation

# Imports
import pandas as pd
import numpy as np
from google.cloud import storage
from google.cloud import bigquery

# Configuration
PROJECT_ID = "clv-prediction-mlops"
REGION = "us-central1"
BUCKET_NAME = "clv-prediction-data"

# Create GCS bucket
!gsutil mb -l {REGION} gs://{BUCKET_NAME}

print(f"Project: {PROJECT_ID}")
print(f"Bucket: gs://{BUCKET_NAME}")
print("Setup complete")

Creating gs://clv-prediction-data/...
ServiceException: 409 A Cloud Storage bucket named 'clv-prediction-data' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.
Project: clv-prediction-mlops
Bucket: gs://clv-prediction-data
Setup complete


In [2]:
# Download UCI Online Retail II dataset
!pip install openpyxl --quiet

!wget -q https://archive.ics.uci.edu/ml/machine-learning-databases/00502/online_retail_II.xlsx -O /tmp/retail.xlsx

# Load data
df = pd.read_excel('/tmp/retail.xlsx', sheet_name=None)

# Dataset has 2 sheets (Year 2009-2010 and Year 2010-2011)
print("Sheets:", list(df.keys()))

# Combine both sheets
df_2009 = df['Year 2009-2010']
df_2010 = df['Year 2010-2011']

data = pd.concat([df_2009, df_2010], ignore_index=True)

print(f"\nTotal rows: {len(data):,}")
print(f"Date range: {data['InvoiceDate'].min()} to {data['InvoiceDate'].max()}")
print(f"\nColumns: {list(data.columns)}")
print(f"\nSample:")
data.head()

Sheets: ['Year 2009-2010', 'Year 2010-2011']

Total rows: 1,067,371
Date range: 2009-12-01 07:45:00 to 2011-12-09 12:50:00

Columns: ['Invoice', 'StockCode', 'Description', 'Quantity', 'InvoiceDate', 'Price', 'Customer ID', 'Country']

Sample:


Unnamed: 0,Invoice,StockCode,Description,Quantity,InvoiceDate,Price,Customer ID,Country
0,489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2009-12-01 07:45:00,6.95,13085.0,United Kingdom
1,489434,79323P,PINK CHERRY LIGHTS,12,2009-12-01 07:45:00,6.75,13085.0,United Kingdom
2,489434,79323W,WHITE CHERRY LIGHTS,12,2009-12-01 07:45:00,6.75,13085.0,United Kingdom
3,489434,22041,"RECORD FRAME 7"" SINGLE SIZE",48,2009-12-01 07:45:00,2.1,13085.0,United Kingdom
4,489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2009-12-01 07:45:00,1.25,13085.0,United Kingdom


In [3]:
# Data quality check
print("="*60)
print("DATA QUALITY CHECK")
print("="*60)

print(f"\nMissing values:")
print(data.isnull().sum())

print(f"\nUnique customers: {data['Customer ID'].nunique():,}")
print(f"Unique products: {data['StockCode'].nunique():,}")
print(f"Unique countries: {data['Country'].nunique()}")

print(f"\nNegative quantities (returns): {(data['Quantity'] < 0).sum():,}")
print(f"Zero/negative prices: {(data['Price'] <= 0).sum():,}")

DATA QUALITY CHECK

Missing values:
Invoice             0
StockCode           0
Description      4382
Quantity            0
InvoiceDate         0
Price               0
Customer ID    243007
Country             0
dtype: int64

Unique customers: 5,942
Unique products: 5,305
Unique countries: 43

Negative quantities (returns): 22,950
Zero/negative prices: 6,207


In [4]:
# Data cleaning
print("="*60)
print("DATA CLEANING")
print("="*60)

# Starting count
print(f"Starting rows: {len(data):,}")

# Remove rows without Customer ID (can't calculate CLV without it)
data_clean = data.dropna(subset=['Customer ID'])
print(f"After removing null Customer ID: {len(data_clean):,}")

# Convert Customer ID to integer
data_clean['Customer ID'] = data_clean['Customer ID'].astype(int)

# Remove cancelled orders (Invoice starts with 'C')
data_clean = data_clean[~data_clean['Invoice'].astype(str).str.startswith('C')]
print(f"After removing cancellations: {len(data_clean):,}")

# Remove zero/negative prices
data_clean = data_clean[data_clean['Price'] > 0]
print(f"After removing zero/negative prices: {len(data_clean):,}")

# Calculate line total
data_clean['Total'] = data_clean['Quantity'] * data_clean['Price']

# Keep only positive totals (remove returns for now)
data_clean = data_clean[data_clean['Total'] > 0]
print(f"After removing returns: {len(data_clean):,}")

print(f"\nFinal dataset: {len(data_clean):,} rows")
print(f"Unique customers: {data_clean['Customer ID'].nunique():,}")

DATA CLEANING
Starting rows: 1,067,371
After removing null Customer ID: 824,364


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data_clean['Customer ID'] = data_clean['Customer ID'].astype(int)


After removing cancellations: 805,620
After removing zero/negative prices: 805,549
After removing returns: 805,549

Final dataset: 805,549 rows
Unique customers: 5,878


In [19]:
!bq mk --dataset clv-predictions-mlops:retail_data

Dataset 'clv-predictions-mlops:retail_data' successfully created.


In [20]:
#load data into BigQuery
!bq load \
    --source_format=PARQUET \
    --replace \
    clv-predictions-mlops:retail_data.transactions \
    gs://clv-prediction-data/raw/transactions.parquet

Waiting on bqjob_r99dc24d65a1b05f_0000019b0afebec5_1 ... (3s) Current status: DONE   


In [22]:
#confirm data is in BigQuery
!bq query --use_legacy_sql=false \
    'SELECT COUNT(*) as row_count FROM `clv-predictions-mlops.retail_data.transactions`'

+-----------+
| row_count |
+-----------+
|    805549 |
+-----------+


In [26]:
# Check date range - convert nanoseconds to timestamp
!bq query --use_legacy_sql=false 'SELECT TIMESTAMP_MICROS(CAST(MIN(InvoiceDate)/1000 AS INT64)) as earliest, TIMESTAMP_MICROS(CAST(MAX(InvoiceDate)/1000 AS INT64)) as latest FROM `clv-predictions-mlops.retail_data.transactions`'

+---------------------+---------------------+
|      earliest       |       latest        |
+---------------------+---------------------+
| 2009-12-01 07:45:00 | 2011-12-09 12:50:00 |
+---------------------+---------------------+


In [27]:
# Calculate RFM features from first 12 months (Dec 2009 - Nov 2010)
# These features will predict spend in months 13-24

query = """
CREATE OR REPLACE TABLE `clv-predictions-mlops.retail_data.customer_features` AS

WITH feature_period AS (
    -- First 12 months of data for building features
    SELECT *,
        TIMESTAMP_MICROS(CAST(InvoiceDate/1000 AS INT64)) AS purchase_time
    FROM `clv-predictions-mlops.retail_data.transactions`
    WHERE TIMESTAMP_MICROS(CAST(InvoiceDate/1000 AS INT64)) < '2010-12-01'
),

target_period AS (
    -- Next 12 months for target variable (what we predict)
    SELECT *,
        TIMESTAMP_MICROS(CAST(InvoiceDate/1000 AS INT64)) AS purchase_time
    FROM `clv-predictions-mlops.retail_data.transactions`
    WHERE TIMESTAMP_MICROS(CAST(InvoiceDate/1000 AS INT64)) >= '2010-12-01'
),

rfm_features AS (
    -- Calculate RFM for each customer from feature period
    SELECT 
        `Customer ID`,
        
        -- Recency: days since last purchase (from end of feature period)
        DATE_DIFF(DATE('2010-12-01'), DATE(MAX(purchase_time)), DAY) AS recency_days,
        
        -- Frequency: number of unique orders
        COUNT(DISTINCT Invoice) AS frequency,
        
        -- Monetary: total spend
        ROUND(SUM(Quantity * Price), 2) AS monetary
        
    FROM feature_period
    GROUP BY `Customer ID`
),

target AS (
    -- Calculate target: total spend in months 13-24
    SELECT 
        `Customer ID`,
        ROUND(SUM(Quantity * Price), 2) AS target_clv
    FROM target_period
    GROUP BY `Customer ID`
)

-- Join features with target
SELECT 
    r.*,
    COALESCE(t.target_clv, 0) AS target_clv  -- 0 if customer didn't buy in target period
FROM rfm_features r
LEFT JOIN target t ON r.`Customer ID` = t.`Customer ID`
"""

# Run it
from google.cloud import bigquery
client = bigquery.Client(project="clv-predictions-mlops")
job = client.query(query)
job.result()
print("Created customer_features table")

Created customer_features table


In [28]:
# Check the features table
!bq query --use_legacy_sql=false 'SELECT COUNT(*) as customers FROM `clv-predictions-mlops.retail_data.customer_features`'

+-----------+
| customers |
+-----------+
|      4266 |
+-----------+


In [29]:
# Preview sample rows
!bq query --use_legacy_sql=false 'SELECT * FROM `clv-predictions-mlops.retail_data.customer_features` LIMIT 5'

+-------------+--------------+-----------+----------+------------+
| Customer ID | recency_days | frequency | monetary | target_clv |
+-------------+--------------+-----------+----------+------------+
|       16473 |            1 |         1 |   154.72 |     316.11 |
|       17820 |            1 |         1 |   183.56 |        0.0 |
|       17378 |            1 |         1 |    10.95 |        0.0 |
|       15939 |            1 |         1 |  2945.38 |    6115.01 |
|       17826 |            1 |         1 |   134.59 |        0.0 |
+-------------+--------------+-----------+----------+------------+


In [30]:
# Target CLV distribution stats
!bq query --use_legacy_sql=false 'SELECT ROUND(MIN(target_clv),2) as min_clv, ROUND(AVG(target_clv),2) as avg_clv, ROUND(MAX(target_clv),2) as max_clv, COUNTIF(target_clv = 0) as churned_customers FROM `clv-predictions-mlops.retail_data.customer_features`'

+---------+---------+-----------+-------------------+
| min_clv | avg_clv |  max_clv  | churned_customers |
+---------+---------+-----------+-------------------+
|     0.0 | 1743.76 | 287491.91 |              1540 |
+---------+---------+-----------+-------------------+


In [31]:
# Expanded feature set - more behavioral signals
query = """
CREATE OR REPLACE TABLE `clv-predictions-mlops.retail_data.customer_features` AS

WITH feature_period AS (
    SELECT *,
        TIMESTAMP_MICROS(CAST(InvoiceDate/1000 AS INT64)) AS purchase_time,
        Quantity * Price AS line_total
    FROM `clv-predictions-mlops.retail_data.transactions`
    WHERE TIMESTAMP_MICROS(CAST(InvoiceDate/1000 AS INT64)) < '2010-12-01'
),

target_period AS (
    SELECT *,
        Quantity * Price AS line_total
    FROM `clv-predictions-mlops.retail_data.transactions`
    WHERE TIMESTAMP_MICROS(CAST(InvoiceDate/1000 AS INT64)) >= '2010-12-01'
),

customer_features AS (
    SELECT 
        `Customer ID`,
        
        -- RFM Features
        DATE_DIFF(DATE('2010-12-01'), DATE(MAX(purchase_time)), DAY) AS recency_days,
        COUNT(DISTINCT Invoice) AS frequency,
        ROUND(SUM(line_total), 2) AS monetary,
        
        -- Order-level features
        ROUND(SUM(line_total) / COUNT(DISTINCT Invoice), 2) AS avg_order_value,
        ROUND(COUNT(DISTINCT Invoice) / 12.0, 2) AS orders_per_month,
        
        -- Item-level features
        COUNT(*) AS total_items_purchased,
        COUNT(DISTINCT StockCode) AS unique_products,
        ROUND(AVG(Quantity), 2) AS avg_quantity_per_line,
        ROUND(AVG(Price), 2) AS avg_unit_price,
        
        -- Time-based features
        DATE_DIFF(DATE(MAX(purchase_time)), DATE(MIN(purchase_time)), DAY) AS customer_tenure_days,
        COUNT(DISTINCT DATE(purchase_time)) AS unique_purchase_days,
        
        -- Engagement features
        ROUND(COUNT(DISTINCT StockCode) / COUNT(DISTINCT Invoice), 2) AS products_per_order
        
    FROM feature_period
    GROUP BY `Customer ID`
),

target AS (
    SELECT 
        `Customer ID`,
        ROUND(SUM(line_total), 2) AS target_clv
    FROM target_period
    GROUP BY `Customer ID`
)

SELECT 
    f.*,
    COALESCE(t.target_clv, 0) AS target_clv
FROM customer_features f
LEFT JOIN target t ON f.`Customer ID` = t.`Customer ID`
"""

client = bigquery.Client(project="clv-predictions-mlops")
job = client.query(query)
job.result()
print("Updated customer_features with expanded feature set")

Updated customer_features with expanded feature set


In [32]:
# Install PySpark
!pip install pyspark -q

In [45]:
!gcloud services enable dataproc.googleapis.com --project=clv-predictions-mlops

In [46]:
# Create Dataproc cluster
!gcloud dataproc clusters create clv-spark-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --single-node \
    --master-machine-type=n1-standard-4 \
    --master-boot-disk-size=100GB \
    --image-version=2.1-debian11 \
    --project=clv-predictions-mlops \
    --enable-component-gateway

Waiting on operation [projects/clv-predictions-mlops/regions/us-central1/operations/109c126e-6898-38a6-a727-ec7734a9b49d].
Waiting for cluster creation operation...                                      
Waiting for cluster creation operation...done.                                 
Created [https://dataproc.googleapis.com/v1/projects/clv-predictions-mlops/regions/us-central1/clusters/clv-spark-cluster] Cluster placed in zone [us-central1-a].


In [59]:
script = '''from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import TimestampType

spark = SparkSession.builder.appName("CLV-TextFeatures").getOrCreate()

transactions = spark.read.format("bigquery").option("table", "clv-predictions-mlops.retail_data.transactions").load()

transactions = transactions.withColumn("purchase_time", (F.col("InvoiceDate") / 1000000000).cast(TimestampType()))

cutoff = F.lit("2010-12-01").cast(TimestampType())
transactions = transactions.filter(F.col("purchase_time") < cutoff)

print(f"Transactions after filter: {transactions.count()}")

customer_text = transactions.groupBy("Customer ID").agg(
    F.concat_ws(" ", F.collect_list("Description")).alias("all_descriptions"),
    F.count("*").alias("num_items"),
    F.countDistinct("Description").alias("unique_descriptions")
)

customer_text = customer_text.withColumn("clean_text", F.lower(F.regexp_replace(F.col("all_descriptions"), "[^a-zA-Z0-9 ]", " ")))
customer_text = customer_text.withColumn("clean_text", F.trim(F.regexp_replace(F.col("clean_text"), " +", " ")))

output = customer_text.select(F.col("Customer ID").alias("customer_id"), "clean_text", "num_items", "unique_descriptions")

output.write.format("bigquery").option("table", "clv-predictions-mlops.retail_data.customer_text").option("temporaryGcsBucket", "clv-prediction-data").mode("overwrite").save()

print(f"Processed {output.count()} customers")
spark.stop()
'''

with open("text_features.py", "w") as f:
    f.write(script)
    
print("Script written")

Script written


In [60]:
!gcloud dataproc jobs submit pyspark text_features.py \
    --cluster=clv-spark-cluster \
    --region=us-central1 \
    --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.32.2.jar \
    --project=clv-predictions-mlops

Job [1676e58b3a9848a9972c2c44147646f3] submitted.
Waiting for job output...
25/12/11 02:08:58 INFO SparkEnv: Registering MapOutputTracker
25/12/11 02:08:58 INFO SparkEnv: Registering BlockManagerMaster
25/12/11 02:08:58 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/12/11 02:08:58 INFO SparkEnv: Registering OutputCommitCoordinator
25/12/11 02:08:59 INFO DataprocSparkPlugin: Registered 128 driver metrics
25/12/11 02:09:00 INFO DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at clv-spark-cluster-m.us-central1-a.c.clv-predictions-mlops.internal./10.128.0.3:8032
25/12/11 02:09:00 INFO AHSProxy: Connecting to Application History server at clv-spark-cluster-m.us-central1-a.c.clv-predictions-mlops.internal./10.128.0.3:10200
25/12/11 02:09:01 INFO Configuration: resource-types.xml not found
25/12/11 02:09:01 INFO ResourceUtils: Unable to find 'resource-types.xml'.
25/12/11 02:09:03 INFO YarnClientImpl: Submitted application application_1765418166196_0002
25/12/11 0

In [61]:
!bq query --use_legacy_sql=false 'SELECT COUNT(*) as customers FROM `clv-predictions-mlops.retail_data.customer_text`'

+-----------+
| customers |
+-----------+
|      4266 |
+-----------+


In [62]:
# Preview the text data
!bq query --use_legacy_sql=false 'SELECT customer_id, LEFT(clean_text, 100) as text_preview, num_items FROM `clv-predictions-mlops.retail_data.customer_text` LIMIT 3'

+-------------+-------------------------------------+-----------+
| customer_id |            text_preview             | num_items |
+-------------+-------------------------------------+-----------+
|       12362 | postage                             |         1 |
|       12404 | adjustment by john on 26 01 2010 16 |         1 |
|       12466 | adjustment by john on 26 01 2010 16 |         1 |
+-------------+-------------------------------------+-----------+


In [63]:
!gcloud dataproc clusters delete clv-spark-cluster --region=us-central1 --quiet --project=clv-predictions-mlops

Waiting on operation [projects/clv-predictions-mlops/regions/us-central1/operations/334b2792-0969-314e-813c-529ee4827c06].
Waiting for cluster deletion operation...done.                                 
Deleted [https://dataproc.googleapis.com/v1/projects/clv-predictions-mlops/regions/us-central1/clusters/clv-spark-cluster].


In [66]:
# Pull text data to notebook
!pip install sentence-transformers -q

In [67]:
from sentence_transformers import SentenceTransformer
import pandas as pd

# Load text data from BigQuery
query = "SELECT customer_id, clean_text FROM `clv-predictions-mlops.retail_data.customer_text`"
client = bigquery.Client(project="clv-predictions-mlops")
text_df = client.query(query).to_dataframe()

print(f"Loaded {len(text_df)} customers")

Loaded 4266 customers


In [68]:
model = SentenceTransformer('all-MiniLM-L6-v2')
embeddings = model.encode(text_df['clean_text'].tolist(), show_progress_bar=True)
print(f"Embeddings shape: {embeddings.shape}")

modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Batches:   0%|          | 0/134 [00:00<?, ?it/s]

Embeddings shape: (4266, 384)


In [69]:
# Get numerical features from BigQuery
features_query = "SELECT * FROM `clv-predictions-mlops.retail_data.customer_features`"
features_df = client.query(features_query).to_dataframe()

print(f"Numerical features: {features_df.shape}")
print(f"Columns: {features_df.columns.tolist()}")

Numerical features: (4266, 14)
Columns: ['Customer ID', 'recency_days', 'frequency', 'monetary', 'avg_order_value', 'orders_per_month', 'total_items_purchased', 'unique_products', 'avg_quantity_per_line', 'avg_unit_price', 'customer_tenure_days', 'unique_purchase_days', 'products_per_order', 'target_clv']


In [70]:
# Convert embeddings to dataframe
embedding_cols = [f'emb_{i}' for i in range(384)]
embeddings_df = pd.DataFrame(embeddings, columns=embedding_cols)
embeddings_df['customer_id'] = text_df['customer_id'].values

# Merge with numerical features
features_df = features_df.rename(columns={'Customer ID': 'customer_id'})
final_df = features_df.merge(embeddings_df, on='customer_id', how='inner')

print(f"Final dataset: {final_df.shape}")
print(f"Features: 12 numerical + 384 embeddings = {final_df.shape[1] - 2} features + customer_id + target")

Final dataset: (4266, 398)
Features: 12 numerical + 384 embeddings = 396 features + customer_id + target


In [71]:
# Separate features and target
feature_cols = [c for c in final_df.columns if c not in ['customer_id', 'target_clv']]
X = final_df[feature_cols]
y = final_df['target_clv']

print(f"X shape: {X.shape}")
print(f"y shape: {y.shape}")
print(f"Target range: ${y.min():.0f} - ${y.max():.0f}")
print(f"Target mean: ${y.mean():.0f}")

X shape: (4266, 396)
y shape: (4266,)
Target range: $0 - $287492
Target mean: $1744


In [72]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

print(f"Train: {X_train.shape[0]} customers")
print(f"Test: {X_test.shape[0]} customers")

# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Log transform target (helps with the wide range $0 - $287K)
y_train_log = np.log1p(y_train)  # log(1 + y) handles zeros
y_test_log = np.log1p(y_test)

print(f"Target log range: {y_train_log.min():.2f} - {y_train_log.max():.2f}")

Train: 3412 customers
Test: 854 customers
Target log range: 0.00 - 12.57


In [73]:
# Save final dataset to GCS for next notebook
final_df.to_parquet('/tmp/clv_features.parquet')

!gsutil cp /tmp/clv_features.parquet gs://clv-prediction-data/features/clv_features.parquet

print("Saved features to GCS")

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Copying file:///tmp/clv_features.parquet [Content-Type=application/octet-stream]...
/ [1 files][  9.1 MiB/  9.1 MiB]                                                
Operation completed over 1 objects/9.1 MiB.                                      
Saved features to GCS
