In [None]:
############ TRAINING THE MODEL #################
import os
import pandas as pd
import numpy as np
import torch
from torch.utils.data import TensorDataset, DataLoader, Dataset
import torch.nn as nn
from transformers import TFAutoModelForSequenceClassification, AutoTokenizer
from transformers import BertModel, BertTokenizer
from transformers import BertForSequenceClassification
from transformers import T5Model, T5Tokenizer
from transformers import EarlyStoppingCallback
from transformers import default_data_collator
from transformers import TrainingArguments, Traine
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score, precision_recall_curve, auc
from sklearn.utils import shuffle
import pickle
import bz2
import mgzip
import gc
from functools import reduce
import bisect
import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib.font_manager as fm
import gc
from datasets import load_metric
from imblearn.under_sampling import RandomUnderSampler


# Set device to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


# Specify file paths
tokenised_data_path = "/content/drive/My Drive/Colab Notebooks/Dissertation Code/AWS/Data/256"



########## PREPARE DATASETS ##########
# Importing training, test and validation x sets
print("\nLoading training dataset")
with mgzip.open(f"{tokenised_data_path}/BEAR_encoded_train.gz", "rb") as f:
    x_train = pickle.load(f)
print("\nLoading validation dataset")
with mgzip.open(f"{tokenised_data_path}/BEAR_encoded_val.gz", "rb") as f:
    x_val = pickle.load(f)


# Import y sets
print("\nLoading training y labels")
with mgzip.open(f"{tokenised_data_path}/BEAR_encoded_y_train.gz", "rb") as f:
    y_train = pickle.load(f)
print("\nLoading validation y labels")
with mgzip.open(f"{tokenised_data_path}/BEAR_encoded_y_val.gz", "rb") as f:
    y_val = pickle.load(f)



########## LOAD MODEL ##########
print("\nLoading model")
model = BertForSequenceClassification.from_pretrained("Rostlab/prot_bert_bfd", num_labels=2)
model.to(device)


# Define function to extract attnetion mask and sequences from encoded files and combine with labels into dataset
class MyDataset(Dataset):
    def __init__(self, encoding, labels):
        self.encoding = encoding
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encoding.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item
    


########## TRAIN MODEL ##########
# Provide model arguments
print("\nPreparing model")
torch.cuda.empty_cache()
training_args = TrainingArguments(
    output_dir='./results',
    evaluation_strategy='epoch',
    save_strategy='epoch',
    learning_rate=1e-4,
    per_device_train_batch_size=24,
    per_device_eval_batch_size=24,
    warmup_steps=50,
    logging_steps=1,
    num_train_epochs=30,
    weight_decay=0.01,
    save_total_limit=3,
    gradient_accumulation_steps=12,            # accumulate gradients from smaller batches before updating model weights: speeds up training, reduces RAM usage
    fp16=True,                                 # mixed precision training: use mixture of float32 and float16 precision for faster training - also needs GPU
    load_best_model_at_end=True,
    prediction_loss_only=False                 # set true to not calculate metrics and save RAM
)

loss_function = nn.CrossEntropyLoss()          # may be redundant


# Calculate metrics
def calculate_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    print(f"labels: {labels}")
    print(f"predictions: {predictions}")
    return {
        "f1_score": f1_score(labels, predictions, average='micro'),
        "precision": precision_score(labels, predictions, average='micro'),
        "recall": recall_score(labels, predictions, average='micro'),
        "accuracy": accuracy_score(labels, predictions),
        "eval_f1_score": f1_score(labels, predictions, average='micro')
    }
    

x_train = {'input_ids': x_train.input_ids, 'attention_mask': x_train.attention_mask}
x_val =  {'input_ids': x_val.input_ids, 'attention_mask': x_val.attention_mask}

print(len(x_train))
print(len(y_train))
print(len(x_val))
print(len(y_val))


# Turn input sequences and labels into datasets
train_dataset = MyDataset(x_train, y_train)
val_dataset = MyDataset(x_val, y_val)


