In [2]:
import os

import google.cloud.aiplatform as aip

from google_cloud_pipeline_components.experimental.custom_job import utils
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component

from datetime import datetime

In [3]:
shell_output = !gcloud config list --format 'value(core.project)' 2>/dev/null
PROJECT_ID = shell_output[0]

In [5]:
PROJECT_ID

'groupby-development'

In [4]:
BUCKET_URI = "gs://gbi_ml/classification_hackathon/pipelines"

In [9]:
BUCKET_URI

'gs://gbi_ml/classification_hackathon/pipelines'

In [10]:
! gsutil mb -l $REGION $BUCKET_URI

CommandException: Incorrect option(s) specified. Usage:

  gsutil mb [-b (on|off)] [-c <class>] [-k <key>] [-l <location>] [-p <project>]
            [--autoclass] [--retention <time>] [--pap <setting>]
            [--rpo (ASYNC_TURBO|DEFAULT)] gs://<bucket_name>...

For additional help run:
  gsutil help mb


In [11]:
! gsutil ls -al $BUCKET_URI

CommandException: One or more URLs matched no objects.


In [31]:
shell_output = !gcloud auth list 2>/dev/null
SERVICE_ACCOUNT = shell_output[2].strip()[8:]
print("Service Account:", SERVICE_ACCOUNT)

Service Account: 937725678441-compute@developer.gserviceaccount.com


In [5]:
REGION = "us-central1"

In [6]:
MACHINE_TYPE = "n1-standard"
VCPU = "16"
TRAIN_COMPUTE = MACHINE_TYPE + "-" + VCPU

In [7]:
PIPELINE_ROOT = BUCKET_URI

In [8]:
DISPLAY_NAME = 'classification_hackathon_model'

In [9]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

## Model train pipeline

In [11]:
import kfp
from pathlib import Path

from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)

