# MLOps Coloring Book

This notebook may be used for demonstration of AutoMLOps


# Install

In [1]:
!pip3 install ../dist/AutoMLOps-1.0.0-py2.py3-none-any.whl --user

Processing /Users/srastatter/Documents/2023/MLOps-graduation/AutoMLOps/dist/AutoMLOps-1.0.0-py2.py3-none-any.whl
Installing collected packages: AutoMLOps
Successfully installed AutoMLOps-1.0.0


Restart the kernel after installing the package

# Upload Data

In [2]:
!python3 -m data.load_data_to_bq --project automlops-sandbox --file data/Dry_Beans_Dataset.csv

Dataset automlops-sandbox.test_dataset already exists
Table test_dataset.dry-beans already exists


## 1. Without Using KFP Spec
This workflow will generate a pipeline without using Kubeflow spec. `generate()` will create all the necessary files but not run them. `run()` will create all the necessary files, resources, and then push the code to the source repo to trigger the build. Please view the readme for more information.

## Imports

In [3]:
from AutoMLOps import AutoMLOps

In [4]:
%%define_imports
import json
import pandas as pd
from google.cloud import aiplatform
from google.cloud import aiplatform_v1
from google.cloud import bigquery
from google.cloud import storage
import datetime
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_curve
from sklearn.model_selection import train_test_split
from joblib import dump
import pickle
import os

## Data Loading

In [5]:
%%define_component
AutoMLOps.makeComponent(
    name="create_dataset",
    description="Loads data from BQ and writes a dataframe as a csv to GCS.", # optional
    params=[
        {"name": "bq_table", "type": str}, # descriptions are optional
        {"name": "data_path", "type": str, "description": "GS location where the training data is written."},
        {"name": "project_id", "type": str, "description": "Project_id."}
    ]
)
# Component code goes below:
bq_client = bigquery.Client(project=project_id)

def get_query(bq_input_table: str) -> str:
    """Generates BQ Query to read data.

    Args:
    bq_input_table: The full name of the bq input table to be read into
    the dataframe (e.g. <project>.<dataset>.<table>)
    Returns: A BQ query string.
    """
    return f"""
    SELECT *
    FROM `{bq_input_table}`
    """

def load_bq_data(query: str, client: bigquery.Client) -> pd.DataFrame:
    """Loads data from bq into a Pandas Dataframe for EDA.
    Args:
    query: BQ Query to generate data.
    client: BQ Client used to execute query.
    Returns:
    pd.DataFrame: A dataframe with the requested data.
    """
    df = client.query(query).to_dataframe()
    return df

dataframe = load_bq_data(get_query(bq_table), bq_client)
dataframe.to_csv(data_path)

## Model Training

In [6]:
%%define_component
AutoMLOps.makeComponent(
    name="train_model",
    description="Trains a decision tree on the training data.",
    params=[
        {"name": "model_directory", "type": str, "description": "GS location of saved model."},
        {"name": "data_path", "type": str, "description": "GS location where the training data."}
    ]
)
# Component code goes below:
def save_model(model, model_directory):
    """Saves a model to uri."""
    filename = f'model.pkl'
    with open(filename, 'wb') as f:
        pickle.dump(model, f)
    
    bucket_name = model_directory.split('/')[2]
    prefix='/'.join(model_directory.split('/')[3:])
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(os.path.join(prefix, filename))
    blob.upload_from_filename(filename)

df = pd.read_csv(data_path)
labels = df.pop("Class").tolist()
data = df.values.tolist()
x_train, x_test, y_train, y_test = train_test_split(data, labels)
skmodel = DecisionTreeClassifier()
skmodel.fit(x_train,y_train)
score = skmodel.score(x_test,y_test)
print('accuracy is:',score)

output_uri = os.path.join(model_directory, f'model.pkl')
save_model(skmodel, model_directory)

## Uploading & Deploying the Model

In [7]:
%%define_component
AutoMLOps.makeComponent(
    name="deploy_model",
    description="Trains a decision tree on the training data.",
    params=[
        {"name": "model_directory", "type": str, "description": "GS location of saved model."},
        {"name": "project_id", "type": str, "description": "Project_id."},
        {"name": "region", "type": str, "description": "Region."}
    ]
)
# Component code goes below:
aiplatform.init(project=project_id, location=region)
deployed_model = aiplatform.Model.upload(
    display_name="beans-model-pipeline",
    artifact_uri = model_directory,
    serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest"
)
endpoint = deployed_model.deploy(machine_type="n1-standard-4")

## Define and Run the Pipeline

