#Set up environment

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install transformers[sentencepiece]

In [None]:
!pip install tabulate

In [None]:
import json
import torch
from tabulate import tabulate
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AutoModel
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from tqdm import tqdm
from sklearn.metrics import f1_score, precision_score, recall_score
from sklearn.metrics import precision_recall_fscore_support

#Hierarchical Classification Metric Implementation

In [None]:
!pip install sklearn-hierarchical-classification

In [None]:
import pdb
import json
import logging.handlers
import argparse
import os
import numpy as np
from sklearn.metrics import f1_score
from sklearn.preprocessing import MultiLabelBinarizer
from networkx import DiGraph, relabel_nodes, all_pairs_shortest_path_length
from sklearn_hierarchical_classification.constants import ROOT
from sklearn_hierarchical_classification.metrics import h_fbeta_score, h_recall_score, h_precision_score, \
    fill_ancestors, multi_labeled

In [None]:
G = DiGraph()
G.add_edge(ROOT, "Logos")
G.add_edge("Logos", "Repetition")
G.add_edge("Logos", "Obfuscation, Intentional vagueness, Confusion")
G.add_edge("Logos", "Reasoning")
G.add_edge("Logos", "Justification")
G.add_edge('Justification', "Slogans")
G.add_edge('Justification', "Bandwagon")
G.add_edge('Justification', "Appeal to authority")
G.add_edge('Justification', "Flag-waving")
G.add_edge('Justification', "Appeal to fear/prejudice")
G.add_edge('Reasoning', "Simplification")
G.add_edge('Simplification', "Causal Oversimplification")
G.add_edge('Simplification', "Black-and-white Fallacy/Dictatorship")
G.add_edge('Simplification', "Thought-terminating cliché")
G.add_edge('Reasoning', "Distraction")
G.add_edge('Distraction', "Misrepresentation of Someone's Position (Straw Man)")
G.add_edge('Distraction', "Presenting Irrelevant Data (Red Herring)")
G.add_edge('Distraction', "Whataboutism")
G.add_edge(ROOT, "Ethos")
G.add_edge('Ethos', "Appeal to authority")
G.add_edge('Ethos', "Glittering generalities (Virtue)")
G.add_edge('Ethos', "Bandwagon")
G.add_edge('Ethos', "Ad Hominem")
G.add_edge('Ethos', "Transfer")
G.add_edge('Ad Hominem', "Doubt")
G.add_edge('Ad Hominem', "Name calling/Labeling")
G.add_edge('Ad Hominem', "Smears")
G.add_edge('Ad Hominem', "Reductio ad hitlerum")
G.add_edge('Ad Hominem', "Whataboutism")
G.add_edge(ROOT, "Pathos")
G.add_edge('Pathos', "Exaggeration/Minimisation")
G.add_edge('Pathos', "Loaded Language")
G.add_edge('Pathos', "Appeal to (Strong) Emotions")
G.add_edge('Pathos', "Appeal to fear/prejudice")
G.add_edge('Pathos', "Flag-waving")
G.add_edge('Pathos', "Transfer")

In [None]:

def _read_gold_and_pred(pred_fpath):
  """
  Read gold and predicted data.
  :param pred_fpath: a json file with predictions,
  :param gold_fpath: the original annotated gold file.
  :return: {id:pred_labels} dict; {id:gold_labels} dict
  """

  gold_labels = {}
  with open('/content/drive/MyDrive/SemEval/data/subtask1/validation.json', encoding='utf-8') as gold_f:
    gold = json.load(gold_f)
    #print(len(gold))
    for obj in gold:
      gold_labels[obj['id']] = obj['labels']


  pred_labels = {}
  with open(pred_fpath, encoding='utf-8') as pred_f:
    pred = json.load(pred_f)
    #print(len(pred))
    for obj in pred:
      pred_labels[obj['id']] = obj['labels']

  if set(gold_labels.keys()) != set(pred_labels.keys()):
      print('There are either missing or added examples to the prediction file. Make sure you only have the gold examples in the prediction file.')

  return pred_labels, gold_labels


