# Dependencies


In [None]:
pip install PyYAML

In [None]:
!sudo apt-get install build-essential

In [None]:
# updated to the latest dependencies on February 2023
!pip install google-cloud-aiplatform==1.21.0 --upgrade
!pip install google-cloud-pipeline-components==1.0.27 --upgrade
!pip install kfp --upgrade
!pip install google-auth google-auth-oauthlib google-auth-httplib2

In [126]:
import kfp
from kfp import dsl
from typing import NamedTuple
from kfp.dsl import pipeline
from kfp.dsl import component
from kfp.dsl import OutputPath
from kfp.dsl import InputPath
from kfp.dsl import Output
from kfp.dsl import Metrics
from kfp import compiler
#from kfp.v2.google import client as kfp_client
# from google.cloud.aiplatform import pipeline_jobs
# from google_cloud_pipeline_components import aiplatform as gcc_aip
# from google_cloud_pipeline_components.aiplatform import ModelUploadOp


# Authentication

In [127]:
# Cloud project id.
PROJECT_ID = "YOUR_PROJECT_ID"  # @param {type:"string"}

# The region you want to launch jobs in.
REGION = "YOUR_REGION  # @param {type:"string"}

SERVICE_ACCOUNT = "YOUR_SERVICE_ACCOUNT_EMAIL"  # @param {type:"string"}

PIPELINE_ROOT = "YOUR_GCS_BUCKET_PATH_FOR_PIPELINE"

STAGING_BUCKET = "YOUR_GCS_STAGING_BUCKET"

!set GOOGLE_APPLICATION_CREDENTIALS='YOUR_SERVICE_ACCOUNT_KEY_PATH' #DO NOT UPLOAD THIS TO GITHUB REPO

# Clients

In [128]:
from google.cloud import aiplatform
aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket="YOUR_STAGING_BUCKET")

# Pipeline Basics

## Component-1 : The Training Job

