In [1]:
%load_ext lab_black

In [2]:
import kfp
from kfp import dsl
from kfp.components import InputPath, OutputPath
from kfp.components import create_component_from_func

In [3]:
BASE_IMAGE = "python:3.8-slim"

In [4]:
def s3_adapter(bucket_name: str, labels: list, output_dir_path: OutputPath()):

    import io
    import os
    import dill
    import boto3
    import torch
    import pandas as pd
    import transformers
    from io import BytesIO
    from contextlib import contextmanager
    from tempfile import NamedTemporaryFile
    from transformers import PretrainedConfig, PreTrainedModel

    class S3Adapter:
        def __init__(self, labels):
            self.labels = labels

        def folder_exists_and_not_empty(self, bucket_name: str, key: str) -> bool:

            if not key.endswith("/"):
                key = key + "/"
            resp = self.s3_client.list_objects(
                Bucket=bucket_name, Prefix=key, Delimiter="/", MaxKeys=1
            )
            return "Contents" in resp

        @contextmanager
        def s3_fileobj(self, bucket_name, key):

            """
            Yields a file object from the filename at {bucket}/{key}

            Args:
                bucket (str): Name of the S3 bucket where you model is stored
                key (str): Relative path from the base of your bucket, including the filename and extension of the object to be retrieved.
            """

            obj = self.s3_client.get_object(Bucket=bucket_name, Key=key)
            yield BytesIO(obj["Body"].read())

        def get_data_from_s3(self, bucket_name, prefix, data_file):

            data_object = self.s3_client.get_object(
                Bucket=bucket_name, Key=os.path.join(prefix, data_file)
            )["Body"]
            return data_object

        def get_model_tokenizer_from_s3(
            self, bucket_name, prefix, model_name="pytorch_model"
        ):

            if not self.folder_exists_and_not_empty(
                bucket_name=bucket_name, key=prefix
            ):
                model = transformers.BertForSequenceClassification.from_pretrained(
                    "bert-base-uncased", num_labels=len(self.labels)
                )
                tokenizer = transformers.BertTokenizer.from_pretrained(
                    "bert-base-uncased"
                )
                self.save_model_tokenizer_into_s3(
                    bucket_name=bucket_name,
                    prefix=prefix,
                    model_name=model_name,
                    model=model,
                    tokenizer=tokenizer,
                )
                return model, tokenizer

            else:
                tempfile = NamedTemporaryFile()
                with self.s3_fileobj(
                    bucket_name=bucket_name, key=f"{prefix}/{model_name}.bin"
                ) as f:
                    tempfile.write(f.read())

                with self.s3_fileobj(
                    bucket_name=bucket_name, key=f"{prefix}/config.json"
                ) as f:
                    dict_data = json.load(f)
                    print(type(dict_data))
                    config = PretrainedConfig.from_dict(dict_data)

                #                 model = PreTrainedModel.from_pretrained(tempfile.name, config=config)
                model = transformers.BertForSequenceClassification.from_pretrained(
                    tempfile.name, config=config
                )
                tokenizer = self.get_tokenizer_from_s3(bucket_name, prefix)
                return model, tokenizer

        def get_tokenizer_from_s3(self, bucket_name, prefix):

            tempfile = NamedTemporaryFile()
            with self.s3_fileobj(
                bucket_name=bucket_name, key=f"{prefix}/vocab.txt"
            ) as f:
                tempfile.write(f.read())
            tokenizer = transformers.BertTokenizer.from_pretrained(tempfile.name)
            return tokenizer

        def save_model_tokenizer_into_s3(
            self, bucket_name, prefix, model, model_name="pytorch_model", tokenizer=None
        ):
            buffer = io.BytesIO()
            config_string = model.config.to_json_string()
            self.s3_client.put_object(
                Bucket=bucket_name, Key=f"{prefix}/config.json", Body=config_string
            )
            torch.save(model.state_dict(), buffer)
            self.s3_client.put_object(
                Bucket=bucket_name,
                Key=f"{prefix}/{model_name}.bin",
                Body=buffer.getvalue(),
            )
            if tokenizer:
                vocab_string = ""
                for token, token_id in tokenizer.vocab.items():
                    vocab_string = vocab_string + token + "\n"

                vocab_string = vocab_string.strip()
                self.s3_client.put_object(
                    Bucket=bucket_name, Key=f"{prefix}/vocab.txt", Body=vocab_string
                )

    os.makedirs(output_dir_path, exist_ok=True)
    s3_adpater_obj = S3Adapter(labels=labels)
    s3_pickle_path = os.path.join(output_dir_path, "s3_adapter.pkl")
    with open(s3_pickle_path, "wb") as f:
        dill.dump(s3_adpater_obj, f)

