# ⚙️ Install Libraries and Download Dataset

After this step, the runtime must be restarted.

In [None]:
# https://stackoverflow.com/questions/56081324/why-are-google-colab-shell-commands-not-working
import locale
def getpreferredencoding(do_setlocale = True):
    return "UTF-8"
locale.getpreferredencoding = getpreferredencoding

!pip install simplet5 evaluate sacrebleu tqdm spacy matplotlib openai zenodo-get bert_score tensorflow

!zenodo_get 10.5281/zenodo.8023142

# 📝 Initialize config

In [None]:
# load config
config = {
    "t5_models": ["t5-small", "t5-base", "google/flan-t5-base", "google/t5-efficient-small-el8-dl2",
      "google/t5-efficient-small-el16-dl2", "google/t5-efficient-small-dm2000", "google/t5-v1_1-base",
      "google/t5-small-lm-adapt"],
    "bart_models": ["facebook/bart-base", "facebook/mbart-large-50"],
    "trainings_data_path": "training_labeled_with_style_samples.json",
    "eval_data_path": "eval_labeled_with_style_samples.json",
    "trainings_params": {
      "max_source_length": 2048,
      "max_target_length": 512,
      "use_gpu": True
    },
    "prompts": {
      "intro_samples": "Here a example sentences: ",
      "intro_input_sentence": "Here is a sentence: ",
      "intro_output_sentence": "Here is a rewrite of this sentence according to the example sentences: "
    }
  }

print('Done!')

Done!


# 🏋️ Train T5

In [None]:
import json
import pandas as pd
from simplet5 import SimpleT5

TRAIN_REVERSE_PROMPT = False


def load_training_and_eval_data(eval_split=0.8):
    # load trainings data
    f = open(config["trainings_data_path"])
    data = json.load(f)
    # split data in training and eval data
    split_index = int(len(data['data']) * eval_split)
    training_data = data['data'][:split_index]
    eval_data = data['data'][split_index:]
    return training_data, eval_data


def create_trainings_prompts(data):
    train_data = {'source_text': [], 'target_text': []}
    for trainings_object in data:
        trainings_prompt = config["prompts"]["intro_samples"] + " "
        for sample in trainings_object["style_samples"]:
            trainings_prompt += "{" + sample + "} "
        trainings_prompt += config["prompts"]["intro_input_sentence"] + "{" + trainings_object['input_sentence'] + "} "
        trainings_prompt += config["prompts"]["intro_output_sentence"] + "{"
        train_data['source_text'].append(trainings_prompt)
        train_data['target_text'].append(trainings_object['result_sentence'] + '}')
    return train_data


def create_trainings_prompts_reverse(data):
    train_data = {'source_text': [], 'target_text': []}
    for trainings_object in data:
        trainings_prompt = config["prompts"]["intro_input_sentence"] + " " + trainings_object['input_sentence'] + " "
        trainings_prompt += config["prompts"]["intro_samples"] + " "
        for sample in trainings_object["style_samples"]:
            trainings_prompt += "{" + sample + "} "
        trainings_prompt += config["prompts"]["intro_output_sentence"] + " "
        train_data['source_text'].append(trainings_prompt)
        train_data['target_text'].append(trainings_object['result_sentence'] + ' ')
    return train_data


def create_trainings_data():
    training_data, eval_data = load_training_and_eval_data()
    training_prompts = create_trainings_prompts(training_data)
    eval_prompts = create_trainings_prompts(eval_data)
    training_df = pd.DataFrame(data=training_prompts)
    eval_df = pd.DataFrame(data=eval_prompts)
    return training_df, eval_df


def create_trainings_data_reverse():
    training_data, eval_data = load_training_and_eval_data()
    training_prompts = create_trainings_prompts_reverse(training_data)
    eval_prompts = create_trainings_prompts(eval_data)
    training_df = pd.DataFrame(data=training_prompts)
    eval_df = pd.DataFrame(data=eval_prompts)
    return training_df, eval_df


