In [None]:
# Specify the Google Cloud region where the services are located
REGION = "asia-southeast2"

# Define the service account email used for Google Cloud services authentication
SERVICE_ACCOUNT = '118208205241-compute@developer.gserviceaccount.com'

# Provide the service account's unique identifier number
SERVICE_ACCOUNT_NUMBER = '118208205241'

# Execute the command to get the current Google Cloud project ID
project = !gcloud config get-value project
PROJECT_ID = project[0]

# Print the retrieved project ID
print("Current Google Cloud Project ID:", PROJECT_ID)

In [None]:
# Define the client name associated with the operations
CLIENT_NAME = "AKULAKU"

# Specify the URI of the Google Cloud Storage bucket
BUCKET_URI = 'gs://dev-bucket-ds-dashboard'

# Extract and define the bucket name from the bucket URI
BUCKET_NAME = BUCKET_URI

# Define the root directory for pipelines within the bucket, specific to the client
PIPELINE_ROOT = f"{BUCKET_URI}/data_science/{CLIENT_NAME}"

# Define the full Google Cloud Storage bucket path for storing temporary data
GCS_BUCKET_NAME = 'gs://dev-bucket-ds-dashboard/data_science/temp_data_vertex/'

# Define the full path to the dataset within the GCS bucket
GCS_PATH = 'gs://dev-bucket-ds-dashboard/data_science/temp_data_vertex/dataset.csv'

# Specify the format of the training dataset
TRAINING_DATASET_FORMAT = "csv"

In [None]:
# Import standard libraries
import pandas as pd
from typing import NamedTuple

# Import Google Cloud client libraries
from google.cloud import bigquery, storage
import google.cloud.aiplatform as aiplatform

# Import Kubeflow Pipeline components
import kfp
from kfp import compiler, dsl
from kfp.dsl import Artifact, Dataset, Input, Metrics, Model, Output, component, OutputPath, InputPath

# Import utilities for pipeline components
from google_cloud_pipeline_components.v1.custom_job import utils


In [None]:
# Define the model ID to be used
model_id = '334339495774126080'

# Create an AI Platform Model instance using the fully qualified model path
MODEL = aiplatform.Model(f"projects/{SERVICE_ACCOUNT_NUMBER}/locations/{REGION}/models/{model_id}")

# Extract the full resource name of the model for easy reference
MODEL_RESOURCE_NAME = MODEL.resource_name

# Print the model's resource name for verification
print("Model Resource Name:", MODEL_RESOURCE_NAME)

In [None]:
# Define the base path for all project and location based operations
PARENT = f"projects/{PROJECT_ID}/locations/{REGION}"

# Configure the API endpoint specific to the region for AI Platform
API_ENDPOINT = f"{REGION}-aiplatform.googleapis.com"

# Set client options with the specific API endpoint
client_options = {"api_endpoint": API_ENDPOINT}

# Initialize the AI Platform Job Service Client with the specified endpoint
CLIENT = aiplatform.gapic.JobServiceClient(client_options=client_options)

# Print the API endpoint used for the client to verify the configuration
print("AI Platform API Endpoint:", API_ENDPOINT)

