In [138]:
# PARAMETERIZED VARIABLES

PROJECT_ID = "{{PROJECT_ID}}"
TARGET_BUCKET = "{{TARGET_BUCKET}}"
ORGANIZATION_ID = "{{ORGANIZATION_ID}}"
BUCKET_PIPELINE = "{{BUCKET_PIPELINE}}"
DATAFOUNDATION_TABLE = "{{DATAFOUNDATION_TABLE}}"
MODEL_URI = "{{MODEL_URI}}"


SCALER_URI = "{{SCALER_URI}}"
AUDIENCE_ID = "{{AUDIENCE_ID}}"
TOPIC_ID = "{{TOPIC_ID}}"

In [139]:
# # PARAMETERIZED VARIABLES

# PROJECT_ID = "sbox-ext-collab-prd-50f1"
# TARGET_BUCKET = "gs://model-results-lookalike-model-sandbox"
# ORGANIZATION_ID = "CUSTOMER_A"
# BUCKET_PIPELINE = "gs://bucket-collab-prd/Arifian/vertex_pipeline_files"
# DATAFOUNDATION_TABLE = "owned_summary.df_customer_data_profile"
# MODEL_URI = "gs://bucket-collab-prd/Arifian/vertex_pipeline_files/pickle_files/knn.pkl"


# SCALER_URI = "gs://bucket-collab-prd/Arifian/vertex_pipeline_files/pickle_files/scaler.pkl"
# AUDIENCE_ID = 1
# TOPIC_ID = "tempbucket-to-audience-table"

In [140]:
pipeline_name = 'custom_lookalike_'+ORGANIZATION_ID

In [141]:
from datetime import datetime

TARGET_GCS_LOCATION = TARGET_BUCKET+"/"+ORGANIZATION_ID
REGION = "asia-southeast2"
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

PIPELINE_ROOT = f"{BUCKET_PIPELINE}/pipeline_root/custom_lookalike_{ORGANIZATION_ID}"

In [142]:
import google.cloud.aiplatform as aip

from typing import NamedTuple
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, component)

In [143]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_PIPELINE, location=REGION)

In [144]:
import tempfile
tmpdir = tempfile.gettempdir()

In [145]:
# query from BQ

@component(
    base_image='python:3.9',
    packages_to_install=[
        "pandas==1.5.2", "google-cloud-bigquery==2.34.4", "pyarrow==10.0.1"
    ],
    output_component_file=tmpdir+"/query_from_bq.yaml"
)
def query_from_bq(
    data: Output[Dataset], table_full_name: str,
    project_id: str
):
    from google.cloud import bigquery
    client = bigquery.Client(project_id)
#     sql = f"""SELECT 
#     *
# FROM `{table_full_name}` 
# WHERE (partition_month = '2022-10-01')"""
    sql = f"""SELECT 
    msisdn,
    total_arpu, data_usage_in_mb, 
    data_usage_duration, 
    total_topups, 
    ARPU_1m, 
    number_of_topups_1m, 
    total_topups_1m, 
    total_usage_GB_1m, 
    daily_GB_consumption_rate_1m,
    number_of_topups_2m, 
    total_topups_2m, 
    total_usage_GB_2m
FROM `{table_full_name}` 
WHERE (partition_month = '2022-10-01')
"""
    df = client.query(sql).to_dataframe()
    print('data stored to df')

    df.to_parquet(data.path, index=False)
    print('data stored to parquet')

In [146]:
# preprocessing

@component(
    base_image='python:3.9',
    packages_to_install=[
        "pandas==1.5.2", "pyarrow==10.0.1", "scikit-learn==1.1.3", "joblib==1.2.0", "gcsfs==2022.11.0"
    ],
    output_component_file=tmpdir+"/preprocess.yaml"
)
def preprocess(
    data: Input[Dataset], preprocessed_data: Output[Dataset],
    scaler_uri: str = None
):
    import pandas as pd
    import gcsfs, joblib

    df_data = pd.read_parquet(data.path)
    df_preprocessed = df_data[[
        'msisdn',
        'total_arpu', 'data_usage_in_mb', 
        'data_usage_duration', 
        'total_topups', 
        'ARPU_1m', 
        'number_of_topups_1m', 
        'total_topups_1m', 
        'total_usage_GB_1m', 
        'daily_GB_consumption_rate_1m',
        'number_of_topups_2m', 
        'total_topups_2m', 
        'total_usage_GB_2m'
    ]]
    df_preprocessed.fillna(0, inplace=True)

    if scaler_uri:
        fs = gcsfs.GCSFileSystem()
        with fs.open(scaler_uri, "rb") as f:
            scaler = joblib.load(f)  
    
        X_test_scaled = scaler.transform(df_preprocessed[[x for x in df_preprocessed.columns if x not in ['msisdn']]])
        df_preprocessed = pd.DataFrame(X_test_scaled, columns = df_preprocessed[[x for x in df_preprocessed.columns if x not in ['msisdn']]].columns)

    df_preprocessed.to_parquet(preprocessed_data.path)

In [147]:
# model inference

@component(
    base_image='python:3.9',
    packages_to_install=[
        "pandas==1.5.2", "pyarrow==10.0.1", "scikit-learn==1.1.3", "joblib==1.2.0", "gcsfs==2022.11.0"
    ],
    output_component_file=tmpdir+"/predict.yaml"
)
def predict(
    model_uri: str, preprocessed_data: Input[Dataset], 
    original_data: Input[Dataset],
    prediction_result: Output[Dataset]
):
    import pandas as pd
    import gcsfs, joblib

    df_preprocessed = pd.read_parquet(preprocessed_data.path)
    df_original = pd.read_parquet(original_data.path)

    fs = gcsfs.GCSFileSystem()
    with fs.open(model_uri, "rb") as f:
        model = joblib.load(f)

    score = model.predict(df_preprocessed)
    df_original['score'] = score

    df_result = df_original[df_original['score']==1][['msisdn']]

    df_result.to_parquet(prediction_result.path)

