## 1. Install all the required modules/packages.

In [None]:
# GCP dependencies (GCS, GCS file system, Vertex AI SDK) and Kubeflow Pipelines SDK.
# !pip install google-cloud-storage gcsfs google-cloud-aiplatform kfp

## 2. Setup GCP-related global variables.

In [None]:
PROJECT_ID = ''
REGION = ''
BUCKET_NAME = ''
GCS_BUCKET_URI = f'gs://{BUCKET_NAME}'
PIPELINE_ROOT = f'{GCS_BUCKET_URI}/pipeline_root/'

## 3. Upload data to the Google Cloud Storage.

In [None]:
import os
from google.cloud import storage

In [None]:
# Authentication with service account key.
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r''

### GCP Python SDK.

In [None]:
def upload_to_gcs(bucket_name, detination_blob_name, source_file_path):
  # Bucket should exist. Folder/Directory structure in the destination_blob_name can be managed/auto-created.

  client = storage.Client()
  bucket = client.bucket(bucket_name)
  blob = bucket.blob(detination_blob_name)
  blob.upload_from_filename(source_file_path)

  print('Upload Successful!')

In [None]:
upload_to_gcs(BUCKET_NAME, 'data/data.csv', 'data/data.csv')

In [None]:
upload_to_gcs(BUCKET_NAME, 'component_scripts/data_preprocessing.py', 'component_scripts/data_preprocessing.py')

### Pandas & GCSFS.

In [None]:
import pandas as pd

df = pd.read_excel('data/product_details.xlsx')
df

In [None]:
# Directly from a Pandas DataFrame.
df.to_excel(f'{GCS_BUCKET_URI}/data/product_details.xlsx', index = False)

## 4. Define the Pipleine Components.

### Data preprocessing.

In [None]:
import kfp
kfp.__version__

In [None]:
# KFP SDK Domain-Specific Language.
from kfp.dsl import component, Input, Output, Dataset, Model, Metrics, ClassificationMetrics

@component(
    packages_to_install = ['google-cloud-storage', 'pandas', 'gcsfs', 'openpyxl'],
    base_image = 'python',
    output_component_file = 'data_preprocessing.yaml'
)
def data_preprocessing(project_id: str, bucket_name: str, script_path: str, data_gcs_uri: str, product_details_gcs_uri: str, processed_data: Output[Dataset], metrics: Output[Metrics]):
    from google.cloud import storage
    import importlib.util

    # Downloading the component script from GCS.
    client = storage.Client(project = project_id)
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(script_path)
    blob.download_to_filename('data_preprocessing.py')

    spec = importlib.util.spec_from_file_location('data_preprocessing', 'data_preprocessing.py')
    data_preprocessing_module = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(data_preprocessing_module)

    df = data_preprocessing_module.data_preprocessing(data_gcs_uri, product_details_gcs_uri) # DataFrame.

    # Output Artifacts (Dataset & Metrics).
    df.to_csv(processed_data.path + '.csv', index = False)
    processed_data.metadata['shape'] = f'{df.shape}'
    metrics.log_metric('num_of_samples', df.shape[0])
    metrics.log_metric('num_of_features', df.shape[1])

### Overall RFM metrics.

In [None]:
@component(
    packages_to_install = ['pandas'],
    base_image = 'python'
)
def overall_rfm_metrics(processed_data: Input[Dataset], rfm: Output[Dataset], metrics: Output[Metrics]):
    import pandas as pd

    # Input Artifact (Dataset).
    df = pd.read_csv(processed_data.path + '.csv')

    df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'])

    # RFM calculation.
    reference_date = df['InvoiceDate'].max()
    _rfm = df.groupby('CustomerID').agg(
        Recency = ('InvoiceDate', lambda s: (reference_date - s.max()).days),
        Frequency = ('InvoiceNo', 'nunique'),
        Monetary = ('TotalPrice', 'sum')
    ).copy()
    _rfm.reset_index(inplace = True)

    # Output Artifacts (Dataset & Metrics).
    _rfm.to_csv(rfm.path + '.csv', index = False)
    rfm.metadata['shape'] = f'{_rfm.shape}'
    metrics.log_metric('num_of_samples', _rfm.shape[0])
    metrics.log_metric('num_of_features', _rfm.shape[1])

### RFM scaling

In [None]:
@component(
    packages_to_install = ['pandas', 'scikit-learn'],
    base_image = 'python'
)
def rfm_scaling(rfm: Input[Dataset], rfm_scaled: Output[Dataset]):
    import pandas as pd
    from sklearn.preprocessing import StandardScaler

    # Input Artifact (Dataset).
    df = pd.read_csv(rfm.path + '.csv')

    scaler = StandardScaler()
    _rfm_scaled = scaler.fit_transform(df.drop('CustomerID', axis = 1))
    _rfm_scaled = pd.DataFrame(_rfm_scaled, columns = ['Recency', 'Frequency', 'Monetary'])
    _rfm_scaled['CustomerID'] = df['CustomerID']

    # Output Artifact (Dataset).
    _rfm_scaled.to_csv(rfm_scaled.path + '.csv', index = False)