In [None]:
@component(
    packages_to_install=[
        "google-cloud-bigquery[pandas]==3.10.0",
        "google-cloud-storage",
        "pandas",
        "joblib",
        "scikit-learn",
    ],
    base_image = 'python:3.9'
)
def get_and_preprocess_dataset(
    project_id : str,
    dataset: Output[Dataset],
    client_name : str
) :
    """
    Retrieves and preprocesses a dataset from BigQuery based on the project and client details,
    then applies encoding and dumps to a CSV.

    Parameters:
        project_id (str): The Google Cloud project ID.
        dataset (Output[Dataset]): The dataset to be processed and stored.
        client_name (str): The client name to filter the data.

    """

    import pandas as pd
    from sklearn.preprocessing import OrdinalEncoder
    from google.cloud import bigquery

    def ensure_min_dpd(df):
        df.loc[df['dpd_number'] < 90, 'dpd_number'] = 90
        return df

    def remove_zero_principal(df):
        return df[df['outstanding_principal_amount'] != 0]

    def create_employment_dummies(df):
        employment_types = ['EMPLOYEE', 'ENTREPRENEUR', 'STUDENT', 'OTHERS', 'FREELANCE',
                            'GOVERNMENT EMPLOYEE', 'POLICE/MILITARY', 'UNEMPLOYED', 'UNKNOWN']
        df_employment = pd.get_dummies(df['employment_type_name'])
        df_employment = df_employment.reindex(columns=employment_types, fill_value=0)
        df_employment.columns = [f'employment_type_{col}' for col in df_employment.columns]
        return df_employment

    def create_gender_dummies(df):
        gender = ['M', 'F']
        df_gender = pd.get_dummies(df['gender_name'])
        df_gender = df_gender.reindex(columns=gender, fill_value=0)
        df_gender.columns = [f'gender_name_{col}' for col in df_gender.columns]
        return df_gender

    def concatenate_dataframes(df, df_employment, df_gender):
        return pd.concat([df, df_employment, df_gender], axis=1)

    def drop_unnecessary_columns(df):
        drop_columns = ['external_loan_number', 'ktp_province_name', 'total_payment', 'age',
                        'campaign_valid_from', 'campaign_valid_to', 'campaign_name', 'client_name',
                        'employment_type_name', 'gender_name', 'Ever_Pay']
        return df.drop(columns=drop_columns)

    def apply_ordinal_encoding(df, cols):
        encoder = OrdinalEncoder()
        df[cols] = encoder.fit_transform(df[cols])
        return df


    # Initialize BigQuery client
    client = bigquery.Client(project=project_id)

    # Construct and execute a query to retrieve the dataset
    query = """
    SELECT *
    FROM `prj-pav-id-vertexai-dev.data_science.ds_pre_call_predict`
    WHERE client_name = "{0}"
    """.format(client_name)

    job_config = bigquery.QueryJobConfig()
    query_job = client.query(query=query, job_config=job_config)
    df = query_job.result().to_dataframe()

    # Apply functions
    df = ensure_min_dpd(df)
    df = remove_zero_principal(df)
    df_employment = create_employment_dummies(df)
    df_gender = create_gender_dummies(df)
    df = concatenate_dataframes(df, df_employment, df_gender)
    df = drop_unnecessary_columns(df)
    df = apply_ordinal_encoding(df, ['dpd_number', 'outstanding_principal_amount'])

    # Save the processed dataframe to a CSV file
    df.to_csv(dataset.path, index=False)

In [None]:
BATCH_NAME = "pre_call_predict"

TRAINING_DATASET = 'bq://prj-pav-id-vertexai-dev.data_science.ds_pre_call'

BQ_INPUT_URI = 'bq://prj-pav-id-vertexai-dev.data_science.temp_pre_call_batch_prediction'
BQ_OUTPUT_URI = 'bq://prj-pav-id-vertexai-dev.data_science'

GS_OUTPUT_URI = 'gs://dev-bucket-ds-dashboard/data_science/batch_pred/'

PRED_FORMAT = 'jsonl'
INS_FORMAT = 'jsonl'
TRAIN_DATA_FORMAT = 'bigquery'

MACHINE_TYPE = "n1-standard-4"
EMAIL_ADDRESS = "mdzeaulfath@id.pepper-advantage.com"