In [130]:
@dsl.component(base_image='YOUR_CUSTOM_CONTAINER_ARTIFACT_REGISTRY_ENDPOINT') #you must have created a new container and pushed it to the Artifact repo while running the cloud build steps
def stable_diffusion_training_job_op(
    gcs_class_data_path: str, 
    gcs_instance_data_path: str,
    jobId: str,
    jobName: str,
    model_id: str,
    num_nodes: int = 1,
    # machine_type: str = "a2-highgpu-1g",
    # gpu_type: str = "NVIDIA_TESLA_A100",
    machine_type: str = "n1-highmem-8",
    gpu_type: str = "NVIDIA_TESLA_T4",
    num_gpus: int = 1
) -> str:

    # Import statements
    from google.cloud import storage
    import os
    import base64
    import glob
    from datetime import datetime
    from io import BytesIO
    from typing import NamedTuple
    import json

    import requests
    #import torch
    from google.cloud import aiplatform, storage, firestore
    from PIL import Image
    from google.cloud import storage
    from google.oauth2 import service_account
    

    model_name = "YOUR_MODEL_NAME"
    #model_name = jobId
    service_account_key_path = 'YOUR_SERVICE_ACCOUNT_KEY_PATH' 
    credentials = service_account.Credentials.from_service_account_file(service_account_key_path)

    INSTANCE_NAME = 'ethlas game avatar' #@param {type:"string"} # CHANGE THIS ACCORDING TO YOUR IDEAS/THEMES/STYLES
    CLASS_NAME = '2d game avatar' #@param {type:"string"} # CHANGE THIS ACCORDING TO YOUR IDEAS/THEMES/STYLES
    
    REGION = "europe-west4"  # @param {type:"string"}
    
    # Create three folders under the container's home directory
    os.makedirs("/home/class", exist_ok=True)
    os.makedirs("/home/instance", exist_ok=True)
    os.makedirs("/home/model_output", exist_ok=True)

    # Set up the job_name
    # prefix = "ethlas-stable-diffusion"
    # user = os.environ.get("USER")
    # now = datetime.now().strftime("%Y%m%d_%H%M%S")
    # job_name = f"{prefix}-{user}-{now}"
    
    def update_firestore(jobId, job_name, training_status):
        """Updates Firestore collection with pipeline job details."""
        # Initialize Firestore client
        service_account_key_path = 'YOUR_SERVICE_ACCOUNT_PATH' 
        credentials = service_account.Credentials.from_service_account_file(service_account_key_path)
        db = firestore.Client(project="YOUR_PROJECT_ID", credentials=credentials)

        # Define the document reference
        doc_ref = db.collection('vertexPipelineJobs').document(jobId)

        # Update the document fields
        doc_ref.set({
            "uploadJobId": jobId,
            "vertexPipelineJobId": job_name,
            "trainingStatus": training_status,
            "modelDeploymentStatus": "Not initiated yet",
            "modelEndpoint": "Not available yet"
        })
    
    job_name = jobName

    # Define the necessary arguments and parameters for the job
    args = [
        "--method=diffuser_dreambooth",
        "--model_name=runwayml/stable-diffusion-v1-5",
        "--input_storage=/gcs/ethlas-customer-1/instance",
        "--output_storage=/gcs/ethlas-customer-1/output_artifacts",
        #"--output_storage=/home/model_output",
        "--prompt=a 2D ethlas game avatar",
        "--class_prompt=a 2D game avatar",
        "--num_class_images=16",
        "--lr=1e-4",
        "--use_8bit=True",
        "--max_train_steps=100",
        "--text_encoder=True",
        "--set_grads_to_none=True"
    ]
    #command = ["python3", "train_wo_nfs.py"]
    command = ["python3", "train_wo_nfs.py"] + args

    # Build the custom Docker container and push it to the container registry (skipping this step)

    # Create the job using aiplatform.CustomContainerTrainingJob
    job = aiplatform.CustomContainerTrainingJob(
        display_name=job_name,
        container_uri="YOUR_TRAINING_JOB_CONTAINER_ENDPOINT",
        command=command,
        # replica_count=1,
        # machine_type="n1-standard-8",
        # accelerator_type="NVIDIA_TESLA_T4",
        # accelerator_count=1,
        location=REGION,
        staging_bucket="YOUR_STAGING_BUCKET"

    )

    training_status = "Initiated"
    update_firestore(jobId, jobName, training_status)

    try:
        # Run the job
        job.run(
            replica_count=num_nodes,
            machine_type=machine_type,
            accelerator_type=gpu_type,
            accelerator_count=num_gpus
        )

        # Job ran successfully, update trainingStatus to "Completed"
        training_status = "Completed"
        
    except Exception as e:
        # Job failed, update trainingStatus to "Failed" or any other appropriate status
        training_status = "Failed"
        # Log the error or take any other actions as needed

    finally:
        # Update Firestore collection with the final training_status, whether it's completed or failed
        update_firestore(jobId, jobName, training_status)
    
    #model.save(local_model_output_path)  

    ###########################################################

    #"""Uploads files in a local directory to a GCS directory."""

    # for local_file in glob.glob(local_model_output_path + "/**"):
    #     if not os.path.isfile(local_file):
    #         continue
    #     filename = local_file[1 + len(local_model_output_path) :]
    #     gcs_file_path = os.path.join('gs://ethlas-customer-1/output_artifacts/', filename)
    #     _, blob_name = get_bucket_and_blob_name(gcs_file_path)
    #     blob = bucket.blob(blob_name)
    #     blob.upload_from_filename(local_file)
    #     print("Copied {} to {}.".format(local_file, gcs_file_path))
    
    model_output = "YOUR_OUTPUT_ARTIFACTS"
    return model_output