In [5]:
def create_data_loader(
    input_dir_path: InputPath(), labels: list, mode: str, output_dir_path: OutputPath()
):

    import os
    import os
    import dill
    import torch
    import transformers
    from torch.utils.data import Dataset

    class LabelDataset(Dataset):
        """

        LabelDataset class - to load the dataset used the __getitem__ fashion supported by the Pytorch.
        The loader supports the JSON and the csv format for parsing the input to the network.
            :param mode: mode of Label classifier model (train, eval or serve)
            :param text: input text for train, eval and serve components
            :param label: output label for train and eval components
            :param func_test: True for functional testing of package
            :param tokenizer: tokenizer from huggingface library


        """

        def __init__(self, labels, mode="serve", max_length=512, text=None, label=None):
            self.mode = mode
            self.labels = labels
            self.max_length = max_length

        def __getitem__(self, item):
            """

            Returns tokenized tensors for text and label(if mode is train or eval) for the given index.
                :param: item: index to fetch the data.
                :returns dict: dictionary of tensors containing input_ids, attention_mask,
                               token_type_ids and label(output if mode is train or eval)

            """
            if self.mode in ["train_eval", "train", "eval"]:
                text = str(self.text[item])
                processed_text = " ".join(text.split())
                label = self.label[item]
                inputs = self.tokenizer.encode_plus(
                    processed_text,
                    None,
                    add_special_tokens=True,
                    max_length=self.max_length,
                    truncation=True,
                )
                padding_length = self.max_length - len(inputs.get("input_ids"))
                input_ids = inputs.get("input_ids") + ([0] * padding_length)
                attention_mask = inputs.get("attention_mask") + ([0] * padding_length)
                token_type_ids = inputs.get("token_type_ids") + ([0] * padding_length)
                return {
                    "input_ids": torch.tensor(input_ids, dtype=torch.long),
                    "attention_mask": torch.tensor(attention_mask, dtype=torch.long),
                    "token_type_ids": torch.tensor(token_type_ids, dtype=torch.long),
                    "label": torch.tensor(
                        self.labels.index(label.strip()), dtype=torch.long
                    ),
                }
            elif self.mode == "serve":
                text = str(self.text[item])
                text = " ".join(text.split())
                inputs = self.tokenizer.encode_plus(
                    text,
                    None,
                    add_special_tokens=True,
                    max_length=self.max_length,
                    truncation=True,
                )
                padding_length = self.max_length - len(inputs.get("input_ids"))
                input_ids = inputs.get("input_ids") + ([0] * padding_length)
                attention_mask = inputs.get("attention_mask") + ([0] * padding_length)
                token_type_ids = inputs.get("token_type_ids") + ([0] * padding_length)
                return {
                    "text": text,
                    "input_ids": torch.tensor(input_ids, dtype=torch.long),
                    "attention_mask": torch.tensor(attention_mask, dtype=torch.long),
                    "token_type_ids": torch.tensor(token_type_ids, dtype=torch.long),
                }

        def __len__(self):
            """

            Returns length of dataset
                :returns int: length of dataset

            """
            return len(self.text)

    os.makedirs(output_dir_path, exist_ok=True)
    with open(os.path.join(input_dir_path, "s3_adapter.pkl"), "rb") as f:
        s3_adapter_obj = dill.load(f)
    with open(os.path.join(output_dir_path, "s3_adapter.pkl"), "wb") as f:
        dill.dump(s3_adapter_obj, f)

    dataset_object = LabelDataset(mode=mode, labels=labels)
    # Writing LabelBackbone class into model.pkl file
    with open(os.path.join(output_dir_path, "dataset.pkl"), "wb") as f:
        dill.dump(dataset_object, f)