In [None]:
# Decorator to specify packages and Python version for the environment
@component(
    packages_to_install=["google-cloud-aiplatform==1.25.0"],
    base_image = 'python:3.9'
)
def create_kfp_batch_prediction_job(
    input_uri: Input[Dataset],
    output_uri: str,
    instances_format: str,
    predictions_format: str,
    job_name_prefix: str,
    model_name: str,
    deploy_compute: str,
    email_address: str,
    project_id: str,
    # region: str,
    training_dataset_format: str,
    training_dataset: str,
    location: str = "asia-southeast2",
    api_endpoint: str = "asia-southeast2-aiplatform.googleapis.com",
):
    """
    Creates a batch prediction job for a machine learning model hosted on Google Cloud,
    configures its monitoring based on the project details provided.

    Parameters:
        input_uri (Input[Dataset]): The location of input data for prediction.
        output_uri (str): The Cloud Storage bucket for saving predictions.
        instances_format (str): Format of the input instances.
        predictions_format (str): Format of the output predictions.
        job_name_prefix (str): Prefix for the job name.
        model_name (str): Name of the model to be used.
        deploy_compute (str): Machine type for the prediction.
        email_address (str): Email address for alert configurations.
        project_id (str): Google Cloud project ID.
        training_dataset_format (str): Format of the training dataset.
        training_dataset (str): Location of the training dataset.
        location (str): Location of the AI platform services.
        api_endpoint (str): API endpoint for the job service.

    """

    # Import necessary libraries from google.cloud.aiplatform_v1beta1
    from google.cloud.aiplatform_v1beta1.types import (
        BatchDedicatedResources, BatchPredictionJob, GcsDestination, GcsSource,
        MachineSpec, ModelMonitoringAlertConfig,
        ModelMonitoringObjectiveConfig, ThresholdConfig, BigQuerySource, GcsDestination,
        ModelMonitoringConfig, CreateBatchPredictionJobRequest, BigQueryDestination
    )
    from google.cloud.aiplatform_v1beta1.services.job_service import JobServiceClient

    # Set client options and initialize the JobServiceClient
    client_options = {"api_endpoint": api_endpoint}
    client = JobServiceClient(client_options=client_options)

    # Generate batch prediction job name
    BATCH_PREDICTION_JOB_NAME = f"{job_name_prefix}_kfp"

    # Define the BatchPredictionJob
    batch_prediction_job = BatchPredictionJob(
        display_name=BATCH_PREDICTION_JOB_NAME,
        model=model_name,
        input_config=BatchPredictionJob.InputConfig(
            instances_format=instances_format,
            gcs_source=GcsSource(uris=[input_uri.uri]),
        ),
        output_config=BatchPredictionJob.OutputConfig(
            predictions_format=predictions_format,
            gcs_destination=GcsDestination(output_uri_prefix=output_uri),
            # bigquery_destination = BigQueryDestination(output_uri=output_uri)
        ),
        dedicated_resources=BatchDedicatedResources(
            machine_spec=MachineSpec(machine_type=deploy_compute),
            starting_replica_count=1,
            max_replica_count=1,
        ),
       model_monitoring_config=ModelMonitoringConfig(
            alert_config=ModelMonitoringAlertConfig(
            email_alert_config=ModelMonitoringAlertConfig.EmailAlertConfig(
                user_emails=[email_address])
            ),
            objective_configs=[

                ModelMonitoringObjectiveConfig(
                    training_dataset=ModelMonitoringObjectiveConfig.TrainingDataset(
                        data_format=training_dataset_format,
                        bigquery_source=BigQuerySource(input_uri=training_dataset),
                    ),
                    prediction_drift_detection_config=ModelMonitoringObjectiveConfig.PredictionDriftDetectionConfig(
                        default_drift_threshold= {
                            "value": 0.3
                          }
                    ),
                )
            ],
        ),
    )

    # Construct the request and create the batch prediction job
    parent = f"projects/{project_id}/locations/{location}"
    request = CreateBatchPredictionJobRequest(
        parent=parent, batch_prediction_job=batch_prediction_job
    )

    # Make the request
    response = client.create_batch_prediction_job(request=request)
    print("response:", response)

In [None]:
# Decorator to specify packages and Python version for the environment
@component(
    packages_to_install=[
        "google-cloud-bigquery[pandas]==3.10.0",
        "google-cloud-storage",
        "pandas",
    ],
    base_image = 'python:3.9'
)
def get_agent_data(
    project_id : str,
    dataset: Output[Dataset],
) :
    """
    Fetches data about agents from a BigQuery dataset and writes it to a CSV file.

    Parameters:
        project_id (str): The Google Cloud project ID where the dataset is hosted.
        dataset (Output[Dataset]): The output location where the CSV file will be saved.
    """

    # Import necessary libraries
    import pandas as pd
    from google.cloud import bigquery

    # Initialize BigQuery client
    client = bigquery.Client(project=project_id)

    # SQL query to select distinct agent names from the specified table
    query = """
        SELECT DISTINCT assigned_agent
    FROM `prj-pav-id-vertexai-dev.data_science.after_call_agent_handle_pay_probability`
        """
    # Configure the query job
    job_config = bigquery.QueryJobConfig()

    # Execute the query
    query_job = client.query(query=query, job_config=job_config)

    # Convert the result to a DataFrame
    df_agent = query_job.result().to_dataframe()

    # Write the DataFrame to a CSV file without the index
    df_agent.to_csv(dataset.path, index=False)

