## Create the template file for creating the pipeline

## Steps
1. Define your pipeline function
2. Build any custom components you need
3. Use the v2 compiler to compile your code 
4. Call the gcloud API Client to establish a connection to AI Platform
5. Run the job from the client

## Imports

In [1]:
# Example imports
import kfp
import json
import os
import datetime
from kfp.v2 import compiler
from kfp.v2.google.client import AIPlatformClient
from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip

## Set up the environment

In [2]:
# Defaults and environment settings
REGION = 'us-central1'
BUCKET_NAME = '<BUCKET_NAME>'
ARTIFACT_STORE_URI = f'gs://{BUCKET_NAME}'
PROJECT_ID = "<PROJECT_ID>"

%env PROJECT_ID=$PROJECT_ID
%env REGION=$REGION
%env BUCKET_NAME=$BUCKET_NAME
%env ARTIFACT_STORE_URI=$ARTIFACT_STORE_URI

env: ENDPOINT=https://fda9da3634d2db2-dot-us-central2.pipelines.googleusercontent.com
env: PROJECT_ID=mwpmltr
env: REGION=us-central1
env: BUCKET_NAME=rrusson-bucket
env: ARTIFACT_STORE_URI=gs://rrusson-bucket


## Create the Docker images and upload to gcr.io

In [3]:
IMAGE_NAME='nasa-iot-base'
TAG='v1'
BASE_IMAGE='gcr.io/{}/{}:{}'.format(PROJECT_ID, IMAGE_NAME, TAG)
print(BASE_IMAGE)
%env BASE_IMAGE = $BASE_IMAGE

gcr.io/mwpmltr/nasa-iot-base:v1
env: BASE_IMAGE=gcr.io/mwpmltr/nasa-iot-base:v1


In [4]:
# DON'T RUN THIS IF THE IMAGE EXISTS!
# !gcloud builds submit --timeout 15m --tag $BASE_IMAGE base_image --async

In [5]:
IMAGE_NAME='nasa-iot-trainer'
TAG='v5'
TRAINER_IMAGE='gcr.io/{}/{}:{}'.format(PROJECT_ID, IMAGE_NAME, TAG)
print(TRAINER_IMAGE)
%env TRAINER_IMAGE = $TRAINER_IMAGE

gcr.io/mwpmltr/nasa-iot-trainer:v5
env: TRAINER_IMAGE=gcr.io/mwpmltr/nasa-iot-trainer:v5


In [6]:
# DON'T RUN THIS IF THE IMAGE EXISTS!
# !gcloud builds submit --timeout 15m --tag $TRAINER_IMAGE train_image --async

## Import component funcs
NOTE: These must be imported AFTER the environment variables are set

In [7]:
from kfp_component.func_components import load_raw_data
from kfp_component.func_components import split_data
from kfp_component.func_components import vertex_custom_job

## Compile the Pipeline

In [8]:
# Define the pipeline
@kfp.dsl.pipeline(
    name="demo-bearing-sensor-data-training",
    description="The pipeline for training and deploying an anomaly detector based on an autoencoder",
    pipeline_root="")

def pipeline(project_id: str,
             region: str,
             source_bucket_name: str, 
             prefix: str,
             dest_bucket_name: str,
             dest_file_name: str,
             gcs_root: str,
             dataset_location:str='US'):
    
    # Read in the raw sensor data from the public dataset and load in the project bucket
    raw_data_op = load_raw_data(source_bucket_name,
                                prefix,
                                dest_bucket_name,
                                dest_file_name)
    
       
    # Preprocess and split the raw data by time
    split_data_op = split_data(raw_data_op.outputs['dest_bucket_name'],
                               raw_data_op.outputs['dest_file_name'],
                               '2004-02-15 12:52:39',
                               True)
    
    # Set up the training args
    train_args = json.dumps(
        ["--bucket", str(split_data_op.outputs['bucket_name']),
         "--train_file", str(split_data_op.outputs['train_dest_file']),
         "--test_file", str(split_data_op.outputs['test_dest_file']),
         "--job_dir", ARTIFACT_STORE_URI,
        ]
    )
    
    job_dir = "{0}/{1}/{2}".format(gcs_root, 'jobdir', kfp.dsl.RUN_ID_PLACEHOLDER)
    
    # Train the model on AI Platform
    train_model = vertex_custom_job(
        project=project_id,
        display_name=f"anomaly-detection-{datetime.datetime.now().strftime('%H%M%S')}",
        container_image_uri=TRAINER_IMAGE,
        train_args=train_args, 
    )

In [9]:
EXPERIMENT_NAME = 'AnomalyDetector'
RUN_ID = f"nasa-iot-example-{datetime.datetime.now().strftime('%H%M%S')}"
SOURCE_BUCKET_NAME = 'amazing-public-data'
PREFIX = 'bearing_sensor_data/bearing_sensor_data/'
DEST_BUCKET_NAME = BUCKET_NAME
DEST_FILE_NAME = 'raw_bearing_data.csv'

## Compile the pipline

In [10]:
kfp.v2.compiler.Compiler().compile(pipeline, 'nasa_iot_training.json')

## Submit a Run

In [11]:
aiplatform.init(project=PROJECT_ID, location=REGION)

pipejob = aiplatform.PipelineJob(
    'nasa_iot_training',
    'nasa_iot_training.json',
    job_id=RUN_ID,
    pipeline_root=ARTIFACT_STORE_URI,  
    parameter_values={
        "project_id": PROJECT_ID,
        "region": REGION,
        "source_bucket_name": SOURCE_BUCKET_NAME,
        "prefix": PREFIX,
        "dest_bucket_name": DEST_BUCKET_NAME,
        "dest_file_name": DEST_FILE_NAME,
        "gcs_root": ARTIFACT_STORE_URI,
        "dataset_location": "US"
    }
)

In [12]:
pipejob.run()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/55590906972/locations/us-central1/pipelineJobs/2750415142742130688
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/55590906972/locations/us-central1/pipelineJobs/2750415142742130688')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/2750415142742130688?project=55590906972
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/55590906972/locations/us-central1/pipelineJobs/2750415142742130688 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/55590906972/locations/us-central1/pipelineJobs/2750415142742130688 current state:
PipelineState.PIPEL