In [3]:
# This pipeline is to be run in Vertex Workbench
# Install Kubeflow Pipelines and GCP AI Platform
!pip3 install kfp --user -q
!pip3 install --upgrade google-cloud-aiplatform --user -q
!pip3 install --upgrade google-cloud-pipeline-components --user -q

In [16]:
from datetime import date, datetime
from typing import NamedTuple # for passing data between steps
import google.cloud.aiplatform as aip
from google.cloud import aiplatform

from google_cloud_pipeline_components import aiplatform as gcc_aip

from google.cloud.aiplatform.models import Model
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component, Input, Artifact

In [17]:
PROJECT_ID = "iowa-steam"
BUCKET_NAME = "iowa-steam-source-data"
BUCKET_URI = f"gs://{BUCKET_NAME}"
TRAINING_DATA_URI = f"{BUCKET_URI}/train_canonical.csv"
TEST_DATA_URI = f"{BUCKET_URI}/test_held_out.csv"
PIPELINE_ROOT = f"{BUCKET_URI}/pipeline_root/control"
TIMESTAMP = datetime.now().strftime("%Y_%m_%d__%H_%M_%S")

TEST_BATCH_OUTPUT_FOLDER = f"automl_batch_{TIMESTAMP}"

In [18]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

In [19]:
@component(
    packages_to_install=[
        "pandas",
        "fsspec",
        "gcsfs",
        "google-cloud-storage",
        "google-cloud-aiplatform"
    ]
)
def preprocess_data_op(bucket_name: str, train_data_uri: str) -> NamedTuple(
    'Outputs',
    [('train_path', str), ('test_path', str)]
):
    # The following code is from Andre's preprocessing pipeline,
    # modified to use GCP cloud storage
    from google.cloud import storage
    import pandas as pd
    from datetime import datetime

    TIMESTAMP = datetime.now().strftime("%Y_%m_%d__%H_%M_%S")
    
    def process_data(data, content):
    
        '''
        data is in a csv format

        content is the column in data set that you want to process

        Content_Parsed_4 is the final processed output 

        '''

        #\r and \n
        data['Content_Parsed_1'] = content.str.replace("\r", " ")
        data['Content_Parsed_1'] = data['Content_Parsed_1'].str.replace("\n", " ")
        data['Content_Parsed_1'] = data['Content_Parsed_1'].str.replace("    ", " ")

        # quotation marks
        data['Content_Parsed_1'] = data['Content_Parsed_1'].str.replace('"', '')

        # Lower casing all words so that upper case words (ex: at the beginning of a sentence) 
        # are read the same as lower case words
        data['Content_Parsed_2'] = data['Content_Parsed_1'].str.lower()

        # punctuation signs
        punctuation_signs = list("?:!.,;")
        data['Content_Parsed_3'] = data['Content_Parsed_2']

        for i in punctuation_signs:
            data['Content_Parsed_3'] = data['Content_Parsed_3'].str.replace(i, '')

        # Possessive pronouns 
        data['Content_Parsed_4'] = data['Content_Parsed_3'].str.replace("'s", "")

        return data

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    # read raw source data from GCS
    data = pd.read_csv(train_data_uri)

    # process data
    processed = process_data(data, data['user_review'])

    # Store as ephemeral CSV
    processed_filename = f"train_canonical_{TIMESTAMP}.csv"
    processed.to_csv(processed_filename, index=False, header=True)

    # upload to GCS
    gcs_dest_path = f"data_processed/{processed_filename}"
    blob = bucket.blob(gcs_dest_path)
    blob.upload_from_filename(processed_filename)

    train_processed_path = f"gs://{bucket_name}/{gcs_dest_path}"
    
    # Process the test instances - split into individual files and create jsonl
    test = pd.read_csv(TEST_DATA_URI)
    test = preprocess_data(test, test['user_review'])
                           
    destination_prefix = TEST_BATCH_OUTPUT_FOLDER
    file_subprefix = "data"    
    input_file_blob_name = f"{destination_prefix}/steam_reviews_batch_predict_test.jsonl"
    
    batch_input_data = []
    for i, r in test.iterrows():
        # Structure each file as required by Vertex AI for batch prediction.
        content = r['Content_Parsed_4']
        blob_name = f"{destination_prefix}/{file_subprefix}/{r['review_id']}.txt"
        uri = f"{BUCKET_URI}/{blob_name}"
        instance = {"content": uri, "mimeType": "text/plain"}
        batch_input_data.append(instance)
    
        # upload this to cloud storage
        blob = bucket.blob(blob_name)
        blob.upload_from_string(content)
    
    batch_string = '\n'.join([str(d) for d in batch_input_data])
    input_file_blob = bucket.blob(input_file_blob_name)
    input_file_blob.upload_from_string(batch_string)
    test_jsonl_path = f"{BUCKET_URI}/{input_file_blob_name}"
    
    return (train_processed_path, test_jsonl_path)

