In [93]:
project_id="dna-verizonpoc"
region="us-central1"

In [94]:
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component)

from kfp.v2 import compiler
from google_cloud_pipeline_components import aiplatform as gcc_aip
from joblib import load
from typing import NamedTuple

In [95]:
@component( base_image="gcr.io/dna-verizonpoc/movie_reviews_base_image:latest")
def get_data(
    dataset_train: Output[Dataset],
    dataset_test: Output[Dataset]
    
):
    
    from sklearn.model_selection import train_test_split as tts
    import pandas as pd
    # import some data to play with
    
    
    data_raw = pd.read_csv("gs://verexai_automl_text_data/pipeline_root/custommodel/imdb_reviews/data/imdb_movie_reviews_data.csv")
    train, test = tts(data_raw, test_size=0.3)
    
    train.to_csv(dataset_train.path)
    test.to_csv(dataset_test.path)

In [96]:
@component(base_image="gcr.io/dna-verizonpoc/movie_reviews_base_image:latest")
def train_movie_reviews_model(
    dataset: Input[Dataset],
    model_artifact: Output[Model]
):
    import numpy as np
    import pandas as pd
    import os
    from os import system
    import subprocess
    import sys
    from joblib import dump, load # used for saving and loading sklearn objects
    from sklearn.pipeline import Pipeline
    from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer
    from sklearn.naive_bayes import MultinomialNB
    
    imdb_train = pd.read_csv(dataset.path)
    gcs_model_path = "gs://verexai_automl_text_data/pipeline_root/custommodel/imdb_reviews/model/model.joblib"
    
    if not os.path.exists('model'):
        system("mkdir 'model'")
    
    text_clf = Pipeline([
        ('vect', CountVectorizer()),
        ('tfidf', TfidfTransformer()),
        ('clf', MultinomialNB()),
    ])
    text_clf.fit(imdb_train.text, imdb_train.label)
    score=text_clf.score(imdb_train.text, imdb_train.label)
    dump(text_clf,'model/model.joblib')
    dump('model/model.joblib',model_artifact.path)
    subprocess.check_call(['gsutil', 'cp', 'model/model.joblib',gcs_model_path],stderr=sys.stdout)
    model_artifact.metadata["model"] = gcs_model_path
    model_artifact.metadata["train_score"] = float(score)
    model_artifact.metadata["framework"] = "Scikit-learn"


In [97]:
@component(base_image="gcr.io/dna-verizonpoc/movie_reviews_base_image:latest")
def eval_model(
    test_set: Input[Dataset],
    movie_review_model: Input[Model],
    threshold : float,
    metrics: Output[ClassificationMetrics],
    smetrics: Output[Metrics]
) -> NamedTuple("Outputs", [("dep_decision", str)]):
    import pandas as pd
    from joblib import dump, load
    from sklearn.pipeline import Pipeline
    from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer
    from sklearn.naive_bayes import MultinomialNB
    import os
    from os import system
    import subprocess
    import sys
    
    if not os.path.exists('model'):
        system("mkdir 'model'")  
        
    imdb_test = pd.read_csv(test_set.path)
    print(movie_review_model.metadata["model"])
    subprocess.check_call(['gsutil', 'cp', movie_review_model.metadata["model"], 'model/model.joblib'],stderr=sys.stdout)
    
    model = load('model/model.joblib')
    print(type(model))
    score = model.score(imdb_test.text, imdb_test.label.astype(str))
    print("score = {} ".format(score))
    
    from sklearn.metrics import confusion_matrix
    y_pred = model.predict(imdb_test['text'].values)
    
    metrics.log_confusion_matrix(
       ["False", "True"],
       confusion_matrix(
           imdb_test['label'].values.astype(str), y_pred.astype(str)
       ).tolist(),  # .tolist() to convert np array to list.
    )
    
    #movie_review_model.metadata["test_score"] = float(score)
    smetrics.log_metric("score", float(score))
    
    if float(score) > threshold:
        dep_decision = "true"
    else:
        dep_decision = "false"
    return (dep_decision,) 

In [98]:
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root="gs://verexai_automl_text_data/pipeline_root/custommodel/imdb_reviews/",
    # A name for the pipeline. Use to determine the pipeline Context.
    name="custom-pipeline-text-sentiment-anlaysis",
)
def pipeline(
serving_container_image_uri: str = "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest",
):
    dataset_op = get_data()
    train_op = train_movie_reviews_model(dataset_op.outputs["dataset_train"])
    eval_op = eval_model(
        test_set=dataset_op.outputs["dataset_test"],
        movie_review_model=train_op.outputs["model_artifact"],
        threshold=0.85,
    )
    endpoint_create_op = gcc_aip.EndpointCreateOp(
        project=project_id,
        display_name="pipelines-created-endpoint-textsa",
    )
    
    model_upload_op = gcc_aip.ModelUploadOp(
        project=project_id,
        display_name="movie_reviews_model",
        artifact_uri="gs://verexai_automl_text_data/pipeline_root/custommodel/imdb_reviews/model",
        serving_container_image_uri=serving_container_image_uri,
        serving_container_environment_variables={"NOT_USED": "NO_VALUE"},
    )
    model_upload_op.after(eval_op)
    
#    deploy_decision_op = derive_deploy_decision("0.85", eval_op.outputs["smetrics"])
    
    with dsl.Condition(
        eval_op.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):    
        model_deploy_op = gcc_aip.ModelDeployOp(         
            project=project_id,
            endpoint=endpoint_create_op.outputs["endpoint"],
            model=model_upload_op.outputs["model"],
            deployed_model_display_name="movie_reviews_model",
            machine_type="n1-standard-4",
        )
    
    
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='movie_review_pipeline.json')

In [99]:
from kfp.v2.google.client import AIPlatformClient

api_client = AIPlatformClient(
                project_id="dna-verizonpoc",
                region="us-central1"
                )

response = api_client.create_run_from_job_spec(
    'movie_review_pipeline.json',
    enable_caching=True 
)