In [None]:
def get_all_classes_from_graph(graph):
    return [
        node
        for node in graph.nodes
        if node != ROOT
        ]

def _h_fbeta_score(y_true, y_pred, class_hierarchy, beta=1., root=ROOT):
    hP = _h_precision_score(y_true, y_pred, class_hierarchy, root=root)
    hR = _h_recall_score(y_true, y_pred, class_hierarchy, root=root)
    if hP == 0 and hR == 0:
      return 0
    return (1. + beta ** 2.) * hP * hR / (beta ** 2. * hP + hR)

def _fill_ancestors(y, graph, root, copy=True):
    y_ = y.copy() if copy else y
    paths = all_pairs_shortest_path_length(graph.reverse(copy=False))
    for target, distances in paths:
        if target == root:
            continue
        #print(target)
        ix_rows = np.where(y[:, target] > 0)[0]
        ancestors = list(filter(lambda x: x != ROOT,distances.keys()))
        y_[tuple(np.meshgrid(ix_rows, ancestors))] = 1
    graph.reverse(copy=False)
    return y_
def _h_recall_score(y_true, y_pred, class_hierarchy, root=ROOT):
    y_true_ = _fill_ancestors(y_true, graph=class_hierarchy, root=root)
    y_pred_ = _fill_ancestors(y_pred, graph=class_hierarchy, root=root)

    ix = np.where((y_true_ != 0) & (y_pred_ != 0))

    true_positives = len(ix[0])
    all_positives = np.count_nonzero(y_true_)

    if all_positives == 0:
      return 0

    return true_positives / all_positives

def _h_precision_score(y_true, y_pred, class_hierarchy, root=ROOT):
    y_true_ = _fill_ancestors(y_true, graph=class_hierarchy, root=root)
    y_pred_ = _fill_ancestors(y_pred, graph=class_hierarchy, root=root)

    ix = np.where((y_true_ != 0) & (y_pred_ != 0))

    true_positives = len(ix[0])
    all_results = np.count_nonzero(y_pred_)

    if all_results == 0:
      return 0

    return true_positives / all_results

In [None]:
def calculate_metrics(graph, pred_file):

  pred_labels, gold_labels = _read_gold_and_pred(pred_file)
  gold = []
  pred = []

  for id in gold_labels:
        gold.append(gold_labels[id])
        pred.append(pred_labels[id])

  with multi_labeled(gold, pred, G) as (gold_, pred_, graph_):
        return  _h_precision_score(gold_, pred_,graph_), _h_recall_score(gold_, pred_,graph_), _h_fbeta_score(gold_, pred_,graph_)


# Data Preparation

In [None]:
def read_classes(file_path):
  CLASSES = []
  with open(file_path) as f:
    for label in f.readlines():
      label = label.strip()
      if label:
        CLASSES.append(label)
  return CLASSES

In [None]:
labelSet = read_classes('label_file_path')

In [None]:
print(len(labelSet))
print(labelSet)

In [None]:
all_labels = ['Repetition', 'Obfuscation, Intentional vagueness, Confusion', 'Slogans', 'Bandwagon', 'Appeal to authority', 'Flag-waving', 'Appeal to fear/prejudice','Causal Oversimplification', 'Black-and-white Fallacy/Dictatorship', 'Thought-terminating cliché', "Misrepresentation of Someone's Position (Straw Man)", 'Presenting Irrelevant Data (Red Herring)', 'Whataboutism', 'Glittering generalities (Virtue)', 'Doubt', 'Name calling/Labeling', 'Smears', 'Reductio ad hitlerum', 'Exaggeration/Minimisation', 'Loaded Language']
print(len(all_labels))

In [None]:
torch.manual_seed(1)

In [None]:
# Load the data from JSON files
with open('train_file_path', 'r') as train_file:
    train_data = json.load(train_file)

with open('valid_file_path', 'r') as valid_file:
    valid_data = json.load(valid_file)

valid_ids = [instance['id'] for instance in valid_data]

