In [1]:
# Install the packages
! pip3 install --user --no-cache-dir --upgrade "kfp>2" "google-cloud-pipeline-components>2" \
                                        google-cloud-aiplatform

Collecting google-cloud-pipeline-components>2
  Obtaining dependency information for google-cloud-pipeline-components>2 from https://files.pythonhosted.org/packages/29/05/e2b13e7982506e6f63c2973c7a139fd8905dfd25ba5a7e03a0cb4541a76d/google_cloud_pipeline_components-2.4.1-py3-none-any.whl.metadata
  Downloading google_cloud_pipeline_components-2.4.1-py3-none-any.whl.metadata (5.9 kB)
Collecting google-cloud-aiplatform
  Obtaining dependency information for google-cloud-aiplatform from https://files.pythonhosted.org/packages/f6/67/734b8c73b8e708a24301b8a0a072ddfe936816896d12af4884e4f7bbd3b0/google_cloud_aiplatform-1.35.0-py2.py3-none-any.whl.metadata
  Downloading google_cloud_aiplatform-1.35.0-py2.py3-none-any.whl.metadata (27 kB)
Downloading google_cloud_pipeline_components-2.4.1-py3-none-any.whl (1.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m28.5 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading google_cloud_aiplatform-1.35.0-

In [2]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [1]:
import kfp
import typing
from typing import Dict
from typing import NamedTuple
from kfp import dsl
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component,
                        OutputPath,
                        InputPath)
import google.cloud.aiplatform as aip

### Pipeline config

In [10]:
#The Google Cloud project that this pipeline runs in.
PROJECT_ID = "assignment1-402316"
# The region that this pipeline runs in
REGION = "us-central1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
PIPELINE_ROOT = "gs://temp_de2023_group1"
# image registry location
IMAGE_REG = "image-repo-group1"

In [11]:
@dsl.container_component
def toxic_data_ingestion(project: str, bucket: str, data_file_name: str,  features: Output[Artifact]):

    return dsl.ContainerSpec(
        image=f'{REGION}-docker.pkg.dev/{PROJECT_ID}/{IMAGE_REG}/toxic-data-ingestor:0.0.1',
        command=[
            'python3', '/pipelines/component/src/component.py'
        ],
        args=['--project_id',project,'--bucket',bucket,'--file_name',data_file_name,'--feature_path', features.path])

In [12]:
@dsl.container_component
def toxic_data_cleaning(features: Input[Artifact], X_dtm: Output[Artifact], y: Output[Artifact]):

    return dsl.ContainerSpec(
        image=f'{REGION}-docker.pkg.dev/{PROJECT_ID}/{IMAGE_REG}/toxic-data-cleaner:0.0.1',
        command=[
            'python3', '/pipelines/component/src/component.py'
        ],
        args=['--dataset',features.path, '--X_dtm', X_dtm.path, '--y', y.path])

In [13]:
@dsl.container_component
def multilabel_classifier(project: str, X_dtm: Input[Artifact], y: Input[Artifact], model_bucket: str,  metrics: OutputPath(str)):

    return dsl.ContainerSpec(
        image=f'{REGION}-docker.pkg.dev/{PROJECT_ID}/{IMAGE_REG}/toxic-multilabel-trainer:0.0.1',
        command=[
            'python3', '/pipelines/component/src/component.py'
        ],
        args=['--project_id',project,'--X_dtm', X_dtm.path, 'y', y.path,'--model_repo',model_bucket,'--metrics_path', metrics])

### Define the pipeline

In [15]:
# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="toxic-predictor-mlp")
def pipeline(project_id: str, data_bucket: str, trainset_filename: str, model_repo: str):
    
    # The first step    
    di_op = toxic_data_ingestion(
        project=project_id,
        bucket=data_bucket,
        data_file_name=trainset_filename
    )

    # The second step 
    cleaning_op = toxic_data_cleaning(
        features=di_op.outputs['features']
    )
    
    # The third step
    training_op = multilabel_classifier(
        project=project_id,      
        X_dtm=cleaning_op.outputs['X_dtm'],
        y=cleaning_op.outputs['y'],
        model_bucket = model_repo
    )

### Compile the Pipeline

In [16]:
from kfp import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='toxic_predictor_mlp.yaml')

### Run the Pipeline

In [23]:
import google.cloud.aiplatform as aip
import os

#roman help: wat is dit / hebben wij dit gedaan? (GOOGLE_APPLICATION_CREDENTIALS)
# Before initializing, make sure to set the GOOGLE_APPLICATION_CREDENTIALS
# environment variable to the path of your service account.

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/data_de2023_group1/assignment1-402316-dc4baf177723.json"

aip.init(
    project=PROJECT_ID,
    location=REGION,
)

job = aip.PipelineJob(
    display_name="toxic-predictor-mlp-pipeline",
    template_path="toxic_predictor_mlp.yaml",
    enable_caching=False,
    pipeline_root=PIPELINE_ROOT,
    parameter_values={
        'project_id': PROJECT_ID,
        'data_bucket': 'data_de2023_group1', 
        'trainset_filename': 'train.csv',
        'model_repo':'models_de2023_group1' 
    }
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/791882449847/locations/us-central1/pipelineJobs/toxic-predictor-mlp-20231019211151
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/791882449847/locations/us-central1/pipelineJobs/toxic-predictor-mlp-20231019211151')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/toxic-predictor-mlp-20231019211151?project=791882449847
PipelineJob projects/791882449847/locations/us-central1/pipelineJobs/toxic-predictor-mlp-20231019211151 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/791882449847/locations/us-central1/pipelineJobs/toxic-predictor-mlp-20231019211151 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/791882449847/locations/us-central1/pipelineJobs/toxic-predictor-mlp-20231019211151 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/791882449847/locations/us-

RuntimeError: Job failed with:
code: 9
message: "The DAG failed because some tasks failed. The failed tasks are: [multilabel-classifier].; Job (project_id = assignment1-402316, job_id = 8606198018242248704) is failed due to the above error.; Failed to handle the job: {project_number = 791882449847, job_id = 8606198018242248704}"