# Verify number of sequences and class proportions
class_counts = {}
for i in range(len(train_dataset)):
    label = train_dataset[i]['labels'].item()
    if label in class_counts:
        class_counts[label] += 1
    else:
        class_counts[label] = 1

print("Class counts:", class_counts)




# Create custom trainer class to calculate metrics
class CustomTrainer(Trainer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.custom_metrics_history = []

    def on_evaluate_end(self):
        self.custom_metrics_history.append(self.state.metrics.copy())

# Initialise custom trainer
trainer = CustomTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=calculate_metrics,
)


# Clean up RAM and train the model
torch.cuda.empty_cache()
del x_val, y_val
gc.collect()

trainer.train()
train_history = trainer.state.log_history


# Save the trained model
model_location = "results"                      # results directory will be created by trainer
model.save_pretrained(model_location)





##############################################################################################################################################################
# The code below was an attempt to calculate metrics (f1 score, precision, recall, accuracy) during traing and plot the results
# Unfortunately, despite extensive efforts the metrics produced still seem to be inaccurate and the code fails to produce any graphs
# This code does not affect the trianing of the model, however, and is preserved for potential further development



## Get the model's predictions on the training set
train_predictions, train_labels, _ = trainer.predict(x_train)
train_prob_estimates = torch.softmax(torch.tensor(train_predictions).float(), dim=-1)[:, 1].numpy()


train_precision, train_recall, _ = precision_recall_curve(y_train, train_prob_estimates)
train_auc_score = auc(train_recall, train_precision)

train_f1_score = f1_score(y_train, np.argmax(train_predictions, axis=-1))
train_accuracy = accuracy_score(y_train, np.argmax(train_predictions, axis=-1))




f1_scores = [epoch_dict['f1_score'] for epoch_dict in trainer.custom_metrics_history]
eval_f1_scores = [epoch_dict['eval_f1_score'] for epoch_dict in trainer.custom_metrics_history]
precision = [epoch_dict['precision'] for epoch_dict in trainer.custom_metrics_history]
eval_precision = [epoch_dict['eval_precision'] for epoch_dict in trainer.custom_metrics_history]
recall = [epoch_dict['recall'] for epoch_dict in trainer.custom_metrics_history]
eval_recall = [epoch_dict['eval_recall'] for epoch_dict in trainer.custom_metrics_history]
accuracy = [epoch_dict['accuracy'] for epoch_dict in trainer.custom_metrics_history]
eval_accuracy = [epoch_dict['eval_accuracy'] for epoch_dict in trainer.custom_metrics_history]

print(f"f1_scores: {f1_scores}")
print(f"eval_f1_scores: {eval_f1_scores}")
print(f"accuracy: {accuracy}")
print(f"eval_accuracy: {eval_accuracy}")



def plot_metrics(train_metric_values, val_metric_values, metric_name, x_label, y_label):
    epochs = range(1, len(train_metric_values) + 1)
    
    plt.plot(epochs, train_metric_values, 'bo', label=f'Training {metric_name}')
    plt.plot(epochs, val_metric_values, 'r', label=f'Validation {metric_name}')
    plt.title(f'Training and Validation {metric_name}')
    plt.xlabel(x_label)
    plt.ylabel(y_label)
    plt.legend()
    
    plt.show()


# Use the modified function to plot the metrics
plot_metrics(f1_scores, eval_f1_scores, 'F1 Score', 'Epochs', 'F1 Score')
plot_metrics(precision, eval_precision, 'Precision', 'Epochs', 'Precision')
plot_metrics(recall, eval_recall, 'Recall', 'Epochs', 'Recall')
plot_metrics(accuracy, eval_accuracy, 'Accuracy', 'Epochs', 'Accuracy')


def plot_metrics(train_metric_values, val_metric_values, metric_name, x_label, y_label):
    epochs = range(1, len(train_metric_values) + 1)
    
    plt.plot(epochs, train_metric_values, 'bo', label=f'Training {metric_name}')
    plt.plot(epochs, val_metric_values, 'r', label=f'Validation {metric_name}')
    plt.title(f'Training and Validation {metric_name}')
    plt.xlabel(x_label)
    plt.ylabel(y_label)
    plt.legend()
    
    plt.show()