In [20]:
# Creates and returns jsonl files for both AutoML and AutoML Sentiment from the 
# preprocessed data.
@component(
    packages_to_install=[
        "pandas",
        "google-cloud-storage",
        "fsspec",
        "gcsfs",
    ])
def create_automl_import_files_op(bucket_name: str, input_path: str) -> NamedTuple(
    'Outputs',
    [('classification_path', str), ('sentiment_path', str)]
):
    import pandas as pd
    from google.cloud import storage
    import json 
    from datetime import datetime
    
    TIMESTAMP = datetime.now().strftime("%Y_%m_%d__%H_%M_%S")

    # Converts from 0/1 to N/Y to improve readability
    def convert_label(label, sent=False):
        if label == 1:
            if sent:
                return 1
            else:
                return "Y"
        else:
            if sent:
                return 0
            else:
                return "N"

    # read csv
    train = pd.read_csv(input_path)

    # construct the json objects
    content_col = "Content_Parsed_4"
    suggestion_col = "user_suggestion"

    json_items = []
    for index, row in train.iterrows():
        item = {
            "classificationAnnotation": { "displayName": convert_label(row[suggestion_col]) },
            "textContent": row[content_col]
        }
        json_items.append(item)

    json_items_sent = []
    for index, row in train.iterrows():
        label = convert_label(row[suggestion_col], sent=True)
        item = {
            "sentimentAnnotation": {"sentiment": label, "sentimentMax": 1},
            "textContent": row[content_col]
        }
        json_items_sent.append(item)

    # Write jsonl import file for AutoML Classification
    automl_import_filename = f"automl_import_train_{TIMESTAMP}.jsonl"
    with open(automl_import_filename, "w") as outfile:
        for item in json_items:
            outfile.write(json.dumps(item) + "\n")

    # Write jsonl import file for AutoML Sentiment Analysis
    automl_sent_import_filename = f"automl_sent_import_train_{TIMESTAMP}.jsonl"
    with open(automl_sent_import_filename, "w") as outfile:
        for item in json_items_sent:
            outfile.write(json.dumps(item) + "\n")

    # Upload these files to GCS
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    prefix = "import_files"
    aml_import = bucket.blob(f"{prefix}/{automl_import_filename}")
    aml_sent_import = bucket.blob(f"{prefix}/{automl_sent_import_filename}")

    aml_import.upload_from_filename(automl_import_filename)
    aml_sent_import.upload_from_filename(automl_sent_import_filename)

    gcs_aml = f"gs://{bucket_name}/{prefix}/{automl_import_filename}"
    gcs_aml_sent = f"gs://{bucket_name}/{prefix}/{automl_sent_import_filename}"

    return (gcs_aml, gcs_aml_sent)

In [24]:
@dsl.pipeline(
    name="iowa-steam-sentiment-pipeline",
    pipeline_root=PIPELINE_ROOT
)

