# Import Necessary Libraries

In [None]:
!pip install transformers
!pip install datasets
from transformers import AutoTokenizer, AutoModelForSequenceClassification, DataCollatorWithPadding
from transformers import TrainingArguments
import torch
import sys
import os
from datasets import Dataset, DatasetDict
from transformers import Trainer
import numpy as np
from sklearn.metrics import f1_score, accuracy_score
import pandas as pd
import csv
import time
import torch.nn.functional as F
import argparse
from transformers import EarlyStoppingCallback
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Lambda, LSTM, Dropout, BatchNormalization, Attention, Input
import tensorflow.keras.backend as K
from sklearn.metrics import f1_score
from sklearn.metrics import classification_report, confusion_matrix

# Generate Summary

In [None]:
train_df = pd.read_csv("/content/original_train.csv")    # original training set from organizers
test_df = pd.read_csv("/content/original_test.csv")      # original test set from organizers
dev_df = pd.read_csv("/content/original_dev.csv")        # original dev set from organizers

In [None]:
def generate_long_text_summary(long_text, max_length_per_section):
    tokenizer = T5Tokenizer.from_pretrained("t5-base")
    model = T5ForConditionalGeneration.from_pretrained("t5-base").to('cuda')

    # Split the text into smaller sections
    sections = [long_text[i:i + max_length_per_section] for i in range(0, len(long_text), max_length_per_section)]

    summaries = []

    for section in sections:
        input_text = "summarize: " + section
        inputs = tokenizer.encode(input_text, return_tensors="pt", max_length=1000, truncation=True, padding=True)

        # Adjust max_length and length_penalty as needed
        summary_ids = model.generate(inputs.to('cuda'), max_length=100, length_penalty=2.0, num_beams=4, early_stopping=True)

        summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True)
        summaries.append(summary)

    # Concatenate the summaries for each section
    final_summary = " ".join(summaries)
    return final_summary

In [None]:
def generate_double_summary(df):
    # Iterate through each row and generate summaries
    for index, row in df.iterrows():
        try:
          input_text = row['explanation']

          # Generate the first summary
          summary = generate_long_text_summary(input_text, 1000)

          # Use the first summary as input for the second summary
          input_text = summary
          summary_new = generate_long_text_summary(input_text, 300)

          # Store the final summary in the 'summary' column
          df.at[index, 'summary'] = summary_new
        except:
          df.at[index, 'summary'] = " "
    return df

In [None]:
# Generate 2nd level summary on all 3 sets
df_train = generate_double_summary(train_df)
df_test = generate_double_summary(test_df)
df_dev = generate_double_summary(dev_df)

# Save all 3 dataframes
df_train.to_csv("/content/summary_train.csv")
df_test.to_csv("/content/summary_test.csv")
df_dev.to_csv("/content/summary_dev.csv")

In [None]:
df_dev = pd.read_csv("/content/summary_dev.csv") # Summarized Dev set
df_train = pd.read_csv("/content/summary_train.csv") # Summarized Train set
df_test = pd.read_csv("/content/summary_test.csv") # Summarized Test set

# Get Legal-Bert Embeddings

In [None]:
# get sentence embeddings using Legal-BERT
def get_embeddings(sentence):
    inputs = tokenizer(sentence, return_tensors="pt", truncation=True, padding=True)
    inputs = inputs.to('cuda')
    outputs = bert_model(**inputs)
    embeddings = outputs.last_hidden_state.mean(dim=1).squeeze().detach().cpu().numpy()
    return embeddings

In [None]:
# Get embeddings for questions,answers and summaries of train
df_train['question_embeddings'] = df_train['question'].apply(get_embeddings)
df_train['answer_embeddings'] = df_train['answer'].apply(get_embeddings)
df_train['summary_embeddings'] = df_train['summary'].apply(get_embeddings)

In [None]:
# Get embeddings for questions,answers and summaries of dev
df_dev['question_embeddings'] = df_dev['question'].apply(get_embeddings)
df_dev['answer_embeddings'] = df_dev['answer'].apply(get_embeddings)
df_dev['summary_embeddings'] = df_dev['summary'].apply(get_embeddings)

In [None]:
# Get embeddings for questions,answers and summaries of test
df_test['question_embeddings'] = df_test['question'].apply(get_embeddings)
df_test['answer_embeddings'] = df_test['answer'].apply(get_embeddings)
df_test['summary_embeddings'] = df_test['summary'].apply(get_embeddings)

In [None]:
# Convert the train embeddings to list
qe_train=df_train['question_embeddings'].tolist()
ae_train= df_train['answer_embeddings'].tolist()
se_train= df_train['summary_embeddings'].tolist()