In [None]:
@component(
    packages_to_install=[
        "pandas",
        "joblib",
        "scikit-learn",
        "google-cloud-bigquery[pandas]==3.10.0",
        "google-cloud-storage",
        "google-cloud-aiplatform==1.25.0",
        "xgboost",
        "gcsfs",
        "pandas-gbq"
    ],
    base_image = 'python:3.9'
)
def write_predicted_data(
    project_id : str,
    data_input : Input[Dataset],
    data_output : Output[Dataset],
    region: str,
    client_name : str,
):

    """
    Fetches prediction data from BigQuery, preprocesses it, applies a predictive model,
    and then exports the results to a CSV file.

    Parameters:
        project_id (str): Google Cloud project ID.
        data_input (Input[Dataset]): Input dataset for model predictions.
        data_output (Output[Dataset]): Output path for the predicted data.
        region (str): Google Cloud region of the project.
        client_name (str): Client's name to filter and model data.
    """

    # Import necessary libraries
    import os
    import pandas as pd
    import joblib
    import numpy as np
    import tempfile
    from google.cloud import bigquery, storage, aiplatform
    from sklearn.preprocessing import OrdinalEncoder
    from xgboost import XGBClassifier

    def ensure_min_dpd(df):
        df.loc[df['dpd_number'] < 90, 'dpd_number'] = 90
        return df

    def remove_zero_principal(df):
        return df[df['outstanding_principal_amount'] != 0]

    def create_employment_dummies(df):
        employment_types = ['EMPLOYEE', 'ENTREPRENEUR', 'STUDENT', 'OTHERS', 'FREELANCE',
                            'GOVERNMENT EMPLOYEE', 'POLICE/MILITARY', 'UNEMPLOYED', 'UNKNOWN']
        df_employment = pd.get_dummies(df['employment_type_name'])
        df_employment = df_employment.reindex(columns=employment_types, fill_value=0)
        df_employment.columns = [f'employment_type_{col}' for col in df_employment.columns]
        return df_employment

    def create_gender_dummies(df):
        gender = ['M', 'F']
        df_gender = pd.get_dummies(df['gender_name'])
        df_gender = df_gender.reindex(columns=gender, fill_value=0)
        df_gender.columns = [f'gender_name_{col}' for col in df_gender.columns]
        return df_gender

    def concatenate_dataframes(df, df_employment, df_gender):
        return pd.concat([df, df_employment, df_gender], axis=1)

    def drop_unnecessary_columns(df):
        drop_columns = ['external_loan_number', 'ktp_province_name', 'total_payment', 'age',
                        'campaign_valid_from', 'campaign_valid_to', 'campaign_name', 'client_name',
                        'employment_type_name', 'gender_name', 'Ever_Pay']
        return df.drop(columns=drop_columns)

    def apply_ordinal_encoding(df, cols):
        encoder = OrdinalEncoder()
        df[cols] = encoder.fit_transform(df[cols])
        return df

    def save_to_csv(df, path):
        df.to_csv(path, index=False)

    def load_model_from_gcs(model_name, project_id, region):
        listed_models = aiplatform.Model.list(filter= f'display_name="{model_name}"', project=project_id,location= region)
        modelpath_auto = listed_models[0].uri + '/model.joblib'
        client = storage.Client()
        bucket_name, blob_name = modelpath_auto.replace("gs://", "").split("/", 1)
        bucket = client.get_bucket(bucket_name)
        blob = bucket.blob(blob_name)

        with tempfile.NamedTemporaryFile() as temp_file:
            blob.download_to_filename(temp_file.name)
            loaded_model = joblib.load(temp_file.name)
        return loaded_model

    def make_predictions(df, model):
        predictions = model.predict(df)
        probabilities = model.predict_proba(df)
        df['prediction'] = predictions
        df['probability_not_pay'] = probabilities[:, 0]
        df['probability_pay'] = probabilities[:, 1]
        df['prediction'] = df['prediction'].map({1: 'pay', 0: 'not_pay'})
        return df

    def add_back_columns(df_original, df_modified, columns_to_add):
        df_modified[columns_to_add] = df_original[columns_to_add]
        return df_modified

    def assign_agents(df, df_agent):
        df['group_id'] = df.groupby('external_loan_number').ngroup()
        num_agents = len(df_agent)
        df['assigned_agent'] = df['group_id'].apply(lambda x: df_agent['assigned_agent'][x % num_agents])
        df.drop(columns='group_id', axis=1, inplace=True)
        return df


    # Initialize BigQuery client
    client = bigquery.Client(project=project_id)

    # Construct and execute a query to retrieve the dataset
    query = """
    SELECT *
    FROM `prj-pav-id-vertexai-dev.data_science.ds_pre_call_predict`
    WHERE client_name = "{0}"
    """.format(client_name)

    job_config = bigquery.QueryJobConfig()
    query_job = client.query(query=query, job_config=job_config)
    df_original = query_job.result().to_dataframe()

    query_agent = """
    SELECT DISTINCT assigned_agent
    FROM `prj-pav-id-vertexai-dev.data_science.after_call_agent_handle_pay_probability`
    """
    job_config = bigquery.QueryJobConfig()
    query_job = client.query(query=query_agent, job_config=job_config)
    df_agent = query_job.result().to_dataframe()

    # Apply functions
    df = ensure_min_dpd(df_original)
    df = remove_zero_principal(df)
    df_employment = create_employment_dummies(df)
    df_gender = create_gender_dummies(df)
    df = concatenate_dataframes(df, df_employment, df_gender)
    df = drop_unnecessary_columns(df)
    df = apply_ordinal_encoding(df, ['dpd_number', 'outstanding_principal_amount'])

    model = load_model_from_gcs(f'{client_name}_precall_model', project_id, region)
    df_pred = make_predictions(df, model)

    df_pred_with_eln = add_back_columns(df_original=df_original, df_modified=df_pred, columns_to_add=['external_loan_number'])
    df_result = assign_agents(df_pred_with_eln, df_agent)

    # Output the data to a CSV file
    df_result.to_csv(data_output.path, index=False)

