# Expected Proceeds Prediction Workflow (Local Version)

This notebook demonstrates the end-to-end workflow for predicting expected proceeds at day 8 and day 100 for different user types, running locally while connecting to Snowflake.

In [1]:
# !pip3 uninstall -y pandas numpy pyarrow snowflake-connector-python snowflake-snowpark-python

In [2]:
!pip3 install numpy==1.23.5
!pip3 install pandas==1.5.3
!pip3 install pyarrow==10.0.1
!pip3 install "snowflake-connector-python[pandas]"
!pip3 install snowflake-snowpark-python

Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/Library/Developer/CommandLineTools/usr/bin/python3 -m pip install --upgrade pip' command.[0m
Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/Library/Developer/CommandLineTools/usr/bin/python3 -m pip install --upgrade pip' command.[0m
Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/Library/Developer/CommandLineTools/usr/bin/python3 -m pip install --upgrade pip' command.[0m
Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/Library/Developer/CommandLineTools/usr/bin/python3 -m pip install --upgrade pip' command.[0m
Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/Library/Developer/CommandLineTools

In [3]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import os

# Import custom modules
from country_utils import add_signup_country_group
from data_utils import split_data_by_date, split_data_by_user_type
from trial_predictions import TrialPredictionModel
from direct_purchase_predictions import DirectPurchasePredictionModel
from lag_purchase_predictions import LagPurchasePredictionModel
from prediction_processor import PredictionProcessor

## Connect to Snowflake

We'll use the configuration from our `config.py` file to connect to Snowflake.

In [4]:
# Import Snowflake connection config
import config

# Snowflake connection
from snowflake.snowpark import Session

def get_snowflake_session():
    """Create and return a Snowflake session"""
    connection_parameters = {
        "account": config.SNOWFLAKE_ACCOUNT,
        "user": config.SNOWFLAKE_USER,
        "role": config.SNOWFLAKE_ROLE,
        "warehouse": config.SNOWFLAKE_WAREHOUSE,
        "database": config.SNOWFLAKE_DATABASE,
        "schema": config.SNOWFLAKE_SCHEMA,
        "authenticator": config.SNOWFLAKE_AUTHENTICATOR
    }
    
    session = Session.builder.configs(connection_parameters).create()
    print(f"Connected to Snowflake as {config.SNOWFLAKE_USER}")
    return session

# Create a Snowflake session
session = get_snowflake_session()

Initiating login request with your identity provider. A browser window should have opened for you to complete the login. If you can't see it, check existing browser windows, or your OS settings. Press CTRL+C to abort and try again...
Going to open: https://blinkist-useast_1_virginia.snowflakecomputing.com/console/login?login_name=meri-kris.jaama%40go1.com&browser_mode_redirect_port=51967&proof_key=ymFxTHEyvXH4gjdo5nWA%2BOya73bcG%2FeuvWssvBPkpug%3D to authenticate...
Connected to Snowflake as meri-kris.jaama@go1.com


## 1. Load Data

First, we'll load the necessary data from Snowflake.

In [5]:
# Fetch data from Snowflake
input_query = """
    SELECT 
        *
    FROM blinkist_dev.dbt_mjaama.exp_proceeds_input
    """
    
input_df = session.sql(input_query).to_pandas()

product_query = """
    select sku as product_name, price 
    from BLINKIST_PRODUCTION.reference_tables.product_dim
    where is_purchasable;
    """
    
product_df = session.sql(product_query).to_pandas()

print(f"Loaded {len(input_df)} input records and {len(product_df)} product records")

# Display the first few rows of each dataset
print("\nInput Data Sample:")
display(input_df.head())

print("\nProduct Data Sample:")
display(product_df.head())

Loaded 8060581 input records and 830 product records

Input Data Sample:


Unnamed: 0,REPORT_DATE,CHANNEL_GROUP,MARKETING_NETWORK_ID,ACCOUNT_ID,CAMPAIGN_NAME,CAMPAIGN_ID,ADGROUP_NAME,ADGROUP_ID,TARGET_MARKET,EUR_MARKETING_SPEND,...,STARTED_CONTENT,N_CONTENT_STARTS,FINISHED_CONTENT,N_CONTENT_FINISHES,SPACE_USER,SPACE_OWNER,SPACE_ADDED_TITLE,EUR_PROCEEDS_D0,EUR_PROCEEDS_D8,EUR_PROCEEDS_D100
0,2022-09-12,display_and_programmatic,uac,6277818731,UAC_ACi_tCPA_Android_GA4F_PurchaseCompleted_US...,16439524975,,,us,848.314323,...,0,0,0,0,0,0,0,0.0,0.0,0.0
1,2022-09-12,display_and_programmatic,uac,6277818731,UAC_ACi_tCPA_Android_GA4F_PurchaseCompleted_US...,16439524975,,,us,848.314323,...,0,0,0,0,0,0,0,0.0,0.0,0.0
2,2022-09-12,display_and_programmatic,uac,6277818731,UAC_ACi_tCPA_Android_GA4F_PurchaseCompleted_US...,16439524975,,,us,848.314323,...,1,1,0,0,0,0,0,0.0,0.0,0.0
3,2022-09-12,display_and_programmatic,uac,6277818731,UAC_ACi_tCPA_Android_GA4F_PurchaseCompleted_US...,16439524975,,,us,848.314323,...,0,0,0,0,0,0,0,0.0,0.0,0.0
4,2024-07-15,display_and_programmatic,uac,6277818731,UAC_US_CA_Jan24,20901003728,,,us,734.791192,...,0,0,0,0,0,0,0,0.0,0.0,0.0



Product Data Sample:


Unnamed: 0,PRODUCT_NAME,PRICE
0,play_subscr.offer.12m.intro.notrial,79.99
1,play_subscription.crm.12m.63.offer_intro,79.99
2,20638744.2024.apple.pro.1m.intro_offer_25.v1,19.99
3,2024_recurly_pro_12m_120_notrial,119.99
4,20334398.2024.apple.pro.12m.intro_offer_40.v2,139.99


In [6]:
# Convert column names to lowercase for compatibility with country_utils.py
input_df.columns = input_df.columns.str.lower()
print(input_df.columns)



Index(['report_date', 'channel_group', 'marketing_network_id', 'account_id',
       'campaign_name', 'campaign_id', 'adgroup_name', 'adgroup_id',
       'target_market', 'eur_marketing_spend', 'impressions', 'clicks',
       'user_id', 'signup_country', 'signup_country_subdivision', 'us_state',
       'signup_client_platform', 'is_trial_subscription',
       'is_trial_autorenewal_on_d0', 'is_direct_subscription', 'product_name',
       'plan_tier', 'days_to_purchase', 'started_content', 'n_content_starts',
       'finished_content', 'n_content_finishes', 'space_user', 'space_owner',
       'space_added_title', 'eur_proceeds_d0', 'eur_proceeds_d8',
       'eur_proceeds_d100'],
      dtype='object')


## 2. Set Inference Date

We'll set the inference date to yesterday by default, but you can change it to any date you want.

In [7]:
# Set inference date (default: yesterday)
inference_date = (datetime.now() - timedelta(days=100)).strftime('%Y-%m-%d')
print(f"Inference date: {inference_date}")

# Uncomment and modify the line below to use a different date
# inference_date = '2023-05-01'

Inference date: 2024-12-03


## 3. Add Country Groups

We'll add country groups to the input data to help with analysis.

In [8]:
# Add country groups
input_df = add_signup_country_group(input_df)

