# Set up the environment

## Config items
Variables that need to be configured according to user requirements

In [1]:
# GCP Project ID.  If not set `gcloud config get project` will be used
PROJECT_ID=""
# GCP Region
GCP_REGION="us-central1"
# Categories or drawings that we want to recognise or classify
CATEGORIES_TO_PROCESS=["basketball", "cat", "owl", "airplane", "bucket", "diamond"]

In [None]:
# Basic setup
# Import required libraries
import kfp
from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component, Dataset, Input, Output
from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
# Environment config
BUCKET_NAME=""
IMAGES_BUCKET="images"
PIPELINE_ROOT=""
SAVED_IMAGES_FILE_NAME_EXT=".txt"
SAVED_IMAGES_FILE_NAME_SUFFIX="saved_images"
SAVED_IMAGES_FILE_NAME=f"{SAVED_IMAGES_FILE_NAME_SUFFIX}{SAVED_IMAGES_FILE_NAME_EXT}"

# Check that the environment has configured a GCP project
def is_project_configured():
    global PROJECT_ID
    # Using gcloud command to get the current project
    system_out = !gcloud config get project
    project_id = system_out[0]
    
    # Check if the project is not set
    check = not (project_id=="" or project_id.strip()=="")
    return check, project_id

# Ensure that a bucket exists.  
def ensure_bucket_exists(bucket_name):
    from google.cloud import storage
    
    gcs_client = storage.Client()
    bucket = gcs_client.bucket(bucket_name)
    # Check if the GCS bucket already exists
    if bucket.exists():
        print("Bucket exists: " + bucket_name)
    else:
        # Doesn't exist, so create one
        print("Creating bucket: " + bucket_name)
        bucket.create()
    
    return

# Setup environment config
def env_setup():
    from google.cloud import storage
    from datetime import datetime
    import os
    import sys
    
    global BUCKET_NAME, PIPELINE_ROOT
    # Check project ID is set
    projectConfigured, project_id = is_project_configured()
    if projectConfigured:
        # Project ID is set
        print("Using project: " + project_id)
        time_stamp = datetime.now().strftime("%Y%m%d%H%M%S")
        BUCKET_NAME=project_id + "-qd"
        ensure_bucket_exists(BUCKET_NAME)
        PIPELINE_ROOT = f"{BUCKET_NAME}/{time_stamp}/pipeline-root/"
        print("Dataflow pipeline root is: " + PIPELINE_ROOT)
    else:
        # Exit with error that project ID has not been configured
        sys.exit("Set project using `gcloud config set project [project-id]`")
    return

env_setup()

# Enable autocomplete in the notebook
def enable_notebook_autocompletion():
    !pip install jupyter_contrib_nbextensions
    !jupyter contrib nbextension install - user
    from jedi import settings
    settings.case_insensitive_completion = True
    return
# enable_notebook_autocompletion()

## Data Flow Pipeline to import & create image dataset

In [None]:
# Install dataflow components for python
!pip install apache-beam[interactive]

In [203]:
import apache_beam as beam
from io import BytesIO
from tensorflow.python.lib.io import file_io
import numpy as np

# Checks if the file already exists.
# Creates an emtpy file if the file doesn't exist
# file_name is a fully qualified file name, including the protocol
# e.g. file_name = "gs://some-gcs-bucket/folder/temp.txt"
def ensureFileExists(file_name):
        print("checking if the file exists")
        # Check if the file exists
        if not filesystems.FileSystems.exists(file_name):
            print("file doesn't exit. creating one")
            # Create an empty file if it doesn't exist
            original_file = filesystems.FileSystems.create(file_name, mime_type="text/txt")
            original_file.write(bytes("", encoding='UTF-8'))
            original_file.close()

# Gets the numpy_bitmap file for each category
# This file in the GCS bucket holds the images in that category
class ProcessCategory(beam.DoFn):
    def process(self, element):
        file_uri = "gs://scratch-pad-kunall-qd/full/numpy_bitmap/"+element+".npy"
        # print("File URI is: " + file_uri)
        yield {"category": element, "file_name": file_uri}

# Loads a required sample size of image data from the files for each category
class LoadImageData(beam.DoFn):
    def process(self, element):
        image_data_file = element["file_name"]
        # Read image data from the file into a numpy array
        np_file = np.load(BytesIO(file_io.read_file_to_string(image_data_file, binary_mode=True))).astype("float32")
        # Randomly select X number of samples from the file
        idx = np.random.randint(len(np_file), size=20)
        # Resize the numpy array with the expected number of samples
        for row in np_file[idx,:]:
            # Reshape the array as 28 * 28 matrix
            image_matrix = row.reshape(28, 28)
            yield {"category": element["category"], "image": image_matrix}

# Save the individual image to GCS bucket
class SaveImage(beam.DoFn):
        def process(self, element):
            category = element["category"]
            image = element["image"]
            outdir = "gs://scratch-pad-kunall-qd/images/" + category + "/"
            file_name = uuid.uuid4().hex + '.png'
            writer = filesystems.FileSystems.create(outdir + file_name )
            writer.write(image)
            writer.close()
            yield outdir + file_name + "," + category

