In [1]:
# Install the packages
! pip3 install --user --no-cache-dir --upgrade "kfp>2" "google-cloud-pipeline-components>2" \
                                        google-cloud-aiplatform

Collecting kfp>2
  Downloading kfp-2.9.0.tar.gz (595 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m595.6/595.6 kB[0m [31m32.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Collecting kfp-pipeline-spec==0.4.0 (from kfp>2)
  Downloading kfp_pipeline_spec-0.4.0-py3-none-any.whl.metadata (301 bytes)
Collecting kfp-server-api<2.4.0,>=2.1.0 (from kfp>2)
  Downloading kfp_server_api-2.3.0.tar.gz (84 kB)
  Preparing metadata (setup.py) ... [?25ldone


In [25]:
import kfp
from kfp.v2.dsl import Dataset, Model, Metrics, component
from google.cloud import aiplatform
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
import pandas as pd

In [27]:
#The Google Cloud project that this pipeline runs in.
PROJECT_ID = "data-engineering-labs-guido"
# The region that this pipeline runs in
REGION = "us-central1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
PIPELINE_ROOT = "gs://temp_de_2024_1645722"   # e.g., gs://temp_de2024

In [56]:
# Preprocessing Component (TF-IDF vectorization)
@component
def preprocess_data(dataset: Input[Dataset], output_data: Output[Dataset]):
    # Load data
    import logging 
    logging.basicConfig(stream=sys.stdout, level=logging.INFO) 
    data = pd.read_csv(dataset.path, index_col=None)

    messages, labels = [], []
    for line in data:
        label, message = line.split('\t', 1)
        labels.append(label.strip())
        messages.append(message.strip())

    df = pd.DataFrame({'label': labels, 'message': messages})
    df['label'] = df['label'].map({'ham': 0, 'spam': 1})

    # Save preprocessed data for train-test split
    df.to_csv(output_data + ".csv", index=False)

In [51]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn==1.3.2"],
    base_image="python:3.10.7-slim"
)
def train_test_split(dataset: Input[Dataset], dataset_train: Output[Dataset], dataset_test: Output[Dataset]):
    '''train_test_split'''
    import pandas as pd
    import logging 
    import sys
    from sklearn.model_selection import train_test_split as tts
    
    # Load preprocessed data
    df = pd.read_csv(preprocessed_data.path + ".csv")
    
    train, test = tts(alldata, test_size=0.3)
    train.to_csv(dataset_train.path + ".csv" , index=False, encoding='utf-8-sig')
    test.to_csv(dataset_test.path + ".csv" , index=False, encoding='utf-8-sig')


In [60]:
# Model Training Component (Generalized for Multiple Models)
@component
def train_model(model_type: str, dataset: Input[Dataset], model: Output[Model]):

    data = pd.read_csv(dataset.path+".csv")
    # Train the appropriate model
    if model_type == "Naive Bayes":
        model = MultinomialNB()
    elif model_type == "SVM":
        model = SVC(kernel='linear', probability=True)
    elif model_type == "Random Forest":
        model = RandomForestClassifier(n_estimators=100, random_state=42)
    else:
        raise ValueError("Unsupported model type")
       
    # Train the model
    model_lr.fit(data.drop('class',axis=1), data['class'])
    model.metadata["framework"] = model_type

    file_name = model.path + f".pkl"
    with open(file_name, 'wb') as file:  
        pickle.dump(model_lr, file)  

In [69]:
@component(
    packages_to_install=["pandas", "scikit-learn==1.3.2"],
    base_image="python:3.10.7-slim"
)
def evaluate_model(model: Input[Model], test_data: Input[Dataset],  metrics_output: Output[Metrics]):

    data = pd.read_csv(test_set.path+".csv")
    # Load the trained model
      #Loading the saved model with joblib
    m_filename = model_lr.path + ".pkl"
    model = pickle.load(open(m_filename, 'rb'))

    # Evaluate the model
    y_test = data.drop(columns=["class"])
    y_target = data['class']
    y_pred = model.predict(y_test)    
    report = classification_report(y_test, y_pred, output_dict=True)

    # Save evaluation metrics
    metrics_output.log_metric("accuracy", report["accuracy"])
    metrics_output.log_metric("precision", report["weighted avg"]["precision"])
    metrics_output.log_metric("recall", report["weighted avg"]["recall"])
    metrics_output.log_metric("f1_score", report["weighted avg"]["f1-score"])

In [70]:
@dsl.component(
    packages_to_install=["google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def upload_model_to_gcs(project_id: str, model_repo: str, model: Input[Model]):
    from google.cloud import storage
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)    
    
    client = storage.Client(project=project_id)
    bucket = client.bucket(model_repo)
    blob = bucket.blob('spam_models.pkl')
    blob.upload_from_filename(model.path + ".pkl")
    print(f"Model uploaded to GCS bucket: {model_repo}")


In [78]:
# Pipeline definitie
@dsl.pipeline(
    name="spam-classifier-training-pipeline"
)
def spam_classifier_pipeline(project_id: str, data_bucket: str, dataset_uri: str, model_repo: str, thresholds_dict_str: str = "{'accuracy': 0.9, 'precision': 0.9, 'recall': 0.9}"):
    
    dataset_op = kfp.dsl.importer(
        artifact_uri=dataset_uri,
        artifact_class=Dataset,
        reimport=False,
    )    
    
    # Preprocessing stap
    preprocess_op = preprocess_data(dataset=dataset_op.output)
    
    # Train-test split stap
    train_test_split_op = train_test_split(dataset=preprocess_op.output)
    
    # Train verschillende modellen
    train_nb_run_op = train_model(dataset=train_test_split_op.outputs["dataset_train"], model_type = 'Naive Bayes')

    train_svm_run_op = train_model(dataset=train_test_split_op.outputs["dataset_train"], model_type = 'SVM')
    
    train_rf_run_op = train_model(dataset=train_test_split_op.outputs["dataset_train"], model_type = 'Random Forrest')
    
    
    model_evaluation_nb_op = evaluate_model(
        test_data=train_test_split_op.outputs["dataset_test"],
        model=train_nb_run_op.outputs["model"]
    )    
    
    model_evaluation_svm_op = evaluate_model(
        test_data=train_test_split_op.outputs["dataset_test"],
        model=train_svm_run_op.outputs["model"]
    )     
    model_evaluation_rf_op = evaluate_model(
        test_data=train_test_split_op.outputs["dataset_test"],
        model=train_rf_run_op.outputs["model"]
    )         
    upload_model_task = upload_model_to_gcs(
        project_id=project_id,
        model_repo=model_repo,
        model=train_nb_run_op.outputs['model']
    )
    
    upload_model_task = upload_model_to_gcs(
        project_id=project_id,
        model_repo=model_repo,
        model=train_svm_run_op.outputs['model']
    )

    upload_model_task = upload_model_to_gcs(
        project_id=project_id,
        model_repo=model_repo,
        model=train_rf_run_op.outputs['model']
    )


In [79]:
from kfp import compiler
compiler.Compiler().compile(spam_classifier_pipeline, 'spam_pipeline.yaml')

import google.cloud.aiplatform as aip
aip.init(project=PROJECT_ID, location=REGION)

job = aip.PipelineJob(
    display_name="spam_classification",
    template_path="spam_pipeline.yaml",
    parameter_values={
        'project_id': PROJECT_ID,
        'model_repo': PIPELINE_ROOT
    }
)
job.run()


Creating PipelineJob
PipelineJob created. Resource name: projects/371925011722/locations/us-central1/pipelineJobs/spam-classifier-training-pipeline-20241023112254
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/371925011722/locations/us-central1/pipelineJobs/spam-classifier-training-pipeline-20241023112254')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/spam-classifier-training-pipeline-20241023112254?project=371925011722


RuntimeError: Job failed with:
code: 3
message: "The task is missing input parameter spec data_bucket required by the component. Task: Project number: 371925011722, Job id: 6720776644530798592, Task id: -2682534662097600512, Task name: spam-classifier-training-pipeline-20241023112254, Task state: QUOTA_READY, Execution name: projects/371925011722/locations/us-central1/metadataStores/default/executions/17872320001161171753"