### KMeans training and prediction.

In [None]:
@component(
    packages_to_install = ['pandas', 'scikit-learn', 'joblib'],
    base_image = 'python'
)
def kmeans_training_and_prediction(rfm_scaled: Input[Dataset], n_clusters: int, model: Output[Model], predictions: Output[Dataset], metrics: Output[Metrics]):
    import pandas as pd
    from sklearn.cluster import KMeans
    from joblib import dump

    # Input Artifact (Dataset).
    df = pd.read_csv(rfm_scaled.path + '.csv')

    _predictions = pd.DataFrame([], columns = ['CustomerID', 'Cluster'])
    _predictions['CustomerID'] = df['CustomerID']
    kmeans = KMeans(
        n_clusters = n_clusters,
        random_state = 123
    )
    _predictions['Cluster'] = kmeans.fit_predict(df.drop('CustomerID', axis = 1))

    # Output Artifact (Model, Dataset, and Metrics).
    dump(kmeans, model.path + '.joblib') # 'model.joblib/pkl' name is mandatory for deployment.
    model.metadata['framework'] = 'scikit_learn'
    model.metadata['algorithm'] = 'kmeans'    
    _predictions.to_csv(predictions.path + '.csv', index = False)
    metrics.log_metric('inertia', kmeans.inertia_)

### Customer Churn data.

In [None]:
@component(
    packages_to_install = ['pandas'],
    base_image = 'python'
)
def customer_churn_data(processed_data: Input[Dataset], rfm_churn: Output[Dataset], metrics: Output[Metrics]):
    import numpy as np
    import pandas as pd
    import datetime

    # Input Artifact (Dataset).
    df = pd.read_csv(processed_data.path + '.csv')

    df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'])

    # Dataset creation.
    min_reference_date = df['InvoiceDate'].min() + datetime.timedelta(days = 15)
    max_reference_date = df['InvoiceDate'].max() - datetime.timedelta(days = 60)

    np.random.seed(123)
    # 30 random dates between the min and max reference date.
    reference_dates = [min_reference_date + (max_reference_date - min_reference_date) * np.random.random() for _ in range(30)]

    _customer_churn_data = []

    for reference_date in reference_dates:
        rfm = df[df['InvoiceDate'] <= reference_date].groupby('CustomerID').agg(
            Recency = ('InvoiceDate', lambda s: (reference_date - s.max()).days),
            Frequency = ('InvoiceNo', 'nunique'),
            Monetary = ('TotalPrice', 'sum')
        )
        
        _max_reference_date = reference_date + datetime.timedelta(days = 60)
        churn = pd.Series(index = rfm.index, data = 1, name = 'Churn')
        returning_customers = set(df[(df['InvoiceDate'] > reference_date) & (df['InvoiceDate'] <= _max_reference_date)]['CustomerID'])
        churn.loc[list(set(churn.index).intersection(returning_customers))] = 0
        
        rfm = pd.concat([
            rfm,
            churn], axis = 1)
        rfm['Quarter'] = reference_date.quarter
        
        _customer_churn_data.append(rfm)

    _customer_churn_data = pd.concat(_customer_churn_data, axis = 0, ignore_index = True)

    # Output Artifacts (Dataset & Metrics).
    _customer_churn_data.to_csv(rfm_churn.path + '.csv', index = False)
    rfm_churn.metadata['shape'] = f'{_customer_churn_data.shape}'
    metrics.log_metric('num_of_samples', _customer_churn_data.shape[0])
    metrics.log_metric('num_of_features', _customer_churn_data.shape[1])

### Train test split.

In [None]:
@component(
    packages_to_install = ['pandas', 'scikit-learn'],
    base_image = 'python'
)
def train_test_split(rfm_churn: Input[Dataset], train_data: Output[Dataset], test_data: Output[Dataset], metrics: Output[Metrics]):
    import pandas as pd
    from sklearn.model_selection import train_test_split

    # Input Artifact (Dataset).
    df = pd.read_csv(rfm_churn.path + '.csv')

    _train_data, _test_data = train_test_split(df, test_size = .3, random_state = 123)

    # Output Artifacts (Datasets & Metrics).
    _train_data.to_csv(train_data.path + '.csv', index = False)
    _test_data.to_csv(test_data.path + '.csv', index = False)
    metrics.log_metric('train_data_samples', _train_data.shape[0])
    metrics.log_metric('test_data_samples', _test_data.shape[0])