In [6]:
def train(
    input_dir_path: InputPath(),
    bucket_name: str,
    prefix: str,
    data_file: str,
    labels: list,
    num_epochs: int,
    batch_size: int,
    device: str,
    output_dir_path: OutputPath(),
):

    """

    Training method for Label Classifier. Saves the model after training of model is completed
        :param num_epochs: Number of epochs for training purpose


    """

    import io
    import os
    import dill
    import json
    import boto3
    import torch
    import numpy as np
    import pandas as pd
    import transformers
    from tqdm import tqdm
    import torch.nn as nn
    from transformers import AdamW
    import torch.nn.functional as f
    from torch.utils.data import DataLoader

    accuracy = []
    buffer = io.BytesIO()
    s3_client = boto3.client("s3")
    os.makedirs(output_dir_path, exist_ok=True)

    class LabelBackbone(nn.Module):
        """

        LabelBackbone - Backbone model class for Label Classifier
            :param model: Transformer model name from config.TRANSFORMER_MODEL_LIST or saved transformer model name
            :param tokenizer: tokenizer from huggingface library
            :returns Object of LabelBackbone Model

        """

        def __init__(self, model):
            super().__init__()
            self.model = model
            self.drop_out = nn.Dropout(p=0.3)

        def forward(self, **kwargs):
            return self.model(**kwargs)
            #             final_output = self.linear(self.drop_out(po))
            return final_output

    with open(os.path.join(input_dir_path, "s3_adapter.pkl"), "rb") as f:
        s3_adapter_obj = dill.load(f)

    setattr(s3_adapter_obj, "s3_client", s3_client)
    pretrained_model, tokenizer = s3_adapter_obj.get_model_tokenizer_from_s3(
        bucket_name=bucket_name, prefix=f"{prefix}/pretrained"
    )

    #     pretrained_model = s3_adapter_obj.get_model_from_s3(
    #         bucket_name=bucket_name, prefix=f"{prefix}/pretrained_model"
    #     )
    backbone_model = LabelBackbone(model=pretrained_model)

    loss_function = nn.CrossEntropyLoss().to(device)
    optimizer = AdamW(backbone_model.parameters(), lr=3e-5)

    dataframe = pd.read_csv(
        s3_adapter_obj.get_data_from_s3(
            bucket_name=bucket_name, prefix=prefix, data_file=data_file
        )
    )
    print(dataframe.head())
    text = dataframe.loc[:, "text"]
    label = dataframe.loc[:, "label"]

    with open(os.path.join(input_dir_path, "dataset.pkl"), "rb") as f:
        train_dataset = dill.load(f)
    setattr(train_dataset, "text", text)
    setattr(train_dataset, "label", label)
    setattr(train_dataset, "mode", "train")
    setattr(train_dataset, "tokenizer", tokenizer)
    train_data_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    try:
        backbone_model.to(device)
        backbone_model.train()  # Call the train method from the nn.Module base class
        print("Starting the Training Loop ..")  # Training loop start
        for epoch in range(num_epochs):
            train_loss = 0
            train_accuracy = 0
            print(f"[INFO] Epoch {epoch + 1} Started..")
            for index, batch in tqdm(enumerate(train_data_loader)):
                print(
                    f"[INFO] [TRAINING] Epoch {epoch + 1} Iteration {index + 1} Running.."
                )
                optimizer.zero_grad()
                input_ids = batch["input_ids"].to(device)
                attention_mask = batch["attention_mask"].to(device)
                token_type_ids = batch["token_type_ids"].to(device)
                label = batch["label"].to(device).to(device)
                output = backbone_model(
                    input_ids=input_ids, attention_mask=attention_mask, labels=label
                )
                loss = output[0]
                print(output[1])
                print("loss", loss)
                loss.backward()
                optimizer.step()
                train_loss = train_loss + loss.item()
                _, hypothesis = torch.max(output[1], dim=1)
                train_accuracy = (
                    train_accuracy + torch.sum(torch.tensor(hypothesis == label)).item()
                )
            train_accuracy = train_accuracy / (len(train_data_loader) * batch_size)
            accuracy.append(train_accuracy)
            train_loss = train_loss / (len(train_data_loader) * batch_size)
            train_st = f"Training Loss: {train_loss} Train Accuracy: {train_accuracy}"
            print(f"Epoch: {epoch+1} {train_st}")
            s3_adapter_obj.save_model_tokenizer_into_s3(
                bucket_name=bucket_name,
                prefix=f"{prefix}/finetuned",
                model=backbone_model.model,
            )
        print("Model has been successfully built..")

    except (RuntimeError, MemoryError, ValueError, TypeError) as e:
        print("Training Exception Occurred")
        raise RuntimeError("Training Exception Occurred")