In [131]:
@dsl.component(base_image='YOUR_CUSTOM_CONTAINER_ARTIFACT_REGISTRY_ENDPOINT_PATH')
def stable_diffusion_model_deployment_job_op(
    model_id: str,
    task: str,
    jobId: str,
    jobName: str,
) -> NamedTuple("Outputs", [('model', str), ("endpoint_name", str), ("endpoint_id", str)]):

    # Import statements

    from google.cloud import storage
    import os
    import base64
    import glob
    from datetime import datetime
    from io import BytesIO
    from typing import NamedTuple
    import json

    import requests
    #import torch
    from google.cloud import aiplatform, storage, firestore
    from PIL import Image
    from google.cloud import storage
    from google.oauth2 import service_account
    
    def update_firestore(jobId, job_name, model_deployment_status, model_deployment_endpoint):
        """Updates Firestore collection with pipeline job details."""
        # Initialize Firestore client

        service_account_key_path = 'YOUR_SERVICE_ACCOUNT_PATH'
        credentials = service_account.Credentials.from_service_account_file(service_account_key_path)
        db = firestore.Client(project="YOUR_PROJECT_ID", credentials=credentials)

        # Define the document reference
        doc_ref = db.collection('vertexPipelineJobs').document(jobId)

        # Update the document fields
        doc_ref.set({
            "uploadJobId": jobId,
            "vertexPipelineJobId": job_name,
            "trainingStatus": "completed",
            "modelDeploymentStatus": model_deployment_status,
            "modelEndpoint": model_deployment_endpoint
        })

    model_name = "YOUR_MODEL_NAME"
    #model_name = jobId
    service_account_key_path = 'YOUR_SERVICE_ACCOUNT_KEY_PATH'
    credentials = service_account.Credentials.from_service_account_file(service_account_key_path)
    endpoint = aiplatform.Endpoint.create(display_name=f"{jobId}-{task}-endpoint", credentials=credentials)
    SERVICE_ACCOUNT = "YOUR_SERVICE_ACCOUNT_EMAIL"
    SERVE_DOCKER_URI = "YOUR_SERVING_DOCKER_CONTAINER_URI"

    # Serving environment variables
    serving_env = {
        "MODEL_ID": model_id,
        "TASK": task,
    }

    model = aiplatform.Model.upload(
        display_name=model_name,
        serving_container_image_uri=SERVE_DOCKER_URI,
        serving_container_ports=[7080],
        serving_container_predict_route="/predictions/diffusers_serving",
        serving_container_health_route="/ping",
        serving_container_environment_variables=serving_env,
    )

    updated_model_id = model.resource_name.split("/")[-1]

    # Set deployment_status to "initiated"
    deployment_status = "Initiated"
    deployment_endpoint = "Not available yet"
    update_firestore(jobId, jobName, deployment_status, deployment_endpoint)

    try:
        updated_endpoint = model.deploy(
            endpoint=endpoint,
            machine_type="n1-standard-8",
            accelerator_type="NVIDIA_TESLA_V100",
            accelerator_count=1,
            deploy_request_timeout=1800,
            service_account=SERVICE_ACCOUNT,
        )
        updated_endpoint_name = updated_endpoint.resource_name
        updated_endpoint_id = updated_endpoint_name.split("/")[-1]

        # Deployment succeeded, update deployment_status to "Completed"
        deployment_status = "Completed"
        deployment_endpoint = updated_endpoint.resource_name

    except Exception as e:
        # Deployment failed, update deployment_status to "Failed" or any other appropriate status
        deployment_status = "Failed"
        deployment_endpoint = "Not available yet"
        # Log the error or take any other actions as needed

    finally:
        # Update Firestore collection with the final deployment_status, whether it's completed or failed
        update_firestore(jobId, jobName, deployment_status, deployment_endpoint)

    # Return the model and endpoint details
    return (updated_model_id, updated_endpoint_name, updated_endpoint_id)


## Main Pipeline

In [132]:
# Define the main pipeline function
@dsl.pipeline(
    name='Ethlas Stable Diffusion Training Pipeline',
    description='A pipeline for custom training of the Stable Diffusion model'
)

