# DISCLAIMER
Copyright 2021 Google LLC. 

*This solution, including any related sample code or data, is made available on an “as is,” “as available,” and “with all faults” basis, solely for illustrative purposes, and without warranty or representation of any kind. This solution is experimental, unsupported and provided solely for your convenience. Your use of it is subject to your agreements with Google, as applicable, and may constitute a beta feature as defined under those agreements. To the extent that you make any data available to Google in connection with your use of the solution, you represent and warrant that you have all necessary and appropriate rights, consents and permissions to permit Google to use and process that data. By using any portion of this solution, you acknowledge, assume and accept all risks, known and unknown, associated with its usage, including with respect to your deployment of any portion of this solution in your systems, or usage in connection with your business, if at all.*

# Crystalvalue Demo: Predictive Customer LifeTime Value for a Retail Store

Crystalvalue is a best practice comprehensive solution for running end-to-end LTV solutions leveraging Google Cloud Vertex AI.

This demo runs the Crystalvalue python library in a notebook, from feature engineering to scheduling predictions. This notebook uses the Online Retail II data set from Kaggle which contains transactions for a UK retail store between 01/12/2009 and 09/12/2011. More details on this dataset can be found here https://www.kaggle.com/mashlyn/online-retail-ii-uci.

This notebook assumes that it is being run from within a [Google Cloud Platform AI Notebook](https://console.cloud.google.com/vertex-ai/notebooks/list/instances) with a Compute Engine default service account (the default setting when an AI Notebook is created) and with a standard Python 3 environment.

If you would like to share feedback about Crystalvalue, please email crystalvalue@google.com.

# Clone the Crystalvalue codebase

If you have not cloned the Crystalvalue codebase yet, open up a terminal and execute the following commands.

To clone the code you can authenticate by following these [steps](https://g3doc.corp.google.com/company/teams/gtech/ads/das/cse/faq/tools/professional-services-googlesource-com.md#working-from-gcp) as a Googler. To grant customer access to the codebase follow these [steps](https://g3doc.corp.google.com/company/teams/gtech/ads/das/cse/faq/tools/professional-services-googlesource-com.md#external-users).

```git clone https://professional-services.googlesource.com/solutions/crystalvalue```

Copy the notebook into your current working directory and run it there (in the parent directory of the crystalvalue directory).

```cp ./crystalvalue/crystalvalue_demo_notebook.ipynb .```

# Set up - Downloading the dataset

In order to use the Kaggle’s public API, you must first authenticate using an API token. You can do this by visiting your Kaggle account and click 'Create New API Token' (See https://www.kaggle.com/docs/api). This will download an API token (called kaggle.json). Put this file in your working directory and run the following commands from your AI Notebook.

In [None]:
!pip install kaggle

Kaggle requires the json to be in a specific folder called 'kaggle'

In [None]:
!mkdir ~/.kaggle

In [None]:
!cp kaggle.json ~/.kaggle/kaggle.json

In [None]:
!kaggle datasets download -d mashlyn/online-retail-ii-uci

In [None]:
!sudo apt-get install unzip

In [None]:
!unzip online-retail-ii-uci.zip -d data/

This creates a `online_retail_II.csv` in `/data` which we will import into BigQuery in the next steps.

# Installing dependencies and initializing Crystalvalue

First create a dataset in [Bigquery](https://console.cloud.google.com/bigquery) that will be used for this analysis if you don't already have one. The dataset location should be in a [location that Vertex AI services are available](https://cloud.google.com/vertex-ai/docs/general/locations#available_regions). 

In [None]:
%pip install -q -r './crystalvalue/requirements.txt'

In [None]:
import pandas as pd

from crystalvalue import crystalvalue
from google.cloud import bigquery

In [None]:
# Create BigQuery client for cloud authentication.
bigquery_client = bigquery.Client()

In [None]:
# Read the data and rename the columns to be BiqQuery friendly (no spaces).
data = pd.read_csv('./data/online_retail_II.csv')
data.columns = data.columns.str.replace(' ', '')
data.head()

In [None]:
# Load the data to Bigquery.
DATASET_ID = 'your_dataset_name'  # This is the name of the dataset that you created. 
TABLE_NAME = 'online_retail_data'  # This is what we will call the table that will be created in the dataset.
LOCATION = 'europe-west4'  # This is the location of your dataset in Bigquery. Here we use 'europe-west4`.

bigquery_job = bigquery_client.load_table_from_dataframe(
      dataframe=data,
      destination=f'{bigquery_client.project}.{DATASET_ID}.{TABLE_NAME}',
      location=LOCATION).result()

In [None]:
# Initiate the CrystalValue class with the relevant parameters.
pipeline = crystalvalue.CrystalValue(
  bigquery_client=bigquery_client,
  dataset_id=DATASET_ID,
  customer_id_column='CustomerID',
  date_column='InvoiceDate',
  value_column='Price',  #  Column to use for LTV calculation.
  days_lookback=90,  #  How many days in the past to use for feature engineering.
  days_lookahead=365,  #  How many days in the future to use for value prediction.
  ignore_columns=['Invoice'],  #  A list of columns in your input table to ignore.
  location=LOCATION
)  

# Data Checks

CrystalValue will run some checks on your data to check if the data is suitable for LTV modelling and raise errors if not. This will also output a new BigQuery table in your dataset called `crystalvalue_data_statistics` with key information such as the number of customers, transactions and analysis time period. This information can be used to check for outliers or anomalies (e.g. negative prices). 

In [None]:
summary_statistics = pipeline.run_data_checks(
    transaction_table_name=TABLE_NAME)

If a custom data cleaning routine has to be implemented use the `.run_query()` method. The example below removes transactions with negative prices. This method could also be used to run custom feature engineering scripts instead of the automated `.feature_engineering()` method in the next step. This data cleaning routine can be scheduled as part of the pipeline that we will define later (for model training and prediction).

In [None]:
# Run a quick data cleaning routine that removes transactions with negative prices
query = f"""
SELECT *
FROM {bigquery_client.project}.{DATASET_ID}.{TABLE_NAME}
WHERE Price > 0
"""

pipeline.run_query(
    query_sql=query,
    destination_table_name=TABLE_NAME,
    location=LOCATION)

# Feature Engineering

Crystalvalue takes transaction or browsing level dataset and creates a machine learning-ready dataset that can be ingested by AutoML. Data types are automatically inferred from the BigQuery schema unless the features are provided using the `feature_types` parameter in the `.feature_engineer()` method. Data transformations are applied automatically depending on the data type. The data crunching happens in BigQuery and the executed script can be optionally written to your directory. The features will be created in a BigQuery table called `crystalvalue_train_data` by default.

In [None]:
crystalvalue_train_data = pipeline.feature_engineer(
  transaction_table_name=TABLE_NAME,
  write_executed_query_file='crystalvalue/executed_train_query.sql'  # (Optional) File path to write the executed SQL query.
)  

# Model Training

Crystalvalue leverages [Vertex AI (Tabular) AutoML](https://cloud.google.com/vertex-ai/docs/training/automl-api) which requires a
[Vertex AI Dataset](https://cloud.google.com/vertex-ai/docs/datasets/create-dataset-api) as an input. CrystalValue automatically creates a Vertex AI Dataset from your input table as part of the training step of the pipeline. The training process typically takes about 2 or more hours to run. The Vertex AI Dataset will have a display name `crystalvalue_dataset`. The model will have a display name `crystalvalue_model` but it will also receive a model ID (so even if you train multiple models they will not be overwritten and can be identified using these IDs). By default CrystalValue chooses the following parameters:
*  Predefined split with random 15% of users as test, 15% in validation and 70% in
training.
*  Optimization objective as Minimize root-mean-squared error (RMSE). This is recommended but can be modified to [MAE or RMSLE](https://cloud.google.com/automl-tables/docs/train#opt-obj).
*  1 node hour of training (1000 milli node hours), which we recommend starting with. [Modify this in line with the number of rows](https://cloud.google.com/automl-tables/docs/train#training_a_model) in the dataset when you are ready for productionising. See information here about [pricing](https://cloud.google.com/automl-tables/pricing).

In this example we keep all the default settings so training the model is as simple as calling pipeline.train().

In order to make fast predictions later, you can deploy the model using the `.deploy_model()` method.

In [None]:
# Creates AI Platform Dataset and trains AutoML model.
model_object = pipeline.train()

In [None]:
model_object = pipeline.deploy_model()

# Model Evaluation

To evaluate model, we use the following criteria:

* The spearman correlation, a measure of how well the model **ranked** the Liftetime value of customers in the test set. This is measured between -1 (worse) and 1 (better).
* The normalised Gini coefficient, another measure of how well the model **ranked** the Lifetime value of customers in the test set compared to random ranking. This is measured between 0 (worse) and 1 (better). 
* The normalised Mean Average Error (MAE%). This is a measure of the **error** of the model's predictions for Lifetime value in the test set. 
* top_x_percent_predicted_customer_value_share: The proportion of value (i.e. total profit or revenue) in the test set that is accounted for by the top x% model-predicted customers. 

These outputs are sent to a BigQuery table (by default called `crystalvalue_evaluation`). Subsequent model evaluations append model performance evaluation metrics to this table to allow for comparison across models.

In [None]:
metrics = pipeline.evaluate_model()

# Generating predictions

Once model training is done, you can generate predictions. Features need to be engineered (the exact same as were used for model training) before prediction. This is done using the `.feature_engineer()` method by setting the parameter `query_type='predict_query'`. The features will be created in a BigQuery table called crystalvalue_predict_data by default. The model will make predictions for all customers in the provided input table that have any activity during the lookback window. The pLTV predictions will be for the period starting from the last date in the input table (not today's date).  

Once you start the training, you can view your model training progress here:  
https://console.cloud.google.com/vertex-ai/training/training-pipelines  
Once the training is finished, check out your Dataset (with statistics and distributions) and Model (with feature importance) in the UI:  
 https://console.cloud.google.com/vertex-ai/datasets   
 https://console.cloud.google.com/vertex-ai/models

In [None]:
crystalvalue_predict_data = pipeline.feature_engineer(
    transaction_table_name=TABLE_NAME,  # An existing bigquery table in your dataset id containing the data to predict with.
    query_type='predict_query')


predictions = pipeline.predict(
    input_table=crystalvalue_predict_data,
    destination_table='predictions'  # The bigquery table to append predictions to. It will be created if it does not exist yet.
    )  

# Scheduling predictions to occur every day

Crystalvalue uses [Vertex Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction) to schedule and monitor ML predictions and retraining in a serverless manner. The example below demonstrates how to set up the model to automatically create predictions using new input data from the source BigQuery table every day at 1am. The frequency and timing of the schedule can be altered using the chron schedule below. Once this pipeline is set up, you can view it [here](https://console.cloud.google.com/vertex-ai/pipelines). If you want a tutorial on how to set up Vertex Pipelines [this guide](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline) or see this [example notebook](https://github.com/GoogleCloudPlatform/ai-platform-samples/blob/master/ai-platform-unified/notebooks/official/pipelines/pipelines_intro_kfp.ipynb).

In order to use Vertex AI pipeline, we need a cloud storage bucket. Use the code below to create a cloud storage bucket.

In [None]:
BUCKET_NAME = f"gs://{bigquery_client.project}-crystalvalue" 
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root"

In [None]:
!gsutil mb -l $LOCATION $BUCKET_NAME

In order to use Vertex AI pipelines with Crystalvalue we also need to create a docker container which will be stored in Google Cloud Container Registry. The following code builds a docker container and pushes it to your [GCP Container Registry](https://cloud.google.com/container-registry).


In [None]:
!docker build -t crystalvalue .
!docker tag crystalvalue gcr.io/$PROJECT/crystalvalue
!docker push gcr.io/$PROJECT/crystalvalue

The Kubeflow components contains self-contained functions whichimport libraries and reinitiate objects. Read about [Kubeflow components](https://www.kubeflow.org/docs/components/pipelines/sdk/component-development/).  

In [None]:
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component
from kfp.v2.google.client import AIPlatformClient

@component(base_image=f"gcr.io/{bigquery_client.project}/crystalvalue:latest")
def crystalvalue_data_checks():  
  from crystalvalue import crystalvalue
  from google.cloud import bigquery
  bigquery_client = bigquery.Client(project=$PROJECT)  # Add your project
  dataset_id = $DATASET  # Add your dataset 
  table_name = 'online_retail_data'  
  location = 'europe-west1'
  pipeline = crystalvalue.CrystalValue(
    bigquery_client=bigquery_client,
    dataset_id=dataset_id, 
    customer_id_column='CustomerID',
    date_column='InvoiceDate',
    value_column='Price',  
    days_lookback=90,  
    days_lookahead=365,  
    location=location,
    ignore_columns=['Invoice'])  
  summary_statistics = pipeline.run_data_checks(
    transaction_table_name=table_name)
  

@component(base_image=f"gcr.io/{bigquery_client.project}/crystalvalue:latest")
def feature_engineering():  
  from crystalvalue import crystalvalue
  from google.cloud import bigquery
  bigquery_client = bigquery.Client(project=$PROJECT)  # Add your project
  dataset_id = $DATASET  # Add your dataset 
  table_name = 'online_retail_data'  
  location = 'europe-west1'
  pipeline = crystalvalue.CrystalValue(
    bigquery_client=bigquery_client,
    dataset_id=dataset_id, 
    customer_id_column='CustomerID',
    date_column='InvoiceDate',
    value_column='Price',  
    days_lookback=90,  
    days_lookahead=365,  
    location=location,
    ignore_columns=['Invoice'])  
  data = pipeline.feature_engineer(
    transaction_table_name=table_name)
  
  
  
@component(base_image=f"gcr.io/{bigquery_client.project}/crystalvalue:latest")
def predict():  
  from crystalvalue import crystalvalue
  from google.cloud import bigquery
  bigquery_client = bigquery.Client(project=$PROJECT)  # Add your project
  dataset_id = $DATASET  # Add your dataset 
  table_name = 'online_retail_data'  
  location = 'europe-west1'
  pipeline = crystalvalue.CrystalValue(
    bigquery_client=bigquery_client,
    dataset_id=dataset_id, 
    customer_id_column='CustomerID',
    date_column='InvoiceDate',
    value_column='Price',  
    days_lookback=90,  
    days_lookahead=365,
    ignore_columns=['Invoice'],  
    location=location,
    model_id=$MODEL_ID  # Add your model ID here e.g. '45252345252624534'
    )
  pipeline.batch_predict(
    input_table_name='predict_features_data',  
    destination_table='predictions')


@dsl.pipeline(
    name="crystalvaluepipeline",
    description="Runs predictions for crystalvalue",
    pipeline_root=PIPELINE_ROOT,
)
def crystalvalue_pipeline():
    summary = crystalvalue_data_checks()
    data = feature_engineering()
    result = predict()
    
compiler.Compiler().compile(
  pipeline_func=crystalvalue_pipeline,
  package_path="crystalvaluepipeline.json"
)

# Choose a region compatible with Vertex Pipelines. 
# This doesn't have to be the same as your data location.
api_client = AIPlatformClient(
    project_id=bigquery_client.project,
    region='europe-west4',  
)

In [None]:
# Optional: Check if you pipeline runs
# api_client.create_run_from_job_spec(job_spec_path="crystalvaluepipeline.json")

In [None]:
# Create the scheduled pipeline.
# Adjust time zone and cron schedule as necessary.
response = api_client.create_schedule_from_job_spec(
    job_spec_path="crystalvaluepipeline.json",
    schedule="0 1 * * *",
    time_zone="America/Los_Angeles")

You can view your running and scheduled pipelines at:
https://console.cloud.google.com/vertex-ai/pipelines