In [None]:
TABLE_ID= "prj-pav-id-vertexai-dev.data_science.pre_call_result"

In [None]:
@component(
    packages_to_install=[
        "pandas",
        "pandas_gbq",
        "google-cloud-bigquery[pandas]",
        "google-cloud-bigquery==3.10.0",
        "fsspec",
        "gcsfs"
    ],
    base_image='python:3.9'
)
def write_to_bigquery(
    project_id: str,
    table_id: str,
    input_uri: Input[Dataset],
):
    """
    Reads data from a CSV file, checks for duplicate column names, and writes it to a BigQuery table.

    Parameters:
        project_id (str): Google Cloud project ID where the BigQuery table is located.
        table_id (str): BigQuery table ID to which the data will be appended.
        input_uri (Input[Dataset]): URI of the input dataset in CSV format.
    """

    # Import necessary libraries
    import pandas as pd
    import pandas_gbq as pd_gbq
    from google.cloud import bigquery

    # Function to check for duplicate column names in the DataFrame
    def check_duplicate_columns(df):
        column_series = pd.Series(df.columns)
        duplicate_names = column_series[column_series.duplicated()]
        if not duplicate_names.empty:
            return False, duplicate_names.values
        return True, None

    # Read data from CSV with specific data types for each column
    df = pd.read_csv(input_uri.uri, index_col=None, dtype={
        'external_loan_number': 'Int64',
        'dpd_number': 'Int64',
        'client_name': 'object',
        'campaign_name': 'object',
        'outstanding_principal_amount': 'float64',
        'age': 'object',
        'has_age': 'Int64',
        'gender_name': 'object',
        'has_gender': 'Int64',
        'has_email': 'Int64',
        'ktp_province_name': 'object',
        'has_ktp_province_name': 'Int64',
        'has_address': 'Int64',
        'mobile_phone_number_exist': 'Int64',
        'emergency_phone_number_exist': 'Int64',
        'good_contact': 'Int64',
        'complete_demography': 'Int64',
        'complete_information': 'Int64',
        'Ever_Pay': 'Int64',
        'total_payment': 'float64',
        'prediction': 'object',
        'probability_not_pay': 'float64',
        'probability_pay': 'float64',
        'employment_type_name': 'object',
        'has_employment_type_name': 'Int64',
        'assigned_agent': 'object'
    })

    # Check for duplicate columns
    is_valid, duplicates = check_duplicate_columns(df)
    if is_valid:
        # If no duplicates, upload data to BigQuery
        pd_gbq.to_gbq(df, table_id, project_id=project_id, if_exists='append')
        print("Data uploaded successfully to BigQuery.")
    else:
        # If duplicates exist, display an error message
        print("Duplicated columns, please fix it first:", duplicates)