# Display the distribution of country groups
country_group_counts = input_df['signup_country_group'].value_counts()
print("Country Group Distribution:")
display(country_group_counts)


  us_df = temp_df.loc[(temp_df.report_date >= six_months_ago) & (temp_df.signup_country == "US") & (
  row_df = temp_df.loc[(temp_df.report_date >= six_months_ago) & (


Country Group Distribution:


row_verylow     2207500
other           1542260
de_at           1216331
gb_ca_au         731143
row_low          667066
row_mod          566265
row_high         367173
in               203749
row_veryhigh     108315
Name: signup_country_group, dtype: int64

## 4. Split Data by Date

We'll split the data into inference, training_d8, and training_d100 datasets based on dates.

In [9]:
# Split data by date
training_window_days = 180  # Use 1/2 year of training data

inference_df, training_d8_df, training_d100_df = split_data_by_date(
    input_df,
    inference_date=inference_date,
    training_window_days=training_window_days,
    date_column='report_date'
)

print(f"Inference data: {len(inference_df)} records")
print(f"Training d8 data: {len(training_d8_df)} records")
print(f"Training d100 data: {len(training_d100_df)} records")

# Check if we have enough inference data
if len(inference_df) == 0:
    print(f"WARNING: No inference data found for {inference_date}. Please check the date or your data source.")

Inference data: 9624 records
Training d8 data: 1530130 records
Training d100 data: 1232896 records


## 5. Split Data by User Type

We'll split the data by user type to train separate models for each type.

In [10]:
# Split data by user type
trial_inference, day0_payers_inference, other_inference = split_data_by_user_type(inference_df)
trial_training_d8, day0_payers_training_d8, other_training_d8 = split_data_by_user_type(training_d8_df)
trial_training_d100, day0_payers_training_d100, other_training_d100 = split_data_by_user_type(training_d100_df)

print(f"Trial users: {len(trial_inference)} inference, {len(trial_training_d8)} training d8, {len(trial_training_d100)} training d100")
print(f"Day 0 payers: {len(day0_payers_inference)} inference, {len(day0_payers_training_d8)} training d8, {len(day0_payers_training_d100)} training d100")
print(f"Other users: {len(other_inference)} inference, {len(other_training_d8)} training d8, {len(other_training_d100)} training d100")

# Plot the distribution of user types
user_types = ['Trial Users', 'Day 0 Payers', 'Other Users']
inference_counts = [len(trial_inference), len(day0_payers_inference), len(other_inference)]


Trial users: 260 inference, 84423 training d8, 109907 training d100
Day 0 payers: 1541 inference, 51978 training d8, 27150 training d100
Other users: 7820 inference, 1392952 training d8, 1095593 training d100


## 6. Train Models

We'll train separate models for each user type.

In [11]:
# Train models
print("Training trial model...")
trial_model = TrialPredictionModel(product_dim_df=product_df)
trial_model.fit(trial_training_d8, trial_training_d100)

print("\nTraining direct purchase model...")
direct_model = DirectPurchasePredictionModel()
direct_model.fit(day0_payers_training_d8, day0_payers_training_d100)

print("\nTraining lag purchase model...")
lag_model = LagPurchasePredictionModel(product_dim_df=product_df)
lag_model.fit(other_training_d8, other_training_d100)

Training trial model...

Training direct purchase model...

Training lag purchase model...


## 7. Make Predictions

We'll make predictions for each user type and combine them.

In [12]:
# Make predictions
predictions = []

try:
    if not trial_inference.empty:
        print("Predicting for trial users...")
        trial_predictions = trial_model.predict(trial_inference)
        trial_predictions['user_type'] = 'trial'
        predictions.append(trial_predictions)
        print(f"Generated {len(trial_predictions)} predictions for trial users")
except Exception as e:
    print(f"Error predicting for trial users: {str(e)}")

try:
    if not day0_payers_inference.empty:
        print("\nPredicting for day 0 payers...")
        direct_predictions = direct_model.predict(day0_payers_inference)
        direct_predictions['user_type'] = 'day0_payer'
        predictions.append(direct_predictions)
        print(f"Generated {len(direct_predictions)} predictions for day 0 payers")
except Exception as e:
    print(f"Error predicting for day 0 payers: {str(e)}")

try:
    if not other_inference.empty:
        print("\nPredicting for other users...")
        lag_predictions = lag_model.predict(other_inference)
        lag_predictions['user_type'] = 'other'
        predictions.append(lag_predictions)
        print(f"Generated {len(lag_predictions)} predictions for other users")
except Exception as e:
    print(f"Error predicting for other users: {str(e)}")

# Check if we have any predictions
if not predictions:
    print("No predictions generated - inference data may be empty")
else:
    # Combine predictions
    predictions_df = pd.concat(predictions, ignore_index=True)  # Changed to ignore_index=True for cleaner indexing
    
    # Add inference date column
    predictions_df['inference_date'] = inference_date
    
    print(f"\nTotal predictions: {len(predictions_df)}")
    
    # Display a sample of the predictions
    print("\nSample predictions:")
    display(predictions_df.head())

Predicting for trial users...
Error predicting for trial users: 'product_name'

Predicting for day 0 payers...
Direct model: Found 316 cases where D100 < D8. Fixing...
Generated 1541 predictions for day 0 payers

Predicting for other users...
Lag model: Found 587 cases where D100 < D8. Fixing...
Generated 7820 predictions for other users

Total predictions: 9361

Sample predictions:


Unnamed: 0,report_date,channel_group,marketing_network_id,account_id,campaign_name,campaign_id,adgroup_name,adgroup_id,target_market,eur_marketing_spend,...,space_owner,space_added_title,eur_proceeds_d0,eur_proceeds_d8,eur_proceeds_d100,signup_country_group,expected_proceeds_d8,expected_proceeds_d100,user_type,inference_date
0,2024-12-03,display_and_programmatic,uac,4190078950,UAC_tROAS_High-Value,20906726572,,,row,4133.990143,...,0,0,86.2915,86.2915,86.2915,de_at,86.2915,86.2915,day0_payer,2024-12-03
1,2024-12-03,display_and_programmatic,appiness,AppinessXUA1,Blinkist_iOS_638d,unknown,,,row,150.0,...,0,0,64.271315,64.271315,64.271315,other,64.271315,64.271315,day0_payer,2024-12-03
2,2024-12-03,display_and_programmatic,appiness,AppinessxUA2,Blinkist_iOS_532a,unknown,,,row,2525.0,...,0,0,8.992456,8.992456,8.992456,other,8.992456,19.312912,day0_payer,2024-12-03
3,2024-12-03,display_and_programmatic,appiness,AppinessxUA2,Blinkist_iOS_532a,unknown,,,row,2525.0,...,0,0,7.216698,7.216698,7.216698,row_veryhigh,7.216698,17.964824,day0_payer,2024-12-03
4,2024-12-03,display_and_programmatic,appiness,AppinessxUA2,Blinkist_iOS_532a,unknown,,,row,2525.0,...,0,0,83.926916,83.926916,83.926916,row_veryhigh,83.926916,83.926916,day0_payer,2024-12-03


In [13]:
# Print samples of 30 rows per user_type
for user_type in predictions_df['user_type'].unique():
    print(f"\n\n=== Sample of 30 rows for {user_type} users ===")
    sample_df = predictions_df[predictions_df['user_type'] == user_type].head(30)
    
    # Select key columns for display
    display_columns = ['user_id', 'report_date', 'signup_country_group', 
                       'eur_proceeds_d0', 'eur_proceeds_d8', 'eur_proceeds_d100', 'expected_proceeds_d8', 'expected_proceeds_d100']
    
    # Display the sample
    display(sample_df[display_columns])
    
    # Print some summary statistics
    print(f"\nSummary statistics for {user_type} users:")
    stats_df = sample_df[['eur_proceeds_d0', 'eur_proceeds_d8', 'eur_proceeds_d100', 'expected_proceeds_d8', 'expected_proceeds_d100']].describe()
    display(stats_df)



=== Sample of 30 rows for day0_payer users ===


Unnamed: 0,user_id,report_date,signup_country_group,eur_proceeds_d0,eur_proceeds_d8,eur_proceeds_d100,expected_proceeds_d8,expected_proceeds_d100
0,674f5df0b6dc7f0027d86f18,2024-12-03,de_at,86.2915,86.2915,86.2915,86.2915,86.2915
1,674f31346382850028631897,2024-12-03,other,64.271315,64.271315,64.271315,64.271315,64.271315
2,674f0565185b040029865c5c,2024-12-03,other,8.992456,8.992456,8.992456,8.992456,19.312912
3,674ea3b338146e002b893074,2024-12-03,row_veryhigh,7.216698,7.216698,7.216698,7.216698,17.964824
4,674edfda05d944002897cc37,2024-12-03,row_veryhigh,83.926916,83.926916,83.926916,83.926916,83.926916
5,674ebc5a38146e002b8937d3,2024-12-03,gb_ca_au,5.620283,5.620283,5.620283,5.620283,11.473137
6,674e7d818397fa00260e1829,2024-12-03,gb_ca_au,23.195,23.195,23.195,23.195,23.278391
7,674e6ddc9dff1b002a8a46e7,2024-12-03,other,8.992456,8.992456,8.992456,8.992456,24.126686
8,674f7053185b0400278677af,2024-12-03,gb_ca_au,6.672701,6.672701,6.672701,6.672701,15.177908
9,674f45f00d19bf00269ab214,2024-12-03,de_at,12.444112,12.444112,12.444112,12.444112,12.444112



Summary statistics for day0_payer users:


Unnamed: 0,eur_proceeds_d0,eur_proceeds_d8,eur_proceeds_d100,expected_proceeds_d8,expected_proceeds_d100
count,30.0,30.0,30.0,30.0,30.0
mean,20.75377,20.75377,20.75377,20.75377,25.442759
std,26.293113,26.293113,26.293113,26.293113,24.452745
min,1.574371,1.574371,1.574371,1.574371,3.283281
25%,7.570023,7.570023,7.570023,7.570023,14.618113
50%,11.605249,11.605249,11.605249,11.605249,17.926573
75%,17.020488,17.020488,17.020488,17.020488,20.871823
max,102.264206,102.264206,102.264206,102.264206,102.264206




=== Sample of 30 rows for other users ===


Unnamed: 0,user_id,report_date,signup_country_group,eur_proceeds_d0,eur_proceeds_d8,eur_proceeds_d100,expected_proceeds_d8,expected_proceeds_d100
1541,674eb48bc2f2a8002992cb18,2024-12-03,row_high,0.0,0.0,0.0,0.0,0.0
1542,674e829866a17300298b6a54,2024-12-03,de_at,0.0,0.0,0.0,0.085232,0.451243
1543,674f5afcb6dc7f0025d87009,2024-12-03,row_veryhigh,0.0,0.0,0.0,0.153494,0.82349
1544,674f585b0fbe47002524427c,2024-12-03,row_veryhigh,0.0,0.0,0.0,0.153494,0.82349
1545,674f70db0fbe4700292448cc,2024-12-03,de_at,0.0,0.0,0.0,0.054728,0.10489
1546,674f269af7ea850028f12114,2024-12-03,de_at,0.0,0.0,0.0,0.085232,0.451243
1547,674eff2763828500266305ea,2024-12-03,de_at,0.0,0.0,0.0,0.054728,0.10489
1548,674f013b219e780027079021,2024-12-03,de_at,0.0,0.0,0.0,0.085232,0.451243
1549,674efbe1f7ea850024f11691,2024-12-03,de_at,0.0,0.0,0.0,0.085232,0.451243
1550,674effbb0d90de0029a0b03c,2024-12-03,de_at,0.0,0.0,0.0,0.085232,0.451243



Summary statistics for other users:


Unnamed: 0,eur_proceeds_d0,eur_proceeds_d8,eur_proceeds_d100,expected_proceeds_d8,expected_proceeds_d100
count,30.0,30.0,30.0,30.0,30.0
mean,0.0,0.0,0.0,0.064765,0.2996
std,0.0,0.0,0.0,0.052692,0.261341
min,0.0,0.0,0.0,0.0,0.0
25%,0.0,0.0,0.0,0.00863,0.05353
50%,0.0,0.0,0.0,0.085232,0.186772
75%,0.0,0.0,0.0,0.085232,0.451243
max,0.0,0.0,0.0,0.153494,0.82349


In [14]:

# Create output directory if it doesn't exist
os.makedirs('output', exist_ok=True)

# Save to CSV
output_path = f"output/predictions_{inference_date}.csv"
predictions_df.to_csv(output_path, index=False)
print(f"Predictions saved locally to {output_path}")

# Optionally save to Snowflake
save_to_snowflake = True  # Set to True if you want to save to Snowflake

if save_to_snowflake:
    print("\nSaving results to Snowflake...")
    
    # Reconnect to Snowflake
    session = get_snowflake_session()
    
    # Convert to Snowpark DataFrame
    snowpark_df = session.create_dataframe(predictions_df)
    
    # Save to Snowflake table
    table_name = "BLINKIST_DEV.DBT_MJAAMA.USER_LEVEL_DAILY_EXPECTED_PROCEEDS"
    
    # Append to existing table or create new one
    snowpark_df.write.mode("append").save_as_table(table_name)
    
    print(f"Predictions for {inference_date} saved to {table_name}")


Predictions saved locally to output/predictions_2024-12-03.csv

Saving results to Snowflake...
Initiating login request with your identity provider. A browser window should have opened for you to complete the login. If you can't see it, check existing browser windows, or your OS settings. Press CTRL+C to abort and try again...
Going to open: https://blinkist-useast_1_virginia.snowflakecomputing.com/console/login?login_name=meri-kris.jaama%40go1.com&browser_mode_redirect_port=52465&proof_key=dadrxp5YwCxcjzI7rrSP8R6%2FjDnnKFV5zzNGsjMMACw%3D to authenticate...
Connected to Snowflake as meri-kris.jaama@go1.com
Predictions for 2024-12-03 saved to BLINKIST_DEV.DBT_MJAAMA.USER_LEVEL_DAILY_EXPECTED_PROCEEDS


## 12. Close Snowflake Session

Finally, we'll close the Snowflake session.

In [15]:
# Close Snowflake session
session.close()
print("Snowflake session closed")

Snowflake session closed
