In [68]:
PROJECT_ID = "hardy-audio-435612-k2"
REGION = "europe-west4"
BUCKET_NAME = "jads-assignment-1-bucket"
PIPELINE_ROOT = f"gs://{BUCKET_NAME}"
DATASET_FILE_NAME = "salary.csv" # The name of the dataset file in the bucket.
PIPELINE_FILE_NAME = "salary-predictor-pipeline.yaml" # Any name, does not matter.
SERVICE_ACCOUNT_NAME = "assignment-1-pipeline@hardy-audio-435612-k2.iam.gserviceaccount.com" # The email adres of the service account.


In [69]:
from typing import NamedTuple
from kfp import dsl, compiler
from kfp.dsl import (Dataset,
                        Input,
                        Model,
                        Output)
import google.cloud.aiplatform as aip

# Components

In [70]:
@dsl.component(
    packages_to_install=["google-cloud-storage"],
    base_image="python:3.10.11-slim"
)
def download_data(project_id: str, bucket: str, file_name: str, dataset: Output[Dataset]):
    from google.cloud import storage
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Downloaing the file from a google bucket
    client = storage.Client(project=project_id)
    bucket = client.bucket(bucket)
    blob = bucket.blob(file_name)
    blob.download_to_filename(dataset.path + ".csv")
    logging.info('Downloaded Data!')

In [71]:
@dsl.component(
    packages_to_install=['pandas', "scikit-learn"],
    base_image="python:3.10.11-slim"
)
def train(features: Input[Dataset], out_model: Output[Model]) -> NamedTuple('outputs', metrics=dict):
    import pandas as pd
    import logging 
    import sys
    import pickle
    from sklearn.linear_model import LinearRegression
    from sklearn.metrics import r2_score, mean_absolute_error, root_mean_squared_error
        
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    df = pd.read_csv(features.path+".csv")
    
    logging.info(df.columns)
    
    X = pd.DataFrame(df["YearsExperience"])
    y = df["Salary"]

    model = LinearRegression().fit(X, y)

    predictions = model.predict(X)
    metrics_dict = {
        "r2": r2_score(y, predictions),
        "MAE": mean_absolute_error(y, predictions),
        "RMSE": root_mean_squared_error(y, predictions),
    }   
    logging.info(metrics_dict)   
    
    # Save the model
    out_model.metadata["file_type"] = ".pkl"
    out_model.metadata["algo"] = "lr"
   # Save the model
    m_file = out_model.path + ".pkl"
    with open(m_file, "wb") as f:
        pickle.dump(model, f)
        
    outputs = NamedTuple('outputs', metrics=dict)
    return outputs(metrics_dict)

In [72]:
@dsl.component(
    packages_to_install=["google-cloud-storage"],
    base_image="python:3.10.11-slim"
)
def upload_model(project_id: str, model_repo: str, model: Input[Model]):
    from google.cloud import storage   
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)    
  
    # upload the model to GCS
    client = storage.Client(project=project_id)
    bucket = client.bucket(model_repo)
    blob = bucket.blob(str(model.metadata["algo"]) + '_model' + str(model.metadata["file_type"])) 
    blob.upload_from_filename(model.path + str(model.metadata["file_type"]))       
    
    logging.info("Saved the model to GCP bucket : " + model_repo)

# Pipeline

In [73]:
@dsl.pipeline(
    name="salary-predictor-pipeline")
def pipeline(project_id: str, data_bucket: str, trainset_filename: str, model_repo: str):
    op_download_data = download_data(
        project_id=project_id,
        bucket=data_bucket,
        file_name=trainset_filename
    )

    op_train = train(
        features=op_download_data.outputs["dataset"]
    )
    
    upload_model(
        project_id=project_id,
        model_repo=model_repo,
        model=op_train.outputs['out_model']
    )

## Compile the pipeline

In [74]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path=PIPELINE_FILE_NAME)

## Submit the pipeline

In [75]:
%env GOOGLE_APPLICATION_CREDENTIALS=credentials.json

aip.init(
    project=PROJECT_ID,
    location=REGION
)

# Prepare the pipeline job
job = aip.PipelineJob(
    display_name=PIPELINE_FILE_NAME,
    enable_caching=False,
    template_path=PIPELINE_FILE_NAME,
    pipeline_root=PIPELINE_ROOT,
    location=REGION,
    parameter_values={
        'project_id': PROJECT_ID,
        'data_bucket': BUCKET_NAME,
        'model_repo': BUCKET_NAME,
        'trainset_filename': DATASET_FILE_NAME 
    },
)

job.run(service_account="assignment-1-pipeline@hardy-audio-435612-k2.iam.gserviceaccount.com")

env: GOOGLE_APPLICATION_CREDENTIALS=credentials.json
Creating PipelineJob
PipelineJob created. Resource name: projects/910815829910/locations/europe-west4/pipelineJobs/salary-predictor-pipeline-20241015155237
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/910815829910/locations/europe-west4/pipelineJobs/salary-predictor-pipeline-20241015155237')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-west4/pipelines/runs/salary-predictor-pipeline-20241015155237?project=910815829910
PipelineJob projects/910815829910/locations/europe-west4/pipelineJobs/salary-predictor-pipeline-20241015155237 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/910815829910/locations/europe-west4/pipelineJobs/salary-predictor-pipeline-20241015155237 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/910815829910/locations/europe-west4/pipelineJobs/salary-predictor-pipeline-20241015155237 curr