<a target="_blank" href="https://colab.research.google.com/github/Deanis/MLEngineering_Capstone_Group3/blob/main/notebooks/pipeline.i:pynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

## Setup

In [12]:
!pip install kfp pandas transformers google-cloud-storage google-cloud-aiplatform




In [13]:
import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component
import pandas as pd
from google.cloud import storage
from transformers import DistilBertTokenizer, TFDistilBertForSequenceClassification
from sklearn.model_selection import train_test_split
import tensorflow as tf
from google.cloud import aiplatform
import os
from typing import NamedTuple


# Constants
bucket_name = 'blank-to-bard'
file_name = 'english-dataset.csv'
project_id = 'ml-class-group-3-capstone'
model_name = 'blank-to-bard'


## Components

In [14]:
@dsl.component(packages_to_install=['pandas', 'numpy', 'google-cloud-storage', 'transformers', 'scikit-learn', 'tensorflow'])
def train_model(bucket_name: str, file_name: str):
    from google.cloud import storage
    import pandas as pd
    from io import BytesIO
    import pandas as pd
    import numpy as np
    from io import BytesIO
    from transformers import DistilBertTokenizer
    from sklearn.model_selection import train_test_split
    import tensorflow as tf
    from collections import namedtuple

    # Initialize a client
    storage_client = storage.Client()

    # Access the bucket and the file
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(file_name)

    # Download the data into a pandas dataframe
    data = blob.download_as_text()
    data = pd.read_csv(BytesIO(bytes(data, 'utf-8')))

    from transformers import DistilBertTokenizer, TFDistilBertForSequenceClassification
    from sklearn.model_selection import train_test_split
    import tensorflow as tf

    # Tokenizer
    tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')

    # Model
    model = TFDistilBertForSequenceClassification.from_pretrained('distilbert-base-uncased')

    # Tokenize the data
    input_ids = []
    attention_masks = []

    for text in data['text']:
        inputs = tokenizer.encode_plus(text, add_special_tokens=True, max_length=128, pad_to_max_length=True,
                                      return_attention_mask=True, return_tensors='tf')
        input_ids.append(inputs['input_ids'])
        attention_masks.append(inputs['attention_mask'])

    input_ids = tf.concat(input_ids, axis=0)
    attention_masks = tf.concat(attention_masks, axis=0)

    # Ensure labels are in the same order and format as the inputs
    labels = tf.convert_to_tensor(data['label'])

    # Convert TensorFlow tensors to numpy arrays before splitting
    input_ids = input_ids.numpy()
    attention_masks = attention_masks.numpy()
    labels = labels.numpy()

    # Split the data
    train_inputs, validation_inputs, train_labels, validation_labels = train_test_split(input_ids, labels, random_state=2023, test_size=0.2)

    # Convert numpy arrays back to tensors
    train_inputs = tf.convert_to_tensor(train_inputs)
    validation_inputs = tf.convert_to_tensor(validation_inputs)
    train_labels = tf.convert_to_tensor(train_labels)
    validation_labels = tf.convert_to_tensor(validation_labels)

    # Compile the model
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=5e-5),
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                  metrics=['accuracy'])

    # Train the model
    model.fit([train_inputs, attention_masks[:len(train_inputs)]], train_labels, batch_size=32, epochs=4, validation_split=0.1)


    # Define model directory
    model_dir = './pipeline-model'

    model.save_pretrained(model_dir, saved_model=True)

    import os

    # Define GCS bucket
    bucket = storage_client.bucket(bucket_name)

    # Walk through all directories and sub-directories
    for root, dirs, files in os.walk(model_dir):
        for file in files:
            local_file = os.path.join(root, file)
            remote_file = local_file[len(model_dir):]

            # Ensure the path doesn't start with a slash
            if remote_file.startswith('/'):
                remote_file = remote_file[1:]

            blob = bucket.blob(f'pipeline-model/{remote_file}')
            blob.upload_from_filename(local_file)



@dsl.component(packages_to_install=['google-cloud-aiplatform'])
def upload_model_to_vertex_ai_op(
    project_id: str,
    bucket_name: str,
    model_name: str,
):
    from google.cloud import aiplatform

    aiplatform.init(project=project_id)

    model = aiplatform.Model.upload(
        display_name=model_name,
        artifact_uri="gs://" + bucket_name + "/pipeline-model/saved_model/1/",
        serving_container_image_uri='gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest',
    )

    model.wait()

## Pipeline

In [17]:
@dsl.pipeline(
    name='Training pipeline',
    description='A pipeline that downloads, pre-processes data, trains a model, saves it and deploys it.'
)
def training_pipeline(bucket_name: str, file_name: str, project_id: str, model_name: str):
    train_op = train_model(bucket_name=bucket_name, file_name=file_name)
    upload_model_op = upload_model_to_vertex_ai_op(project_id=project_id, bucket_name=bucket_name, model_name=model_name)

    upload_model_op.after(train_op)



## Compile

In [18]:
# Compile the pipeline
compiler.Compiler().compile(
    pipeline_func=training_pipeline,
    package_path='training_pipeline.json'
)