In [None]:
# Convert the dev embeddings to list
qe_dev=df_dev['question_embeddings'].tolist()
ae_dev= df_dev['answer_embeddings'].tolist()
se_dev= df_dev['summary_embeddings'].tolist()

In [None]:
# Convert the test embeddings to list
qe_test=df_test['question_embeddings'].tolist()
ae_test= df_test['answer_embeddings'].tolist()
se_test= df_test['summary_embeddings'].tolist()

In [None]:
# Use this code to convert the lists to pytorch tensors. 
#Same code applies for dev and test and is ignored here to avoid redundancy
qe_tensor_train= torch.tensor(qe_train).to_dense()
ae_tensor_train= torch.tensor(ae_train).to_dense()
se_tensor_train= torch.tensor(se_train).to_dense()

# CNN feature extraction

In [None]:
import torch
import torch.nn as nn

class CNNModel(nn.Module):
    def __init__(self, input_size, embedding_size):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=input_size, out_channels=100, kernel_size=1)
        self.relu = nn.ReLU()
        self.global_max_pooling = nn.AdaptiveMaxPool1d(1)
        self.dense = nn.Linear(100, embedding_size)

    def forward(self, x):
        x = x.unsqueeze(2)  
        x = self.conv1(x)
        x = self.relu(x)
        x = self.global_max_pooling(x).squeeze(-1)
        x = self.dense(x)
        return x

def get_cnn(input_tensor, model):
    # Forward pass to obtain output embeddings
    embeddings = model(input_tensor)
    return embeddings

In [None]:
#question cnn features
q_cnn_train= get_cnn(qe_tensor_train)
q_cnn_dev = get_cnn(qe_tensor_dev)
q_cnn_test = get_cnn(qe_tensor_test)

In [None]:
#answer cnn features
a_cnn_train= get_cnn(ae_tensor_train)
a_cnn_dev = get_cnn(ae_tensor_dev)
a_cnn_test = get_cnn(ae_tensor_test)

In [None]:
#summary cnn features
s_cnn_train= get_cnn(se_tensor_train)
s_cnn_dev = get_cnn(se_tensor_dev)
s_cnn_test = get_cnn(se_tensor_test)

# Multi-level CNN Feature Fusion Approach

In [None]:
# Define the CNN model
class CNNModel(nn.Module):
    def __init__(self, input_size, embedding_size):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=input_size, out_channels=100, kernel_size=3,padding=1)
        self.relu = nn.ReLU()
        self.global_max_pooling = nn.AdaptiveMaxPool1d(1)
        self.dense = nn.Linear(100, embedding_size)

    def forward(self, x):
        x = x.unsqueeze(2)  # Add a dummy dimension for the channel
        x = self.conv1(x)
        x = self.relu(x)
        x = self.global_max_pooling(x).squeeze(-1)
        x = self.dense(x)
        return x

num_examples = 666
bert_embedding_size = 768
# Create an instance of the model
model = CNNModel(input_size=bert_embedding_size, embedding_size=100)
# Forward pass to obtain output embeddings
q_cnn_train_fus = model(qe_tensor_train)
print("Output Embeddings Shape:", q_cnn_train_fus.shape)

In [None]:
class CNNModel(nn.Module):
    def __init__(self, input_size, embedding_size):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=input_size, out_channels=100, kernel_size=4,padding=2)
        self.relu = nn.ReLU()
        self.global_max_pooling = nn.AdaptiveMaxPool1d(1)
        self.dense = nn.Linear(100, embedding_size)

    def forward(self, x):
        x = x.unsqueeze(2)  # Add a dummy dimension for the channel
        x = self.conv1(x)
        x = self.relu(x)
        x = self.global_max_pooling(x).squeeze(-1)
        x = self.dense(x)
        return x

num_examples = 666
bert_embedding_size1 = 100

# Create an instance of the model
model1 = CNNModel(input_size=bert_embedding_size1, embedding_size=100)
# Forward pass to obtain output embeddings. The previous output is passed as input to this new CNN layer.
q_cnn_train_fus1 = model1(q_cnn_train_fus)
print("Output Embeddings Shape:", q_cnn_train_fus1.shape)

In [None]:
class CNNModel(nn.Module):
    def __init__(self, input_size, embedding_size):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=input_size, out_channels=100, kernel_size=5,padding=3)
        self.relu = nn.ReLU()
        self.global_max_pooling = nn.AdaptiveMaxPool1d(1)
        self.dense = nn.Linear(100, embedding_size)

    def forward(self, x):
        x = x.unsqueeze(2)  # Add a dummy dimension for the channel
        x = self.conv1(x)
        x = self.relu(x)
        x = self.global_max_pooling(x).squeeze(-1)
        x = self.dense(x)
        return x