# Probabiliity estimates for resisting heat (positive class)
# eval_outputs = trainer.predict(val_dataset)
eval_outputs = trainer.predict(val_dataset)
logits = eval_outputs.predictions
prob_estimates = np.exp(logits[:, 1]) / (np.exp(logits[:, 0]) + np.exp(logits[:, 1]))

precision, recall, _ = precision_recall_curve(y_val, prob_estimates)
auc_score = auc(recall, precision)

plt.plot(recall, precision, 'b', label=f'AUC = {auc_score:.2f}')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.legend()
plt.show()


def plot_save_metrics(train_metric_values, val_metric_values, metric_name, x_label, y_label, output_file):
    epochs = range(1, len(train_metric_values) + 1)
    
    plt.plot(epochs, train_metric_values, 'bo', label=f'Training {metric_name}')
    plt.plot(epochs, val_metric_values, 'r', label=f'Validation {metric_name}')
    plt.title(f'Training and Validation {metric_name}')
    plt.xlabel(x_label)
    plt.ylabel(y_label)
    plt.legend()
    plt.savefig(output_file)
    plt.clf()

def plot_save_precision_recall_curve(precision, recall, auc_score, output_file):
    plt.plot(recall, precision, 'b', label=f'AUC = {auc_score:.2f}')
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.legend()
    plt.savefig(output_file)
    plt.clf()

# Call the updated functions and provide the output file names
plot_save_metrics(f1_scores, eval_f1_scores, 'F1 Score', 'Epochs', 'F1 Score', 'f1.png')
plot_save_metrics(precision, eval_precision, 'Precision', 'Epochs', 'Precision', 'precision.png')
plot_save_metrics(recall, eval_recall, 'Recall', 'Epochs', 'Recall', 'recall.png')
plot_save_metrics(accuracy, eval_accuracy, 'Accuracy', 'Epochs', 'Accuracy', 'accuracy.png')

plot_save_precision_recall_curve(precision, recall, auc_score, 'precision_recall_curve.png')

In [None]:
############ CALCULATING METRICS FOR LABELELD DATASETS ##################

import os
import pandas as pd
import numpy as np
import torch
from torch.utils.data import TensorDataset, DataLoader, Dataset
import torch.nn as nn
from transformers import TFAutoModelForSequenceClassification, AutoTokenizer
from transformers import AutoModel, AutoConfig
from transformers import BertModel, BertTokenizer#, DistilProtBert
from transformers import BertForSequenceClassification
from transformers import T5Model, T5Tokenizer
from transformers import EarlyStoppingCallback
from transformers import default_data_collator
from transformers import TrainingArguments, Trainer#, default_device
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score, precision_recall_curve, auc
from sklearn.utils import shuffle
import pickle
import bz2
import mgzip
import gc
from functools import reduce
import bisect
import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib.font_manager as fm
import gc
from datasets import load_metric
from imblearn.under_sampling import RandomUnderSampler


# Specify filepaths
model_path = "/content/drive/MyDrive/Colab Notebooks/Dissertation Code/AWS/results"
tokenised_data_path = "/content/drive/My Drive/Colab Notebooks/Dissertation Code/AWS/Data/256"
dataset = "val" # train, val or test


# Load trained model
config = AutoConfig.from_pretrained(f'{model_path}/config.json')
model = BertForSequenceClassification.from_pretrained(f'{model_path}/pytorch_model.bin', config=config)


# Use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)
model = model.to(device)


# Load data
print("\nLoading dataset")
with mgzip.open(f"{tokenised_data_path}/BEAR_encoded_{dataset}.gz", "rb") as f:
    x = pickle.load(f)

print("\nLoading labels")
with mgzip.open(f"{tokenised_data_path}/BEAR_encoded_y_{dataset}.gz", "rb") as f:
    y = pickle.load(f)


y = torch.tensor(y)

class EvalDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)