with open('test_file_path', 'r') as test_file:
    test_data = json.load(test_file)

In [None]:
print(len(train_data))
print(len(valid_data))
print(len(test_data))

In [None]:
# Convert labels to one-hot encoded tensors
def one_hot_encode_labels(labels, label_list):
    label_dict = {label: i for i, label in enumerate(label_list)}
    one_hot = torch.zeros(len(label_list))
    for label in labels:
        one_hot[label_dict[label]] = 1
    return one_hot

In [None]:
# Tokenize and encode the text data
def tokenize_and_encode_text(instance, model_name, tokenizer):
    encoding = tokenizer(instance['text'], padding='max_length', truncation=True, max_length=128, return_tensors='pt')
    return encoding

In [None]:
def prepare_data(train_data, valid_data, test_data, model_name):

  tokenizer = AutoTokenizer.from_pretrained(model_name)
  # Tokenize and encode all instances in the training and validation datasets
  train_encodings = [tokenize_and_encode_text(instance,model_name,tokenizer) for instance in train_data]
  valid_encodings = [tokenize_and_encode_text(instance,model_name,tokenizer) for instance in valid_data]
  test_encodings = [tokenize_and_encode_text(instance,model_name,tokenizer) for instance in test_data]

  # One-hot encode labels for all instances
  train_labels = [one_hot_encode_labels(instance['labels'], all_labels) for instance in train_data]
  valid_labels = [one_hot_encode_labels(instance['labels'], all_labels) for instance in valid_data]

  # Convert the lists of encodings and labels to PyTorch tensors
  train_input_ids = torch.cat([encoding['input_ids'] for encoding in train_encodings], dim=0)
  train_attention_mask = torch.cat([encoding['attention_mask'] for encoding in train_encodings], dim=0)
  train_labels = torch.stack(train_labels)

  valid_input_ids = torch.cat([encoding['input_ids'] for encoding in valid_encodings], dim=0)
  valid_attention_mask = torch.cat([encoding['attention_mask'] for encoding in valid_encodings], dim=0)
  valid_labels = torch.stack(valid_labels)

  test_input_ids = torch.cat([encoding['input_ids'] for encoding in test_encodings], dim=0)
  test_attention_mask = torch.cat([encoding['attention_mask'] for encoding in test_encodings], dim=0)


  # Create a list of 'id' strings
  test_ids = [instance['id'] for instance in test_data]
  print("Test shape: ",test_input_ids.shape)


  train_input_ids = train_input_ids.to(device)
  train_attention_mask = train_attention_mask.to(device)
  train_labels = train_labels.to(device)

  valid_input_ids = valid_input_ids.to(device)
  valid_attention_mask = valid_attention_mask.to(device)
  valid_labels = valid_labels.to(device)

  test_input_ids = test_input_ids.to(device)
  test_attention_mask = test_attention_mask.to(device)


  return train_input_ids, train_attention_mask, train_labels, valid_input_ids, valid_attention_mask, valid_labels, test_input_ids, test_attention_mask, test_ids


In [None]:
def train_model(classifier_model, train_dataloader):
  # Define loss function and optimizer
  criterion = nn.BCEWithLogitsLoss()
  optimizer = optim.Adam(classifier_model.parameters(), lr=2e-5)

  # Training loop
  num_epochs = 10 # You can adjust the number of epochs
  for epoch in range(num_epochs):
      classifier_model.train()
      total_loss = 0
      for batch in tqdm(train_dataloader, desc=f'Epoch {epoch + 1}/{num_epochs}'):
          input_ids, attention_mask, labels = batch
          # Move input tensors to the same device as the model
          input_ids = input_ids.to(device)
          attention_mask = attention_mask.to(device)
          optimizer.zero_grad()
          logits = classifier_model(input_ids, attention_mask)
          loss = criterion(logits, labels.float())
          total_loss+=loss.item()
          loss.backward()
          optimizer.step()

      average_loss = total_loss / len(train_dataloader)
      print(f'Epoch {epoch + 1}/{num_epochs}, Average Loss: {average_loss}')

  # Save the trained model
  torch.save(classifier_model.state_dict(), 'custom_classifier_model.pth')
  return classifier_model


