## Importing libraries and Initializations

In [None]:
# Install huggingface specific libraries
! pip install transformers datasets evaluate --quiet

%env TOKENIZERS_PARALLELISM=true

In [None]:
# Common libraries
import json
import time
import numpy as np
import pandas as pd
import warnings
from time import gmtime, strftime
from random import randrange
warnings.filterwarnings('ignore')

# Check versions
from platform import python_version
import torch
import sagemaker
import transformers
import datasets
print('Pytorch version: ', torch.__version__)
print('Python version: ', python_version())
print('Sagemaker version: ', sagemaker.__version__)
print('Transformers version: ', transformers.__version__)
print('Datasets version: ', datasets.__version__)

In [None]:
# Sagemaker specific imports
import boto3
from sagemaker.pytorch import PyTorch
# from sagemaker.huggingface import HuggingFace, TrainingCompilerConfig

# Huggingface specific imports
from datasets import Dataset, DatasetDict, load_dataset, concatenate_datasets
from datasets.filesystems import S3FileSystem
from transformers import AutoTokenizer

In [None]:
model_path = 'google/flan-t5-xxl' 
workspace_bucket_name = 'gupshup-ml' # This s3 bucket is for storing datasets used for training.
s3_prefix = 'smp' # s3 prefix at which train and test dataets will be saved. Ex - s3://gupshup-ml/smp/train
model_name = model_path.split('/')[1]
save_model_s3_path = f's3://gupshup-ml/model-artifacts/{model_name}-smp/' # s3 path where model artifacts gets stored (Used when trying to save using s5cmd)
experiment_name = f'qa-smp-{model_name}'

epochs = 1                           # number of training epochs
per_device_batch_size = 5            # batch size for training and evaluation
gradient_accumulation_steps = 64     # gradient accumulation steps for training
learning_rate = float('1e-4') 
# optim = 'adamw_torch_xla'
pipeline_parallel_degree = 1
sharded_data_parallel_degree = 16 
partitions = 1
processes_per_host = 8

In [None]:
sess = sagemaker.Session()
sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sess is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sess.default_bucket()

role = sagemaker.get_execution_role()

sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")

# Data

In [None]:
# Load csv as a pandas dataframe
train_path = '../others/data/full_data/train/sniper_faq_session_train.csv'
train = pd.read_csv(train_path)
train['id'] = train.index
train = train[['id', 'input_text', 'output_text']]
print('Train:', train.shape)

test_path = '../others/data/full_data/test/sniper_faq_session_test.csv'
test = pd.read_csv(test_path)
test['id'] = test.index
test = test[['id', 'input_text', 'output_text']]
test.dropna(inplace=True)
print('Test:', test.shape)

# Create Dataset from pandas dataframes
train_dataset = Dataset.from_pandas(train)
test_dataset = Dataset.from_pandas(test)

dataset = DatasetDict({
    'train': train_dataset,
    'test': test_dataset
})

dataset

# train = 6673, test = 1668 samples
# train.info(memory_usage='deep') # memory usage: 26.6 MB
# test.info(memory_usage='deep') # memory usage: 6.6 MB

In [None]:
# Initialize tokenizer for a chosen model
tokenizer = AutoTokenizer.from_pretrained(model_path)

print(f'Model input names: {tokenizer.model_input_names}')
print(f'Model max length: {tokenizer.model_max_length}')

In [None]:
%%time
# Data processing

# The maximum total input sequence length after tokenization.
# Sequences longer than this will be truncated, sequences shorter will be padded.
tokenized_inputs = concatenate_datasets([dataset['train'], dataset['test']]).map(lambda x: tokenizer(x['input_text'], truncation=True), batched=True, remove_columns=['input_text', 'output_text'])
input_lenghts = [len(x) for x in tokenized_inputs['input_ids']]
# take 85 percentile of max length for better utilization
max_source_length = int(np.percentile(input_lenghts, 85))
print(f'Max source length: {max_source_length}')

# The maximum total sequence length for target text after tokenization.
# Sequences longer than this will be truncated, sequences shorter will be padded.'
tokenized_targets = concatenate_datasets([dataset['train'], dataset['test']]).map(lambda x: tokenizer(x['output_text'], truncation=True), batched=True, remove_columns=['input_text', 'output_text'])
target_lenghts = [len(x) for x in tokenized_targets['input_ids']]
# take 90 percentile of max length for better utilization
max_target_length = int(np.percentile(target_lenghts, 90))
print(f'Max target length: {max_target_length}')

def preprocess_function(sample,padding='max_length'):
    # add prefix to the input for t5
    inputs = sample['input_text']

    # tokenize inputs
    model_inputs = tokenizer(inputs, max_length=max_source_length, padding=padding, truncation=True)

    # Tokenize targets with the `text_target` keyword argument
    labels = tokenizer(text_target=sample['output_text'], max_length=max_target_length, padding=padding, truncation=True)

    # If we are padding here, replace all tokenizer.pad_token_id in the labels by -100 when we want to ignore
    # padding in the loss.
    if padding == 'max_length':
        labels['input_ids'] = [
            [(l if l != tokenizer.pad_token_id else -100) for l in label] for label in labels['input_ids']
        ]

    model_inputs['labels'] = labels['input_ids']
    return model_inputs