In [7]:
s3_adapter_comp = create_component_from_func(
    s3_adapter,
    base_image=BASE_IMAGE,
    packages_to_install=["boto3", "dill", "pandas", "transformers", "torch"],
)
dataset_comp = create_component_from_func(
    create_data_loader,
    base_image=BASE_IMAGE,
    packages_to_install=["torch", "dill", "transformers"],
)
train_comp = create_component_from_func(
    train,
    base_image=BASE_IMAGE,
    packages_to_install=["torch", "pandas", "transformers", "dill", "boto3", "numpy"],
)

In [8]:
@kfp.dsl.pipeline(name="label-classifier-training-pipeline")
def model_pipeline(labels: list):
    mode = "train"
    prefix = "kf-label-classifier"
    bucket_name = "mlops-kubeflow"
    labels = [
        "Action with Deadline",
        "Announcement",
        "Appreciation",
        "Action",
        "Others",
    ]
    s3_adapter_task = s3_adapter_comp(bucket_name=bucket_name, labels=labels)
    dataset_task = dataset_comp(
        input_dir=s3_adapter_task.output, labels=labels, mode=mode
    )
    train_task = train_comp(
        input_dir=dataset_task.output,
        bucket_name=bucket_name,
        prefix=prefix,
        data_file="label.csv",
        labels=labels,
        num_epochs=10,
        batch_size=2,
        device="cpu",
    )

In [9]:
EXPERIMENT_NAME = "label_classifier"
HOST = "https://kubeflow-workos-slvr.anthem.com"
namespace = "chaluvadi-avinash"
session_cookie = "MTY0NTQyNTI5M3xOd3dBTkRkVVRUTkxTa3RVVkVnM1MxTkhUVTFKVmxCQ05WQXpVVTFFVUVSSE0wdE9RVXRGTlROSVRsQkhRVU5NV2xkWU4wRktSRkU9fDyBx8kiey-TPSimHemde3ySqxXCVyP_8OhlH06bBMu3"
client = kfp.Client(
    host=f"{HOST}/pipeline",
    cookies=f"authservice_session={session_cookie}",
    namespace=namespace,
    ssl_ca_cert="./root.pem",
)
experiment = client.create_experiment(name=EXPERIMENT_NAME, namespace=namespace)
client.create_run_from_pipeline_func(
    pipeline_func=model_pipeline,
    arguments={},
    experiment_name=EXPERIMENT_NAME,
    namespace=namespace,
)

RunPipelineResult(run_id=bc731be4-c0da-4e75-af17-cb3df61184cd)

In [28]:
import boto3

s3 = boto3.resource("s3")
bucket = s3.Bucket("mlops-kubeflow")
# bucket.objects.filter(Prefix="kf-label-classifier").delete()
s3.Bucket("mlops-kubeflow").upload_file("label.csv", "kf-label-classifier/label.csv")