# Create an image dataset that can be used to train image classification ML models
def create_image_dataset():
    saved_images_filename = "gs://scratch-pad-kunall-qd/images/" + SAVED_IMAGES_FILE_NAME
    ensureFileExists(saved_images_filename)    
    with beam.Pipeline() as p:
        categories = (
                p 
                | "Read categories" >> beam.Create(CATEGORIES_TO_PROCESS)
                | "Get file to category map" >> beam.ParDo(ProcessCategory())
                | "Load Image Data File" >> beam.ParDo(LoadImageData())
                | "Save Image" >> beam.ParDo(SaveImage())
                
            )
        original_file = (
            p | "Read existing saved images file" >> beam.io.ReadFromText(saved_images_filename)
        )
        merged = (
            (categories, original_file) | beam.Flatten() |
             beam.io.WriteToText("gs://scratch-pad-kunall-qd/images/" + SAVED_IMAGES_FILE_NAME_SUFFIX, 
                                 file_name_suffix=SAVED_IMAGES_FILE_NAME_EXT, 
                                 append_trailing_newlines=True, 
                                 shard_name_template='')
        )
        result = p.run()


if __name__ == '__main__':
    create_image_dataset()



In [121]:
# @component(
#     packages_to_install=[
#         "google-cloud-storage",
#         "pandas",
#     ],
#     base_image="python:3.9",
#     output_component_file="preprocess_drawings.yaml"
# )
def preprocess_drawings(
    project=project_id,
    display_name="categories",
    categories=CATEGORIES_TO_PROCESS,
    categories_dataset = Output[Dataset]        
    ):
    import json
    import pandas as pd
    global BUCKET_NAME, IMAGES_BUCKET
    ensure_bucket_exists(BUCKET_NAME)
    for category in categories:
        print(f"Read from: gs://scratch-pad-kunall-qd/full/simplified/{category}.ndjson")
        with open(f'gs://scratch-pad-kunall-qd/full/simplified/{category}.ndjson', 'r') as f:
            data = json.load(f)
        pd.DataFrame(data).head
        print(f"gs://{BUCKET_NAME}/{IMAGES_BUCKET}/{category}/key.png,{category}")
            
    return
preprocess_drawings()

NameError: name 'project_id' is not defined

In [None]:
import kfp
from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component, Dataset, Input, Output
from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip


project_id = "scratch-pad-kunall"
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
BUCKET_NAME="gs://" + project_id + "-quickdraw-" + TIMESTAMP
PIPELINE_ROOT = f"{BUCKET_NAME}/qd-pipeline-root/"
pipeline_root_path = PIPELINE_ROOT

# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="automl-image-training-v2",
    pipeline_root=pipeline_root_path,)
def pipeline(project_id: str):
    # The first step of your workflow is a dataset generator.
    # This step takes a Google Cloud pipeline component, providing the necessary
    # input arguments, and uses the Python variable `ds_op` to define its
    # output. Note that here the `ds_op` only stores the definition of the
    # output but not the actual returned object from the execution. The value
    # of the object is not accessible at the dsl.pipeline level, and can only be
    # retrieved by providing it as the input to a downstream component.
    ds_op = gcc_aip.ImageDatasetCreateOp(
        project=project_id,
        display_name="flowers",
        gcs_source="gs://cloud-samples-data/vision/automl_classification/flowers/all_data_v2.csv",
        import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification,
    )

    # The second step is a model training component. It takes the dataset
    # outputted from the first step, supplies it as an input argument to the
    # component (see `dataset=ds_op.outputs["dataset"]`), and will put its
    # outputs into `training_job_run_op`.
    training_job_run_op = gcc_aip.AutoMLImageTrainingJobRunOp(
        project=project_id,
        display_name="train-iris-automl-mbsdk-1",
        prediction_type="classification",
        model_type="CLOUD",
        #base_model=None,
        dataset=ds_op.outputs["dataset"],
        model_display_name="iris-classification-model-mbsdk",
        training_fraction_split=0.6,
        validation_fraction_split=0.2,
        test_fraction_split=0.2,
        budget_milli_node_hours=8000,
    )

    # The third and fourth step are for deploying the model.
    create_endpoint_op = gcc_aip.EndpointCreateOp(
        project=project_id,
        display_name = "create-endpoint",
    )

    model_deploy_op = gcc_aip.ModelDeployOp(
        model=training_job_run_op.outputs["model"],
        endpoint=create_endpoint_op.outputs['endpoint'],
        automatic_resources_min_replica_count=1,
        automatic_resources_max_replica_count=1,
    )

In [None]:
from kfp.v2 import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='image_classif_pipeline.json')

In [None]:
import google.cloud.aiplatform as aip

job = aip.PipelineJob(
    display_name="automl-image-training-v2",
    template_path="image_classif_pipeline.json",
    pipeline_root=pipeline_root_path,
    parameter_values={
        'project_id': project_id
    }
)

job.submit()