In [None]:
!pip install transformers
!pip install editdistance

import editdistance
import pandas as pd
import transformers
import matplotlib.pyplot as plt
import torch
import torch.cuda
import sklearn
import numpy as np

from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import accuracy_score,f1_score
from sklearn.metrics import accuracy_score
from transformers import AdamW, get_linear_schedule_with_warmup
from sklearn import preprocessing
from collections import defaultdict
from google.colab import drive
from transformers import AutoTokenizer, AutoModelForMaskedLM
from transformers import AutoModelForSequenceClassification

drive.mount('/content/drive')
dataset = pd.read_csv('/content/drive/MyDrive/eng', sep="   ", header= None, error_bad_lines=False)

dataset_copy=dataset.copy()
list_data = dataset_copy.values.tolist()

In [None]:
split_data=[]

def split(corpus, spit_data):
  for sets in corpus:
    for x in sets:
      parts = x.split("\t")
      spit_data.append(parts)

split(list_data,split_data)

print(list_data[0:100])
print(split_data[0:100])

In [None]:
# removing duplicates
def remove_duplicates(split_data):
    unique_data = list(set(tuple(sublist) for sublist in split_data))
    unique_data = [list(sublist) for sublist in unique_data]
    return unique_data

unique_data = remove_duplicates(split_data)
print(unique_data[0:100])

# getting the longest word for padding 
len_longest=0
for string in unique_data:
    x=string[1]
    if len(x)>len_longest:
      len_longest=len(x)

In [None]:
# splitting into words and grammar
def split_word_and_grammar(unique_data_array):
    last_elements = [subarray[-1] for subarray in unique_data_array]
    word_list = [(subarray[0],subarray[1]) for subarray in unique_data_array]
    return word_list, last_elements
    
word_list_eng, last_elements_eng = split_word_and_grammar(unique_data)

print(word_list_eng[0:10])

'''
sep_token = "SEP"
word_list_char_eng = []
for tpl in word_list_eng:
    new_word = []
    for word in tpl:
        new_word += [word] + [sep_token]
    new_word.pop() # remove the extra separator token at the end
    word_list_char_eng.append(new_word)
'''

# splitting grammar
def splitting_grammar(last_elements):
    list_of_lists = []
    for info in last_elements:
        parts = info.split(';')
        num_parts = [int(part) if part.isdigit() else part for part in parts]
        list_of_lists.append(num_parts)
    return list_of_lists

list_of_lists_mlt = splitting_grammar(last_elements_eng)

In [None]:
#splitting data
tokeniser = AutoTokenizer.from_pretrained('bert-base-cased', char_level=True) #,char_level=True
model = AutoModelForSequenceClassification.from_pretrained('bert-base-cased')

from sklearn.model_selection import train_test_split
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'

X = word_list_mlt
y = list_of_lists_mlt

X_train, X_rem, y_train, y_rem = train_test_split(X,y, train_size=0.8)
X_valid, X_test, y_valid, y_test = train_test_split(X_rem,y_rem, test_size=0.2)

print(y_train[0:10])
print(len(X_train))

In [None]:
len_longest=len_longest+1

token_indexes = tokeniser(X_train, return_tensors='pt', padding=True, truncation=True, is_split_into_words = False, max_length=512)
indexed_train_x = token_indexes['input_ids'].to(device).squeeze()
mask_train_x = token_indexes['attention_mask'].to(device).squeeze()

new_tuples = [(("",) + t[1:]) for t in X_test]
print(new_tuples[0:10])
token_indexes_test = tokeniser(new_tuples, return_tensors = 'pt', padding = True, truncation = True, is_split_into_words = False, max_length = 512)
indexed_test_x = token_indexes_test['input_ids'].to(device).squeeze()
mask_test_x = token_indexes_test['attention_mask'].to(device).squeeze()

# padding w/0 so gramm info is all the same length
def padding_grammar(Y):
    max_len = max(len(info) for info in Y)
    padded_array = [info + ['<PAD>'] * (max_len - len(info)) for info in Y]
    return padded_array