### RFC (Random Forest Classifier) training.

In [None]:
@component(
    packages_to_install = ['pandas', 'scikit-learn', 'joblib'],
    base_image = 'python'
)
def rfc_training(train_data: Input[Dataset], hyperparams: dict, model: Output[Model]):
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    from joblib import dump

    # Input Artifact (Dataset).
    df = pd.read_csv(train_data.path + '.csv')
    X_train, y_train = df.drop('Churn', axis = 1), df['Churn']

    rfc = RandomForestClassifier(
        n_estimators = hyperparams['n_estimators'],
        min_samples_split = hyperparams['min_samples_split'],
        min_samples_leaf = hyperparams['min_samples_leaf'],
        random_state = 123
    )
    rfc.fit(X_train, y_train)

    # Output Artifact (Model).
    dump(rfc, model.path + '.joblib')
    model.metadata['framework'] = 'scikit_learn'
    model.metadata['algorithm'] = 'random_forest'

### RFC (Random Forest Classifier) evaluation.

In [None]:
from typing import NamedTuple

@component(
    packages_to_install = ['joblib', 'pandas', 'scikit-learn'],
    base_image = 'python'
)
def rfc_evaluation(model: Input[Model], test_data: Input[Dataset], classification_metrics: Output[ClassificationMetrics], metrics: Output[Metrics], baseline_precision: float) -> NamedTuple('output', [('deployment', str)]):
    from joblib import load
    import pandas as pd
    import numpy as np
    from sklearn.metrics import accuracy_score, precision_recall_fscore_support, precision_recall_curve, auc, confusion_matrix, roc_curve

    # Input Artifacts (Model & Dataset).
    rfc = load(model.path + '.joblib')
    df = pd.read_csv(test_data.path + '.csv')

    X_test, y_test = df.drop('Churn', axis = 1), df['Churn']
    y_predicted_proba = rfc.predict_proba(X_test)
    y_predicted = y_predicted_proba[:, 1].copy()
    y_predicted[y_predicted >= 0.5] = 1 # Broadcasting.
    y_predicted[y_predicted < 0.5] = 0 # Broadcasting.

    # Output Artifacts (Metrics & Classification Metrics).
    accuracy = accuracy_score(y_test, y_predicted)
    metrics.log_metric('accuracy', accuracy)
    precision, recall, f1_score, _ = precision_recall_fscore_support(y_test, y_predicted)
    metrics.log_metric('precision:0', precision[0])
    metrics.log_metric('recall:0', recall[0])
    metrics.log_metric('f1_score:0', f1_score[0])
    metrics.log_metric('precision:1', precision[1])
    model_precision = precision[1]
    metrics.log_metric('recall:1', recall[1])
    metrics.log_metric('f1_score:1', f1_score[1])
    precision, recall, thresholds = precision_recall_curve(y_test, y_predicted_proba[:, 1])
    metrics.log_metric('auc_precision_recall', auc(recall, precision))

    classification_metrics.log_confusion_matrix(['0', '1'], confusion_matrix(y_test, y_predicted).tolist())
    
    fpr, tpr, thresholds = roc_curve(y_test, y_predicted_proba[:, 1])
    metrics.log_metric('auc_roc', auc(fpr, tpr))
    # Sampled FPR & TPR values (executor_output.json size restrictions).
    roc = pd.DataFrame({
        'fpr': fpr,
        'tpr': tpr,
        'thresholds': thresholds
    })
    roc = pd.concat([
        roc[roc['thresholds'] < 0.5].sample(50, replace = False),
        roc[roc['thresholds'] >= 0.5].sample(50, replace = False)
    ], axis = 0, ignore_index = True).sort_values('thresholds')
    classification_metrics.log_roc_curve(roc['fpr'].tolist(), roc['tpr'].tolist(), roc['thresholds'].tolist())

    # Deployment evaluation.
    return ('yes' if model_precision > baseline_precision else 'no',)

### RFC (Random Forest Classifier) deployment.