In [None]:
def validate_model(classifier_model, valid_dataloader, model):

  # Initialize lists to store true labels and predicted labels
  true_labels = []
  predicted_labels = []

  # Iterate over the validation dataset
  with torch.no_grad():
      for batch in valid_dataloader:  # Assuming you have set up a DataLoader for the validation dataset
          input_ids, attention_mask, labels = batch
          logits = classifier_model(input_ids, attention_mask)

          # Apply sigmoid activation to the logits to get predicted probabilities
          predicted_probs = torch.sigmoid(logits)

          true_labels.extend(labels.cpu().numpy())
          predicted_labels.extend(predicted_probs.cpu().numpy())

  # Convert the lists to NumPy arrays
  p_labels = predicted_labels
  t_labels = true_labels
  true_labels = np.array(true_labels)
  predicted_labels = np.array(predicted_labels)

  # Apply a threshold to predicted probabilities to determine the predicted labels
  threshold = 0.25  # You can adjust this threshold based on your requirements
  class_threshold = [0.4, 0.3, 0.2, 0.01, 0.7, 0.5, 0.4, 0.4, 0.2, 0.1, 0.01, 0.01, 0.2, 0.1, 0.7, 0.5, 0.4, 0.4, 0.2, 0.1]
  predicted_labels = (predicted_labels > threshold).astype(int)

  # Prepare the results in the desired format (id and labels) and save to a JSON file
  results = [{'id': valid_id, 'labels': [all_labels[i] for i, value in enumerate(instance) if value == 1]} for valid_id, instance in zip(valid_ids, predicted_labels)]

  output_file_path = "path_to_val_results" + model + "_val_results.json"
  with open(output_file_path, 'w') as output_file:
      json.dump(results, output_file, indent=2)


  precision, recall, f1, support = precision_recall_fscore_support(true_labels, predicted_labels, average=None, zero_division=0)

  precision_h, recall_h, f1_h = calculate_metrics(G, output_file_path)
  print("HP",precision_h)
  print("HR",recall_h)
  print("HF",f1_h)


  # Create a list to store the results
  results = []

  # Populate the results list with class-wise metrics
  for i, class_name in enumerate(all_labels):
   results.append([class_name, f"{f1[i]:.2f}", f"{precision[i]:.2f}", f"{recall[i]:.2f}"])


  # Print the results in a table format
  headers = ["Class", "F1-Score", "Precision", "Recall", "Support"]
  print(tabulate(results, headers=headers, tablefmt="grid"))

  return p_labels, t_labels


In [None]:
def test_model(classifier_model, test_data, model):

  cnt = 0

  # Make predictions on the test dataset
  predicted_labels = []
  p_test_ids = []

  with torch.no_grad():
      for batch in test_dataloader:
          input_ids, attention_mask= batch
          input_ids = input_ids.to(device)
          attention_mask = attention_mask.to(device)
          logits = classifier_model(input_ids, attention_mask)

          # Apply sigmoid activation to the logits to get predicted probabilities
          predicted_probs = torch.sigmoid(logits)

          predicted_labels.extend(predicted_probs.cpu().numpy())
          p_test_ids.extend(test_ids[cnt])
          cnt+=1


  p_labels = predicted_labels
  # Apply a threshold to predicted probabilities to determine the predicted labels
  threshold = 0.25  # You can adjust this threshold based on your requirements
  class_threshold = [0.4, 0.3, 0.2, 0.01, 0.7, 0.5, 0.4, 0.4, 0.2, 0.1, 0.01, 0.01, 0.2, 0.1, 0.7, 0.5, 0.4, 0.4, 0.2, 0.1]
  predicted_labels = (np.array(predicted_labels) > threshold).astype(int)


  # Prepare the results in the desired format (id and labels) and save to a JSON file
  results = [{'id': test_id, 'labels': [all_labels[i] for i, value in enumerate(instance) if value == 1]} for test_id, instance in zip(test_ids, predicted_labels)]

  output_file_path = "path_to_test_results" + "_" + model + ".json"
  with open(output_file_path, 'w') as output_file:
      json.dump(results, output_file, indent=2)

  print(f"Predictions saved to {output_file_path}")

  return p_labels


