# Setup


In [None]:
%%capture
 !pip install transformers

In [None]:
import os
import time
import datetime
from google.colab import drive

import pandas as pd
import seaborn as sns
import numpy as np
import random

import matplotlib.pyplot as plt
% matplotlib inline

import torch
from torch.utils.data import Dataset, DataLoader, random_split, RandomSampler, SequentialSampler
torch.manual_seed(42)

from transformers import GPT2LMHeadModel,  GPT2Tokenizer, GPT2Config
from transformers import AdamW, get_linear_schedule_with_warmup
from transformers import Trainer, TrainingArguments,AutoModelWithLMHead


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


True

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

### TO CHANGE ######
DIR = f"/content/gdrive/MyDrive/Path/to/Dementiabank/folder"

Mounted at /content/drive


In [None]:
# For reproducibility
SEED = 42

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

# Loading data

In [None]:
# Load the ids of AD subjects into a list
with open(f'{DIR}/audio_filenames_dementia.txt', "r") as clf:
    lines = clf.readlines()
ids_ad = [re.sub('\n', '', line) for line in lines]

# Load the ids of Control subjects into a list
with open(f'{DIR}/audio_filenames_control.txt', "r") as clf:
    lines = clf.readlines()
ids_hc = [re.sub('\n', '', line) for line in lines]

path_ad = 'path_to_ad_data'
path_hc = 'path_to_hc_data'
test_path = 'path_to_test_data'
test_id_path = 'path_to_a_txt_file_with_test_sample_labels'

data_ad, labels_ad, aug_dataset_ad = data_to_str(ids_ad, path_ad, AD_flag=1,
                                                 augment=False) 
data_hc, labels_hc, aug_dataset_hc = data_to_str(ids_hc, path_hc, AD_flag=0,
                                                 augment=False)

dataset, labels, aug_dataset = [], [], []

dataset.extend(data_ad)
dataset.extend(data_hc)

labels.extend(labels_ad)
labels.extend(labels_hc)


# Finetuning GPT2


In [None]:
class GPT2Dataset(Dataset):
  def __init__(self, txt_list,labels, tokenizer, gpt2_type="gpt2", max_length=768):

    self.tokenizer = tokenizer
    self.input_ids = []
    self.attn_masks = []

    for idx,txt in enumerate(txt_list):
      encodings_dict = tokenizer(f'<|{labels[idx]}|>'+ txt + '<|endoftext|>', truncation=True, max_length=max_length, padding="max_length", return_attention_mask = True)

      self.input_ids.append(torch.tensor(encodings_dict['input_ids']))
      self.attn_masks.append(torch.tensor(encodings_dict['attention_mask']))
    
  def __len__(self):
    return len(self.input_ids)

  def __getitem__(self, idx):
    return self.input_ids[idx], self.attn_masks[idx] 

def format_time(elapsed):
    return str(datetime.timedelta(seconds=int(round((elapsed)))))

In [None]:

tokenizer = GPT2Tokenizer.from_pretrained('gpt2', eos_token='<|endoftext|>', pad_token='<|pad|>') 
train_dataset = GPT2Dataset(dataset,labels,tokenizer)
train_dataloader = DataLoader(
            train_dataset, 
            sampler = RandomSampler(train_dataset),
            batch_size = batch_size,
        )

In [None]:
configuration = GPT2Config.from_pretrained('gpt2', output_hidden_states=False)
model = GPT2LMHeadModel.from_pretrained("gpt2", config=configuration)
model = AutoModelWithLMHead.from_pretrained("gpt2", output_hidden_states=False)

model.resize_token_embeddings(len(tokenizer))

model.to(device)


In [None]:
## model params
epochs = 15
learning_rate = 5e-4
warmup_steps = 1e2
epsilon = 1e-8

sample_step = 100

optimizer = AdamW(model.parameters(),
                  lr = learning_rate,
                  eps = epsilon
                )
total_steps = len(train_dataloader) * epochs
scheduler = get_linear_schedule_with_warmup(optimizer, 
                                            num_warmup_steps = warmup_steps, 
                                            num_training_steps = total_steps)

In [None]:

## TRAINING

for epoch_i in range(0, epochs):
    print("")
    print('======== Epoch {:} / {:} ========'.format(epoch_i + 1, epochs))

    total_train_loss = 0
    model.train()
    for step, batch in enumerate(train_dataloader):

        b_input_ids = batch[0].to(device)
        b_labels = batch[0].to(device)
        b_masks = batch[1].to(device)

        model.zero_grad()        

        outputs = model(  b_input_ids,
                          labels=b_labels, 
                          attention_mask = b_masks,
                          token_type_ids=None
                        )

        loss = outputs[0]  

        batch_loss = loss.item()
        total_train_loss += batch_loss

        if step % sample_step == 0 and not step == 0:
            elapsed = format_time(time.time() - t0)
            print('  Batch {:>5,}  of  {:>5,}. Loss: {:>5,}.   Elapsed: {:}.'.format(step, len(train_dataloader), batch_loss, elapsed))
            model.eval()
            sample_outputs = model.generate(
                                    bos_token_id=random.randint(0,2),
                                    do_sample=True,   
                                    top_k=50, 
                                    max_length = 150,
                                    top_p=0.95, 
                                    num_return_sequences=1
                                )
            for i, sample_output in enumerate(sample_outputs):
                  print("{}: {}".format(i, tokenizer.decode(sample_output, skip_special_tokens=True)))
            
            model.train()

        loss.backward()
        optimizer.step()
        scheduler.step()

    avg_train_loss = total_train_loss / len(train_dataloader)       
  
    print("")
    print("  Average training loss: {0:.2f}".format(avg_train_loss))
        
print("")
print("Training complete!")

In [None]:
output_dir = './model_save/'

if not os.path.exists(output_dir):
    os.makedirs(output_dir)

print("Saving model to %s" % output_dir)

model_to_save = model.module if hasattr(model, 'module') else model 
model_to_save.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)
torch.save(args, os.path.join(output_dir, 'training_args.bin'))

# Generate

In [None]:
def gen(label):
  prompt = label

  generated = torch.tensor(tokenizer.encode(prompt)).unsqueeze(0)
  generated = generated.to(device)

  print(generated)

  sample_outputs = model.generate(
                                  generated, 
                                  bos_token_id=random.randint(1,30000),
                                  do_sample=True,   
                                  top_k=50, 
                                  max_length = 150,
                                  top_p=0.75,
                                  num_return_sequences=220
                                  )
  return sample_outputs
  

In [None]:
output_dir = './model_save/'
model = AutoModelWithLMHead.from_pretrained(output_dir,'')
tokenizer = GPT2Tokenizer.from_pretrained(output_dir,'/tokenizer_config.json')

In [None]:
model.eval()

rows = []

lbs =['0','1']

for l in lbs:
  sample_outputs = gen(f'<|{l}|>')
  for i, sample_output in enumerate(sample_outputs):
    decoded = tokenizer.decode(sample_output, skip_special_tokens=True)
    print("{}: {}\n\n".format(i, decoded))
    rows.append([decoded.replace(f'<|{l}|>',''),l])

df = pd.DataFrame(rows, columns = ['Text',"Intent"])

In [None]:
df.to_csv(f'{DIR}/synth_aug_dataset.csv',index=False)