In [8]:
AutoMLOps.makePipeline(
    name="training-pipeline",
    description="description", # optional
    params=[
        {"name": "bq_table", "type": str}, # descriptions are optional
        {"name": "model_directory", "type": str, "description": "Description."},
        {"name": "data_path", "type": str, "description": "Description."},
        {"name": "project_id", "type": str, "description": "Description."},
        {"name": "region", "type": str, "description": "Description."}
    ],
    pipeline=[{
        "component_name": "create_dataset", "param_mapping": [
            ("bq_table", "bq_table"), # (component_param, pipeline_param)
            ("data_path", "data_path"),
            ("project_id", "project_id")
        ]
    },
    {
        "component_name": "train_model", "param_mapping": [
            ("model_directory", "model_directory"),
            ("data_path", "data_path")
        ]
    },
    {
        "component_name": "deploy_model", "param_mapping": [
            ("model_directory", "model_directory"),
            ("project_id", "project_id"),
            ("region", "region")
        ]
    }]
)

In [9]:
PROJECT_ID = "automlops-sandbox"
pipeline_params = {
    "bq_table": f"{PROJECT_ID}.test_dataset.dry-beans",
    "model_directory": f"gs://{PROJECT_ID}-bucket/trained_models/{datetime.datetime.now()}",
    "data_path": f"gs://{PROJECT_ID}-bucket/data",
    "project_id": f"{PROJECT_ID}",
    "region": "us-central1"
}

In [10]:
AutoMLOps.generate(project_id=PROJECT_ID, pipeline_params=pipeline_params, use_kfp_spec=False, run_local=False, schedule_pattern='0 */12 * * *')

INFO: Successfully saved requirements file in AutoMLOps/components/component_base/requirements.txt


In [11]:
# .go() calls .generate() and runs the code
AutoMLOps.go(project_id=PROJECT_ID, pipeline_params=pipeline_params, use_kfp_spec=False, run_local=False, schedule_pattern='0 */12 * * *')