#Models

##BERT

In [None]:
# Initialize a tokenizer for your chosen pre-trained model
model_name_1 = "bert-base-uncased"  # You can replace this with your preferred model

In [None]:
bert_model = AutoModel.from_pretrained(model_name_1)

# Define the custom classifier model
class CustomClassifierBERT(nn.Module):
    def __init__(self, num_labels):
        super(CustomClassifierBERT, self).__init__()
        self.num_labels = num_labels
        self.bert = bert_model.to(device)
        self.classifier = nn.Linear(self.bert.config.hidden_size, num_labels)

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids, attention_mask=attention_mask)
        logits = self.classifier(outputs.last_hidden_state[:, 0, :])  # Use [CLS] token output
        return logits

# Instantiate the classifier model
num_labels = len(all_labels)  # The number of labels in your dataset
classifier_model_1 = CustomClassifierBERT(num_labels).to(device) # or you can load a saved model



train_input_ids, train_attention_mask, train_labels, valid_input_ids, valid_attention_mask, valid_labels, test_input_ids, test_attention_mask, test_ids = prepare_data(train_data, valid_data, test_data, model_name_1)

# Convert data to PyTorch DataLoader for training
train_dataset = TensorDataset(train_input_ids, train_attention_mask, train_labels)
train_dataloader = DataLoader(train_dataset, batch_size=128, shuffle=True)

valid_dataset = TensorDataset(valid_input_ids, valid_attention_mask, valid_labels)
valid_dataloader = DataLoader(valid_dataset, batch_size=128, shuffle=False)

test_dataset = TensorDataset(test_input_ids, test_attention_mask)
test_dataloader = DataLoader(test_dataset, batch_size=128, shuffle=False)

In [None]:
print(len(train_data))

In [None]:
# Train the model
c_model_1 = train_model(classifier_model_1,train_dataloader)

In [None]:
# Validate the model
c_model_1.eval()
bert_val_labels, true_labels = validate_model(c_model_1, valid_dataloader,"bert")

In [None]:
# Test the model
c_model_1.eval()
bert_labels = test_model(c_model_1, test_dataloader, "bert")

In [None]:
print(len(bert_labels))
print(np.shape(bert_labels))

In [None]:
torch.save(c_model_1.state_dict(), 'path_to_model.pth')

##RoBERTa

In [None]:
# Initialize a tokenizer for your chosen pre-trained model
model_name_2 = "xlm-roberta-base"  # You can replace this with your preferred model

In [None]:
#tokenizer_roberta = RobertaTokenizer.from_pretrained(model_name_2)
roberta_model = AutoModel.from_pretrained(model_name_2)

class CustomClassifierRoBERTa(nn.Module):
    def __init__(self, num_labels):
        super(CustomClassifierRoBERTa, self).__init__()
        self.num_labels = num_labels
        self.roberta = roberta_model.to(device)
        self.classifier = nn.Linear(self.roberta.config.hidden_size, num_labels)

    def forward(self, input_ids, attention_mask):
        outputs = self.roberta(input_ids, attention_mask=attention_mask)
        logits = self.classifier(outputs.last_hidden_state[:, 0, :])
        return logits


# Instantiate the classifier model
num_labels = len(all_labels)  # The number of labels in your dataset
classifier_model_2 = CustomClassifierRoBERTa(num_labels).to(device)

train_input_ids, train_attention_mask, train_labels, valid_input_ids, valid_attention_mask, valid_labels, test_input_ids, test_attention_mask, test_ids = prepare_data(train_data, valid_data, test_data, model_name_2)

# Convert data to PyTorch DataLoader for training
train_dataset = TensorDataset(train_input_ids, train_attention_mask, train_labels)
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)