padded_morph_info_train = padding_grammar(y_train)
padded_morph_info_test = padding_grammar(y_test)

# creating a dictionary for mapping 
morph_vocab = defaultdict(lambda: len(morph_vocab))
for i in range(len(padded_morph_info_train[0])):
    # extracting the unique values at this position
    values = set(info[i] for info in padded_morph_info_train and padded_morph_info_test)
    # assignign index
    for value in values:
        morph_vocab[value]
print(morph_vocab)
# map the morphological information to indexes using the vocabulary
def get_targets_one_hot(padded_info):
  targets = [[morph_vocab[value] for value in info] for info in padded_info]
  targets = [[morph_vocab['<PAD>'] if value == 0 else value for value in info] for info in targets]
  targets=torch.tensor(targets).to(device)

  # one-hot encode the targets
  num_classes = len(morph_vocab)
  targets = torch.nn.functional.one_hot(targets, num_classes=num_classes).to(torch.float32)
  targets = torch.tensor(targets, requires_grad=True).to(device)
  return targets

targets_train = get_targets_one_hot(padded_morph_info_train)
targets_test = get_targets_one_hot(padded_morph_info_test)

In [None]:
class Model(torch.nn.Module):
    def __init__(self, bert):
        super().__init__()
        self.bert = bert
        self.hidden_layer1 = torch.nn.Linear(768, 512)
        self.batchnorm1 = torch.nn.BatchNorm1d(512)
        self.drop1 = torch.nn.Dropout(p=0.5)
        self.hidden_layer2 = torch.nn.Linear(512, 256)
        self.batchnorm2 = torch.nn.BatchNorm1d(256)
        self.drop2 = torch.nn.Dropout(p=0.2)
        self.hidden_layer3 = torch.nn.Linear(256, 128)
        self.batchnorm3 = torch.nn.BatchNorm1d(128)
        self.drop3 = torch.nn.Dropout(p=0.2)
        self.output_layer = torch.nn.Linear(128, 4*len(morph_vocab))
    
    def forward(self, x, mask):
        with torch.cuda.amp.autocast():
            vecs = self.bert(x, attention_mask=mask, output_hidden_states=True).hidden_states[8][:, 0, :]
            hidden1 = self.hidden_layer1(vecs)
            hidden1 = self.batchnorm1(hidden1)
            hidden1 = torch.relu(hidden1)
            hidden1 = self.drop1(hidden1)

            hidden2 = self.hidden_layer2(hidden1)
            hidden2 = self.batchnorm2(hidden2)
            hidden2 = torch.relu(hidden2)
            hidden2 = self.drop2(hidden2)

            hidden3 = self.hidden_layer3(hidden2)
            hidden3 = self.batchnorm3(hidden3)
            hidden3 = torch.relu(hidden3)
            hidden3 = self.drop3(hidden3)
            
            output = self.output_layer(hidden3).view(-1, 4, len(morph_vocab))
            output = torch.sigmoid(output)
        return output


bert = transformers.BertForMaskedLM.from_pretrained('bert-base-cased')
model = Model(bert)
model.to(device)


In [None]:
accuracy_count=[]
def get_acc(matches):
    if len(matches) == 0:
        return 0
    accuracy_count.append(sum(matches)/len(matches))
    return sum(matches)/len(matches)
#return round(100*sum(preds)/len(preds),3)

In [None]:
batch_size = 128

optimizer = AdamW(model.parameters(), lr=1e-5, weight_decay=0.1) #, correct_bias=True

print('step', 'error', 'accuracy')
train_errors = []
train_accs = []
all_true=[]
all_pred=[]
accumulation_steps = 2 