In [None]:
@component(
    packages_to_install = ['google-cloud-aiplatform'],
    base_image = 'python'
)
def rfc_deployment(project_id: str, region: str, model: Input[Model], serving_container_image_uri: str, vertex_model: Output[Model]):
    from google.cloud import aiplatform

    # Authentication.
    aiplatform.init(project = project_id, location = region)

    DISPLAY_NAME = 'e-commerce_analysis'
    MODEL_NAME = 'e-commerce_analysis-rfc'
    ENDPOINT_NAME = 'e-commerce_analysis_endpoint'

    def create_endpoint():
        endpoints = aiplatform.Endpoint.list(
            project = project_id,
            location = region,
            filter = f'display_name={ENDPOINT_NAME}',
            order_by = 'create_time desc'
        )
        
        if len(endpoints):
            return endpoints[0] # Most recent.
        else:
            endpoint = aiplatform.Endpoint.create(
                display_name = ENDPOINT_NAME,
                project = project_id,
                location = region
            )
            return endpoint
    endpoint = create_endpoint()

    # Input Artifact (Model).
    model_upload = aiplatform.Model.upload(
        display_name = DISPLAY_NAME,
        artifact_uri = model.uri.replace('model', ''), # Directory path (with model.joblib/pkl).
        serving_container_image_uri =  serving_container_image_uri,
        serving_container_health_route = f'/v1/models/{MODEL_NAME}',
        serving_container_predict_route = f'/v1/models/{MODEL_NAME}:predict',
        serving_container_environment_variables = {
            'MODEL_NAME': MODEL_NAME,
        },
    )
    model_deploy = model_upload.deploy(
        machine_type = 'e2-standard-2',
        endpoint = endpoint,
        traffic_split = {
            '0': 100
        },
        deployed_model_display_name = DISPLAY_NAME
    )

    # Output Artifact (Model)
    vertex_model.uri = model_deploy.resource_name

## 5. Build the Pipeline with Components.

In [None]:
from kfp import dsl

@dsl.pipeline(name = 'e-commerce_analysis')
def ecommerce_analysis():
    data_preprocessing_task = data_preprocessing(
        project_id = PROJECT_ID,
        bucket_name = BUCKET_NAME,
        script_path = 'component_scripts/data_preprocessing.py',
        data_gcs_uri = f'{GCS_BUCKET_URI}/data/data.csv',
        product_details_gcs_uri = f'{GCS_BUCKET_URI}/data/product_details.xlsx'
    )
    overall_rfm_metrics_task = overall_rfm_metrics(
        processed_data = data_preprocessing_task.outputs['processed_data']
    )
    rfm_scaling_task = rfm_scaling(
        rfm = overall_rfm_metrics_task.outputs['rfm']
    )
    kmeans_training_and_prediction_task = kmeans_training_and_prediction(
        rfm_scaled = rfm_scaling_task.outputs['rfm_scaled'],
        n_clusters = 4
    )
    customer_churn_data_task = customer_churn_data(
        processed_data = data_preprocessing_task.outputs['processed_data']
    )
    train_test_split_task = train_test_split(
        rfm_churn = customer_churn_data_task.outputs['rfm_churn']
    )
    rfc_training_task = rfc_training(
        train_data = train_test_split_task.outputs['train_data'],
        hyperparams = {
            'n_estimators': 50,
            'min_samples_split': 4,
            'min_samples_leaf': 2
        }
    )
    rfc_evaluation_task = rfc_evaluation(
        model = rfc_training_task.outputs['model'],
        test_data = train_test_split_task.outputs['test_data'],
        baseline_precision = 0.75
    )

    # Conditional deployment.
    with dsl.Condition(
        rfc_evaluation_task.outputs['deployment'] == 'yes',
        name = 'rfc_quality'
    ):
        rfc_deployment_task = rfc_deployment(
            project_id = PROJECT_ID,
            region = REGION,
            model = rfc_training_task.outputs['model'],
            serving_container_image_uri = '',
        )

## 6. Compile and Run the Pipeline.

In [None]:
from kfp import compiler
from google.cloud import aiplatform

# The compiler generates a JSON file containing the pipeline structure, and the PipelineJob uses that JSON file to run the pipeline on GCP.
compiler.Compiler().compile(
    pipeline_func = ecommerce_analysis,
    package_path = 'kubeflow_pipeline/ecommerce_analysis.json'
)

# The pipeline_root is the GCS location where pipeline artifacts (logs, metadata, and output data) are stored during and after the pipeline run.
pipeline_job = aiplatform.PipelineJob(
    display_name = 'e-commerce_analysis',
    template_path = 'kubeflow_pipeline/ecommerce_analysis.json',
    pipeline_root = PIPELINE_ROOT,
    project = PROJECT_ID, # Project ID.
    location = REGION, # Region.
    enable_caching = False
)
pipeline_job

In [None]:
pipeline_job.run()

## 7. Make Predictions.

In [None]:
aiplatform.init(project = PROJECT_ID, location = REGION)

def endpoint_predict(endpoint_id: str, instances: list):
    endpoint = aiplatform.Endpoint(endpoint_id)
    predictions = endpoint.predict(instances = instances)
    return predictions

In [None]:
endpoint_id = '' # Deployed Model Endpoint.
instances = [
    [3,12,1569.45,3],
    [29,1,316.61,2],
    [26,3,154.58,1],
    [7,1,115.1,1]
]

predictions = endpoint_predict(endpoint_id, instances)
predictions

In [None]:
predictions.predictions