## Simple example of Kubeflow pipeline to predict flight delays
Code is borrowed from https://aiinpractice.com/gcp-mlops-vertex-ai-pipeline-scikit-learn/

I takes around 10 min to run the pipeline. At the end of the notebook see how to use prediction endpoint.


Project works, but there are weird bugs with the bucket and data file. For some reason, only mpg3-temp-data works for now. 

#### Notes:
- Project works, but there are weird bugs with the bucket and data file. For some reason, only mpg3-temp-data works for now. 
- There are often bugs when trying to create an endpoint with the same name as previously created and deleted endpoint in the same region.
- The slowest part of the pipeline is deploying model to an endpoint. Using more powerful instance for an endpoint seems to speed up this step. after standard-8 more powerful insances seem to deploy slower.

#### Next steps:
1. Figure out how to use any bucket and any data file. Done
2. Use more powerful instances to speed up all steps.
3. Go to the next part and add preprocessing pipeline.


In [64]:
USER_NAME="oo00011760@gmail.com" 
PROJECT_ID = "polished-vault-379315"  
REGION = "us-central1"
# REGION = "us-east1"
! gcloud config set project $PROJECT_ID

Updated property [core/project].


In [65]:
SERVICE_ACCOUNT = 'vertex-ai-service-account@polished-vault-379315.iam.gserviceaccount.com'
print(f'Service Account: {SERVICE_ACCOUNT}')

Service Account: vertex-ai-service-account@polished-vault-379315.iam.gserviceaccount.com


In [66]:
# BUCKET_NAME = 'training_data_' + PROJECT_ID
BUCKET_NAME = 'mpg3-testflights-polished-vault-379315'
# BUCKET_NAME = 'mpg3-temp-data'

BUCKET_URI = "gs://" + BUCKET_NAME
! gsutil mb -l $REGION $BUCKET_URI
! gsutil ls -al $BUCKET_URI

Creating gs://mpg3-testflights-polished-vault-379315/...
ServiceException: 409 A Cloud Storage bucket named 'mpg3-testflights-polished-vault-379315' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.
                                 gs://mpg3-testflights-polished-vault-379315/data/
                                 gs://mpg3-testflights-polished-vault-379315/pipeline-output/


In [67]:
! gsutil ls -al $BUCKET_URI

                                 gs://mpg3-testflights-polished-vault-379315/data/
                                 gs://mpg3-testflights-polished-vault-379315/pipeline-output/


In [68]:
# most of commands from setup.sh are still missing. need to translate bash code into python.

In [69]:
from google.cloud import aiplatform as aip
from kfp.v2.dsl import (
    Artifact,
    Dataset,
    Input,
    Model,
    Output,
    ClassificationMetrics,
    component,
    pipeline,
)
from kfp.v2 import compiler

from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp, ModelDeployOp
from google_cloud_pipeline_components.v1.model import ModelUploadOp


# BUCKET = f"training_data_{PROJECT_ID}"
BUCKET = BUCKET_NAME
pipeline_root_path = f"gs://{BUCKET}/pipeline-output/"
print(BUCKET)

mpg3-testflights-polished-vault-379315


In [70]:
pipeline_root_path

'gs://mpg3-testflights-polished-vault-379315/pipeline-output/'

In [71]:
@component(
    packages_to_install=['gcsfs', 'fsspec'],
    base_image="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
)
def data_download(
    data_url: str,
    split_date: str,
    dataset_train: Output[Dataset],
    dataset_test: Output[Dataset],
):
    import pandas as pd
    import logging

    logging.warn("Import file:", data_url)

    data = pd.read_csv(data_url, nrows=5000)

    cancelled = (data["Cancelled"] > 0) | (data["Diverted"] > 0)
    completed_flights = data[~cancelled]

    training_data = completed_flights[["DepDelay", "TaxiOut", "Distance"]]
    # Consider flights that arrive more than 15 min late as delayed
    training_data["target"] = completed_flights["ArrDelay"] > 15

    test_data = training_data[completed_flights["FlightDate"] >= split_date]
    training_data = training_data[completed_flights["FlightDate"] < split_date]

    training_data.to_csv(dataset_train.path, index=False)
    test_data.to_csv(dataset_test.path, index=False)