In [None]:
# Define pipeline
@dsl.pipeline(
    name="precall-batchpred-pipeline",
    pipeline_root = PIPELINE_ROOT
)
def pre_call_pipeline(
    project_id: str = PROJECT_ID ,
    bucket_name: str = BUCKET_NAME,
    bucket_uri: str = BUCKET_URI,
    client_name: str = CLIENT_NAME,
    region: str = REGION
):
    """
    Defines a Kubeflow Pipeline for preprocessing data, making batch predictions,
    getting agent data, and writing predictions and results to BigQuery.

    Parameters:
        project_id (str): GCP project ID.
        bucket_name (str): Name of the GCP storage bucket.
        bucket_uri (str): URI of the GCP storage bucket.
        client_name (str): Name of the client to filter data.
        region (str): GCP region.
    """


    # Task to export agent dataset
    get_agent_data_task = get_agent_data(
        project_id=project_id,
    ).set_display_name('Export Agent Dataset')

    # Task to write predicted data, occurs after preprocessing task
    write_predicted_data_task = write_predicted_data(
        project_id=project_id,
        # model_input = getmodel_op.outputs['model_path'],
        data_input = get_agent_data_task.outputs['dataset'],
        region = REGION,
        client_name = client_name,
        # input_bucket_name_path= getmodel_op.outputs['bucket_name_path'],
        # input_blob_name_path= getmodel_op.outputs['blob_name_path'],
    ).set_display_name('Prediction Result')

    # Task to write results to BigQuery
    write_to_bigquery_task = write_to_bigquery(
        project_id= project_id,
        table_id= TABLE_ID,
        input_uri= write_predicted_data_task.outputs['data_output']
    ).set_display_name('Upload to BigQuery')

# Compile the pipeline function into a YAML definition
compiler.Compiler().compile(
    pipeline_func=pre_call_pipeline,
    package_path="pre_call_batch_pred_pipeline.yaml"
)
print("Pipeline has been compiled successfully.")

# Initialize the Google Cloud Storage client
storage_client = storage.Client()

# Access the specific bucket
bucket = storage_client.bucket("dev-bucket-ds-dashboard")

# Create a blob in the specified bucket directory
blob = bucket.blob("yaml/pre_call_batch_pred_pipeline.yaml")

# Upload the compiled YAML file to Google Cloud Storage
blob.upload_from_filename("pre_call_batch_pred_pipeline.yaml")
print("Pipeline YAML has been uploaded successfully.")

In [None]:
# Create a PipelineJob instance with the specified parameters
job = aiplatform.PipelineJob(
    display_name="pre-call-batchpred-pipeline",
    template_path="pre_call_batch_pred_pipeline.yaml",
    pipeline_root= PIPELINE_ROOT,
    location = REGION,
)

# Execute the pipeline job
job.run()
print("Pipeline job has been initiated.")