In [148]:
# write to gcs

@component(
    base_image='python:3.9',
    packages_to_install=[
        "pandas==1.5.2", "gcsfs==2022.11.0", "pyarrow==10.0.1", "datetime"
    ],
    output_component_file=tmpdir+"/write_to_gcs.yaml"
)
def write_to_gcs(
    data: Input[Dataset], project_id: str, 
    audience_id: int, bucket: str
) -> NamedTuple('Outputs', [('data_gcs_path', str)]):
    import pandas as pd
    from datetime import datetime
    TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

    df_to_store = pd.read_parquet(data.path)
    df_to_store['audience_id'] = audience_id
    df_to_store = df_to_store[['audience_id', 'msisdn']]

    path = bucket+f"/custom_lookalike_{TIMESTAMP}.csv"

    df_to_store.to_csv(path, index=False, header=False)
    from typing import NamedTuple
    
    outputs = NamedTuple('Outputs', [('data_gcs_path', str)])
    return outputs(path)

In [149]:
# publish to pubsub

@component(
    base_image='python:3.9',
    packages_to_install=[
        "google-cloud-pubsub==2.13.4", "datetime"
    ],
    output_component_file=tmpdir+"/aud_data_to_pubsub.yaml"
)

def aud_data_to_pubsub(
    data_gcs_path: str, topic_id: str,
    project_id: str, audience_id: int
):
    import json
    from google.cloud import pubsub_v1
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_id)

    message = {
        'audience_id': audience_id,
        'file_path': data_gcs_path,
    }

    future = publisher.publish(topic_path, json.dumps(message).encode("utf-8"))
    print(future.result())
    print(f"Published messages to {topic_path}.")


In [150]:
from google_cloud_pipeline_components.v1.custom_job import create_custom_training_job_from_component

query_from_bq_v2 = create_custom_training_job_from_component(
    query_from_bq,
    display_name = 'query_from_bq',
    machine_type = 'n1-highmem-16',
)

preprocess_v2 = create_custom_training_job_from_component(
    preprocess,
    display_name = 'preprocess',
    machine_type = 'n1-highmem-16',
)

predict_v2 = create_custom_training_job_from_component(
    predict,
    display_name = 'predict',
    machine_type = 'n1-highmem-16',
)

In [151]:
@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    name=pipeline_name.replace("_","-").lower(),
)
def pipeline(
    model_uri: str,
    target_bucket: str,
    datafoundation_table_fullname: str,
    project_id: str,
    topic_id: str,
    audience_id: int,
    data_gcs_path: str,
    scaler_uri: str = None
):
    query_audience_data = query_from_bq_v2(
        table_full_name=datafoundation_table_fullname,
        project_id=project_id,
        project=project_id,
        location=REGION
    )
    preprocess_audience_data = preprocess_v2(
        data=query_audience_data.outputs["data"],
        scaler_uri=scaler_uri,
        project=project_id,
        location=REGION
    )
    custom_lookalike_predict = predict_v2(
        model_uri=model_uri,
        preprocessed_data=preprocess_audience_data.outputs["preprocessed_data"],
        original_data=query_audience_data.outputs["data"],
        project=project_id,
        location=REGION
    )    
    write_result_to_gcs = write_to_gcs(
        data=custom_lookalike_predict.outputs["prediction_result"],
        bucket=target_bucket, project_id=project_id,
        audience_id=audience_id
    )
    trigger_load_gcs_to_cloudsql = aud_data_to_pubsub(
        data_gcs_path=write_result_to_gcs.outputs['data_gcs_path'],
        topic_id=topic_id, project_id=project_id, 
        audience_id=audience_id
    )

In [152]:
from kfp.v2 import compiler

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path=tmpdir+f"/{pipeline_name}.json"
)



In [None]:
DISPLAY_NAME = pipeline_name + "_" + TIMESTAMP

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path=tmpdir+f"/{pipeline_name}.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={
        "project_id": PROJECT_ID,
        "target_bucket": TARGET_GCS_LOCATION,
        "model_uri": MODEL_URI,
        "scaler_uri": None if SCALER_URI == 'None' else SCALER_URI,
        "datafoundation_table_fullname": DATAFOUNDATION_TABLE,
        "audience_id": AUDIENCE_ID,
        "topic_id": TOPIC_ID,
        "data_gcs_path": TARGET_GCS_LOCATION+f"/custom_lookalike_{TIMESTAMP}.csv"
    },
    enable_caching=False
)
job.run(sync=False)

Creating PipelineJob
PipelineJob created. Resource name: projects/731696491468/locations/asia-southeast2/pipelineJobs/custom-lookalike-customer-a-20221220205916
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/731696491468/locations/asia-southeast2/pipelineJobs/custom-lookalike-customer-a-20221220205916')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/asia-southeast2/pipelines/runs/custom-lookalike-customer-a-20221220205916?project=731696491468
PipelineJob projects/731696491468/locations/asia-southeast2/pipelineJobs/custom-lookalike-customer-a-20221220205916 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/731696491468/locations/asia-southeast2/pipelineJobs/custom-lookalike-customer-a-20221220205916 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/731696491468/locations/asia-southeast2/pipelineJobs/custom-lookalike-customer-a-20221220205916 current state:
PipelineStat