def train_model(training_df, eval_df, model_name_param, save_name, trainings_params):
    output_dir = f"prod_{model_name_param}_{save_name}"
    print(f"Train {model_name_param} {save_name}")
    model = SimpleT5()
    model.from_pretrained("t5", model_name_param)
    model.train(train_df=training_df,  # pandas dataframe with 2 columns: source_text & target_text
                eval_df=eval_df,  # pandas dataframe with 2 columns: source_text & target_text
                source_max_token_len=trainings_params['max_source_length'],
                target_max_token_len=trainings_params['max_target_length'],
                batch_size=4,
                max_epochs=5,
                use_gpu=trainings_params['use_gpu'],
                outputdir=output_dir,
                early_stopping_patience_epochs=0,
                precision=32
                )


training_dataframe, eval_dataframe = create_trainings_data()
training_dataframe_reverse, eval_dataframe_reverse = create_trainings_data_reverse()
for model_name in config['t5_models']:
    train_model(training_dataframe, eval_dataframe, model_name, 'default', config['trainings_params'])
    if TRAIN_REVERSE_PROMPT:
      train_model(training_dataframe_reverse, eval_dataframe_reverse, model_name, 'reverse', config['trainings_params'])


# 🏋️ Train BART

In [None]:
import json
import torch
from tqdm import tqdm
from transformers import BartTokenizer, BartForConditionalGeneration, AutoTokenizer

trainings_data_file = open(config['trainings_data_path'])
training_data = json.load(trainings_data_file)['data']

# Use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

tokenizer = AutoTokenizer.from_pretrained("facebook/bart-base")
sep_token = tokenizer.sep_token
bos_token = tokenizer.bos_token
eos_token = tokenizer.eos_token


def create_trainings_for_bert(array):
    result_text = []
    for trainings_object in array:
        trainings_prompt = ""
        for sample in trainings_object['style_samples']:
            trainings_prompt += bos_token + sample + sep_token
        trainings_prompt += bos_token + trainings_object['input_sentence'] + eos_token
        result_sentence = bos_token + trainings_object['result_sentence'] + eos_token
        result_text.append((trainings_prompt, result_sentence))
    return result_text


def train_model(model_name, training_dataset):
    tokenizer = BartTokenizer.from_pretrained("facebook/bart-base")
    model = BartForConditionalGeneration.from_pretrained(model_name).to(device)

    training_data = [[tokenizer.encode(input_text, return_tensors='pt').to(device),
                      tokenizer.encode(output_text, return_tensors='pt').to(device)]
                     for input_text, output_text in training_dataset]

    optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)

    for epoch in range(5):
        epoch_loss = 0.0

        with tqdm(total=len(training_data), desc=f"Epoch {epoch + 1}") as pbar:
            for input_ids, output_ids in training_data:
                optimizer.zero_grad()

                # Generate the output using the model
                generated_ids = model(input_ids=input_ids, decoder_input_ids=output_ids[:, :-1]).logits

                # Calculate the loss
                loss = torch.nn.functional.cross_entropy(generated_ids.view(-1, generated_ids.size(-1)),
                                                         output_ids[:, 1:].reshape(-1))

                # Backpropagate and update the model
                loss.backward()
                optimizer.step()

                epoch_loss += loss.item()

                pbar.set_postfix({'loss': epoch_loss / len(training_data)})
                pbar.update()

        model.save_pretrained(f"prod_{model_name}_epoch{epoch}")


training_dataset = create_trainings_for_bert(training_data)
# train the models
for model_name in config["bart_models"]:
    train_model(model_name=model_name, training_dataset=training_dataset)


# 🏋️ Train GPT

In [None]:
import json
import torch
from tqdm import tqdm
from transformers import AutoTokenizer, GPTNeoForCausalLM

trainings_data_file = open(config['trainings_data_path'])
training_data = json.load(trainings_data_file)['data']

# Use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

tokenizer = AutoTokenizer.from_pretrained("EleutherAI/gpt-neo-2.7B")
eos_token = tokenizer.eos_token

def create_trainings_for_gpt_neo(array):
    result_text = []
    for trainings_object in array:
        trainings_prompt = ""
        for sample in trainings_object['style_samples']:
            trainings_prompt += sample + '\n'
        trainings_prompt += trainings_object['input_sentence'] + eos_token
        result_sentence = trainings_object['result_sentence'] + eos_token
        result_text.append((trainings_prompt, result_sentence))
    return result_text

