# Infra Config

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
PROJECT_ID = "looker-sandbox-323013"
REGION = "europe-west4"
BUCKET_URI = f"gs://vertex-ai-order-items-churn"
PIPELINE_ROOT = "{}/pipeline_root/order_items_churn".format(BUCKET_URI)
API_ENDPOINT = "{}-aiplatform.googleapis.com".format(REGION)

In [3]:
#shell_output = !gcloud auth list 2>/dev/null
#SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()
#print(SERVICE_ACCOUNT)

SERVICE_ACCOUNT = "1001913874856-compute@developer.gserviceaccount.com"

#! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI
#! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

In [55]:
! gsutil iam ch aminehakkou@google.com:roles/storage.objectCreator $BUCKET_URI

CommandException: Incorrect public member type for binding aminehakkou@google.com:roles/storage.objectCreator


# Components Definition

## Artifacts type documentation : https://github.com/kubeflow/pipelines/blob/55a2fb5c20011b01945c9867ddff0d39e9db1964/sdk/python/kfp/v2/components/types/artifact_types.py#L255-L256

In [4]:
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [21]:
!{sys.executable} -m pip install --no-deps kfp==1.8.14

Collecting kfp==1.8.14
  Downloading kfp-1.8.14.tar.gz (304 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m304.3/304.3 kB[0m [31m10.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hBuilding wheels for collected packages: kfp
  Building wheel for kfp (setup.py) ... [?25ldone
[?25h  Created wheel for kfp: filename=kfp-1.8.14-py3-none-any.whl size=426472 sha256=2d5431651f858a114143f8ef17d7439557ad8e3eb9e51a9cae982d7a23a9597c
  Stored in directory: /home/jupyter/.cache/pip/wheels/e7/b7/6d/b22f3f664269a163d3f0d15e01b723fa8695ca00f99983031e
Successfully built kfp
Installing collected packages: kfp
  Attempting uninstall: kfp
    Found existing installation: kfp 1.8.13
    Uninstalling kfp-1.8.13:
      Successfully uninstalled kfp-1.8.13
Successfully installed kfp-1.8.14


In [None]:
import sys
!{sys.executable} -m pip freeze | grep kfp

In [96]:
import json
from kfp.v2 import dsl
from kfp import components

from custom_components import (
    automl_eval, 
    get_bq_job_output_table,
    sklearn_trainer,
    sklearn_validator,
    generate_model_card,
    export_from_bq_to_gcs,
    tfdv
)

SQL_QUERY = open("sql/get_input_data.sql", "r").read()

#tfdv_generate_statistics = components.load_component_from_url("https://github.com/GoogleCloudPlatform/vertex-pipelines-end-to-end-samples/blob/main/pipelines/kfp_components/tfdv/generate_statistics.py")

@dsl.pipeline(
    name='churn',
    description='A pipeline to detect churn from order_items transactions',
    pipeline_root=PIPELINE_ROOT
)
def pipeline():
    
    #model_card = generate_model_card.generate_model_card(
    #    gcp_project_id = PROJECT_ID
    #)
    
    #dsl.ParallelFor(["toto", "tata"]) as item:
    #    item.set_display_name("toto")
    #    model_card_loop = generate_model_card.generate_model_card(gcp_project_id = PROJECT_ID)
    
    
    
    
    from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp
    from google_cloud_pipeline_components.aiplatform import (
        AutoMLTabularTrainingJobRunOp, 
        EndpointCreateOp, 
        ModelDeployOp,
        TabularDatasetCreateOp
    )
    
    bq_read = BigqueryQueryJobOp(
        project = PROJECT_ID,
        query = SQL_QUERY,
        location = "US",
        job_configuration_query = json.dumps(
            {
               "destinationTable": {
                   "projectId": PROJECT_ID,
                   "datasetId": "churn_featuresets",
                   "tableId": "churn_featureset_" + TIMESTAMP
               }
            }
        )
    ).set_display_name("Generate Training Data")
    
    gcs_export = export_from_bq_to_gcs.export_from_bq_to_gcs(
        gcp_project_id = PROJECT_ID,
        bq_job_output = bq_read.outputs['destination_table']
    ).set_display_name("Export to GCS")
    
    gen_statistics = tfdv.generate_statistics(
        project_id = PROJECT_ID,
        dataset = gcs_export.outputs["gcs_dataset"],
        file_pattern = "*.csv",
        use_dataflow = False,
        tfdv_stats_options = {
            "label_feature" : "is_churner"
        }

    ).set_display_name("Generate data statistics")
    
    #featureset = get_bq_job_output_table.get_bq_job_output_table(
    #    bq_job_output = bq_read.outputs['destination_table']
    #)
    #tabular_dataset = TabularDatasetCreateOp(
    #    display_name = "churn_" + TIMESTAMP,
    #    bq_source = featureset.output,
    #    project = PROJECT_ID
    #)
    #
    #gen_statistics = generate_statistics(
    #    dataset=tabular_dataset.outputs["dataset"]
    #).set_display_name("Generate data statistics")
    
    #my_dict = my_list = ['foo', 'bar']
    #with dsl.ParallelFor(my_dict) as item:
    
    sklearn_model = sklearn_trainer.sklearn_trainer(
        input_table = bq_read.outputs['destination_table'],
        gcp_project_id = PROJECT_ID
    ).set_display_name("Model Training")

    sklearn_eval = sklearn_validator.sklearn_validator(        
        input_model = sklearn_model.outputs["model"],
        input_table = bq_read.outputs['destination_table'],
        gcp_project_id = PROJECT_ID,
        thresholds_dict_str = '{"roc":0.8}'
    ).set_display_name("Model Evaluation")
    
    model_card = generate_model_card.generate_model_card(
        gcp_project_id = PROJECT_ID ,
        input_table = bq_read.outputs['destination_table'],
        metrics = sklearn_eval.outputs['metrics']
    ).set_display_name("Generate Model Card")
    
    #featureset = get_bq_job_output_table.get_bq_job_output_table(
    #    bq_job_output = bq_read.outputs['destination_table']
    #)
    #tabular_dataset = TabularDatasetCreateOp(
    #    display_name = "churn_" + TIMESTAMP,
    #    bq_source = featureset.output,
    #    project = PROJECT_ID
    #)
    #automl_training = AutoMLTabularTrainingJobRunOp(
    #    project = PROJECT_ID,
    #    display_name = "AutoML Training",
    #    model_display_name = "automl_" + TIMESTAMP,
    #    
    #    optimization_prediction_type="classification",
    #    optimization_objective="minimize-log-loss",
    #    
    #    budget_milli_node_hours=1000,
    #    disable_early_stopping = False,
    #    
    #    dataset = tabular_dataset.outputs["dataset"],
    #    #column_specs={
    #    #    "lifetime_orders": "numeric",
    #    #    "nb_orders_last_12_days": "numeric",
    #    #    "nb_orders_last_15_days": "numeric",
    #    #    "nb_orders_last_7_days": "numeric"
    #    #},
    #    target_column = "is_churner",
    #    
    #    predefined_split_column_name = "split"
    #)
    #model_eval_task = automl_eval.automl_classification_model_eval_metrics(
    #    project = PROJECT_ID,
    #    location = REGION,
    #    thresholds_dict_str = '{"auRoc": 0.95}',
    #    model = automl_training.outputs["model"]
    #)
        

In [97]:
from kfp.v2 import compiler  # noqa: F811

compiler.Compiler().compile(pipeline_func=pipeline, package_path="churn_pipeline.json")

# Training Pipeline

In [98]:
import google.cloud.aiplatform as aip

DISPLAY_NAME = "churn_" + TIMESTAMP

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="churn_pipeline.json",
    pipeline_root=PIPELINE_ROOT
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/1001913874856/locations/us-central1/pipelineJobs/churn-20221120165658
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/1001913874856/locations/us-central1/pipelineJobs/churn-20221120165658')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/churn-20221120165658?project=1001913874856
PipelineJob projects/1001913874856/locations/us-central1/pipelineJobs/churn-20221120165658 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/1001913874856/locations/us-central1/pipelineJobs/churn-20221120165658 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/1001913874856/locations/us-central1/pipelineJobs/churn-20221120165658 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/1001913874856/locations/us-central1/pipelineJobs/churn-20221120165658 current state:
PipelineState.PIPEL

In [37]:
from google.cloud import bigquery
client = bigquery.Client()

sql = """
    SELECT *
    FROM `looker-sandbox-323013.churn_featuresets.churn_featureset_20220929092510`
    where split != 'VALIDATE'
"""

df = client.query(sql).to_dataframe()

df.head(5)

Unnamed: 0,user_id,bucket,lifetime_orders,nb_orders_last_7_days,nb_orders_last_15_days,nb_orders_last_12_days,is_churner,split
0,72978,2,3,3,3,2,False,TRAIN
1,72587,4,3,3,3,0,False,TRAIN
2,72591,4,3,3,3,0,False,TRAIN
3,72951,5,3,3,3,1,False,TRAIN
4,72791,6,3,3,3,0,False,TRAIN


In [50]:
# Identifying test indices
print(df[df['split']=='TEST'].index)
test_rows = df.iloc[df[df['split']=='TEST'].index]
print(test_rows['is_churner'])

Int64Index([1962, 1963, 1964, 1965, 1966, 1967, 1968, 1969, 1970, 1971,
            ...
            4196, 4197, 4198, 4199, 4200, 4201, 4202, 4203, 4204, 4205],
           dtype='int64', length=504)
1962    False
1963    False
1964    False
1965    False
1966    False
        ...  
4201    False
4202    False
4203    False
4204    False
4205    False
Name: is_churner, Length: 504, dtype: bool


In [63]:
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import chi2
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import RandomizedSearchCV
from sklearn.model_selection import PredefinedSplit
from sklearn.pipeline import Pipeline
import joblib
import pandas
import numpy as np

class columnDropperTransformer():
    def __init__(self,columns):
        self.columns=columns

    def transform(self,X,y=None):
        return X.drop(self.columns,axis=1)

    def fit(self, X, y=None):
        return self 

#train_rows = df.iloc[df[df['split']=='TRAIN'].index]
#test_rows = df.iloc[df[df['split']=='TEST'].index]

pipeline = Pipeline(
    [
        ("columnDropper", columnDropperTransformer(['user_id', 'bucket', 'split'])),
        ('scaler', StandardScaler()),
        #('feature_selection', SelectKBest(chi2, k=2)),
        (
            'classification', 
            RandomizedSearchCV(
                RandomForestClassifier(),
                param_distributions = {
                    "n_estimators": np.arange(1, 2, 3),
                    "max_depth": [3, 5]
                },
                n_iter = 2,
                refit = True,
                cv = PredefinedSplit(test_fold = df[df['split']=='TEST'].index) # Predefined split (no cross validation)
            )
        )
    ]
)

X, Y = df.drop(['is_churner'], axis=1), df['is_churner']

pipeline.fit(X, Y)

# Export the classifier to a file
joblib.dump(pipeline, 'model.joblib')

['model.joblib']

In [64]:
from google.cloud import bigquery
client = bigquery.Client()

sql = """
    SELECT *
    FROM `looker-sandbox-323013.churn_featuresets.churn_featureset_20220929092510`
    where split = 'VALIDATE'
"""

validation_df = client.query(sql).to_dataframe()

validation_df.head(5)

Unnamed: 0,user_id,bucket,lifetime_orders,nb_orders_last_7_days,nb_orders_last_15_days,nb_orders_last_12_days,is_churner,split
0,50333,7,1,0,1,0,False,VALIDATE
1,64795,7,1,0,1,0,False,VALIDATE
2,31728,7,1,0,1,0,False,VALIDATE
3,53224,7,1,0,1,0,False,VALIDATE
4,49469,7,1,0,1,0,False,VALIDATE


In [65]:
pipe = joblib.load('model.joblib')

In [110]:
X_val, Y_val = validation_df.drop(['is_churner'], axis=1), validation_df['is_churner']

pipe.predict_proba(X_val)

array([[0.92590751, 0.07409249],
       [0.92590751, 0.07409249],
       [0.92590751, 0.07409249],
       ...,
       [0.98461538, 0.01538462],
       [0.98461538, 0.01538462],
       [0.98461538, 0.01538462]])

In [10]:
from custom_components import sklearn_trainer