In [71]:
def train(
    bucket_name: str,
    prefix: str,
    data_file: str,
    labels: list,
    num_epochs: int,
    batch_size: int,
    device: str,
):

    """

    Training method for Label Classifier. Saves the model after training of model is completed
        :param num_epochs: Number of epochs for training purpose


    """

    import io
    import os
    import dill
    import json
    import boto3
    import torch
    import numpy as np
    import pandas as pd
    import transformers
    from tqdm import tqdm
    import torch.nn as nn
    from transformers import AdamW
    import torch.nn.functional as f
    from torch.utils.data import DataLoader
    from contextlib import contextmanager
    from tempfile import NamedTemporaryFile
    from transformers import PretrainedConfig, PreTrainedModel

    buffer = io.BytesIO()
    s3_client = boto3.client("s3")
    #     os.makedirs(output_dir_path, exist_ok=True)

    class LabelBackbone(nn.Module):
        """

        LabelBackbone - Backbone model class for Label Classifier
            :param model: Transformer model name from config.TRANSFORMER_MODEL_LIST or saved transformer model name
            :param tokenizer: tokenizer from huggingface library
            :returns Object of LabelBackbone Model

        """

        def __init__(self, model):
            super().__init__()
            self.model = model
            self.drop_out = nn.Dropout(p=0.3)

        def forward(self, **kwargs):
            return self.model(**kwargs)
            #             final_output = self.linear(self.drop_out(po))
            return final_output

    class S3Adapter:
        def __init__(self, labels, client):
            self.s3_client = client
            self.labels = labels

        def folder_exists_and_not_empty(self, bucket_name: str, key: str) -> bool:

            if not key.endswith("/"):
                key = key + "/"
            resp = self.s3_client.list_objects(
                Bucket=bucket_name, Prefix=key, Delimiter="/", MaxKeys=1
            )
            return "Contents" in resp

        @contextmanager
        def s3_fileobj(self, bucket_name, key):

            """
            Yields a file object from the filename at {bucket}/{key}

            Args:
                bucket (str): Name of the S3 bucket where you model is stored
                key (str): Relative path from the base of your bucket, including the filename and extension of the object to be retrieved.
            """

            obj = self.s3_client.get_object(Bucket=bucket_name, Key=key)
            yield io.BytesIO(obj["Body"].read())

        def get_data_from_s3(self, bucket_name, prefix, data_file):

            data_object = self.s3_client.get_object(
                Bucket=bucket_name, Key=os.path.join(prefix, data_file)
            )["Body"]
            return data_object

        def get_model_tokenizer_from_s3(self, bucket_name, prefix, model_name="model"):

            if not self.folder_exists_and_not_empty(
                bucket_name=bucket_name, key=prefix
            ):
                model = transformers.BertForSequenceClassification.from_pretrained(
                    "bert-base-uncased", num_labels=5
                )
                tokenizer = transformers.BertTokenizer.from_pretrained(
                    "bert-base-uncased"
                )
                self.save_model_tokenizer_into_s3(
                    bucket_name=bucket_name,
                    prefix=prefix,
                    model_name=model_name,
                    model=model,
                    tokenizer=tokenizer,
                )
                return model, tokenizer

            else:
                tempfile = NamedTemporaryFile()
                with self.s3_fileobj(
                    bucket_name=bucket_name, key=f"{prefix}/{model_name}.bin"
                ) as f:

                    tempfile.write(f.read())

                with self.s3_fileobj(
                    bucket_name=bucket_name, key=f"{prefix}/config.json"
                ) as f:
                    dict_data = json.load(f)
                    config = PretrainedConfig.from_dict(dict_data)
                #                 model = PreTrainedModel.from_pretrained(
                #                     tempfile.name, config=config, num_labels=5
                #                 )
                model2 = transformers.BertForSequenceClassification.from_pretrained(
                    tempfile.name, config=config
                )
                tokenizer = self.get_tokenizer_from_s3(bucket_name, prefix)
                return model, tokenizer

        def get_tokenizer_from_s3(self, bucket_name, prefix):

            tempfile = NamedTemporaryFile()
            with self.s3_fileobj(
                bucket_name=bucket_name, key=f"{prefix}/vocab.txt"
            ) as f:
                tempfile.write(f.read())
            tokenizer = transformers.BertTokenizer.from_pretrained(tempfile.name)
            return tokenizer

        def save_model_tokenizer_into_s3(
            self, bucket_name, prefix, model, model_name, tokenizer=None
        ):
            buffer = io.BytesIO()
            config_string = model.config.to_json_string()
            self.s3_client.put_object(
                Bucket=bucket_name, Key=f"{prefix}/config.json", Body=config_string
            )
            torch.save(model.state_dict(), buffer)
            self.s3_client.put_object(
                Bucket=bucket_name,
                Key=f"{prefix}/{model_name}.bin",
                Body=buffer.getvalue(),
            )
            if tokenizer:
                vocab_string = ""
                for token, token_id in tokenizer.vocab.items():
                    vocab_string = vocab_string + token + "\n"

                vocab_string = vocab_string.strip()
                self.s3_client.put_object(
                    Bucket=bucket_name, Key=f"{prefix}/vocab.txt", Body=vocab_string
                )

    s3_adapter_obj = S3Adapter(labels=labels, client=s3_client)
    pretrained_model, tokenizer = s3_adapter_obj.get_model_tokenizer_from_s3(
        bucket_name=bucket_name, prefix=f"{prefix}/pretrained"
    )

    #     pretrained_model = s3_adapter_obj.get_model_from_s3(
    #         bucket_name=bucket_name, prefix=f"{prefix}/pretrained_model"
    #     )
    backbone_model = LabelBackbone(model=pretrained_model)

    loss_function = nn.CrossEntropyLoss().to(device)
    optimizer = AdamW(backbone_model.parameters(), lr=3e-5)

    dataframe = pd.read_csv(
        s3_adapter_obj.get_data_from_s3(
            bucket_name=bucket_name, prefix=prefix, data_file=data_file
        )
    )
    print(dataframe.head())
    text = dataframe.loc[:, "text"]
    label = dataframe.loc[:, "label"]

    with open(os.path.join(input_dir_path, "dataset.pkl"), "rb") as f:
        train_dataset = dill.load(f)
    setattr(train_dataset, "text", text)
    setattr(train_dataset, "label", label)
    setattr(train_dataset, "mode", "train")
    setattr(train_dataset, "tokenizer", tokenizer)
    train_data_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    try:
        backbone_model.to(device)
        backbone_model.train()  # Call the train method from the nn.Module base class
        print("Starting the Training Loop ..")  # Training loop start
        for epoch in range(num_epochs):
            train_loss = 0
            train_accuracy = 0
            print(f"[INFO] Epoch {epoch + 1} Started..")
            for index, batch in tqdm(enumerate(train_data_loader)):
                print(
                    f"[INFO] [TRAINING] Epoch {epoch + 1} Iteration {index + 1} Running.."
                )
                optimizer.zero_grad()
                input_ids = batch["input_ids"].to(device)
                attention_mask = batch["attention_mask"].to(device)
                token_type_ids = batch["token_type_ids"].to(device)
                label = batch["label"].to(device).to(device)
                output = backbone_model(
                    input_ids=input_ids, attention_mask=attention_mask, labels=label
                )
                loss = loss_function(output, marker)
                test_loss = output[0]
                print("loss", test_loss, test_loss.item(), loss)
                loss.backward()
                optimizer.step()
                train_loss = train_loss + loss.item()
                _, hypothesis = torch.max(output, dim=1)
                train_accuracy = (
                    train_accuracy
                    + torch.sum(torch.tensor(hypothesis == marker)).item()
                )
            train_accuracy = train_accuracy / (
                len(self.data_loader[0]) * config.params.get("train").get("batch_size")
            )
            accuracy.append(train_accuracy)
            train_loss = train_loss / (
                len(self.data_loader[0]) * config.params.get("train").get("batch_size")
            )
            train_st = f"Training Loss: {train_loss} Train Accuracy: {train_accuracy}"
            print(f"Epoch: {epoch+1} {train_st}")

        print("Model has been successfully built..")
        # utils_tools.save_model_bin(model_name=config.MARKER_CLASSIFIER, model=self.model)
        accuracy = sum(accuracy) / len(accuracy)
        s3_adapter_obj.save_model_tokenizer_into_s3(
            bucket_name=bucket_name,
            prefix=f"{prefix}/finetuned",
            model=backbone_model.model,
        )

    except (RuntimeError, MemoryError, ValueError, TypeError) as e:
        print("Training Exception Occurred")
        raise RuntimeError("Training Exception Occurred")

In [None]:
mode = "train"
prefix = "kf-label-classifier"
bucket_name = "mlops-kubeflow"
labels = [
    "Action with Deadline",
    "Announcement",
    "Appreciation",
    "Action",
    "Others",
]
    
train(
    bucket_name=bucket_name,
    prefix=prefix,
    data_file="label.csv",
    labels=labels,
    num_epochs=10,
    batch_size=2,
    device="cpu",
)