def train_model(training_dataset):
    tokenizer = AutoTokenizer.from_pretrained("EleutherAI/gpt-neo-2.7B")
    model = GPTNeoForCausalLM.from_pretrained("EleutherAI/gpt-neo-2.7B").to(device)

    training_data = [[tokenizer.encode(input_text, return_tensors='pt').to(device),
                      tokenizer.encode(output_text, return_tensors='pt').to(device)]
                     for input_text, output_text in training_dataset]

    optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)

    for epoch in range(5):
        epoch_loss = 0.0

        with tqdm(total=len(training_data), desc=f"Epoch {epoch + 1}") as pbar:
            for input_ids, output_ids in training_data:
                optimizer.zero_grad()

                # Generate the output using the model
                max_length = max(input_ids.size(1), output_ids.size(1))
                input_ids = torch.nn.functional.pad(input_ids, (0, max_length - input_ids.size(1)))
                labels = torch.nn.functional.pad(output_ids, (0, max_length - output_ids.size(1)))
                generated_ids = model(input_ids=input_ids, labels=labels).logits

                # Pad generated_ids and labels to the same length
                max_length = max(generated_ids.size(1), labels.size(1))
                generated_ids = torch.nn.functional.pad(generated_ids, (0, max_length - generated_ids.size(1)))
                labels = torch.nn.functional.pad(labels, (0, max_length - labels.size(1)))

                # Calculate the loss
                loss = torch.nn.functional.cross_entropy(generated_ids.view(-1, generated_ids.size(-1)),
                                                         labels.view(-1))

                # Backpropagate and update the model
                loss.backward()
                optimizer.step()

                epoch_loss += loss.item()

                pbar.set_postfix({'loss': epoch_loss / len(training_data)})
                pbar.update()

        model.save_pretrained(f"prod_gpt-neo_epoch{epoch}")

training_dataset = create_trainings_for_gpt_neo(training_data)
# train the models
train_model(training_dataset=training_dataset)


# 🏋️ Train BERT

In [None]:
import json
import torch
from transformers import BertTokenizer, BertLMHeadModel, AdamW

trainings_data_file = open(config['trainings_data_path'])
training_data = json.load(trainings_data_file)['data']

# Initialize BERT model and tokenizer
model = BertLMHeadModel.from_pretrained('bert-base-uncased')
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

sep_token = tokenizer.sep_token

# Set up optimizer
optimizer = AdamW(model.parameters(), lr=2e-5)

# Set up training data (input sequences and target output sequences)
def create_trainings_for_bert(array):
    result_text = []
    for trainings_object in array:
        trainings_prompt = ""
        for sample in trainings_object['style_samples']:
            trainings_prompt += sep_token + sample + sep_token
        trainings_prompt += sep_token + trainings_object['input_sentence'] + sep_token
        result_sentence = sep_token + trainings_object['result_sentence'] + sep_token
        result_text.append((trainings_prompt, result_sentence))
    return result_text


trainings_data = create_trainings_for_bert(training_data)

# Set up device (GPU or CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

num_epochs = 4
# Training loop
model.train()
for epoch in range(num_epochs):
    total_loss = 0
    for input_seq, target_seq in trainings_data:
        optimizer.zero_grad()

        # Tokenize input sequence and target sequence
        input_tokens = tokenizer.encode(input_seq, add_special_tokens=True, truncation=True, padding='max_length')
        target_tokens = tokenizer.encode(target_seq, add_special_tokens=True, truncation=True, padding='max_length')

        # Convert tokenized sequences to tensors
        input_tensors = torch.tensor([input_tokens]).to(device)
        target_tensors = torch.tensor([target_tokens]).to(device)

        # Forward pass
        outputs = model(input_tensors, labels=target_tensors)
        loss = outputs.loss

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    average_loss = total_loss / len(training_data)
    print(f"Epoch {epoch+1}/{num_epochs} - Loss: {average_loss:.4f}")

    # Save the fine-tuned model
    model.save_pretrained(f'bert_epoch{epoch}')
