### Script to generate summaries using chunking based Pegasus approach

In [3]:
dataset = "IN-Abs" # Options: IN - IN-Abs, UK-UK-Abs, N2-IN-Ext
output_path = "./output/"

In [2]:
import pandas as pd
import numpy as np
import glob
import sys
sys.path.insert(0, '../')
from utilities import *
import os
import nltk

In [4]:
if not os.path.exists(output_path):
    os.makedirs(output_path)

In [None]:
#Reading the test documents
names, data_source, data_summary = get_summary_data(dataset, "test")
print(len(names))
print(len(data_source))
print(len(data_summary))
# len_dic = dict_names = get_req_len_dict(dataset, "test")  

In [6]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Test sur : {device}")

Test sur : cuda


In [7]:
# Loading Model and tokenizer
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from transformers import PegasusForConditionalGeneration, PegasusTokenizer, Trainer, TrainingArguments

tokenizer = AutoTokenizer.from_pretrained("nsi319/legal-pegasus")  
model = AutoModelForSeq2SeqLM.from_pretrained("nsi319/legal-pegasus").to(device)


In [8]:
def summerize(text, max_len, min_len):
    '''
    Function to generate summary using Pegasus
    input:  nested_sentences - chunks
            max_l - Maximum length
            min_l - Minimum length
    output: document summary
    '''
    try:
        input_tokenized = tokenizer.encode(text, return_tensors='pt',max_length=512,truncation=True).to(device)
        summary_ids = model.generate(input_tokenized,
                                          num_beams=9,
                                          length_penalty=0.1,
                                          min_length=min_len,
                                          max_length=max_len,
                                    )
        summary = [tokenizer.decode(g, skip_special_tokens=True, clean_up_tokenization_spaces=False) for g in summary_ids][0]
        return summary
    except:
        return ""

In [9]:
def summerize_doc(nested_sentences, p):
    '''
    Function to generate summary using chunking based Pegasus
    input:  nested_sentences - chunks
            p - Number of words in summaries per word in the document
    output: document summary
    '''
    device = 'cuda'
    result = []
    for nested in nested_sentences:
        l = int(p * len(nested.split(" ")))
        max_len = l
        min_len = l-5
        result.append(summerize(nested, max_len, min_len))
    return result

In [10]:
done_files = glob.glob(output_path + "*.txt")
done_files = [i[i.rfind("/")+1:] for i in done_files]

In [None]:
# main loop to generate and save summaries of each document in the test dataset
for i in range(len(data_source)):
    done_files = glob.glob(output_path + "*.txt")
    done_files = [i[i.rfind("/")+1:] for i in done_files]
    name = names[i]
    if name in done_files:continue
    doc = data_source[i]
    input_len = len(doc.split(" "))
    req_len = 512
    print(req_len)
    print(str(i) + ": " + name +  " - " + str(input_len) + " : " + str(req_len), end = ", ")
    
    nested = nest_sentences(doc,512)
    p = float(req_len/input_len)
    print(p)
    abs_summ = summerize_doc(nested,p)
    abs_summ = " ".join(abs_summ)
    print(len((abs_summ.split(" "))))
    
    if len(abs_summ.split(" ")) > req_len:
        abs_summ = abs_summ.split(" ")
        abs_summ = abs_summ[:req_len]
        abs_summ = " ".join(abs_summ)
    print(len((abs_summ.split(" "))))
    path = output_path + name
    file = open(path,'w')
    file.write(abs_summ)
    file.close()
#     break

In [19]:
from rouge_score import rouge_scorer
from bert_score import BERTScorer
from transformers import logging

import pandas as pd

def rouge_evaluations(text, ref, f1_only=True):
    """Return a dataframe for rouge scores"""
    scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
    scores = scorer.score(text, ref)

    if f1_only:
        return {
            'rouge1': scores['rouge1'].fmeasure,
            'rouge2': scores['rouge2'].fmeasure,
            'rougeL': scores['rougeL'].fmeasure,
        }
    else:
        return {
            'rouge1': scores['rouge1'],
            'rouge2': scores['rouge2'],
            'rougeL': scores['rougeL'],
        }