test_dataset = EvalDataset(x, y)

test_loader = DataLoader(test_dataset, batch_size=16)


class_counts = {}
for i in range(len(test_dataset)):
    label = test_dataset[i]['labels'].item()
    if label in class_counts:
        class_counts[label] += 1
    else:
        class_counts[label] = 1

print("Class counts:", class_counts)



from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score

model.eval()
true_labels = []
pred_labels = []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        
        outputs = model(input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        predictions = torch.argmax(logits, dim=-1).cpu().numpy()

        true_labels.extend(labels.cpu().numpy())
        pred_labels.extend(predictions)

f1 = f1_score(true_labels, pred_labels, average='weighted')
accuracy = accuracy_score(true_labels, pred_labels)
precision = precision_score(true_labels, pred_labels, average='weighted')
recall = recall_score(true_labels, pred_labels, average='weighted')

print("F1 Score:", f1)
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)

In [None]:
############ INFERENCE WITH UNLABELLED DATASET ##################
from tqdm import tqdm
import pandas as pd
import numpy as np
import torch
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn
from transformers import TFAutoModelForSequenceClassification, AutoTokenizer
from transformers import AutoModel, AutoConfig
from transformers import BertModel, BertTokenizer#, DistilProtBert
from transformers import BertForSequenceClassification
from transformers import T5Model, T5Tokenizer
from transformers import EarlyStoppingCallback
from transformers import default_data_collator
from transformers import TrainingArguments, Trainer#, default_device
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score, precision_recall_curve, auc
from sklearn.utils import shuffle
import pickle
import bz2
import mgzip
import gc
from functools import reduce
import bisect
import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib.font_manager as fm
import gc
from datasets import load_metric
from imblearn.under_sampling import RandomUnderSampler


# Specify filepaths
model_path = "/content/drive/MyDrive/Colab Notebooks/Dissertation Code/AWS/results"
tokenised_data_path = "/content/drive/My Drive/Colab Notebooks/Dissertation Code/AWS/Data/256"
dataset = "val" # cauris, train, val or test

# Use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# Import model
config = AutoConfig.from_pretrained(f'{model_path}/config.json')
model = BertForSequenceClassification.from_pretrained(f'{model_path}/pytorch_model.bin', config=config)
model.to(device)

# Load unlabelled dataset
print("\nLoading unlabelled dataset")
with mgzip.open(f"{tokenised_data_path}/BEAR_encoded_{dataset}.gz", "rb") as f:
    unlabelled_data = pickle.load(f)

# Convert input_ids and attention_mask lists to tensors
input_ids = torch.tensor(unlabelled_data['input_ids'])
attention_mask = torch.tensor(unlabelled_data['attention_mask'])

# Create DataLoader for the unlabeled dataset
unlabeled_dataset = TensorDataset(input_ids, attention_mask)
unlabeled_dataloader = DataLoader(unlabeled_dataset, batch_size=16)


print(len(unlabelled_data))
print(len(input_ids))


# Perform inference on the unlabeled dataset
predictions = []

with torch.no_grad():
    for batch in tqdm(unlabeled_dataloader, desc="Inference"):
        input_ids = batch[0].to(device)
        attention_mask = batch[1].to(device)

        outputs = model(input_ids, attention_mask=attention_mask)
        logits = outputs.logits1
        batch_predictions = torch.argmax(logits, dim=-1).cpu().numpy()
        predictions.extend(batch_predictions)

print("Predictions:", predictions)

# Convert predictions to a pandas DataFrame
predictions_df = pd.DataFrame(predictions, columns=['prediction'])

# Save predictions as a CSV file
predictions_df.to_csv('predictions.csv', index=False)


# Save real labels as a CSV file
print("\nLoading evaluation labels")
with mgzip.open(f"{tokenised_data_path}/BEAR_encoded_y_{dataset}.gz", "rb") as f:
    y = pickle.load(f)

y.to_csv(f'{dataset}_groundtruth.csv', index=False)

In [None]:
################ Baseline Metric Calculations ###############
import csv
import random
import mgzip
import pickle