valid_dataset = TensorDataset(valid_input_ids, valid_attention_mask, valid_labels)
valid_dataloader = DataLoader(valid_dataset, batch_size=64, shuffle=False)

test_dataset = TensorDataset(test_input_ids, test_attention_mask)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [None]:
print(len(train_data))

In [None]:
# Train the model
c_model_2 = train_model(classifier_model_2,train_dataloader)

In [None]:
# Validate the model
c_model_2.eval()
roberta_val_labels, true_labels = validate_model(c_model_2, valid_dataloader,"roberta")

In [None]:
# Test the model
c_model_2.eval()
roberta_labels = test_model(c_model_2, test_dataloader, "roberta")

In [None]:
torch.save(c_model_2.state_dict(), 'path_to_model.pth')

## mBERT

In [None]:
# Initialize mbert tokenizer and model
model_name_3 = "bert-base-multilingual-uncased"

In [None]:
mbert_model = AutoModel.from_pretrained(model_name_3)

# Define the custom classifier model
class CustomClassifiermBERT(nn.Module):
    def __init__(self, num_labels):
        super(CustomClassifiermBERT, self).__init__()
        self.num_labels = num_labels
        self.mbert = mbert_model.to(device)
        self.classifier = nn.Linear(self.mbert.config.hidden_size, num_labels)

    def forward(self, input_ids, attention_mask):
        outputs = self.mbert(input_ids, attention_mask=attention_mask)
        logits = self.classifier(outputs.last_hidden_state[:, 0, :])  # Use [CLS] token output
        return logits

# Instantiate the classifier model
num_labels = len(all_labels)  # The number of labels in your dataset
classifier_model_3 = CustomClassifiermBERT(num_labels).to(device)





train_input_ids, train_attention_mask, train_labels, valid_input_ids, valid_attention_mask, valid_labels, test_input_ids, test_attention_mask, test_ids = prepare_data(train_data, valid_data, test_data, model_name_3)

# Convert data to PyTorch DataLoader for training
train_dataset = TensorDataset(train_input_ids, train_attention_mask, train_labels)
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)

valid_dataset = TensorDataset(valid_input_ids, valid_attention_mask, valid_labels)
valid_dataloader = DataLoader(valid_dataset, batch_size=64, shuffle=False)

test_dataset = TensorDataset(test_input_ids, test_attention_mask)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [None]:
# Train the model
c_model_3 = train_model(classifier_model_3,train_dataloader)

In [None]:
# Validate the model
c_model_3.eval()
mbert_val_labels, true_labels = validate_model(c_model_3, valid_dataloader,"mbert")

In [None]:
# Test the model
c_model_3.eval()
m_bert_labels = test_model(c_model_3, test_dataloader, "mbert")

In [None]:
torch.save(c_model_3.state_dict(), 'path_to_model.pth')

#Ensemble-Validation Data

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
def calc_h_metrics(bert_val_labels, roberta_val_labels, mbert_val_labels,model):

  predicted_labels_model1 = np.array(bert_val_labels)
  predicted_labels_model2 = np.array(roberta_val_labels)
  predicted_labels_model3 = np.array(mbert_val_labels)

  ensemble_predictions = (predicted_labels_model1 + predicted_labels_model2 + predicted_labels_model3) / 3

  # Apply a threshold to determine the final predicted labels
  threshold = 0.5  # You can adjust this threshold based on your requirements
  class_threshold = [0.4, 0.3, 0.2, 0.01, 0.7, 0.5, 0.4, 0.4, 0.2, 0.1, 0.01, 0.01, 0.2, 0.1, 0.7, 0.5, 0.4, 0.4, 0.2, 0.1]
  final_predicted_labels = (ensemble_predictions > class_threshold).astype(int)

  # Prepare the results in the desired format (id and labels) and save to a JSON file
  results = [{'id': valid_id, 'labels': [all_labels[i] for i, value in enumerate(instance) if value == 1]} for valid_id, instance in zip(valid_ids, final_predicted_labels)]

  output_file_path = "path_to_val_results" + model + "_.json"
  with open(output_file_path, 'w') as output_file:
      json.dump(results, output_file, indent=2)

  precision_h, recall_h, f1_h = calculate_metrics(G, output_file_path)
  print("HP",precision_h)
  print("HR",recall_h)
  print("HF",f1_h)