In [89]:
# Custom Training
@dsl.component(
    base_image='huggingface/transformers-pytorch-gpu:4.23.1',
    packages_to_install=['fsspec==2022.11.0', 
                         'gcsfs==2022.11.0', 
                         'google-cloud-secret-manager', 
                         'google-cloud-bigquery',
                         'google-cloud-bigquery-storage',
                         'evaluate',
                         'pandas==1.5.2']
)
def custom_training(
    pretrained_model: str,
    model_dir: str,
    learning_rate: float,
    epochs: int,
    batch_size: int,
):
    """Custom training for Huggingface Sequence Classification models. 
    This function is based on the Huggingface Trainer class, and allow any Huggingface 
    pretrained models supported for sequence classification.

    Args:
        pretrained_model: Huggingface model name.
        model_output: GSC path to save model artifacts.
        label_mapping: GCS path to label mapping, has to be in JSON format.
        learning_rate: Learning rate for training.
        epochs: Number of epochs for training.
        batch_size: Batch size for training.
    """
    import pandas as pd
    import gcsfs
    import pickle
    import logging
    import json
    import numpy as np
    import evaluate
    from datasets import Dataset, DatasetDict
    from transformers import (DataCollatorWithPadding, AutoModelForSequenceClassification,
                              TrainingArguments, AutoTokenizer, Trainer,
                              EarlyStoppingCallback, IntervalStrategy
                              )
    
    logging.basicConfig(level=logging.INFO)
    
    # Use accuracy as metrics for now, can add more sophistication to compute_metrics later
    target_metric = evaluate.load("accuracy")
    PROJECT_ID = 'groupby-development'
    CHECKPOINT = pretrained_model
    
    def create_datasets(ds, tokenizer):
        '''Creates a tf.data.Dataset for train and evaluation.'''
        tokenized_datasets = ds.map((lambda examples: tokenize_function(examples, tokenizer)), batched=True)

        # To speed up training, we use only a portion of the data.
        # Use full_train_dataset and full_eval_dataset if you want to train on all the data.
        return tokenized_datasets['train'], tokenized_datasets['validation']
    
    def compute_metrics(eval_pred):
        predictions, labels = eval_pred
        predictions = np.argmax(predictions, axis=1)
        return target_metric.compute(predictions=predictions, references=labels)

    def tokenize_function(examples, tokenizer):
        '''Tokenizes text examples.'''

        return tokenizer(examples['text'], padding='max_length', truncation=True)
    
    def main():
        fs = gcsfs.GCSFileSystem(project=PROJECT_ID)
        
        # Prepare label mapping
        label_mapping='gs://gbi_ml/classification_hackathon/label2id.pickle'
        logging.info(f'Label mapping path: {label_mapping}')
        
        with fs.open(label_mapping, 'rb') as handle:
            label2id = pickle.load(handle)
        
        id_mapping='gs://gbi_ml/classification_hackathon/id2label.pickle'
        with fs.open(id_mapping, 'rb') as handle:
            id2label = pickle.load(handle)
        
        from collections import defaultdict
  
        def def_value():
            return -1

        # Defining the dict
        d = defaultdict(int, label2id)
        
        # Prepare model
        model = AutoModelForSequenceClassification.from_pretrained(
            CHECKPOINT, id2label=id2label, label2id=label2id)
        tokenizer = AutoTokenizer.from_pretrained(CHECKPOINT)
        
        train_set = 'gs://gbi_ml/classification_hackathon/bbby_train_new.csv'
        test_set = 'gs://gbi_ml/classification_hackathon/bbby_test_new.csv'
        # Prepare datasets
        with fs.open(train_set, "r") as f:
            logging.info(f'Train set path: {train_set}')
            train_df = pd.read_csv(f)[["raw_product_description", "bucket_name"]]
            train_df.rename(columns={'raw_product_description':'text'}, inplace=True)
            train_df = train_df[train_df["text"].notna()]
            train_df['label'] = train_df.bucket_name.apply(lambda x: label2id[x])
        with fs.open(test_set, "r") as f:
            logging.info(f'Validation set path: {test_set}')
            val_df = pd.read_csv(f)[["raw_product_description", 'Manual Classification Bucket']]
            val_df.rename(columns={'raw_product_description':'text'}, inplace=True)
            val_df = val_df[val_df["text"].notna()]
            val_df['label'] = val_df['Manual Classification Bucket'].apply(lambda x: d[x])

        # Convert labels to ids for training purposes
        # train_df["label"] = train_df["label"].apply(lambda x: label2id[x])
        # val_df["label"] = val_df["label"].apply(lambda x: label2id[x])
        # logging.info("Label mapping: {}".format(id2label))
        logging.info("Train set: {}".format(train_df.shape))
        logging.info("Validation set: {}".format(val_df.shape))
        
        # Prepare Huggingface Datasets
        tds = Dataset.from_pandas(train_df)
        vds = Dataset.from_pandas(val_df)
        ds = DatasetDict()

        ds['train'] = tds
        ds['validation'] = vds
        
        # Create train and validation set for tensorflow training
        train_ds, val_ds = create_datasets(ds, tokenizer)
        data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

        # Model training configuration
        
        training_args = TrainingArguments(
            output_dir='./model_output',
            learning_rate=learning_rate,
            per_device_train_batch_size=batch_size,
            per_device_eval_batch_size=batch_size,
            num_train_epochs=epochs,
            weight_decay=0.01,
            evaluation_strategy="steps",
            lr_scheduler_type="linear",
            log_level="debug",
            logging_strategy="steps",
            save_total_limit=3,
            eval_steps=100,
            save_steps=500,
            seed=42,
            save_strategy="steps",
            metric_for_best_model="accuracy",
            load_best_model_at_end=True
        )

        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=train_ds,
            eval_dataset=val_ds,
            tokenizer=tokenizer,
            data_collator=data_collator,
            compute_metrics=compute_metrics,
            callbacks=[EarlyStoppingCallback(early_stopping_patience=5, early_stopping_threshold=0.02)]
        )

        trainer.train()
        
        # Save model locally first, then copy model artifacts direct
        trainer.save_model('./model_output')
        fs.put('./model_output/', model_dir, recursive=True)
        test_metric = trainer.evaluate(val_ds)
        print(test_metric)

    main()