def stable_diffusion_training_pipeline(
    gcs_class_data_path: str = 'gs://<YOUR_BUCKET_NAME>/class',
    gcs_instance_data_path: str = 'gs://<YOUR_BUCKET_NAME>/instance',
    jobId: str = "defaultname",
):

    # stable_diffusion_training_job_task = stable_diffusion_training_job_op(
    #     model_id="runwayml/stable-diffusion-v1-5",
    #     gcs_class_data_path=gcs_class_data_path,
    #     gcs_instance_data_path=gcs_instance_data_path,
    #     jobId=jobId
    # ).add_node_selector_constraint('NVIDIA_TESLA_T4').set_gpu_limit(1)
    from datetime import datetime
    import os

    prefix = f"ethlas-sd-{jobId}"
    #user = os.environ.get("USER")
    now = datetime.now().strftime("%Y%m%d_%H%M%S")
    job_name = f"{prefix}-{now}"
    
    
    stable_diffusion_training_job_task = stable_diffusion_training_job_op(
        model_id="runwayml/stable-diffusion-v1-5",
        gcs_class_data_path=gcs_class_data_path,
        gcs_instance_data_path=gcs_instance_data_path,
        jobId=jobId,
        jobName=job_name
    ).add_node_selector_constraint('NVIDIA_TESLA_T4').set_gpu_limit(1)
    
    stable_diffusion_training_job_task.set_cpu_request('16')
    stable_diffusion_training_job_task.set_memory_request('64Gi')

    
        
    #output_path = stable_diffusion_training_job_task.outputs["model_output"]
    stable_diffusion_model_deployment_job_task = stable_diffusion_model_deployment_job_op(
        #model_id="runwayml/stable-diffusion-v1-5",
        #model_id=stable_diffusion_training_job_task.outputs['model_output'],
        model_id =  stable_diffusion_training_job_task.output,
        task="image-to-image",
        jobId=jobId,
        jobName=job_name
    )
    
    #final_model = stable_diffusion_model_deployment_job_task.outputs["model"]
    model_id = stable_diffusion_model_deployment_job_task.outputs["model"]
    endpoint_name = stable_diffusion_model_deployment_job_task.outputs["endpoint_name"]
    endpoint_id =  stable_diffusion_model_deployment_job_task.outputs["endpoint_id"]
    
   

## Compile

The compiler takes our pipeline function and compiles it into our pipeline specifiction as json file. This json file we can use to create our pipeline in Vertex AI Pipelines.

In [133]:


# Compile and run the pipeline
if __name__ == '__main__':
    # Compile the pipeline to generate a YAML representation
    compiler.Compiler().compile(
        pipeline_func=stable_diffusion_training_pipeline,
        package_path='stable_diffusion_pipeline.yaml'
    )

## Run

Create the run job using the API.
You can also directly upload the pipeline JSON file in the Vertx AI UI.

In [114]:

job = aiplatform.PipelineJob(
    display_name="YOUR_PIPELINE_NAME",
    template_path="stable_diffusion_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    enable_caching=False,
)

In [115]:
job.run(sync=False)
#! rm stable_diffusion_pipeline.yaml

Creating PipelineJob
PipelineJob created. Resource name: projects/846468749543/locations/europe-west4/pipelineJobs/ethlas-stable-diffusion-training-pipeline-20230730040738
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/846468749543/locations/europe-west4/pipelineJobs/ethlas-stable-diffusion-training-pipeline-20230730040738')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-west4/pipelines/runs/ethlas-stable-diffusion-training-pipeline-20230730040738?project=846468749543
PipelineJob projects/846468749543/locations/europe-west4/pipelineJobs/ethlas-stable-diffusion-training-pipeline-20230730040738 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/846468749543/locations/europe-west4/pipelineJobs/ethlas-stable-diffusion-training-pipeline-20230730040738 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/846468749543/locations/europe-west4/pipelineJobs/ethlas-stable-diff