# Specify filepaths
predictions_path = "/content/drive/MyDrive/Colab Notebooks/Dissertation Code/AWS/results"
tokenised_data_path = "/content/drive/My Drive/Colab Notebooks/Dissertation Code/AWS/Data/256"
dataset = "test"    # can also use train or val to see baselines for these

def read_csv(file_name):
    with open(file_name, 'r') as file:
        labels = csv.reader(file)
        next(labels)  # Skip the header row
        return [int(row[0]) for row in labels]


def calculate_confusion_matrix(predictions, ground_truth):
    true_pos, true_neg, false_pos, false_neg = 0, 0, 0, 0

    for pred, gt in zip(predictions, ground_truth):
        if pred == gt:
            if pred == 1:
                true_pos += 1
            else:
                true_neg += 1
        else:
            if pred == 1:
                false_pos += 1
            else:
                false_neg += 1

    return true_pos, true_neg, false_pos, false_neg


def calculate_metrics(predictions, ground_truth):
    true_pos, true_neg, false_pos, false_neg = calculate_confusion_matrix(predictions, ground_truth)

    accuracy = (true_pos + true_neg) / (true_pos + true_neg + false_pos + false_neg)
    precision = true_pos / (true_pos + false_pos) if true_pos + false_pos > 0 else 0
    recall = true_pos / (true_pos + false_neg) if true_pos + false_neg > 0 else 0
    f1_score = 2 * ((precision * recall) / (precision + recall)) if precision + recall > 0 else 0

    return accuracy, precision, recall, f1_score


def baseline_50(ground_truth):
    num_zeros = ground_truth.count(0)
    num_ones = ground_truth.count(1)

    half_zeros = num_zeros // 2
    half_ones = num_ones // 2

    flipped_zeros = 0
    flipped_ones = 0

    baseline_50 = []

    for gt in ground_truth:
        if gt == 0 and flipped_zeros < half_zeros:
            baseline_50.append(1)
            flipped_zeros += 1
        elif gt == 1 and flipped_ones < half_ones:
            baseline_50.append(0)
            flipped_ones += 1
        else:
            baseline_50.append(gt)

    return baseline_50


def baseline_proportionate(ground_truth):      # predicts majority class 98.84% of the time
    num_zeros = ground_truth.count(0)
    num_ones = ground_truth.count(1)

    proportion_zeros = int(num_zeros * 0.0116) # incorrect 1.16% of the time
    proportion_ones = int(num_ones * 0.9884)   # incorrect 98.84% of the time

    flipped_zeros = 0
    flipped_ones = 0

    baseline_proportionate = []

    for gt in ground_truth:
        if gt == 0 and flipped_zeros < proportion_zeros:
            baseline_proportionate.append(1)
            flipped_zeros += 1
        elif gt == 1 and flipped_ones < proportion_ones:
            baseline_proportionate.append(0)
            flipped_ones += 1
        else:
            baseline_proportionate.append(gt)

    return baseline_proportionate



# Import label files
with mgzip.open(f"{tokenised_data_path}/BEAR_encoded_y_{dataset}.gz", "rb") as f:
    ground_truth = pickle.load(f)

predictions = read_csv(f'{predictions_path}/predictions_{dataset}.csv')
baseline_majority = [0 for _ in range(len(ground_truth))]                         # always predict negative (majority class)
baseline_true_random = [random.choice([0, 1]) for _ in range(len(ground_truth))]  # randomly predict postive or negatice
baseline_50_50_random = baseline_50(ground_truth)                                 # predict randomly such that it's correct 50% of the time on both classes
baseline_proportionate_random = baseline_proportionate(ground_truth)              # predict randomly such that it's correct 98.837% of the time on both classes


# Calculate metrics
accuracy, precision, recall, f1_score = calculate_metrics(baseline_proportionate_random, ground_truth)

print(f'Accuracy: {accuracy:.5f}')
print(f'F1 Score: {f1_score:.5f}')
print(f'Precision: {precision:.5f}')
print(f'Recall: {recall:.5f}')