def bert_evaluation(summary, ref, f1_only=True):
    """Return a dataframe for bert score"""
    # Temporarily set verbosity to ERROR to suppress warnings
    logging.set_verbosity_error()

    try:
        scorer = BERTScorer(lang="en")
        precision, recall, f1 = scorer.score([summary], [ref])
        
        if f1_only:
            return f1.mean().item()
        else:
            return [precision.mean().item(), recall.mean().item(), f1.mean().item()]
    finally:
        logging.set_verbosity_warning()

def evaluations(text, ref, f1_only=True):
    """Return the different metrics results \n
        f1_only return only the f1_score of each metrics  """
    rouges = rouge_evaluations(text, ref, f1_only)
    bert = bert_evaluation(text, ref, f1_only)
    
    if f1_only:
        data = {
            'rouge1': [rouges['rouge1']],
            'rouge2': [rouges['rouge2']],
            'rougeL': [rouges['rougeL']],
            'bert_score': [bert]
        }
        return pd.DataFrame(data)
    else:
        data = {
            'rouge1_P': [rouges['rouge1'].precision],
            'rouge1_R': [rouges['rouge1'].recall],
            'rouge1_F1': [rouges['rouge1'].fmeasure],
            
            'rouge2_P': [rouges['rouge2'].precision],
            'rouge2_R': [rouges['rouge2'].recall],
            'rouge2_F1': [rouges['rouge2'].fmeasure],
            
            'rougeL_P': [rouges['rougeL'].precision],
            'rougeL_R': [rouges['rougeL'].recall],
            'rougeL_F1': [rouges['rougeL'].fmeasure],
            
            'bert_score_P': [bert[0]],
            'bert_score_R': [bert[1]],
            'bert_score_F1': [bert[2]]
        }
        
        return pd.DataFrame(data)

In [None]:
import os

# Paths to the reference and output folders
ref_folder = "../../dataset/"+dataset+"/test-data/summary"
output_folder = "./output/judgement"

# Function to evaluate all files
def evaluate_all(ref_folder, output_folder, f1_only=True):
    # Get the list of files in the folders
    ref_files = sorted(os.listdir(ref_folder))
    output_files = sorted(os.listdir(output_folder))

    # Ensure both folders have the same number of files and matching names
    assert len(ref_files) == len(output_files), "Folders have a different number of files"
    assert ref_files == output_files, "File names do not match between folders"

    # Iterate over each file and evaluate
    results = pd.DataFrame()
    
    for file_name in ref_files:
        ref_path = os.path.join(ref_folder, file_name)
        output_path = os.path.join(output_folder, file_name)

        # Read the contents of the files
        with open(ref_path, 'r', encoding='utf-8') as ref_file:
            ref_text = ref_file.read()
        with open(output_path, 'r', encoding='utf-8') as output_file:
            output_text = output_file.read()

        # Call the evaluations function
        result = evaluations(output_text, ref_text, f1_only=f1_only)
        result["file_name"] = file_name 

        # Append the dataframe to the results dataframe
        results = pd.concat([results, result], ignore_index=True)

    return results

results = evaluate_all(ref_folder, output_folder)

metrics = [col for col in results.columns if col in ['rouge1', 'rouge2', 'rougeL', 'bert_score', 'Execution time']]
means = results[metrics].mean()

print("Means :")
print(means)

Means :
rouge1        0.499684
rouge2        0.255940
rougeL        0.262419
bert_score    0.842430
dtype: float64


In [25]:
results.head()

Unnamed: 0,rouge1,rouge2,rougeL,bert_score,file_name
0,0.618557,0.263566,0.301546,0.85958,1181.txt
1,0.522541,0.205339,0.245902,0.829442,1195.txt
2,0.542714,0.256927,0.248744,0.844111,1329.txt
3,0.393893,0.140888,0.192366,0.831265,1378.txt
4,0.612059,0.342075,0.323094,0.862031,1406.txt