for step in range(1, 4000+1):
    optimizer.zero_grad()
    model.train(True)
    loss_fn = torch.nn.CrossEntropyLoss()
    all_outputs = []
    all_targets = []
    for i in range(accumulation_steps):
        output = model(indexed_train_x[i*batch_size:(i+1)*batch_size], mask_train_x[i*batch_size:(i+1)*batch_size])
       
        error = loss_fn(output,targets_train[i*batch_size:(i+1)*batch_size])
        error = error/accumulation_steps
        error.backward()

        all_outputs.append(torch.sigmoid(output).cpu().detach().numpy())
        all_targets.append(targets_train[i*batch_size:(i+1)*batch_size].cpu().detach().numpy())

    optimizer.step()
    model.train(False)

    train_errors.append(error.detach().tolist())

    train_accs = []
    
  #  one_hot_output = np.round(output.cpu().detach()).numpy().astype(int)
    
    
    for x in range(0,batch_size): # every vector in the output
      targets = targets_train[i*batch_size:(i+1)*batch_size].cpu().detach()
      targets = targets[x].argmax(dim=1).tolist()
      predicted_labels = output.cpu().detach()[x].argmax(dim=1).tolist()
      results=0

      all_true.append(targets)
      all_pred.append(predicted_labels)

     # print('t',targets)
     # print('p',predicted_labels)
      for vecs1 in predicted_labels: # every vector in output vector i
        for vecs2 in targets: # every vector in target vector i
            if vecs1==vecs2:
                results=results+1
           
      if results>=len(predicted_labels): #0.8*len(predicted_labels):
        train_accs.append(1)
      else:
        train_accs.append(0)  
    accuracy=get_acc(train_accs)

    if step % 10 == 0:
        print(step, train_errors[-1], accuracy)


In [None]:
m = MultiLabelBinarizer().fit(all_true)

f1_weighted=f1_score(m.transform(all_true),
         m.transform(all_pred),
         average='weighted')

f1_macro=f1_score(m.transform(all_true),
         m.transform(all_pred),
         average='macro')

print("F1 score weighted:", f1_weighted)
print("F1 score macro:", f1_macro)


all_true_flat = [label for sublist in all_true for label in sublist]
all_pred_flat = [label for sublist in all_pred for label in sublist]

f1_micro = f1_score(all_true_flat, all_pred_flat, average='weighted')
print("F1 score micro:", f1_micro)

# problem with lev distance, uses order, order doesnt matter a lot here
reverse_dict = {v: k for k, v in morph_vocab.items()} 
distances = []
for i in range(len(all_true)):
    predicted_labels = [reverse_dict[j] for j in all_pred[i]]
    true_labels = [reverse_dict[j] for j in all_true[i]]
    distance = editdistance.eval(predicted_labels, true_labels)
    distances.append(distance)

average_distance = sum(distances) / len(distances)
print("Levenshtein distances:", average_distance)

avg= sum(accuracy_count)/len(accuracy_count)
print("the average is: ", avg)

In [None]:
with torch.no_grad():
    print('sent', 'prediction')
    for i in range(len(indexed_test_x) // batch_size):
        outputs = model(indexed_test_x[i*batch_size:(i+1)*batch_size], mask_test_x[i*batch_size:(i+1)*batch_size])

        # reversing the dict to get the grammar from the index
        reverse_dict = {v: k for k, v in morph_vocab.items()} 

        for j, x in enumerate(outputs):
            predicted_labels = [reverse_dict[i] for i in x.argmax(dim=1).tolist()]

            # changing from char to word
            input_word = ''.join(tokeniser.decode(indexed_test_x[i*batch_size:(i+1)*batch_size][j]).split())

            print(input_word, predicted_labels)
            print("actual")
            print(y_test[i*batch_size:(i+1)*batch_size][j])


In [None]:
(fig, ax) = plt.subplots(1, 1)
ax.set_xlabel('step')
ax.set_ylabel('$E$')
ax.plot(range(1, len(train_errors) + 1), train_errors, color='blue', linestyle='-', linewidth=3)
ax.grid()

(fig, ax) = plt.subplots(1, 1)
ax.set_xlabel('step')
ax.set_ylabel('$A$')
ax.plot(range(1, len(accuracy_count) + 1), accuracy_count, color='blue', linestyle='-', linewidth=3)
ax.grid()