In [1]:
# Copyright 2020 Google LLC

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#      https://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [Run in Colab](https://colab.research.google.com/github/dpanigra/cdp-workshop/blob/master/CDP_Workshop_Customer_segmentation_ltv.ipynb)

 # Overview
CDP workshop - Customer segmentation using ltv.

Step by step solution guide with explanation is [here.](https://cloud.google.com/architecture/building-audiences-clv)

# Setup environment

## *PIP install appropriate packages*

In [2]:
%pip install google-cloud-storage # for Storage Account
%pip install google-cloud # for cloud sdk
%pip install google-cloud-bigquery # for BigQuery
%pip install google-cloud-bigquery-storage # for BigQuery Storage client
# for data exploration
%pip install pandas 
%pip install matplotlib 
%pip install pandas_profiling 

# Restart kernel after installs
# import IPython
# app = IPython.Application.instance()
# app.kernel.do_shutdown(True)

Collecting google-cloud
  Downloading google_cloud-0.34.0-py2.py3-none-any.whl (1.8 kB)
Installing collected packages: google-cloud
Successfully installed google-cloud-0.34.0


## *Initialize all the variables*

In [4]:
CS_GCP_PROJECT = "dpani-sandbox" #@param {type:"string"}
CS_DATASET_LOCATION = "us-central1-b" #@param {type:"string"}
# provides a mechansim to version
# e.g. different dataset could use different models
#   to compare the performance of the models
CS_BQ_DATASET_NAME = "cdp_ws_cs" #@param {type:"string"}

# the below are tables to create clv model from Sales and Customer tables
# (used in the customer ltv solution guide)
CS_LTV_SOL_DS_INPUT_SALES_TBL = "ltv_sol_input_sales" #@param {type:"string"}
CS_LTV_SOL_DS_INPUT_CUST_TBL = "ltv_sol_input_customer" #@param {type:"string"} 
CS_LTV_SOL_DS_CLV_AGGREGATE_TBL = "ltv_sol_clv_aggregation" #@param {type:"string"}
CS_LTV_SOL_DS_FEATURE_TBL = "ltv_sol_features" #@param {type:"string"}
CS_LTV_SOL_DS_ML_FEATURE_PREP_SP = "ltv_sol_ml_prep_sp" #@param {type:"string"}
CS_LTV_SOL_DS_ML_MODEL_NAME = "ltv_sol_clv_ml_model" #@param {type:"string"}
CS_LTV_SOL_DS_ML_PREDICT_UTIL_SP = "ltv_sol_clv_ml_predict_util_sp" #@param {type:"string"}
CS_LTV_SOL_DS_ML_PREDICT_TBL = "ltv_sol_clv_predict_tbl" #@param {type:"string"}

# use the two variables to remove the outliers from the
#   aggregation table --- used in the customer ltv calc
CS_CLV_MAX_STDV_MONETARY = 500 #@param {type:"integer"}
CS_CLV_MAX_STDV_QTY  = 100 #@param {type:"integer"}

# use the four below variables to set the proper 
#   segmentation --- used in the customer ltv calc
CS_CLV_WINDOW_LENGTH = 0 #@param {type:"integer"}
CS_CLV_WINDOW_STEP = 30 #@param {type:"integer"}
CS_CLV_WINDOW_STEP_INITIAL = 90 #@param {type:"integer"}
CS_CLV_LENGTH_FUTURE = 30 #@param {type:"integer"}

# create a variable that you can pass to the bq Cell magic
# import the variables to the shell
import os
cs_all_args = [key for key in locals().keys() if key.startswith('CS')]
CS_BQ_ARGS = {}
for cs_each_key in cs_all_args:
    # print (f"{cs_each_key}:{locals()[cs_each_key]}")
    # del locals()[cs_each_key]
    if cs_each_key != 'CS_BQ_ARGS':
      CS_BQ_ARGS[cs_each_key] = locals()[cs_each_key]
      os.environ[cs_each_key] = str(CS_BQ_ARGS[cs_each_key])
print (CS_BQ_ARGS)

{'CS_GCP_PROJECT': 'dpani-sandbox', 'CS_DATASET_LOCATION': 'us-central1-b', 'CS_BQ_DATASET_NAME': 'cdp_ws_cs', 'CS_LTV_SOL_DS_INPUT_SALES_TBL': 'ltv_sol_input_sales', 'CS_LTV_SOL_DS_INPUT_CUST_TBL': 'ltv_sol_input_customer', 'CS_LTV_SOL_DS_CLV_AGGREGATE_TBL': 'ltv_sol_clv_aggregation', 'CS_LTV_SOL_DS_FEATURE_TBL': 'ltv_sol_features', 'CS_LTV_SOL_DS_ML_FEATURE_PREP_SP': 'ltv_sol_ml_prep_sp', 'CS_LTV_SOL_DS_ML_MODEL_NAME': 'ltv_sol_clv_ml_model', 'CS_LTV_SOL_DS_ML_PREDICT_UTIL_SP': 'ltv_sol_clv_ml_predict_util_sp', 'CS_LTV_SOL_DS_ML_PREDICT_TBL': 'ltv_sol_clv_predict_tbl', 'CS_CLV_MAX_STDV_MONETARY': 500, 'CS_CLV_MAX_STDV_QTY': 100, 'CS_CLV_WINDOW_LENGTH': 0, 'CS_CLV_WINDOW_STEP': 30, 'CS_CLV_WINDOW_STEP_INITIAL': 90, 'CS_CLV_LENGTH_FUTURE': 30}


## *Setup your Google Cloud project*

In [5]:
!export CS_GCP_PROJECT
!echo $CS_GCP_PROJECT
# set the desired Google Cloud project
!gcloud config set project $CS_GCP_PROJECT
import os
os.environ['GOOGLE_CLOUD_PROJECT'] = CS_GCP_PROJECT
# validate that the Google Cloud project has been set properly.
# !gcloud info --format='value(config.project)'

dpani-sandbox
Updated property [core/project].


## *Authenticate with Google Cloud*

### Authenticate using ServiceAccount Key file

In [6]:
# download the ServiceAccount key and provide the path to the file below
# CS_GCP_APPLICATION_CREDENTIALS = "<Full path with the file name to the above downloaded json file>"
# CS_GCP_APPLICATION_CREDENTIALS = "/Users/dpani/Downloads/dpani-sandbox-2-3073195cd132.json"

# uncomment the below code in codelab environment
# authenticate using service account
# from google.colab import files
# # Upload service account key
# keyfile_upload = files.upload()
# CS_GCP_APPLICATION_CREDENTIALS = list(keyfile_upload.keys())[0]

# import os
# os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = CS_GCP_APPLICATION_CREDENTIALS
# # set the account
# !echo "Setting Service Account:" $CS_GCP_APPLICATION_CREDENTIALS
# !gcloud auth activate-service-account --key-file=$CS_GCP_APPLICATION_CREDENTIALS

### Authenticate using OAuth

In [7]:
# uncomment the below code in codelab environment
# authenticate using oauth
import sys
if 'google.colab' in sys.modules:
  from google.colab import auth as google_auth
  google_auth.authenticate_user()

## *Enable the below Google Cloud Services for the solution*

In [8]:
# set the proper Permission for the required Google Cloud Services
!gcloud services enable \
    storage-component.googleapis.com \
    bigquery.googleapis.com \
    ml.googleapis.com \
    notebooks.googleapis.com

Operation "operations/acf.p2-976354649621-b7b53b5a-2778-405f-bee7-9a9a0ee78296" finished successfully.


In [9]:
# validate that all desired Permission have been set properl.
!gcloud services list | grep 'storage-component.googleapis.com\|bigquery.googleapis.com\|ml.googleapis.com\|notebooks.googleapis.com'

automl.googleapis.com                   Cloud AutoML API
bigquery.googleapis.com                 BigQuery API
ml.googleapis.com                       AI Platform Training & Prediction API
notebooks.googleapis.com                Notebooks API
storage-component.googleapis.com        Cloud Storage


## *Create a BigQuery client, import the libraries, load the bigquery Cell magic*

In [10]:
# create a BQ client
from google.cloud import bigquery
bq_client = bigquery.Client(project=CS_GCP_PROJECT)
# load the bigquery Cell magic
# %load_ext google.cloud.bigquery
%reload_ext google.cloud.bigquery

In [11]:
# test that BQ client works
sql = """
    SELECT name
    FROM `bigquery-public-data.usa_names.usa_1910_current`
    WHERE state = 'TX'
    LIMIT 100
"""

# Run a Standard SQL query using the environment's default project
df = bq_client.query(sql).to_dataframe()
df

Unnamed: 0,name
0,Mary
1,Ruby
2,Annie
3,Willie
4,Ruth
...,...
95,Leona
96,Lucile
97,Lucy
98,Manuela


# Utilities fuctions

## *Create the BigQuery dataset (DDL)*

In [12]:
# create_bq_ds
def create_bq_ds(CS_GCP_PROJECT: str,
                 CS_BQ_DATASET_NAME: str,
                 CS_LOCATION: str
                 ):
  """The function creates a BigQuery dataset if don't exist.

      The idea is to create DataSet only one time.
      Args:
        CS_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment
        CS_BQ_DATASET_NAME:(:obj:`str`): Name of the dataset.
        CS_LOCATION:(:obj:`str`): Location of the Google Cloud region
          of the BigQuery dataset
  """
  from google.cloud import bigquery
  from google.cloud.exceptions import NotFound
  client = bigquery.Client()
  dataset_id = f"{CS_GCP_PROJECT}.{CS_BQ_DATASET_NAME}"

  ds_found = True
  try:
    client.get_dataset(dataset_id)  # Make an API request.
    print('Dataset {} already exists'.format(dataset_id))
  except NotFound:
    print('Dataset {} is not found'.format(dataset_id))
    ds_found = False

  import traceback
  if ds_found is False:
    try:
      # Construct a full Dataset object to send to the API.
      dataset = bigquery.Dataset(dataset_id)
      dataset.location = CS_LOCATION.split('-')[0].upper()
      dataset = client.create_dataset(dataset)  # Make an API request.
      print('Created dataset {}.{} in location: {}.'.\
            format(client.project, dataset.dataset_id, dataset.location))
    except Exception as e:
      error = traceback.format_exc()
      print(error)
      print(e)
      raise RuntimeError(f"Can't create the BigQuery DS {dataset_id}")

## *Load from Google Cloud Stroage the BigQuery dataset (DDL)*

In [13]:
def load_table_uri_autodetect_csv(CS_GCP_PROJECT: str,
                 CS_BQ_DATASET_NAME: str,
                 CS_BQ_TABLE_ID: str, 
                 uri: str
                 ):
  """The function loads from CSV to a BQ table.

      Args:
        CS_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment
        CS_BQ_DATASET_NAME:(:obj:`str`): Name of the dataset.
        CS_BQ_TABLE_NAME:(:obj:`str`): Name of the table.
        uri:(:obj:`str`): Google Cloud Storage uri of the csv file
  """
  # [START bigquery_load_table_gcs_csv_autodetect]
  from google.cloud import bigquery
  from google.cloud.exceptions import NotFound

  # table_id = f"{CS_GCP_PROJECT}.{CS_BQ_DATASET_NAME}.{CS_BQ_TABLE_ID}"
  table_id = f"{CS_BQ_DATASET_NAME}.{CS_BQ_TABLE_ID}"

  # Construct a BigQuery client object.
  client = bigquery.Client()

  job_config = bigquery.LoadJobConfig(
      autodetect=True,
      skip_leading_rows=1,
      # The source format defaults to CSV, so the line below is optional.
      source_format=bigquery.SourceFormat.CSV,
  )
  # uri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"
  # uri = "gs://solutions-public-assets/analytics-componentized-patterns/ltv/sales_*"
  load_job = client.load_table_from_uri(
      uri, table_id, job_config=job_config
  )  # Make an API request.
  load_job.result()  # Waits for the job to complete.
  destination_table = client.get_table(table_id)
  print("Loaded {} rows in table {}.".format(destination_table.num_rows,
                                             table_id))
  # [END bigquery_load_table_gcs_csv_autodetect]

## *Delete a dataset in BigQuery (DDL)*

In [14]:
# delete the BigQuery dataset...!!! BE CAREFUL !!!
def delete_dataset(dataset_id):
    """Deletes a BigQuery dataset
    This is not recommendated to use it in a production enviornment.
    Comes handy in the iterative development and testing phases of the SDLC.
    !!! BE CAREFUL !!!!
    Args:
        dataset_id(:obj:`str`): The BigQuery dataset name that we want to delete
    """
    # [START bigquery_delete_dataset]
    from google.cloud import bigquery
    # Construct a BigQuery client object.
    client = bigquery.Client()
    # dataset_id = 'your-project.your_dataset'
    # Use the delete_contents parameter to delete a dataset and its contents.
    # Use the not_found_ok parameter to not receive an error if the
    #     dataset has already been deleted.
    client.delete_dataset(
        dataset_id, delete_contents=True, not_found_ok=True
    )  # Make an API request.
    print("Deleted dataset '{}'.".format(dataset_id))

In [15]:
## *Execute query in BigQuery (DDL + DML)*

## *Execute SQL BigQuery*


In [16]:
# execute_sql
def execute_sql(sql_query: str):
  """The executes the sql.

    Args:
        sql_query:(:obj:`str`): SQL query to execute
  """
  from google.cloud import bigquery
  from google.cloud.exceptions import NotFound
  client = bigquery.Client()
  import traceback
  try:
    client = bigquery.Client()
    query_job = client.query(sql_query)  # Make an API request.
    print(f"Querty executed.")
    results = query_job.result()  # Waits for job to complete.
    for row in results:
      # print("{} : {} views".format(row.url, row.view_count))
      print (row)
  except Exception as e:
    error = traceback.format_exc()
    print(error)
    print(e)
    raise RuntimeError(f"Can't execute the query {sql_query}")

In [17]:
# delete BigQuery table if not needed...!!! BE CAREFUL !!!
def delete_table(table_id):
  """Deletes a BigQuery table
    This is not recommendated to use it in a production enviornment.
    Comes handy in the iterative development and testing phases of the SDLC.
    !!! BE CAREFUL !!!!
    Args:
      table_id(:obj:`str`): The BigQuery table name that we want to delete
  """
  from google.cloud import bigquery
  # Construct a BigQuery client object.
  client = bigquery.Client()
  # client.delete_table(table_id, not_found_ok=True)  # Make an API request.
  client.delete_table(table_id)  # Make an API request.
  print("Deleted table '{}'.".format(table_id))

## *Deletes an ML model* (DDL)

In [18]:
# delete the BQML model if not needed...!!! BE CAREFUL !!!
def delete_model(model_id):
  """Deletes a BigQuery table
    This is not recommendated to use it in a production enviornment.
    Comes handy in the iterative development and testing phases of the SDLC.
    !!! BE CAREFUL !!!!
    Args:
      delete_model(:obj:`str`): The BigQuery ML model name that we want to delete
  """
  from google.cloud import bigquery
  # Construct a BigQuery client object.
  client = bigquery.Client()
  # TODO(developer): Set model_id to the ID of the model to fetch.
  # model_id = 'your-project.your_dataset.your_model'
  client.delete_model(model_id)  # Make an API request.
  print("Deleted model '{}'.".format(model_id))

# Collect phase

## *Creates the BigQuery dataset* (DDL)

In [19]:
# create the bq dataset
create_bq_ds(CS_GCP_PROJECT,
 CS_BQ_DATASET_NAME,
 CS_DATASET_LOCATION,
)

Dataset dpani-sandbox.cdp_ws_cs is not found
Created dataset dpani-sandbox.cdp_ws_cs in location: US.


## *Create and populate Sales and Customer table* (DDL + DML)

In [20]:
# load sales data
load_table_uri_autodetect_csv (CS_GCP_PROJECT, 
                               CS_BQ_DATASET_NAME, 
                               CS_LTV_SOL_DS_INPUT_SALES_TBL,
                               "gs://solutions-public-assets/analytics-componentized-patterns/ltv/sales_*")

Loaded 20467201 rows in table cdp_ws_cs.ltv_sol_input_sales.


In [21]:
# load customer data
load_table_uri_autodetect_csv (CS_GCP_PROJECT, 
                               CS_BQ_DATASET_NAME, 
                               CS_LTV_SOL_DS_INPUT_CUST_TBL,
                               "gs://solutions-public-assets/analytics-componentized-patterns/ltv/crm.csv")

Loaded 100000 rows in table cdp_ws_cs.ltv_sol_input_customer.


# Transform phase

### *Data prep and transform for the Customer LTV model*

In [22]:
# sql to create the aggregate table
cs_ltv_sol_create_aggregate_order_tbl = f"""
  DECLARE MAX_STDV_MONETARY INT64 DEFAULT {CS_CLV_MAX_STDV_MONETARY};
  DECLARE MAX_STDV_QTY INT64 DEFAULT {CS_CLV_MAX_STDV_QTY};

  CREATE OR REPLACE TABLE `{CS_BQ_DATASET_NAME}.{CS_LTV_SOL_DS_CLV_AGGREGATE_TBL}` AS
  SELECT
    customer_id,
    order_day,
    ROUND(day_value_after_returns, 2) AS value,
    day_qty_after_returns as qty_articles,
    day_num_returns AS num_returns,
    CEIL(avg_time_to_return) AS time_to_return
  FROM (
    SELECT
      customer_id,
      order_day,
      SUM(order_value_after_returns) AS day_value_after_returns,
      STDDEV(SUM(order_value_after_returns)) OVER(PARTITION BY customer_id ORDER BY SUM(order_value_after_returns)) AS stdv_value,
      SUM(order_qty_after_returns) AS day_qty_after_returns,
      STDDEV(SUM(order_qty_after_returns)) OVER(PARTITION BY customer_id ORDER BY SUM(order_qty_after_returns)) AS stdv_qty,
      CASE
        WHEN MIN(order_min_qty) < 0 THEN count(1)
        ELSE 0
      END AS day_num_returns,
      CASE
        WHEN MIN(order_min_qty) < 0 THEN AVG(time_to_return)
        ELSE NULL
      END AS avg_time_to_return
    FROM (
      SELECT
        customer_id,
        order_id,
        -- Gives the order date vs return(s) dates.
        MIN(transaction_date) AS order_day,
        MAX(transaction_date) AS return_final_day,
        DATE_DIFF(MAX(transaction_date), MIN(transaction_date), DAY) AS time_to_return,
        -- Aggregates all products in the order
        -- and all products returned later.
        SUM(qty * unit_price) AS order_value_after_returns,
        SUM(qty) AS order_qty_after_returns,
        -- If negative, order has qty return(s).
        MIN(qty) order_min_qty
      FROM
        `{CS_BQ_DATASET_NAME}.{CS_LTV_SOL_DS_INPUT_SALES_TBL}`
      GROUP BY
        customer_id,
        order_id)
    GROUP BY
      customer_id,
      order_day)
  WHERE
    -- [Optional] Remove dates with outliers per a customer.
    (stdv_value < MAX_STDV_MONETARY
      OR stdv_value IS NULL) AND
    (stdv_qty < MAX_STDV_QTY
      OR stdv_qty IS NULL);
"""
print (cs_ltv_sol_create_aggregate_order_tbl)
execute_sql(cs_ltv_sol_create_aggregate_order_tbl)


  DECLARE MAX_STDV_MONETARY INT64 DEFAULT 500;
  DECLARE MAX_STDV_QTY INT64 DEFAULT 100;

  CREATE OR REPLACE TABLE `cdp_ws_cs.ltv_sol_clv_aggregation` AS
  SELECT
    customer_id,
    order_day,
    ROUND(day_value_after_returns, 2) AS value,
    day_qty_after_returns as qty_articles,
    day_num_returns AS num_returns,
    CEIL(avg_time_to_return) AS time_to_return
  FROM (
    SELECT
      customer_id,
      order_day,
      SUM(order_value_after_returns) AS day_value_after_returns,
      STDDEV(SUM(order_value_after_returns)) OVER(PARTITION BY customer_id ORDER BY SUM(order_value_after_returns)) AS stdv_value,
      SUM(order_qty_after_returns) AS day_qty_after_returns,
      STDDEV(SUM(order_qty_after_returns)) OVER(PARTITION BY customer_id ORDER BY SUM(order_qty_after_returns)) AS stdv_qty,
      CASE
        WHEN MIN(order_min_qty) < 0 THEN count(1)
        ELSE 0
      END AS day_num_returns,
      CASE
        WHEN MIN(order_min_qty) < 0 THEN AVG(time_to_return)
        ELSE 

### Total aggregate sales record (DML)

In [23]:
# print total number of records in the table
df = bq_client.query('''
  SELECT count (*) AS uci_total_sales_aggregate_records
  FROM `%s.%s`
''' % (CS_BQ_DATASET_NAME, CS_LTV_SOL_DS_CLV_AGGREGATE_TBL)).to_dataframe()
# print total number of records in the table
df

Unnamed: 0,uci_total_sales_aggregate_records
0,362212


## Create a Features table that you will use to model clv (DDL)

In [24]:
# create features for the clv model
cs_ltv_sol_ds_create_feature_tbl = f"""
  CREATE OR REPLACE TABLE `{CS_BQ_DATASET_NAME}.{CS_LTV_SOL_DS_FEATURE_TBL}`
  (
    customer_id INT64,
    monetary FLOAT64,
    frequency INT64,
    recency INT64,
    T INT64,
    time_between FLOAT64,
    avg_basket_value FLOAT64,
    avg_basket_size FLOAT64,
    has_returns STRING,
    avg_time_to_return FLOAT64,
    num_returns INT64,
    -- threshold DATE,
    -- step INT64,
    target_monetary FLOAT64,
  )
"""
print (cs_ltv_sol_ds_create_feature_tbl)
execute_sql(cs_ltv_sol_ds_create_feature_tbl)


  CREATE OR REPLACE TABLE `cdp_ws_cs.ltv_sol_features`
  (
    customer_id INT64,
    monetary FLOAT64,
    frequency INT64,
    recency INT64,
    T INT64,
    time_between FLOAT64,
    avg_basket_value FLOAT64,
    avg_basket_size FLOAT64,
    has_returns STRING,
    avg_time_to_return FLOAT64,
    num_returns INT64,
    -- threshold DATE,
    -- step INT64,
    target_monetary FLOAT64,
  )

Querty executed.


## Create a stored proc to populate the Features table (DDL + DML)

In [25]:
# create a stored proc to populate the Features table
cs_ltv_sol_ds_create_sp_populate_feature_tbl = f"""
  CREATE OR REPLACE PROCEDURE `{CS_BQ_DATASET_NAME}.{CS_LTV_SOL_DS_ML_FEATURE_PREP_SP}`(
    WINDOW_LENGTH INT64,         -- How many days back for inputs transactions.
    WINDOW_STEP INT64,          -- How many days between thresholds.
    WINDOW_STEP_INITIAL INT64,  -- How many days for the first window.        
    LENGTH_FUTURE INT64        -- How many days to predict for.  
  )

  BEGIN

  DECLARE MIN_DATE DATE;        -- Date of the first order in the dataset.                                     
  DECLARE MAX_DATE DATE;        -- Date of the final order in the dataset.
  DECLARE THRESHOLD_DATE DATE;  -- Date that separates inputs orders from target orders.  
  DECLARE WINDOW_START DATE;    -- Date at which an input transactions window starts.
  DECLARE STEP INT64 DEFAULT 1; -- Index of the window being run.

  -- Aggregates per date per customers.
  -- Creates the inputs and targets accross multiple threshold dates.
  SET (MIN_DATE, MAX_DATE) = (
    SELECT AS STRUCT 
      MIN(order_day) AS min_days,
      MAX(order_day) AS max_days
    FROM
      `{CS_BQ_DATASET_NAME}.{CS_LTV_SOL_DS_CLV_AGGREGATE_TBL}`
  );

  SET THRESHOLD_DATE = MIN_DATE;

  LOOP
    -- Can choose a longer original window in case 
    -- there were not many orders in the early days.
    IF STEP = 1 THEN
      SET THRESHOLD_DATE = DATE_ADD(THRESHOLD_DATE, INTERVAL WINDOW_STEP_INITIAL DAY); 
    ELSE
      SET THRESHOLD_DATE = DATE_ADD(THRESHOLD_DATE, INTERVAL WINDOW_STEP DAY);
    END IF;
    SET STEP = STEP + 1;

    IF THRESHOLD_DATE >= DATE_SUB(MAX_DATE, INTERVAL (WINDOW_STEP) DAY) THEN
      LEAVE;
    END IF;

    -- Takes all transactions before the threshold date unless you decide
    -- to use a different window lenght to test model performance.
    IF WINDOW_LENGTH != 0 THEN
      SET WINDOW_START = DATE_SUB(THRESHOLD_DATE, INTERVAL WINDOW_LENGTH DAY);
    ELSE
      SET WINDOW_START = MIN_DATE;
    END IF;

    INSERT `{CS_BQ_DATASET_NAME}.{CS_LTV_SOL_DS_FEATURE_TBL}`
    SELECT
      -- CASE
      --   WHEN THRESHOLD_DATE <= DATE_SUB(MAX_DATE, INTERVAL LENGTH_FUTURE DAY) THEN 'UNASSIGNED'
      --   ELSE 'TEST'
      -- END AS dataset,
      tf.customer_id,
      ROUND(tf.monetary_orders, 2) AS monetary,
      tf.cnt_orders AS frequency,
      tf.recency,
      tf.T,
      ROUND(tf.recency/cnt_orders, 2) AS time_between,
      ROUND(tf.avg_basket_value, 2) AS avg_basket_value,
      ROUND(tf.avg_basket_size, 2) AS avg_basket_size,
      has_returns,
      CEIL(avg_time_to_return) AS avg_time_to_return,
      num_returns,
      -- THRESHOLD_DATE AS threshold,
      -- STEP - 1 AS step,
      ROUND(tt.target_monetary, 2) AS target_monetary,
    FROM (
        -- This SELECT uses only data before THRESHOLD_DATE to make features.
        SELECT
          customer_id,
          SUM(value) AS monetary_orders,
          DATE_DIFF(MAX(order_day), MIN(order_day), DAY) AS recency,
          DATE_DIFF(THRESHOLD_DATE, MIN(order_day), DAY) AS T,
          COUNT(DISTINCT order_day) AS cnt_orders,
          AVG(qty_articles) avg_basket_size,
          AVG(value) avg_basket_value,
          CASE
            WHEN SUM(num_returns) > 0 THEN 'y'
            ELSE 'n'
          END AS has_returns,
          AVG(time_to_return) avg_time_to_return,
          THRESHOLD_DATE AS threshold,
          SUM(num_returns) num_returns,
        FROM
          `{CS_BQ_DATASET_NAME}.{CS_LTV_SOL_DS_CLV_AGGREGATE_TBL}`
        WHERE
          order_day <= THRESHOLD_DATE AND
          order_day >= WINDOW_START
        GROUP BY
          customer_id
      ) tf
    INNER JOIN (
      -- This SELECT uses all orders that happened between threshold and 
      -- threshold + LENGTH_FUTURE to calculte the target monetary.
      SELECT
        customer_id,
        SUM(value) target_monetary
      FROM
        `{CS_BQ_DATASET_NAME}.{CS_LTV_SOL_DS_CLV_AGGREGATE_TBL}`
      WHERE
        order_day <= DATE_ADD(THRESHOLD_DATE, INTERVAL LENGTH_FUTURE DAY)
        -- Overall value is similar to predicting only what's after threshold.
        -- and the prediction performs better. We can substract later.
        -- AND order_day > THRESHOLD_DATE
      GROUP BY
        customer_id) tt
    ON
      tf.customer_id = tt.customer_id;

  END LOOP;
  END
"""
print (cs_ltv_sol_ds_create_sp_populate_feature_tbl)
execute_sql(cs_ltv_sol_ds_create_sp_populate_feature_tbl)


  CREATE OR REPLACE PROCEDURE `cdp_ws_cs.ltv_sol_ml_prep_sp`(
    WINDOW_LENGTH INT64,         -- How many days back for inputs transactions.
    WINDOW_STEP INT64,          -- How many days between thresholds.
    WINDOW_STEP_INITIAL INT64,  -- How many days for the first window.        
    LENGTH_FUTURE INT64        -- How many days to predict for.  
  )

  BEGIN

  DECLARE MIN_DATE DATE;        -- Date of the first order in the dataset.                                     
  DECLARE MAX_DATE DATE;        -- Date of the final order in the dataset.
  DECLARE THRESHOLD_DATE DATE;  -- Date that separates inputs orders from target orders.  
  DECLARE WINDOW_START DATE;    -- Date at which an input transactions window starts.
  DECLARE STEP INT64 DEFAULT 1; -- Index of the window being run.

  -- Aggregates per date per customers.
  -- Creates the inputs and targets accross multiple threshold dates.
  SET (MIN_DATE, MAX_DATE) = (
    SELECT AS STRUCT 
      MIN(order_day) AS min_days,
 

In [26]:
# populate the feature table
# with the current dataset it takes ~1 minute to complete the execution
cs_ltv_sol_ds_invoke_sp_populate_feature_tbl = f"""
  DECLARE WINDOW_LENGTH INT64 DEFAULT {CS_CLV_WINDOW_LENGTH};
  DECLARE WINDOW_STEP INT64 DEFAULT {CS_CLV_WINDOW_STEP};
  DECLARE WINDOW_STEP_INITIAL INT64 DEFAULT {CS_CLV_WINDOW_STEP_INITIAL};
  DECLARE LENGTH_FUTURE INT64 DEFAULT {CS_CLV_LENGTH_FUTURE};
  BEGIN 
      CALL `{CS_BQ_DATASET_NAME}.{CS_LTV_SOL_DS_ML_FEATURE_PREP_SP}`(WINDOW_LENGTH, WINDOW_STEP, WINDOW_STEP_INITIAL, LENGTH_FUTURE);
  EXCEPTION WHEN ERROR THEN
    SELECT
      @@error.message,
      @@error.stack_trace,
      @@error.statement_text,
      @@error.formatted_stack_trace;
  END
"""
print (cs_ltv_sol_ds_invoke_sp_populate_feature_tbl)
execute_sql(cs_ltv_sol_ds_invoke_sp_populate_feature_tbl)


  DECLARE WINDOW_LENGTH INT64 DEFAULT 0;
  DECLARE WINDOW_STEP INT64 DEFAULT 30;
  DECLARE WINDOW_STEP_INITIAL INT64 DEFAULT 90;
  DECLARE LENGTH_FUTURE INT64 DEFAULT 30;
  BEGIN 
      CALL `cdp_ws_cs.ltv_sol_ml_prep_sp`(WINDOW_LENGTH, WINDOW_STEP, WINDOW_STEP_INITIAL, LENGTH_FUTURE);
  EXCEPTION WHEN ERROR THEN
    SELECT
      @@error.message,
      @@error.stack_trace,
      @@error.statement_text,
      @@error.formatted_stack_trace;
  END

Querty executed.


## Create the clv model (DDL)

In [27]:
# create the clv model
# takes ~1 hour and 30 mins
cs_ltv_sol_ds_create_clv_model = f"""
  CREATE OR REPLACE MODEL `{CS_BQ_DATASET_NAME}.{CS_LTV_SOL_DS_ML_MODEL_NAME}`
        OPTIONS(MODEL_TYPE="AUTOML_REGRESSOR",
                INPUT_LABEL_COLS=["target_monetary"],
                OPTIMIZATION_OBJECTIVE="MINIMIZE_MAE")
  AS SELECT
    * EXCEPT(customer_id)
  FROM
    `{CS_BQ_DATASET_NAME}.{CS_LTV_SOL_DS_FEATURE_TBL}`
"""
print (cs_ltv_sol_ds_create_clv_model)
execute_sql(cs_ltv_sol_ds_create_clv_model)


  CREATE OR REPLACE MODEL `cdp_ws_cs.ltv_sol_clv_ml_model`
        OPTIONS(MODEL_TYPE="AUTOML_REGRESSOR",
                INPUT_LABEL_COLS=["target_monetary"],
                OPTIMIZATION_OBJECTIVE="MINIMIZE_MAE")
  AS SELECT
    * EXCEPT(customer_id)
  FROM
    `cdp_ws_cs.ltv_sol_features`

Querty executed.


## Predict Customer ltv (DDL + DML)

In [28]:
# create a stored proc to populate the Features table
cs_create_sp_populate_predict_tbl = f"""
CREATE OR  REPLACE PROCEDURE `{CS_BQ_DATASET_NAME}.{CS_LTV_SOL_DS_ML_PREDICT_UTIL_SP}`(
  WINDOW_LENGTH INT64
)
BEGIN
  -- How many days back for inputs transactions. 0 means from the start.
  -- DECLARE WINDOW_LENGTH INT64 DEFAULT WINDOW_LENGTH;
  -- Date at which an input transactions window starts.
  DECLARE WINDOW_START DATE;

  -- Date of the first transaction in the dataset.
  DECLARE MIN_DATE DATE;
  -- Date of the final transaction in the dataset.
  DECLARE MAX_DATE DATE;
  -- Date from which you want to predict.
  DECLARE PREDICT_FROM_DATE DATE;

  SET (MIN_DATE, MAX_DATE) = (
    SELECT AS STRUCT
      MIN(order_day) AS min_days,
      MAX(order_day) AS max_days
    FROM
      `{CS_BQ_DATASET_NAME}.{CS_LTV_SOL_DS_CLV_AGGREGATE_TBL}`
  );

  -- You can set any date here. In production, it is generally today.
  SET PREDICT_FROM_DATE = MAX_DATE;
  IF WINDOW_LENGTH != 0 THEN
    SET WINDOW_START = DATE_SUB(PREDICT_FROM_DATE, INTERVAL WINDOW_LENGTH DAY);
  ELSE
    SET WINDOW_START = MIN_DATE;
  END IF;

  CREATE OR REPLACE TABLE `{CS_BQ_DATASET_NAME}.{CS_LTV_SOL_DS_ML_PREDICT_TBL}`
  AS (
  SELECT
    cust_tbl.customer_id,
    cust_tbl.full_name,
    cust_tbl.job_title,
    cust_tbl.email,
    monetary AS monetary_so_far,
    ROUND(predicted_target_monetary, 2) AS monetary_predicted,
    ROUND(predicted_target_monetary - monetary, 2) AS monetary_future
  FROM
    ML.PREDICT(
      -- To use your own model, set the model name here.
      MODEL `{CS_BQ_DATASET_NAME}.{CS_LTV_SOL_DS_ML_MODEL_NAME}`,
      (
        SELECT
          agg_tmp_tbl.customer_id,
          ROUND(monetary_orders, 2) AS monetary,
          cnt_orders AS frequency,
          recency,
          T,
          ROUND(recency/cnt_orders, 2) AS time_between,
          ROUND(avg_basket_value, 2) AS avg_basket_value,
          ROUND(avg_basket_size, 2) AS avg_basket_size,
          has_returns,
          CEIL(avg_time_to_return) AS avg_time_to_return,
          num_returns
        FROM (
          SELECT
            agg_tbl.customer_id,
            SUM(value) AS monetary_orders,
            DATE_DIFF(MAX(order_day), MIN(order_day), DAY) AS recency,
            DATE_DIFF(PREDICT_FROM_DATE, MIN(order_day), DAY) AS T,
            COUNT(DISTINCT order_day) AS cnt_orders,
            AVG(qty_articles) avg_basket_size,
            AVG(value) avg_basket_value,
            CASE
              WHEN SUM(num_returns) > 0 THEN 'y'
              ELSE 'n'
            END AS has_returns,
            AVG(time_to_return) avg_time_to_return,
            SUM(num_returns) num_returns,
          FROM
            `{CS_BQ_DATASET_NAME}.{CS_LTV_SOL_DS_CLV_AGGREGATE_TBL}` agg_tbl
          WHERE
            order_day <= PREDICT_FROM_DATE AND
            order_day >= WINDOW_START
          GROUP BY
            agg_tbl.customer_id
        ) agg_tmp_tbl
      )
    ) pred_temp,
    `{CS_BQ_DATASET_NAME}.{CS_LTV_SOL_DS_INPUT_CUST_TBL}` cust_tbl
    where 
    pred_temp.customer_id = cust_tbl.customer_id
  );
END
"""
print (cs_create_sp_populate_predict_tbl)
execute_sql(cs_create_sp_populate_predict_tbl)


CREATE OR  REPLACE PROCEDURE `cdp_ws_cs.ltv_sol_clv_ml_predict_util_sp`(
  WINDOW_LENGTH INT64
)
BEGIN
  -- How many days back for inputs transactions. 0 means from the start.
  -- DECLARE WINDOW_LENGTH INT64 DEFAULT WINDOW_LENGTH;
  -- Date at which an input transactions window starts.
  DECLARE WINDOW_START DATE;

  -- Date of the first transaction in the dataset.
  DECLARE MIN_DATE DATE;
  -- Date of the final transaction in the dataset.
  DECLARE MAX_DATE DATE;
  -- Date from which you want to predict.
  DECLARE PREDICT_FROM_DATE DATE;

  SET (MIN_DATE, MAX_DATE) = (
    SELECT AS STRUCT
      MIN(order_day) AS min_days,
      MAX(order_day) AS max_days
    FROM
      `cdp_ws_cs.ltv_sol_clv_aggregation`
  );

  -- You can set any date here. In production, it is generally today.
  SET PREDICT_FROM_DATE = MAX_DATE;
  IF WINDOW_LENGTH != 0 THEN
    SET WINDOW_START = DATE_SUB(PREDICT_FROM_DATE, INTERVAL WINDOW_LENGTH DAY);
  ELSE
    SET WINDOW_START = MIN_DATE;
  END IF;

  CREATE O

In [29]:
# Create and predict the clv of customers
# with the current dataset it takes ~1.5 minute to complete the execution
cs_ltv_sol_ds_invoke_sp_populate_predict_tbl = f"""
DECLARE WINDOW_LENGTH INT64 DEFAULT {CS_CLV_WINDOW_LENGTH};
BEGIN
    CALL `{CS_BQ_DATASET_NAME}.{CS_LTV_SOL_DS_ML_PREDICT_UTIL_SP}`(WINDOW_LENGTH);
    EXCEPTION WHEN ERROR THEN
    SELECT
      @@error.message,
      @@error.stack_trace,
      @@error.statement_text,
      @@error.formatted_stack_trace;
END
"""
print (cs_ltv_sol_ds_invoke_sp_populate_predict_tbl)
execute_sql(cs_ltv_sol_ds_invoke_sp_populate_predict_tbl)


DECLARE WINDOW_LENGTH INT64 DEFAULT 0;
BEGIN
    CALL `cdp_ws_cs.ltv_sol_clv_ml_predict_util_sp`(WINDOW_LENGTH);
    EXCEPTION WHEN ERROR THEN
    SELECT
      @@error.message,
      @@error.stack_trace,
      @@error.statement_text,
      @@error.formatted_stack_trace;
END

Querty executed.


In [30]:
# adjust the below query to grab only a sample dataset e.g. use a where clause.
df = bq_client.query('''
  SELECT *
  FROM `%s.%s`
''' % (CS_BQ_DATASET_NAME,CS_LTV_SOL_DS_ML_PREDICT_TBL)).to_dataframe()
df.describe()

Unnamed: 0,customer_id,monetary_so_far,monetary_predicted,monetary_future
count,99070.0,99070.0,99070.0,99070.0
mean,50008.361563,1524.427797,1525.180655,0.752854
std,28862.604042,1158.682822,1158.815462,0.576954
min,1.0,0.0,0.87,-23.76
25%,25021.25,652.23,652.7225,0.42
50%,50010.5,1279.3,1279.895,0.68
75%,74992.5,2131.7975,2132.495,1.03
max,99999.0,31088.88,31065.12,10.58


# Analyze phase

## Display customers list with their ltv

In [31]:
# adjust the below query to grab only a sample dataset e.g. use a where clause.
df = bq_client.query('''
  SELECT *
  FROM `%s.%s`
  ORDER BY customer_id
  # ORDER BY monetary_predicted desc
  LIMIT 10
''' % (CS_BQ_DATASET_NAME,CS_LTV_SOL_DS_ML_PREDICT_TBL)).to_dataframe()
drop_columns = ['monetary_so_far', 'monetary_future']
df.drop(drop_columns, axis=1, inplace=True)
df

Unnamed: 0,customer_id,full_name,job_title,email,monetary_predicted
0,1,Eric Hilton,Product Manager,jkocher0@163-example.com,415.03
1,2,Fritz S Hillis,Product Manager,lvandermark1@sciencedirect-example.com,1496.64
2,3,Joann Merrill,CEO,enorthwood2@biglobe.ne-example.jp,1122.01
3,4,Elvin Rochon,Software Engineer,eoller3@comcast-example.net,293.92
4,5,Rebecca V Larson,CIO,srippen4@wunderground-example.com,3850.99
5,6,Siobhan Riley,Vice President,ydouberday5@admin-example.ch,1031.98
6,7,Erica Z Coffey,Software Engineer,meddisforth6@jalbum-example.net,1720.93
7,8,Melissia P Cespedes,Software Engineer,alockie7@cafepress-example.com,1544.95
8,9,Ida K Rodriquez,Software Engineer,fharbidge8@sitemeter-example.com,759.28
9,10,Tena F Bloomberg,CIO,fabrahamovitz9@tiny-example.cc,5696.04


In [33]:
# adjust the below query to grab only a sample dataset e.g. use a where clause.
df = bq_client.query('''
  SELECT *
  FROM `%s.%s`
  # ORDER BY id
  ORDER BY monetary_predicted desc
  LIMIT 10
''' % (CS_BQ_DATASET_NAME,CS_LTV_SOL_DS_ML_PREDICT_TBL)).to_dataframe()
drop_columns = ['monetary_so_far', 'monetary_future']
df.drop(drop_columns, axis=1, inplace=True)
df

Unnamed: 0,customer_id,full_name,job_title,email,monetary_predicted
0,39030,Crystle N Hake,Vice President,rkalfu45@myspace-example.com,31065.12
1,23553,Shannon S Ashley,CIO,nhavocki68@360-example.cn,27738.73
2,28722,Melvin Strand,Product Manager,dlafflinam5t@nih-example.gov,20986.28
3,15918,Delmar Motley,CIO,mchantca5@ox.ac-example.uk,20073.68
4,35073,Judith Potter,Software Engineer,rbellshamr28@surveymonkey-example.com,19320.57
5,14480,Casey W Burress,CEO,hrameauxb67@mozilla-example.com,17465.78
6,99422,Orval X Hutton,CEO,oriseley24pp@independent.co-example.uk,16931.41
7,54786,Rosemary Zacharias,Product Manager,hduling169t@bbb-example.org,15924.72
8,38732,Lesley N Trussell,Vice President,jholdintvv@opera-example.com,14387.3
9,61895,Amanda B Holland,CIO,fpirolini1bra@ustream-example.tv,13193.63


# Google Cloud Resource Clean up

## Delete the BigQuery Dataset

In [None]:
# deletes the dataset
# delete_dataset(CS_BQ_DATASET_NAME)

Deleted dataset 'cdp_cs'.


## Delete the Google Cloud Project
To avoid incurring charges to your Google Cloud Platform account for the resources used in this tutorial is to **Delete the project**.

The easiest way to eliminate billing is to delete the project you created for the tutorial.

**Caution**: Deleting a project has the following effects:
* *Everything in the project is deleted.* If you used an existing project for this tutorial, when you delete it, you also delete any other work you've done in the project.
* <b>Custom project IDs are lost. </b>When you created this project, you might have created a custom project ID that you want to use in the future. To preserve the URLs that use the project ID, such as an appspot.com</b> URL, delete selected resources inside the project instead of deleting the whole project. 

If you plan to explore multiple tutorials and quickstarts, reusing projects can help you avoid exceeding project quota limits.
<br>
<ol type="1">
    <li>In the Cloud Console, go to the <b>Manage resources</b> page.</li>
    Go to the <a href="https://console.cloud.google.com/iam-admin/projects">Manage resources page</a>
    <li>In the project list, select the project that you want to delete and then click <b>Delete</b> Trash icon.</li>
    <li>In the dialog, type the project ID and then click <b>Shut down</b> to delete the project. </li>
</ol>