In [72]:
@component(
    base_image="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
)
def model_train(
    dataset: Input[Dataset],
    model: Output[Artifact],
):
    import pandas as pd
    import pickle
    from sklearn.pipeline import Pipeline
    from sklearn.impute import SimpleImputer
    from sklearn.preprocessing import StandardScaler
    from sklearn.linear_model import LogisticRegression

    data = pd.read_csv(dataset.path)
    X = data.drop(columns=["target"])
    y = data["target"]

    model_pipeline = Pipeline(
        [
            ("imputer", SimpleImputer(strategy="mean")),
            ("scaler", StandardScaler()),
            ("clf", LogisticRegression(random_state=42, tol=0.0001, max_iter=100)),
        ]
    )

    model_pipeline.fit(X, y)

    model.metadata["framework"] = "scikit-learn"
    model.metadata["containerSpec"] = {
        "imageUri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest"
    }

    file_name = model.path + "/model.pkl"
    import pathlib

    pathlib.Path(model.path).mkdir()
    with open(file_name, "wb") as file:
        pickle.dump(model_pipeline, file)

In [73]:
@component(
    base_image="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
)
def model_evaluate(
    test_set: Input[Dataset],
    model: Input[Model],
    metrics: Output[ClassificationMetrics],
):
    import pandas as pd
    import pickle
    from sklearn.metrics import roc_curve, confusion_matrix, accuracy_score

    data = pd.read_csv(test_set.path)[:1000]
    file_name = model.path + "/model.pkl"
    with open(file_name, "rb") as file:
        model_pipeline = pickle.load(file)

    X = data.drop(columns=["target"])
    y = data.target
    y_pred = model_pipeline.predict(X)

    y_scores = model_pipeline.predict_proba(X)[:, 1]
    fpr, tpr, thresholds = roc_curve(y_true=y, y_score=y_scores, pos_label=True)
    metrics.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())

    metrics.log_confusion_matrix(
        ["False", "True"],
        confusion_matrix(y, y_pred).tolist(),
    )

In [74]:
# Define the workflow of the pipeline.
@pipeline(name="gcp-mlops-v0", pipeline_root=pipeline_root_path)
def pipeline(
    training_data_url: str = f"gs://{BUCKET}/data/processed/2020/2020-05.csv",
    test_split_date: str = "2020-05-20",
):
    data_op = data_download(
        data_url=training_data_url,
        split_date=test_split_date
    )

    from google_cloud_pipeline_components.experimental.custom_job.utils import (
        create_custom_training_job_op_from_component,
    )

    custom_job_distributed_training_op = create_custom_training_job_op_from_component(
        model_train, 
        replica_count=1, 
        machine_type = 'n1-standard-8'
    )

    model_train_op = custom_job_distributed_training_op(
        dataset=data_op.outputs["dataset_train"],
        project=PROJECT_ID,
        location=REGION,
    )

    model_evaluate_op = model_evaluate(
        test_set=data_op.outputs["dataset_test"],
        model=model_train_op.outputs["model"],
    )

    model_upload_op = ModelUploadOp(
        project=PROJECT_ID,
        location=REGION,
        display_name="flight-delay-model",
        unmanaged_container_model=model_train_op.outputs["model"],
    ).after(model_evaluate_op)

    endpoint_create_op = EndpointCreateOp(
        project=PROJECT_ID,
        location=REGION,
        display_name="flight-delay-endpoint9",
    )

    ModelDeployOp(
        endpoint=endpoint_create_op.outputs["endpoint"],
        model=model_upload_op.outputs["model"],
        deployed_model_display_name="flight-delay-model",
        dedicated_resources_machine_type="n1-standard-8",
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=3,
    )