def pipeline():
    preprocess_op = preprocess_data_op(BUCKET_NAME, TRAINING_DATA_URI)

    import_op = create_automl_import_files_op(BUCKET_NAME, preprocess_op.outputs['train_path'])

    text_ds_op = gcc_aip.TextDatasetCreateOp(
        project=PROJECT_ID,
        display_name=f"iowa-steam-reviews-processed-{TIMESTAMP}",
        gcs_source=import_op.outputs['classification_path'],
        import_schema_uri=aiplatform.schema.dataset.ioformat.text.single_label_classification
    ).set_display_name("create-text-dataset")
    
    sentiment_ds_op = gcc_aip.TextDatasetCreateOp(
        project=PROJECT_ID,
        display_name=f"iowa-steam-reviews-processed-sentiment-{TIMESTAMP}",
        gcs_source=import_op.outputs['sentiment_path'],
        import_schema_uri=aiplatform.schema.dataset.ioformat.text.sentiment
    ).set_display_name("create-sentiment-dataset")
    
    automl_class_training_op = gcc_aip.AutoMLTextTrainingJobRunOp(
        dataset=text_ds_op.outputs["dataset"],
        display_name=f"automl_classification_{TIMESTAMP}",
        prediction_type="classification",
        multi_label=False,
        training_fraction_split=0.6,
        validation_fraction_split=0.2,
        test_fraction_split=0.2,
        model_display_name=f"automl_classifcation_{TIMESTAMP}",
        project=PROJECT_ID
    ).set_display_name("train_automl_classification").after(text_ds_op)
    
    automl_sentiment_training_op = gcc_aip.AutoMLTextTrainingJobRunOp(
        dataset=sentiment_ds_op.outputs["dataset"],
        display_name=f"automl_classification_{TIMESTAMP}",
        prediction_type="sentiment",
        sentiment_max=1,
        multi_label=False,
        training_fraction_split=0.6,
        validation_fraction_split=0.2,
        test_fraction_split=0.2,
        model_display_name=f"automl_classifcation_{TIMESTAMP}",
        project=PROJECT_ID
    ).set_display_name("train_automl_sentiment").after(sentiment_ds_op)
    
    automl_class_endpoint_op = gcc_aip.EndpointCreateOp(
        project=PROJECT_ID,
        display_name = "automl_endpoint_classification"
    ).set_display_name("create_classification_endpoint").after(automl_class_training_op)
    
    automl_sentiment_endpoint_op = gcc_aip.EndpointCreateOp(
        project=PROJECT_ID,
        display_name = "automl_endpoint_sentiment"
    ).set_display_name("create_sentiment_endpoint").after(automl_sentiment_training_op)
    
    automl_class_deploy_op = gcc_aip.ModelDeployOp(
        model=automl_class_training_op.outputs["model"],
        endpoint=automl_class_endpoint_op.outputs['endpoint'],
        automatic_resources_min_replica_count=1,
        automatic_resources_max_replica_count=1,
    ).set_display_name("deploy_classification_model").after(automl_class_endpoint_op)
    
    automl_sentiment_deploy_op = gcc_aip.ModelDeployOp(
        model=automl_sentiment_training_op.outputs["model"],
        endpoint=automl_sentiment_endpoint_op.outputs['endpoint'],
        automatic_resources_min_replica_count=1,
        automatic_resources_max_replica_count=1,
    ).set_display_name("deploy_sentiment_model").after(automl_sentiment_endpoint_op)
        

In [25]:
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="iowa-steam-sentiment-pipeline.json"
)

In [15]:
job = aip.PipelineJob(
    display_name="iowa-steam-pipeline",
    template_path="iowa-steam-sentiment-pipeline.json",
    pipeline_root=PIPELINE_ROOT
)

job.submit(service_account="47224200977-compute@developer.gserviceaccount.com")

Creating PipelineJob
PipelineJob created. Resource name: projects/47224200977/locations/us-central1/pipelineJobs/iowa-steam-sentiment-pipeline-20221125205306
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/47224200977/locations/us-central1/pipelineJobs/iowa-steam-sentiment-pipeline-20221125205306')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/iowa-steam-sentiment-pipeline-20221125205306?project=47224200977