num_examples = 666
bert_embedding_size2 = 100

# Create an instance of the model
model2 = CNNModel(input_size=bert_embedding_size2, embedding_size=100)
# Forward pass to obtain output embeddings. The previous output is passed as input to this new layer.
q_cnn_train_fus2 = model2(q_cnn_train_fus1)
print("Output Embeddings Shape:", q_cnn_train_fus2.shape)

In [None]:
#The above multi-level fusion code can be used to obtain embeddings for answer, summary of dev and test as well.

# GRU

In [None]:
def get_bi_gru(input_tensor):
    # Define the Bi-GRU layer
    bi_gru = nn.GRU(input_size=768, hidden_size=100, bidirectional=True)

    # Pass the input tensor through the Bi-GRU layer
    gru_output, _ = bi_gru(input_tensor.unsqueeze(0))

    # Extract the hidden state for each timestep
    hidden_states = gru_output.view(input_tensor.size(0), 100, 2)#input_tensor= 666 for train,84 for dev and 98 for test

    # Take the average of the hidden states from both directions
    avg_embeddings = torch.mean(hidden_states, dim=2)

    return avg_embeddings


In [None]:
#Question gru features
q_gru_train= get_bi_gru(qe_tensor_train)
q_gru_dev = get_bi_gru(qe_tensor_dev)
q_gru_test = get_bi_gru(qe_tensor_test)

In [None]:
#answer gru features
a_gru_train= get_bi_gru(ae_tensor_train)
a_gru_dev = get_bi_gru(ae_tensor_dev)
a_gru_test = get_bi_gru(ae_tensor_test)

In [None]:
#summary gru features
s_gru_train= get_bi_gru(se_tensor_train)
s_gru_dev = get_bi_gru(se_tensor_dev)
s_gru_test = get_bi_gru(se_tensor_test)

# LSTM

In [None]:
def get_lstm(input_tensor):
    # Define the Bi-LSTM layer
    bi_lstm = nn.LSTM(input_size=768, hidden_size=100, bidirectional=True)

    # Pass the input tensor through the Bi-LSTM layer
    lstm_output, _ = bi_lstm(input_tensor.unsqueeze(0))

    # Extract the hidden state for each timestep
    hidden_states = lstm_output.view(input_tensor.size(0), 100, 2)

    # Optionally, take the average of the hidden states from both directions
    avg_embeddings = torch.mean(hidden_states, dim=2)

    return avg_embeddings


In [None]:
#Question LSTM features
q_lstm_train= get_lstm(qe_tensor_train)
q_lstm_dev = get_lstm(qe_tensor_dev)
q_lstm_test = get_lstm(qe_tensor_test)

In [None]:
#answer LSTM features
a_lstm_train= get_lstm(ae_tensor_train)
a_lstm_dev = get_lstm(ae_tensor_dev)
a_lstm_test = get_lstm(ae_tensor_test)

In [None]:
#summary LSTM features
s_lstm_train= get_lstm(se_tensor_train)
s_lstm_dev = get_lstm(se_tensor_dev)
s_lstm_test = get_lstm(se_tensor_test)

# CONCAT