INFO: Successfully saved requirements file in AutoMLOps/components/component_base/requirements.txt
[0;32m Updating required API services in project automlops-sandbox [0m
Operation "operations/acat.p2-45373616427-957dc83a-db0c-47c8-8d9c-5c344058019f" finished successfully.
[0;32m Checking for Artifact Registry: vertex-mlops-af in project automlops-sandbox [0m
Listing items under project automlops-sandbox, location us-central1.

vertex-mlops-af  DOCKER  STANDARD_REPOSITORY  Artifact Registry vertex-mlops-af in us-central1.  us-central1          Google-managed key  2023-01-11T17:12:26  2023-01-13T12:00:34  2760.085
Artifact Registry: vertex-mlops-af already exists in project automlops-sandbox
[0;32m Checking for GS Bucket: automlops-sandbox-bucket in project automlops-sandbox [0m
gs://automlops-sandbox-bucket/
GS Bucket: automlops-sandbox-bucket already exists in project automlops-sandbox
[0;32m Checking for Service Account: vertex-pipelines in project automlops-sandbox [0m
Pipeli

remote: It seems you're using Apple Git (git/2.37.1 (Apple Git-137.1),gzip(gfe),gzip(gfe)). Apple Git is not frequently updated and often has known vulnerabilities. Please follow the instructions at go/old-git-client#gmac to use a more current version of Git.        
To https://source.developers.google.com/p/automlops-sandbox/r/AutoMLOps-repo
   45f1b45..8609f82  automlops -> automlops


Pushing code to automlops branch, triggering cloudbuild...
Waiting for cloudbuild job to complete.............Submitting PipelineJob...
[0;32m Submitting training job to Cloud Runner Service https://run-pipeline-q5owjmymra-uc.a.run.app using @pipelines/runtime_parameters/pipeline_parameter_values.json [0m
Note: Unnecessary use of -X or --request, POST is already inferred.
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0== Info:   Trying 216.239.36.53:443...
== Info: Connected to run-pipeline-q5owjmymra-uc.a.run.app (216.239.36.53) port 443 (#0)
== Info: ALPN: offers http/1.1
== Info:  CAfile: /etc/ssl/cert.pem
== Info:  CApath: none
== Info: (304) (OUT), TLS handshake, Client hello (1):
=> Send SSL data, 338 bytes (0x152)
0000: ...N...(. 1.w.NS.-,.v;]..ts.C...$f.... O.5=B7.M8.t.+......4.).%$
0040: 5

## Default Run Settings

In [12]:
AutoMLOps.go(project_id=PROJECT_ID, # required
             pipeline_params=pipeline_params, # required
             af_registry_location='us-central1', # default
             af_registry_name='vertex-mlops-af', # default
             cb_trigger_location='us-central1', # default
             cb_trigger_name='automlops-trigger', # default
             cloud_run_location='us-central1', # default
             cloud_run_name='run-pipeline', # default
             csr_branch_name='automlops', # default
             csr_name='AutoMLOps-repo', # default
             gs_bucket_location='us-central1', # default
             gs_bucket_name=None, # default
             parameter_values_path='pipelines/runtime_parameters/pipeline_parameter_values.json', # default
             pipeline_job_spec_path='scripts/pipeline_spec/pipeline_job.json', # default
             pipeline_runner_sa=None, # default
             run_local=True, # default
             schedule_location='us-central1', # default
             schedule_name='AutoMLOps-schedule', # default
             schedule_pattern='No Schedule Specified', # default
             use_kfp_spec=False # default
)

INFO: Successfully saved requirements file in AutoMLOps/components/component_base/requirements.txt
[0;32m Updating required API services in project automlops-sandbox [0m
Operation "operations/acat.p2-45373616427-f7b2644e-1b69-4faf-b983-42e5e0c4202c" finished successfully.
[0;32m Checking for Artifact Registry: vertex-mlops-af in project automlops-sandbox [0m
Listing items under project automlops-sandbox, location us-central1.

vertex-mlops-af  DOCKER  STANDARD_REPOSITORY  Artifact Registry vertex-mlops-af in us-central1.  us-central1          Google-managed key  2023-01-11T17:12:26  2023-01-13T12:05:31  2951.969
Artifact Registry: vertex-mlops-af already exists in project automlops-sandbox
[0;32m Checking for GS Bucket: automlops-sandbox-bucket in project automlops-sandbox [0m
gs://automlops-sandbox-bucket/
GS Bucket: automlops-sandbox-bucket already exists in project automlops-sandbox
[0;32m Checking for Service Account: vertex-pipelines in project automlops-sandbox [0m
Pipeli

## 2. Using KFP Spec

## Imports

In [13]:
import json
import pandas as pd
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import pipeline, component, Artifact, Dataset, Input, Metrics, Model, Output, InputPath, OutputPath
from google.cloud import aiplatform
from google.cloud import aiplatform_v1
import datetime

from AutoMLOps import AutoMLOps

## Data Loading

In [14]:
@component(
    packages_to_install=[
        "google-cloud-bigquery", 
        "pandas",
        "pyarrow",
        "db_dtypes"
    ],
    base_image="python:3.9",
    output_component_file=f"{AutoMLOps.OUTPUT_DIR}/create_dataset.yaml"
)
def create_dataset(
    bq_table: str,
    output_data_path: OutputPath("Dataset"),
    project: str
):
    from google.cloud import bigquery
    import pandas as pd
    bq_client = bigquery.Client(project=project)


    def get_query(bq_input_table: str) -> str:
        """Generates BQ Query to read data.

        Args:
        bq_input_table: The full name of the bq input table to be read into
        the dataframe (e.g. <project>.<dataset>.<table>)
        Returns: A BQ query string.
        """
        return f"""
        SELECT *
        FROM `{bq_input_table}`
        """

    def load_bq_data(query: str, client: bigquery.Client) -> pd.DataFrame:
        """Loads data from bq into a Pandas Dataframe for EDA.
        Args:
        query: BQ Query to generate data.
        client: BQ Client used to execute query.
        Returns:
        pd.DataFrame: A dataframe with the requested data.
        """
        df = client.query(query).to_dataframe()
        return df

    dataframe = load_bq_data(get_query(bq_table), bq_client)
    dataframe.to_csv(output_data_path)

## Model Training

In [15]:
@component(
    packages_to_install=[
        "scikit-learn",
        "pandas",
        "joblib",
        "tensorflow"
    ],
    base_image="python:3.9",
    output_component_file=f"{AutoMLOps.OUTPUT_DIR}/train_model.yaml",
)
def train_model(
    output_model_directory: str,
    dataset: Input[Dataset],
    metrics: Output[Metrics],
    model: Output[Model]
):
    from sklearn.tree import DecisionTreeClassifier
    from sklearn.metrics import roc_curve
    from sklearn.model_selection import train_test_split
    from joblib import dump
    import pandas as pd
    import tensorflow as tf
    import pickle
    import os
    
    def save_model(model, uri):
        """Saves a model to uri."""
        with tf.io.gfile.GFile(uri, 'w') as f:
            pickle.dump(model, f)
    
    df = pd.read_csv(dataset.path)
    labels = df.pop("Class").tolist()
    data = df.values.tolist()
    x_train, x_test, y_train, y_test = train_test_split(data, labels)
    skmodel = DecisionTreeClassifier()
    skmodel.fit(x_train,y_train)
    score = skmodel.score(x_test,y_test)
    print('accuracy is:',score)
    metrics.log_metric("accuracy",(score * 100.0))
    metrics.log_metric("framework", "Scikit Learn")
    metrics.log_metric("dataset_size", len(df))

    output_uri = os.path.join(output_model_directory, f'model.pkl')
    save_model(skmodel, output_uri)
    model.path = output_model_directory

## Uploading & Deploying the Model

In [16]:
@component(
    packages_to_install=[
        "google-cloud-aiplatform"
    ],
    base_image="python:3.9",
    output_component_file=f"{AutoMLOps.OUTPUT_DIR}/deploy_model.yaml",
)
def deploy_model(
    model: Input[Model],
    project: str,
    region: str,
    vertex_endpoint: Output[Artifact],
    vertex_model: Output[Model]
):
    from google.cloud import aiplatform
    aiplatform.init(project=project, location=region)
    deployed_model = aiplatform.Model.upload(
        display_name="beans-model-pipeline",
        artifact_uri = model.uri,
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest"
    )
    endpoint = deployed_model.deploy(machine_type="n1-standard-4")
    vertex_endpoint.uri = endpoint.resource_name
    vertex_model.uri = deployed_model.resource_name

## Define and Run the Pipeline

In [17]:
%%define_kfp_pipeline

@dsl.pipeline(name='training-pipeline')
def pipeline(bq_table: str,
             output_model_directory: str,
             project: str,
             region: str,
            ):

    dataset_task = create_dataset(
        bq_table=bq_table, 
        project=project)

    model_task = train_model(
        output_model_directory=output_model_directory,
        dataset=dataset_task.output)

    deploy_task = deploy_model(
        model=model_task.outputs["model"],
        project=project,
        region=region)


In [22]:
pipeline_params = {
    "bq_table": f"{PROJECT_ID}.test_dataset.dry-beans",
    "output_model_directory": f"gs://{PROJECT_ID}-bucket/trained_models/{datetime.datetime.now()}",
    "project": f"{PROJECT_ID}",
    "region": "us-central1"
}

In [23]:
AutoMLOps.go(project_id=PROJECT_ID, pipeline_params=pipeline_params, use_kfp_spec=True, run_local=False, schedule_pattern='0 */12 * * *')

[0;32m Updating required API services in project automlops-sandbox [0m
Operation "operations/acat.p2-45373616427-ce04bc73-22b4-429d-80c5-5546406b4d5a" finished successfully.
[0;32m Checking for Artifact Registry: vertex-mlops-af in project automlops-sandbox [0m
Listing items under project automlops-sandbox, location us-central1.

vertex-mlops-af  DOCKER  STANDARD_REPOSITORY  Artifact Registry vertex-mlops-af in us-central1.  us-central1          Google-managed key  2023-01-11T17:12:26  2023-01-13T12:30:41  3905.859
Artifact Registry: vertex-mlops-af already exists in project automlops-sandbox
[0;32m Checking for GS Bucket: automlops-sandbox-bucket in project automlops-sandbox [0m
gs://automlops-sandbox-bucket/
GS Bucket: automlops-sandbox-bucket already exists in project automlops-sandbox
[0;32m Checking for Service Account: vertex-pipelines in project automlops-sandbox [0m
Pipeline Runner Service Account         vertex-pipelines@automlops-sandbox.iam.gserviceaccount.com  False

remote: It seems you're using Apple Git (git/2.37.1 (Apple Git-137.1),gzip(gfe),gzip(gfe)). Apple Git is not frequently updated and often has known vulnerabilities. Please follow the instructions at go/old-git-client#gmac to use a more current version of Git.        
To https://source.developers.google.com/p/automlops-sandbox/r/AutoMLOps-repo
   91dfb2c..af9ca0a  automlops -> automlops


Pushing code to automlops branch, triggering cloudbuild...
Waiting for cloudbuild job to complete...............................Submitting PipelineJob...
[0;32m Submitting training job to Cloud Runner Service https://run-pipeline-q5owjmymra-uc.a.run.app using @pipelines/runtime_parameters/pipeline_parameter_values.json [0m
Note: Unnecessary use of -X or --request, POST is already inferred.
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0== Info:   Trying 216.239.36.53:443...
== Info: Connected to run-pipeline-q5owjmymra-uc.a.run.app (216.239.36.53) port 443 (#0)
== Info: ALPN: offers http/1.1
== Info:  CAfile: /etc/ssl/cert.pem
== Info:  CApath: none
== Info: (304) (OUT), TLS handshake, Client hello (1):
=> Send SSL data, 338 bytes (0x152)
0000: ...N...G..s.X.....p(#..n.(..d.H5..3.]. &.E..O....l$A..