tokenized_dataset = dataset.map(preprocess_function, batched=True, remove_columns=['id', 'input_text', 'output_text'])
print('Keys of tokenized dataset:', list(tokenized_dataset['train'].features))

In [None]:
# Save the train and test datasets to s3
s3 = S3FileSystem()

training_input_path = f's3://{workspace_bucket_name}/{s3_prefix}/train'
print(f'Training input path: {training_input_path}')
tokenized_dataset['train'].save_to_disk(training_input_path, fs=s3)

test_input_path = f's3://{workspace_bucket_name}/{s3_prefix}/test'
print(f'Test input path: {test_input_path}')
tokenized_dataset['test'].save_to_disk(test_input_path, fs=s3)

## Fine Tune

In [None]:
checkpoint_dir = "/opt/ml/checkpoints"
checkpoint_s3_path = "s3://" + workspace_bucket_name + "/flant5-checkpoints"
print(checkpoint_s3_path)

In [None]:
# !aws s3 rm --recursive $checkpoint_s3_path # Not for first run - its for subsequent runs

In [None]:
# define hyperparameters
hyperparameters = {
    'model_id': model_path,
    'learning_rate': learning_rate,
    'per_device_train_batch_size': per_device_batch_size,
    'gradient_accumulation_steps': gradient_accumulation_steps,
    'per_device_eval_batch_size': per_device_batch_size,
    'epochs': epochs,
    'save_model_s3_path': save_model_s3_path,
    'checkpoint_dir': "/opt/ml/checkpoints",
    'max_train_steps': 500,
    
    'pipeline_parallel_degree': pipeline_parallel_degree,
    'sharded_data_parallel_degree': sharded_data_parallel_degree,
    'partitions': partitions, # NOTE: Sharded data parallelism currently is not compatible with pipeline parallelism or optimizer state sharding. To activate sharded data parallelism, turn off optimizer state sharding and set the pipeline parallel degree to 1.
    # 'tensor_parallel_degree': 1,
    'processes_per_host': processes_per_host,
}
print('Hyperparameters: \n', json.dumps(hyperparameters, indent=2, default=str))

In [None]:
smp_options = {
    "enabled":True,
    "parameters": {                        
        "pipeline_parallel_degree": hyperparameters['pipeline_parallel_degree'],     
        "ddp": True,
        "sharded_data_parallel_degree": hyperparameters['sharded_data_parallel_degree'],              
        "partitions": hyperparameters['partitions'],
        "bf16":True,
        "skip_tracing": True,
        
        # NOTE: To enable Tensor Parallelism
        # "tensor_parallel_degree": hyperparameters['tensor_parallel_degree'],
        # "prescaled_batch": True,
    }
}


mpi_options = {
    "enabled" : True,
    "processes_per_host" : hyperparameters['processes_per_host'],
    # Below is to debug parallel Open MPI processes. TODO: Test the impact on logging.
    # "custom_mpi_options": "-verbose --mca orte_base_help_aggregate 0 ",
}

In [None]:
estimator = PyTorch(
    entry_point                  = "./scripts/qa-peft-smp.py",
    source_dir                   = ".",
    role                         = role,
    framework_version            = "1.13.1",
    py_version                   = "py39", 
    
    # base_job_name                = 'peft-smp-p4',
    # instance_count               = 1,
    # instance_type                = "ml.p4d.24xlarge",
    
    base_job_name                = 'peft-smp-p3-24xl',
    instance_count               = 2,
    instance_type                = "ml.p3dn.24xlarge",
    
    # base_job_name                = 'peft-sdp-p3-24xl',
    # instance_count               = 1,
    # instance_type                = "ml.p3dn.24xlarge",
    
    # base_job_name                = 'peft-smp-p3-16xl',
    # instance_count               = 1,
    # instance_type                = "ml.p3.16xlarge", # Don't have SL
    
    # base_job_name                = 'peft-smp-g5-48xl',
    # instance_count               = 1,
    # instance_type                = "ml.g5.48xlarge",
    
    # base_job_name                = 'peft-smp-g5-12xl',
    # instance_count               = 2,
    # instance_type                = "ml.g5.12xlarge",
    
    # base_job_name                = 'peft-smp-g5-16xl',
    # instance_count               = 4,
    # instance_type                = "ml.g5.16xlarge",
    
    # base_job_name                = 'peft-smp-g4dn-12xl',
    # instance_count               = 4,
    # instance_type                = "ml.g4dn.12xlarge",
    
    hyperparameters              = hyperparameters,
    checkpoint_local_path        = checkpoint_dir,   
    checkpoint_s3_uri            = checkpoint_s3_path,
    disable_profiler             = True,
    keep_alive_period_in_seconds = 1800, # TODO: Change it to a reasonable value as its gonna add to total billing cost.
    debugger_hook_config         = False,
    
    # distribution = {'smdistributed':{'dataparallel':{ 'enabled': True }}}

    distribution = {
                    "smdistributed": {"modelparallel": smp_options},
                    "mpi": mpi_options
                   }
)

data = {
    'train': training_input_path,
    'test' : test_input_path,
}

In [None]:
estimator.fit(data, wait=True)