In [100]:
custom_training_op = utils.create_custom_training_job_op_from_component(
    custom_training, 
    machine_type='n1-standard-8',
    accelerator_type="NVIDIA_TESLA_P100",
    accelerator_count=2
)

In [101]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [102]:
PRETRAINED_MODEL = "facebook/bart-large"
DISPLAY_NAME=f'huggingface-sequence-classification-{PRETRAINED_MODEL}-{TIMESTAMP}'
LEARNING_RATE = 1e-5
EPOCHS = 10
BATCH_SIZE = 32
MODEL_DIR = f"{PIPELINE_ROOT}/model_artifacts"
MODEL_ARTIFACTS_LOCAL = "./model_artifacts"
MODEL_NAME = f"huggingface-sequence-classification-{PRETRAINED_MODEL}-{TIMESTAMP}"

In [103]:
@dsl.pipeline(name="huggingface-classification-training-pipeline")
def pipeline(
    project_id: str = PROJECT_ID,
    root_dir: str = PIPELINE_ROOT,
    pretrained_model: str = PRETRAINED_MODEL,
    learning_rate: float = LEARNING_RATE,
    epochs: int = EPOCHS,
    batch_size: int = BATCH_SIZE,
    model_name: str = MODEL_NAME,
    model_dir: str = MODEL_DIR,
):
    custom_training_task = custom_training_op(
        project=project_id,
        location=REGION,
        service_account=SERVICE_ACCOUNT,
        pretrained_model=pretrained_model,
        model_dir=model_dir,
        learning_rate=learning_rate,
        epochs=epochs,
        batch_size=batch_size
    )

In [104]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="classification_training_spec.json"
)

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="classification_training_spec.json",
    pipeline_root=PIPELINE_ROOT,
    enable_caching=False
)

job.run(service_account=SERVICE_ACCOUNT)

Creating PipelineJob




PipelineJob created. Resource name: projects/937725678441/locations/us-central1/pipelineJobs/huggingface-classification-training-pipeline-20230323095028
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/937725678441/locations/us-central1/pipelineJobs/huggingface-classification-training-pipeline-20230323095028')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/huggingface-classification-training-pipeline-20230323095028?project=937725678441
PipelineJob projects/937725678441/locations/us-central1/pipelineJobs/huggingface-classification-training-pipeline-20230323095028 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/937725678441/locations/us-central1/pipelineJobs/huggingface-classification-training-pipeline-20230323095028 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/937725678441/locations/us-central1/pipelineJobs/huggingface-classification-tra

RuntimeError: Job failed with:
code: 9
message: "The DAG failed because some tasks failed. The failed tasks are: [custom-training].; Job (project_id = groupby-development, job_id = 4272698337246838784) is failed due to the above error.; Failed to handle the job: {project_number = 937725678441, job_id = 4272698337246838784}"


## RuntimeError: CUDA out of memory. Tried to allocate 2.00 GiB (GPU 0; 15.90 GiB total capacity; 11.62 GiB already allocated; 1.84 GiB free; 13.36 GiB reserved in total by PyTorch) If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation. See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF

need either train on several GPUs of use a GPU with more memory

In [None]:
# Prediction example
# Encode the text
encoded = tokenizer(input_texts, truncation=True, padding="max_length", max_length=256, return_tensors="pt").to("cuda")

# Call the model to predict under the format of logits of 27 classes
logits = model(**encoded).logits.cpu().detach().numpy()

# Decode the result
preds = get_preds_from_logits(logits)
decoded_preds = [[id2label[i] for i, l in enumerate(row) if l == 1] for row in preds]

for text, pred in zip(input_texts, decoded_preds):
    print(text)
    print("Global feeling:", [LABEL_DICTIONARY[l] for l in pred if l.startswith("S")])
    print("Emotions:", [LABEL_DICTIONARY[l] for l in pred if l.startswith("E")])
    print("Causes:", [LABEL_DICTIONARY[l] for l in pred if l.startswith("C") and l != "C9"]) # Exclude "no cause" for simpler reading
    print("")