In [None]:
calc_h_metrics(bert_val_labels, roberta_val_labels, mbert_val_labels,"ensemble")

In [None]:
def calculate_metrics_ensemble_val(bert_val_labels, roberta_val_labels, mbert_val_labels, true_labels, save_path, save_path_metrics):

  predicted_labels_model1 = np.array(bert_val_labels)
  predicted_labels_model2 = np.array(roberta_val_labels)
  predicted_labels_model3 = np.array(mbert_val_labels)


  # Combine the predictions by element-wise averaging
  ensemble_predictions = (predicted_labels_model1 + predicted_labels_model2 + predicted_labels_model3) / 3

  # Apply a threshold to determine the final predicted labels
  threshold = 0.5  # You can adjust this threshold based on your requirements
  class_threshold = [0.4, 0.3, 0.2, 0.01, 0.7, 0.5, 0.4, 0.4, 0.2, 0.1, 0.01, 0.01, 0.2, 0.1, 0.7, 0.5, 0.4, 0.4, 0.2, 0.1]
  precision, recall, f1, support = precision_recall_fscore_support(true_labels, final_predicted_labels, average=None, zero_division=0)

  # Create a list to store the results
  results = []

  # Populate the results list with class-wise metrics
  for i, class_name in enumerate(all_labels):
    results.append([class_name, f"{f1[i]:.2f}", f"{precision[i]:.2f}", f"{recall[i]:.2f}"])

  # Print the results in a table format
  headers = ["Class", "F1-Score", "Precision", "Recall"]
  table = tabulate(results, headers=headers, tablefmt="grid")
  print(tabulate(results, headers=headers, tablefmt="grid"))

  #Save the table in human pretty format

  with open(save_path, 'w') as file:
    file.write(table)
  print(f"Table saved to {save_path}")


  # Create a DataFrame using pandas
  df = pd.DataFrame(results, columns=headers)

  # Save the DataFrame to a CSV file
  df.to_csv(save_path_metrics, index=False)
  print(f"Table saved to {save_path_metrics}")


In [None]:
calculate_metrics_ensemble_val(bert_val_labels, roberta_val_labels, mbert_val_labels, true_labels,
                               'file_1.csv',
                               'file_2.csv')

# Ensemble-Test Data

In [None]:
def predict_test(bert_labels, roberta_labels, m_bert_labels, model):

  predicted_labels_model1 = np.array(bert_labels)
  predicted_labels_model2 = np.array(roberta_labels)
  predicted_labels_model3 = np.array(m_bert_labels)

  # Combine the predictions by element-wise averaging
  ensemble_predictions = (predicted_labels_model1 + predicted_labels_model2 + predicted_labels_model3) / 3

  # Apply a threshold to determine the final predicted labels
  threshold = 0.5  # You can adjust this threshold based on your requirements
  class_threshold = [0.4, 0.3, 0.2, 0.01, 0.7, 0.5, 0.4, 0.4, 0.2, 0.1, 0.01, 0.01, 0.2, 0.1, 0.7, 0.5, 0.4, 0.4, 0.2, 0.1]
  final_predicted_labels = (ensemble_predictions > class_threshold).astype(int)


  # Prepare the results in the desired format (id and labels) and save to a JSON file
  results = [{'id': test_id, 'labels': [all_labels[i] for i, value in enumerate(instance) if value == 1]} for test_id, instance in zip(test_ids, final_predicted_labels)]

  output_file_path = "path_to_test_results" + model +  "_test_ensemble.json"
  with open(output_file_path, 'w',encoding='utf-8') as output_file:
    json.dump(results, output_file, ensure_ascii = False, indent=2)

  print(f"Predictions saved to {output_file_path}")

In [None]:
predict_test(bert_labels, roberta_labels, m_bert_labels, "ensemble")