In [None]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path="gcp-mlops-v0.json")

aip.init(project=PROJECT_ID, staging_bucket=BUCKET, location=REGION)

job = aip.PipelineJob(
    display_name="gcp-mlops-v0",
    template_path="gcp-mlops-v0.json",
    pipeline_root=pipeline_root_path,
)

job.run(service_account=SERVICE_ACCOUNT)

Creating PipelineJob
PipelineJob created. Resource name: projects/662390005506/locations/us-central1/pipelineJobs/gcp-mlops-v0-20230423175031
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/662390005506/locations/us-central1/pipelineJobs/gcp-mlops-v0-20230423175031')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/gcp-mlops-v0-20230423175031?project=662390005506
PipelineJob projects/662390005506/locations/us-central1/pipelineJobs/gcp-mlops-v0-20230423175031 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/662390005506/locations/us-central1/pipelineJobs/gcp-mlops-v0-20230423175031 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/662390005506/locations/us-central1/pipelineJobs/gcp-mlops-v0-20230423175031 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/662390005506/locations/us-central1/pipelineJobs/gcp-mlops-v0-2023042

In [39]:
# predictions from Python

ENDPOINT_ID = '2557670754392997888'
# get it from gcloud ai endpoints list. gcloud config set project polished-vault-379315. 

from google.cloud import aiplatform as aip

aip.init(project=PROJECT_ID, location=REGION)
endpoint = aip.Endpoint(ENDPOINT_ID)
prediction = endpoint.predict(instances=[[-4.0, 16.0, 153.0]])
print(f'Prediction is: {prediction}')

Prediction is: Prediction(predictions=[False], deployed_model_id='2932338137650692096', model_version_id='1', model_resource_name='projects/662390005506/locations/us-central1/models/4506214266021347328', explanations=None)


In [14]:
# use the code below in Shell to test the endpoint.

# gcloud auth application-default login
nano INPUT.json

{
  "instances": [{1, 15, 400}]
}

ENDPOINT_ID="3891580669024796672"
PROJECT_ID="polished-vault-379315"
INPUT_DATA_FILE="INPUT.json"

curl \
-X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://us-central1-aiplatform.googleapis.com/v1/projects/${PROJECT_ID}/locations/us-central1/endpoints/${ENDPOINT_ID}:predict \
-d "@${INPUT_DATA_FILE}"


SyntaxError: invalid syntax (3469231046.py, line 4)

In [None]:
training_data_url: str = f"gs://{BUCKET}/data/processed/2020/2020-05.csv"
training_data_url

In [None]:
import pandas as pd
data = pd.read_csv(training_data_url, nrows=2000)
data.head(2)

In [None]:
training_data = data[["DepDelay", "TaxiOut", "Distance"]]
training_data.head()

In [None]:
BUCKET

In [None]:
data_url: str = f"gs://{BUCKET}/data/2021/2021-12raw.csv"
data_url

In [None]:
df = pd.read_csv('gs://mpg3-temp-data/data/processed/2021/2021-12.csv', nrows=10000)
df.head(1)
# the file is the problem. if I download it directly via datapull, things are fine.
# if I load file, preprocessed with R locally, I got this error.

In [None]:
df = pd.read_csv('gs://training-data-polished-vault-379315/data/2021/2021-12.csv', nrows=10)
df = pd.read_csv('gs://training-data-polished-vault-379315/data/2021/2021-12.csv')
df.head(2)
# df = pd.read_csv('gs://mpg3-temp-data/data/2021/2021-12.csv')


In [None]:
df = pd.read_csv('gs://mpg3-temp-data/data/processed/2021/2021-12.csv', nrows=10000)
df.head(1)
# the file is the problem. if I download it directly via datapull, things are fine.
# if I load file, preprocessed with R locally, I got this error.