In [1]:
# %%capture
# !pip install datasets
# !pip install transformers

## Libraries and Dependencies

In [2]:
import torch
import nltk
import asyncio
from nltk.tokenize import sent_tokenize
from datasets import load_dataset
from tqdm.notebook import tqdm
from concurrent.futures import ThreadPoolExecutor


# The models the authors used:
from transformers import BertForMaskedLM, BertTokenizer, BertModel
from transformers import AlbertForMaskedLM, AlbertTokenizer

nltk.download('punkt')

  from .autonotebook import tqdm as notebook_tqdm
[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\clara\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [3]:
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
DEVICE

device(type='cuda')

## Algorithm Implementation

In [4]:
def mask_sentence(sentence, mask_token, i, M, L_min):
    return [mask_token
            if (j - i) % M == 0
            and (len(sentence[j]) >= L_min
                 or sentence[j].startswith('##')
                 or sentence[min(j+1, len(sentence)-1)].startswith('##'))
            else sentence[j]
            for j in range(len(sentence))]

def no_copy_guard(sentence, summary):
    sentence = ' '.join(sentence)
    summary = ' '.join(summary)
    return sentence in summary

In [5]:
def BLANC_help(text, summary, model, tokenizer, M=6, L_min=4, sep='[SEP]', device='cpu', word_sim_model = None):
    """
    Calculates BLANC score between a given text and its summary using a specified model.

    Parameters:
    - text (List[List[str]]): List of sentences represented as a list of tokens.
    - summary (List[str]): The tokenized summary of the text.
    - model: BERT-type model
    - tokenizer: The tokenizer associated with the model used.
    - M (int): Parameter M for the algorithm (default is 6).
    - L_min (int): Minimum length requirement for masked words (default is 4).
    - sep (str): Separator between the inference help (filler/summary) and a sentence from the text (default is '[SEP]').

    Returns:
    - float: BLANC score for the given text and its summary.
    """

    filler = ['.'] * len(summary)
    S = [[0, 0], [0, 0]]

    score = 0

    for sentence in text:
        if no_copy_guard(sentence, summary): 
           continue
        for i in range(M):
            masked_sentence = mask_sentence(sentence, tokenizer.mask_token, i, M, L_min)

            input_base = filler + [sep] + masked_sentence
            input_help = summary + [sep] + masked_sentence

            tokenized_input_base = torch.tensor(tokenizer.convert_tokens_to_ids(input_base)).to(device) # Shape: [sequence_length]
            tokenized_input_help = torch.tensor(tokenizer.convert_tokens_to_ids(input_help)).to(device) # Shape: [sequence_length]
            with torch.no_grad():
                input_stacked = torch.stack((tokenized_input_base, tokenized_input_help))
                out_stacked = model(input_ids=input_stacked).logits  # Shape: [1, sequence_length, Bert_vocab_size]
                out_base = out_stacked[0]
                out_help = out_stacked[1]

            out_base = torch.argmax(out_base.squeeze(0), dim=-1)  # Shape: [sequence_length]
            out_help = torch.argmax(out_help.squeeze(0), dim=-1)  # Shape: [sequence_length]

            masked_tokens = [idx for idx, word in enumerate(masked_sentence) if word == tokenizer.mask_token]

            for j in masked_tokens:
                idx = len(summary + [sep]) + j
                predicted_word_base = tokenizer.convert_ids_to_tokens(out_base[idx].item())
                predicted_word_help = tokenizer.convert_ids_to_tokens(out_help[idx].item())

                if word_sim_model is not None:
                    predicted_sentence_base = tokenizer.convert_tokens_to_ids(masked_sentence)
                    predicted_sentence_base[j] = out_base[idx].item()

                    predicted_sentence_help = tokenizer.convert_tokens_to_ids(masked_sentence)
                    predicted_sentence_help[j] = out_help[idx].item()


                    tokenized_sentence = masked_sentence.copy()
                    tokenized_sentence[j] = sentence[j]
                    tokenized_sentence = tokenizer.convert_tokens_to_ids(tokenized_sentence)

                    with torch.no_grad():
                        word_sim_input = torch.stack([torch.tensor(predicted_sentence_base), torch.tensor(predicted_sentence_help), torch.tensor(tokenized_sentence)]).to(device)
                        word_sim_out = word_sim_model(word_sim_input)
                        predicted_sentence_base_embedding = word_sim_out.last_hidden_state[0, j, :]
                        predicted_sentence_help_embedding = word_sim_out.last_hidden_state[1, j, :]
                        correct_embedding = word_sim_out.last_hidden_state[2, j, :]

                    cos_sim = torch.nn.CosineSimilarity(dim=0)
                    
                    sim_base = cos_sim(predicted_sentence_base_embedding, correct_embedding)
                    sim_help = cos_sim(predicted_sentence_help_embedding, correct_embedding)

                    k = int(predicted_word_base == sentence[j])
                    m = int(predicted_word_help == sentence[j])
                    S[k][m] += 1

                    score += sim_help - sim_base
                    """print(tokenizer.convert_ids_to_tokens(predicted_sentence_base_tensor[0]),
                          tokenizer.convert_ids_to_tokens(predicted_sentence_help_tensor[0]),
                          tokenizer.convert_ids_to_tokens(tokenized_sentence_tensor[0]), sep="\n")
                    print(k, m, sim_base.item(), sim_help.item())"""

                else:
                
                    """print(f'predicted_word_base[{idx - len(summary + [sep])}]: {predicted_word_base}')
                    print(f'predicted_word_help[{idx - len(summary + [sep])}]: {predicted_word_help}')
                    print(f'sentence[{j}]: {sentence[j]}')"""

                    k = int(predicted_word_base == sentence[j])
                    m = int(predicted_word_help == sentence[j])
                    S[k][m] += 1


    B = (S[0][1] - S[1][0]) / (S[0][0] + S[1][1] + S[0][1] + S[1][0])
    print(S)

    return B, score / (S[0][0] + S[1][1] + S[0][1] + S[1][0])

In [6]:
def BLANC_help_optimized(text, summary, model, tokenizer, M=6, L_min=4, sep='[SEP]', device='cpu', word_sim_model=None):
    """
    Optimized version of the BLANC_help function for calculating BLANC score.
    """

    filler = ['.'] * len(summary)
    S = [[0, 0], [0, 0]]
    score = 0

    # Batch preparation for model inference
    base_inputs = []
    help_inputs = []
    attention_masks_base = []
    attention_masks_help = []

    # Precompute and store tokenized summary and separator
    tokenized_summary = tokenizer.convert_tokens_to_ids(summary)
    tokenized_sep = tokenizer.convert_tokens_to_ids([sep])
    tokenized_filler = tokenizer.convert_tokens_to_ids(filler)
    max_length = 0

    batch_sentences = []
    masked_tokens_batch = []

    for sentence in text:
        if no_copy_guard(sentence, summary): 
            continue
        for i in range(M):
            masked_sentence = mask_sentence(sentence, tokenizer.mask_token, i, M, L_min)
            masked_sentence_ids = tokenizer.convert_tokens_to_ids(masked_sentence)

            input_base = tokenized_filler + tokenized_sep + masked_sentence_ids
            input_help = tokenized_summary + tokenized_sep + masked_sentence_ids

            base_inputs.append(input_base)
            help_inputs.append(input_help)

            attention_masks_base.append([1] * len(input_base))
            attention_masks_help.append([1] * len(input_help))

            # Track the maximum length
            max_length = max(max_length, len(input_base), len(input_help))

            masked_tokens_batch.append([idx for idx, word in enumerate(masked_sentence_ids) if word == tokenizer.mask_token_id])

            batch_sentences.append(tokenizer.convert_tokens_to_ids(sentence))

    # Pad the sequences and convert to tensors
    base_inputs_tensor = torch.stack([torch.nn.functional.pad(torch.tensor(seq), (0, max_length - len(seq)), value=tokenizer.pad_token_id) for seq in base_inputs]).to(device)
    help_inputs_tensor = torch.stack([torch.nn.functional.pad(torch.tensor(seq), (0, max_length - len(seq)), value=tokenizer.pad_token_id) for seq in help_inputs]).to(device)
    attention_mask_base_tensor = torch.stack([torch.nn.functional.pad(torch.tensor(mask), (0, max_length - len(mask))) for mask in attention_masks_base]).to(device)
    attention_mask_help_tensor = torch.stack([torch.nn.functional.pad(torch.tensor(mask), (0, max_length - len(mask))) for mask in attention_masks_help]).to(device)

    # Model inference in batches with attention masks
    with torch.no_grad():
        out_base = model(input_ids=base_inputs_tensor, attention_mask=attention_mask_base_tensor).logits
        out_help = model(input_ids=help_inputs_tensor, attention_mask=attention_mask_help_tensor).logits

    out_base = torch.argmax(out_base.squeeze(0), dim=-1)
    out_help = torch.argmax(out_help.squeeze(0), dim=-1)
    print(out_base.shape)

    # Iterate through the batches
    for b_idx in range(len(base_inputs)):
        print(b_idx)
        out_base_batch = out_base[b_idx]
        out_help_batch = out_help[b_idx]
        for j in masked_tokens_batch[b_idx]:
            print("j:", j)
            idx = len(tokenized_summary) + 1 + j
            print(idx)
            predicted_word_base = out_base_batch[idx].item()
            print(predicted_word_base)
            predicted_word_help = out_help_batch[idx].item()
            print(predicted_word_help)

            # Rest of your logic here...
            # Make sure to adjust indexing and access according to batch processing
            """print(f'predicted_word_base[{idx - len(summary + [sep])}]: {tokenizer.convert_ids_to_tokens(predicted_word_base)}')
            print(f'predicted_word_help[{idx - len(summary + [sep])}]: {tokenizer.convert_ids_to_tokens(predicted_word_help)}')
            print(f'sentence[{j}]: {tokenizer.convert_ids_to_tokens(batch_sentences[b_idx][j])}')"""

            k = int(predicted_word_base == batch_sentences[b_idx][j])
            m = int(predicted_word_help == batch_sentences[b_idx][j])
            S[k][m] += 1
            print(S)

    B = (S[0][1] - S[1][0]) / (S[0][0] + S[1][1] + S[0][1] + S[1][0])
    print(S)

    return B, score / (S[0][0] + S[1][1] + S[0][1] + S[1][0])

## Datasets

In [7]:
# cnn_dailymail_ds = load_dataset("cnn_dailymail", '3.0.0', split='test')
# print(cnn_dailymail_ds)

In [8]:
DailyNews_ds = load_dataset('json', data_files='../datasets/DailyNews_300.json', split='train')
DailyNews_ds

Dataset({
    features: ['summary', 'text', 'scores', 'annotators_ids'],
    num_rows: 300
})

## Model and Tokenizer

In [9]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased', do_lower_case = True)
model = BertForMaskedLM.from_pretrained('bert-base-uncased').to(DEVICE)
word_sim_model = BertModel.from_pretrained('bert-base-uncased').to(DEVICE)

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForMaskedLM: ['bert.pooler.dense.weight', 'cls.seq_relationship.bias', 'bert.pooler.dense.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


## Preprocessing

In [10]:
summaries = DailyNews_ds['summary'] # (List[str])
texts = DailyNews_ds['text']  # (List[str]) each string is a paragraph made of a few sentences

In [11]:
# each text in texts is a list of sentences (each sentence is a string)
texts = [sent_tokenize(text.strip()) for text in texts] # List[List[str]]
assert len(texts) == len(summaries) == 300

In [12]:
tokenized_texts = [[tokenizer.tokenize(sentence) for sentence in text] for text in texts]
tokenized_summaries = [tokenizer.tokenize(summary) for summary in summaries]

## Running the Program

In [13]:
BLANC_help(tokenized_texts[0], tokenized_summaries[0], model, tokenizer, device=DEVICE, word_sim_model = word_sim_model)

[[251, 78], [19, 185]]


(0.11069418386491557, tensor(0.0267, device='cuda:0'))

In [14]:
BLANC_help_optimized(tokenized_texts[0], tokenized_summaries[0], model, tokenizer, device=DEVICE)

torch.Size([216, 263])
0
j: 0
121
3594
7986
[[0, 1], [0, 0]]
j: 6
127
29033
17457
[[0, 2], [0, 0]]
j: 12
133
2012
2013
[[1, 2], [0, 0]]
j: 18
139
2563
2563
[[2, 2], [0, 0]]
1
j: 1
122
2158
2158
[[2, 2], [0, 1]]
j: 7
128
2094
2015
[[2, 3], [0, 1]]
j: 13
134
4901
4901
[[3, 3], [0, 1]]
j: 19
140
2046
2046
[[3, 3], [0, 2]]
2
j: 2
123
2094
2094
[[3, 3], [0, 3]]
j: 14
135
12954
10272
[[4, 3], [0, 3]]
3
j: 3
124
3900
24015
[[4, 4], [0, 3]]
j: 9
130
3413
2448
[[5, 4], [0, 3]]
j: 15
136
6132
6132
[[6, 4], [0, 3]]
j: 21
142
2088
2088
[[6, 4], [0, 4]]
4
j: 4
125
2278
2278
[[6, 4], [0, 5]]
5
j: 5
126
13433
17738
[[6, 4], [1, 5]]
j: 11
132
3608
3608
[[6, 4], [1, 6]]
j: 23
144
1012
2345
[[6, 5], [1, 6]]
6
j: 0
121
3120
3120
[[7, 5], [1, 6]]
j: 6
127
1012
1012
[[8, 5], [1, 6]]
j: 12
133
2008
20243
[[8, 6], [1, 6]]
j: 24
145
2042
2042
[[8, 6], [1, 7]]
j: 30
151
2088
2088
[[8, 6], [1, 8]]
j: 36
157
1997
1997
[[9, 6], [1, 8]]
j: 42
163
2000
2000
[[10, 6], [1, 8]]
7
j: 7
128
2563
2563
[[10, 6], [1, 9]]
j

(0.11069418386491557, 0.0)

In [15]:
scores = [BLANC_help(text, summary, model, tokenizer, device=DEVICE)
          for summary, text in tqdm(zip(tokenized_summaries, tokenized_texts))]
scores

ImportError: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html