In [None]:
c_q_train = torch.cat((q_cnn_train, q_gru_train, q_lstm_train),dim=1)
c_a_train= torch.cat((a_cnn_train,a_gru_train,a_lstm_train,dim=1)
c_s_train= torch.cat((s_cnn_train,s_gru_train,s_lstm_train,dim=1)


In [None]:
c_q_dev = torch.cat((q_cnn_dev, q_gru_dev, q_lstm_dev),dim=1)
c_a_dev= torch.cat((a_cnn_dev,a_gru_dev,a_lstm_dev,dim=1)
c_s_dev= torch.cat((s_cnn_dev,s_gru_dev,s_lstm_dev,dim=1)

In [None]:
c_q_test = torch.cat((q_cnn_test, q_gru_test, q_lstm_test),dim=1)
c_a_test= torch.cat((a_cnn_test,a_gru_test,a_lstm_test,dim=1)
c_s_test= torch.cat((s_cnn_test,s_gru_test,s_lstm_test,dim=1)

In [None]:
# Obtaining the labels from train and dev set.
label_list_train= df_train['label'].to_list()
label_tensor_train= torch.tensor(label_list_train)
label_list_dev= df_dev['label'].to_list()
label_tensor_dev= torch.tensor(label_list_dev)

In [None]:
df1 = pd.DataFrame(c_q_train.detach().numpy())
df2= pd.DataFrame(c_a_train.detach().numpy())
df3= pd.DataFrame(c_s_train.detach().numpy())

In [None]:
df4 = pd.DataFrame(c_q_dev.detach().numpy())
df5= pd.DataFrame(c_a_dev.detach().numpy())
df6= pd.DataFrame(c_s_dev.detach().numpy())

In [None]:
df7 = pd.DataFrame(c_q_test.detach().numpy())
df8= pd.DataFrame(c_a_test.detach().numpy())
df9= pd.DataFrame(c_s_test.detach().numpy())

In [None]:
#concating into a single dataframe
result_df_train = pd.concat([df1, df2,df3,df_train['label']], axis=1)
result_df_dev = pd.concat([df4, df5,df6,df_dev['label']], axis=1)
result_df_test = pd.concat([df7, df8,df9], axis=1)

In [None]:
result_df_train.columns = result_df_train.columns.astype(str)
result_df_dev.columns = result_df_dev.columns.astype(str)
result_df_test.columns = result_df_test.columns.astype(str)

In [None]:
#obtaining the csv files
result_df_train.to_csv('summary_train.csv',index=False)
result_df_dev.to_csv('summary_dev.csv',index=False)
result_df_test.to_csv('summary_test.csv',index=False)

# MULTI-LEVEL CONCAT APPROACH

In [None]:
mf_train_q_1= torch.cat((q_cnn_train_fus, q_cnn_train_fus1), dim=1)# Output of first layer concated with that of second layer.
mf_train_q_2= torch.cat((mf_train_q_1,q_cnn_train_fus2),dim=1)# The previous concated embedding is further concated with the second layer's output.
final_q_train= torch.cat((mf_train_q_2,q_lstm_train,q_gru_train),dim=1) # The multi-level cnn is futher concated with lstm and gru.

In [None]:
#The above code can be used to obtained multi-level features of answers and summaries of dev and train as well and is ignored here to avoid redundancy.

# 1D CNN Model

In [None]:
# Define the CNN model
model = Sequential()

# Convolutional layer with ReLU activation
model.add(Conv1D(filters=32, kernel_size=3, activation='relu', input_shape=(900, 1)))

# Max pooling layer
model.add(MaxPooling1D(pool_size=2))

# Flatten layer to convert 2D output to 1D
model.add(Flatten())

# Fully connected (dense) layer
model.add(Dense(128, activation='relu'))

# Output layer with linear activation
model.add(Dense(1, activation='linear'))

# Custom activation layer with learnable threshold
model.add(Lambda(lambda x: K.sigmoid(x - K.mean(x)), output_shape=(1,)))

# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Display the model summary
model.summary()

# TRAINING

In [None]:
#defining training data
X_train= result_df_train.drop('label',axis=1) 
y_train= result_df_train['label']

In [None]:
#fitting the model
model.fit(X_train, y_train, epochs=15, batch_size=32)

In [None]:
#defining dev data
X_dev= result_df_dev.drop('label',axis=1)
y_dev= result_df_dev['label']

In [None]:
#obtaining predictions
y_pred_dev= model.predict(X_dev)

# MANUAL GRID SEARCH FOR THRESHOLD

In [None]:
# defining an array of explorable thresholds and initializing the best threshold to 0.
thresholds = np.arange(0.01, 1.0, 0.01)
best_threshold = 0
best_macro_f1 = 0

In [None]:
#searching through all the thresholds and obtaining the best one with its corresponding f1-score
for threshold in thresholds:
    y_predi = (y_pred > threshold).astype(int)
    current_macro_f1 = f1_score(y_test, y_predi, average='macro')

    print(f"Threshold: {threshold:.2f}, Macro F1: {current_macro_f1:.4f}")

    if current_macro_f1 > best_macro_f1:
        best_macro_f1 = current_macro_f1
        best_threshold = threshold

print(f"Best Threshold: {best_threshold}")
print(f"Best Macro F1 Score: {best_macro_f1}")

In [None]:
#obtaining the final binary prediction using the best threshold
y_final_dev = (y_pred_dev >= best_threshold ).astype(int)

In [None]:
#obtaining classification report on dev data.
report_dev = classification_report(y_final_dev,y_dev)
print("Classification Report:\n", report)

In [None]:
X_test = result_df_test
y_pred_test= model.predict(X_test)
y_final_test = (y_pred_test >= best_threshold ).astype(int)