In [1]:
import nltk
import copy
import sys, math, re
from collections import defaultdict

import six
from six.moves import xrange as range

bleu and rouge


# Model res

In [5]:
import torch
from transformers import BartTokenizer, BartForConditionalGeneration
from KG_trainer_w_comp import get_KG_trainer
from KG_trainer_w_comp import BartGraphAwareForConditionalGeneration
from transformers import BartTokenizer
import torch

def generate_explanation(model, tokenizer, sentence: str) -> list:
    # Tokenize the input text
    inputs = tokenizer(sentence, return_tensors="pt").to(device)
    # Generate output using beam search:
    output_ids = model.generate(
        **inputs,
        max_length=60,
        num_beams=3,
        num_return_sequences=3,
        early_stopping=True
    )
    # Decode each of the generated token sequences
    explanations = [
        tokenizer.decode(ids, skip_special_tokens=True)
        for ids in output_ids
    ]
    return explanations

model_path = "KG_finetuned_out2/checkpoint-30"
tokenizer = BartTokenizer.from_pretrained(model_path)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BartGraphAwareForConditionalGeneration.from_pretrained(model_path)
model.to(device)
model.eval() 

test_sentence = "I  can count  stars."
explanation = generate_explanation(model, tokenizer, test_sentence)
print("Input Sentence:", test_sentence)
print("Generated Explanation:", explanation)

Input Sentence: I  can count  stars.
Generated Explanation: ['There is no way to count stars.', 'There are no stars in the sky.', 'There are no stars.']


In [6]:
test_sentence = "We use book to know the time."
explanation = generate_explanation(model, tokenizer, test_sentence)
ground_truths = ['A book is used to study.'	,'A book does not have the ability to show what time it is.',	"Books don't tell the time."]
print("Input Sentence:", test_sentence)
print("Generated Explanation:", explanation)

Input Sentence: We use book to know the time.
Generated Explanation: ['Book is used to know the time.', 'Book is a book.', 'Book is used to know time.']


# BLEU

In [7]:
def precook(s, n=4, out=False):
    """Takes a string as input and returns an object that can be given to
    either cook_refs or cook_test. This is optional: cook_refs and cook_test
    can take string arguments as well."""
    words = s.split()
    counts = defaultdict(int)
    for k in range(1,n+1):
        for i in range(len(words)-k+1):
            ngram = tuple(words[i:i+k])
            counts[ngram] += 1
    return (len(words), counts)

def cook_refs(refs, eff=None, n=4): ## lhuang: oracle will call with "average"
    '''Takes a list of reference sentences for a single segment
    and returns an object that encapsulates everything that BLEU
    needs to know about them.'''

    reflen = []
    maxcounts = {}
    for ref in refs:
        rl, counts = precook(ref, n)
        reflen.append(rl)
        for (ngram,count) in six.iteritems(counts):
            maxcounts[ngram] = max(maxcounts.get(ngram,0), count)

    # Calculate effective reference sentence length.
    if eff == "shortest":
        reflen = min(reflen)
    elif eff == "average":
        reflen = float(sum(reflen))/len(reflen)

    ## lhuang: N.B.: leave reflen computaiton to the very end!!

    ## lhuang: N.B.: in case of "closest", keep a list of reflens!! (bad design)

    return (reflen, maxcounts)

def cook_test(test, reflen_refmaxcounts, eff=None, n=4):
    '''Takes a test sentence and returns an object that
    encapsulates everything that BLEU needs to know about it.'''

    reflen, refmaxcounts = reflen_refmaxcounts
    testlen, counts = precook(test, n, True)

    result = {}

    # Calculate effective reference sentence length.

    if eff == "closest":
        result["reflen"] = min((abs(l-testlen), l) for l in reflen)[1]
    else: ## i.e., "average" or "shortest" or None
        result["reflen"] = reflen

    result["testlen"] = testlen

    result["guess"] = [max(0,testlen-k+1) for k in range(1,n+1)]

    result['correct'] = [0]*n
    for (ngram, count) in six.iteritems(counts):
        result["correct"][len(ngram)-1] += min(refmaxcounts.get(ngram,0), count)

    return result

class BleuScorer(object):
    """Bleu scorer."""

    __slots__ = "n", "crefs", "ctest", "_score", "_ratio", "_testlen", "_reflen", "special_reflen"
    # special_reflen is used in oracle (proportional effective ref len for a node).

    def copy(self):
        ''' copy the refs.'''
        new = BleuScorer(n=self.n)
        new.ctest = copy.copy(self.ctest)
        new.crefs = copy.copy(self.crefs)
        new._score = None
        return new

    def __init__(self, test=None, refs=None, n=4, special_reflen=None):
        ''' singular instance '''

        self.n = n
        self.crefs = []
        self.ctest = []
        self.cook_append(test, refs)
        self.special_reflen = special_reflen

    def cook_append(self, test, refs):
        '''called by constructor and __iadd__ to avoid creating new instances.'''

        if refs is not None:
            self.crefs.append(cook_refs(refs))
            if test is not None:
                cooked_test = cook_test(test, self.crefs[-1])
                self.ctest.append(cooked_test) ## N.B.: -1
            else:
                self.ctest.append(None) # lens of crefs and ctest have to match

        self._score = None ## need to recompute

    def ratio(self, option=None):
        self.compute_score(option=option)
        return self._ratio

    def score_ratio(self, option=None):
        '''return (bleu, len_ratio) pair'''
        return (self.fscore(option=option), self.ratio(option=option))

    def score_ratio_str(self, option=None):
        return "%.4f (%.2f)" % self.score_ratio(option)

    def reflen(self, option=None):
        self.compute_score(option=option)
        return self._reflen

    def testlen(self, option=None):
        self.compute_score(option=option)
        return self._testlen

    def retest(self, new_test):
        if type(new_test) is str:
            new_test = [new_test]
        assert len(new_test) == len(self.crefs), new_test
        self.ctest = []
        for t, rs in zip(new_test, self.crefs):
            self.ctest.append(cook_test(t, rs))
        self._score = None

        return self

    def rescore(self, new_test):
        ''' replace test(s) with new test(s), and returns the new score.'''

        return self.retest(new_test).compute_score()

    def size(self):
        assert len(self.crefs) == len(self.ctest), "refs/test mismatch! %d<>%d" % (len(self.crefs), len(self.ctest))
        return len(self.crefs)

    def __iadd__(self, other):
        '''add an instance (e.g., from another sentence).'''

        if type(other) is tuple:
            ## avoid creating new BleuScorer instances
            self.cook_append(other[0], other[1])
        else:
            assert self.compatible(other), "incompatible BLEUs."
            self.ctest.extend(other.ctest)
            self.crefs.extend(other.crefs)
            self._score = None ## need to recompute

        return self

    def compatible(self, other):
        return isinstance(other, BleuScorer) and self.n == other.n

    def single_reflen(self, option="average"):
        return self._single_reflen(self.crefs[0][0], option)

    def _single_reflen(self, reflens, option=None, testlen=None):

        if option == "shortest":
            reflen = min(reflens)
        elif option == "average":
            reflen = float(sum(reflens))/len(reflens)
        elif option == "closest":
            reflen = min((abs(l-testlen), l) for l in reflens)[1]
        else:
            assert False, "unsupported reflen option %s" % option

        return reflen

    def recompute_score(self, option=None, verbose=0):
        self._score = None
        return self.compute_score(option, verbose)

    def compute_score(self, option=None, verbose=0):
        n = self.n
        small = 1e-9
        tiny = 1e-15 ## so that if guess is 0 still return 0
        bleu_list = [[] for _ in range(n)]

        if self._score is not None:
            return self._score

        if option is None:
            option = "average" if len(self.crefs) == 1 else "closest"

        self._testlen = 0
        self._reflen = 0
        totalcomps = {'testlen':0, 'reflen':0, 'guess':[0]*n, 'correct':[0]*n}

        # for each sentence
        for comps in self.ctest:
            testlen = comps['testlen']
            self._testlen += testlen

            if self.special_reflen is None: ## need computation
                reflen = self._single_reflen(comps['reflen'], option, testlen)
            else:
                reflen = self.special_reflen

            self._reflen += reflen

            for key in ['guess','correct']:
                for k in range(n):
                    totalcomps[key][k] += comps[key][k]

            # append per image bleu score
            bleu = 1.
            for k in range(n):
                bleu *= (float(comps['correct'][k]) + tiny) \
                        /(float(comps['guess'][k]) + small)
                bleu_list[k].append(bleu ** (1./(k+1)))
            ratio = (testlen + tiny) / (reflen + small) ## N.B.: avoid zero division
            if ratio < 1:
                for k in range(n):
                    bleu_list[k][-1] *= math.exp(1 - 1/ratio)

            if verbose > 1:
                print(comps, reflen)

        totalcomps['reflen'] = self._reflen
        totalcomps['testlen'] = self._testlen

        bleus = []
        bleu = 1.
        for k in range(n):
            bleu *= float(totalcomps['correct'][k] + tiny) \
                    / (totalcomps['guess'][k] + small)
            bleus.append(bleu ** (1./(k+1)))
        ratio = (self._testlen + tiny) / (self._reflen + small) ## N.B.: avoid zero division
        if ratio < 1:
            for k in range(n):
                bleus[k] *= math.exp(1 - 1/ratio)

        if verbose > 0:
            print(totalcomps)
            print("ratio:", ratio)

        self._score = bleus
        return self._score, bleu_list

In [8]:

class Bleu:
    def __init__(self, n=4):
        # default compute Blue score up to 4
        self._n = n
        self._hypo_for_image = {}
        self.ref_for_image = {}

    def compute_score(self, gts, res):

        assert(gts.keys() == res.keys())
        imgIds = gts.keys()

        bleu_scorer = BleuScorer(n=self._n)
        for id in imgIds:
            hypo = res[id]
            ref = gts[id]

            # Sanity check.
            assert(type(hypo) is list)
            assert(len(hypo) == 1)
            assert(type(ref) is list)
            assert(len(ref) >= 1)

            bleu_scorer += (hypo[0], ref)

        score, scores = bleu_scorer.compute_score(option='closest', verbose=0)
        # print("closest:", score)
        # print("shortest:", bleu_scorer.compute_score(option='shortest'))
        # print("average:", bleu_scorer.compute_score(option='average', verbose=1))
        # return (bleu, bleu_info)
        return score, scores

    def method(self):
        return "Bleu"

In [9]:
test_sentence = "We use book to know the time."
explanation = generate_explanation(model, tokenizer, test_sentence)
ground_truths = {test_sentence: ['A book is used to study.'	,'A book does not have the ability to show what time it is.',	"Books don't tell the time."]}
print("Input Sentence:", test_sentence)
print("Generated Explanation:", explanation)

Input Sentence: We use book to know the time.
Generated Explanation: ['Book is used to know the time.', 'Book is a book.', 'Book is used to know time.']


In [10]:
bleu = Bleu()
print('[BLEU-1, BLEU-2, BLEU-3, BLEU-4]\n')
for e in explanation:
    print(e)
    res = {test_sentence: [e]}
    print(bleu.compute_score(gts = ground_truths, res = res)[0])
    print()

[BLEU-1, BLEU-2, BLEU-3, BLEU-4]

Book is used to know the time.
[0.7142857141836736, 0.597614304574709, 0.41491326661265254, 6.500593259109356e-05]

Book is a book.
[0.19470019567050137, 7.109445940997322e-09, 2.6999515154283422e-11, 1.97867090930997e-12]

Book is used to know time.
[0.6666666664444446, 0.5163977793135832, 0.4054801328872982, 6.865890476915431e-05]



# ROUGE

In [11]:
import numpy as np
import pdb

def my_lcs(string, sub):
    """
    Calculates longest common subsequence for a pair of tokenized strings
    :param string : list of str : tokens from a string split using whitespace
    :param sub : list of str : shorter string, also split using whitespace
    :returns: length (list of int): length of the longest common subsequence between the two strings

    Note: my_lcs only gives length of the longest common subsequence, not the actual LCS
    """
    if(len(string)< len(sub)):
        sub, string = string, sub

    lengths = [[0 for i in range(0,len(sub)+1)] for j in range(0,len(string)+1)]

    for j in range(1,len(sub)+1): 
        for i in range(1,len(string)+1):
            if(string[i-1] == sub[j-1]):
                lengths[i][j] = lengths[i-1][j-1] + 1
            else:
                lengths[i][j] = max(lengths[i-1][j] , lengths[i][j-1])

    return lengths[len(string)][len(sub)]

class Rouge():
    '''
    Class for computing ROUGE-L score for a set of candidate sentences for the MS COCO test set

    '''
    def __init__(self):
        # vrama91: updated the value below based on discussion with Hovey
        self.beta = 1.2

    def calc_score(self, candidate, refs):
        """
        Compute ROUGE-L score given one candidate and references for an image
        :param candidate: str : candidate sentence to be evaluated
        :param refs: list of str : COCO reference sentences for the particular image to be evaluated
        :returns score: int (ROUGE-L score for the candidate evaluated against references)
        """
        assert(len(candidate)==1)	
        assert(len(refs)>0)         
        prec = []
        rec = []

        # split into tokens
        token_c = candidate[0].split(" ")
    	
        for reference in refs:
            # split into tokens
            token_r = reference.split(" ")
            # compute the longest common subsequence
            lcs = my_lcs(token_r, token_c)
            prec.append(lcs/float(len(token_c)))
            rec.append(lcs/float(len(token_r)))

        prec_max = max(prec)
        rec_max = max(rec)

        if(prec_max!=0 and rec_max !=0):
            score = ((1 + self.beta**2)*prec_max*rec_max)/float(rec_max + self.beta**2*prec_max)
        else:
            score = 0.0
        return score

    def compute_score(self, gts, res):
        """
        Computes Rouge-L score given a set of reference and candidate sentences for the dataset
        Invoked by evaluate_captions.py 
        :param hypo_for_image: dict : candidate / test sentences with "image name" key and "tokenized sentences" as values 
        :param ref_for_image: dict : reference MS-COCO sentences with "image name" key and "tokenized sentences" as values
        :returns: average_score: float (mean ROUGE-L score computed by averaging scores for all the images)
        """
        assert(gts.keys() == res.keys())
        imgIds = gts.keys()

        score = []
        for id in imgIds:
            hypo = res[id]
            ref  = gts[id]

            score.append(self.calc_score(hypo, ref))

            # Sanity check.
            assert(type(hypo) is list)
            assert(len(hypo) == 1)
            assert(type(ref) is list)
            assert(len(ref) > 0)

        average_score = np.mean(np.array(score))
        return average_score, np.array(score)

    def method(self):
        return "Rouge"

In [12]:
rouge = Rouge()

print('Returns ROUGE-L Score... how good of match is it with the references?\n')

for e in explanation:
    print(e)
    res = {test_sentence: [e]}
    print(rouge.compute_score(gts = ground_truths, res = res))
    print()

Returns ROUGE-L Score... how good of match is it with the references?

Book is used to know the time.
(0.4680306905370844, array([0.46803069]))

Book is a book.
(0.1930379746835443, array([0.19303797]))

Book is used to know time.
(0.5, array([0.5]))



# Self-BLEU

In [13]:
test_sentence = "We use book to know the time."
explanations = generate_explanation(model, tokenizer, test_sentence)
ground_truths = {test_sentence: ['A book is used to study.'	,'A book does not have the ability to show what time it is.',	"Books don't tell the time."]}
print("Input Sentence:", test_sentence)
print("Generated Explanation:", explanation)

Input Sentence: We use book to know the time.
Generated Explanation: ['Book is used to know the time.', 'Book is a book.', 'Book is used to know time.']


In [14]:
bleu = Bleu()

print('[BLEU-1, BLEU-2, BLEU-3, BLEU-4]\n')
for i, e in enumerate(explanations):
    print(e)
    res = {test_sentence: [e]}
    gts = { test_sentence: explanations[:i] + explanations[i+1:]}
    print(gts)
    print(bleu.compute_score(gts = gts, res = res)[0])
    print()

[BLEU-1, BLEU-2, BLEU-3, BLEU-4]

Book is used to know the time.
{'We use book to know the time.': ['Book is a book.', 'Book is used to know time.']}
[0.8571428570204083, 0.7559289459014655, 0.6999028046563295, 0.6434588840385813]

Book is a book.
{'We use book to know the time.': ['Book is used to know the time.', 'Book is used to know time.']}
[0.30326532970468434, 0.2476151048074771, 2.6492666763238897e-06, 1.030522425913718e-08]

Book is used to know time.
{'We use book to know the time.': ['Book is used to know the time.', 'Book is a book.']}
[0.8464817246084536, 0.757116271161685, 0.7139503370879647, 0.6731821379696712]



# CORPUS DIVERSITY

In [15]:
import numpy as np
from collections import defaultdict

def eval_entropy_distinct(generated_sentences):
    """
    Computes Entropy-k and Distinct-k for corpus diversity.

    :param generated_sentences: List of generated sentences (e.g., model outputs)
    :return: Dictionary with entropy and distinct scores for n-grams (1 to 4)
    """
    diversity_metrics = {}
    counter = [defaultdict(int) for _ in range(4)]  # Stores n-gram counts

    for sentence in generated_sentences:
        words = sentence.strip().split()  # Tokenize sentence
        for n in range(4):  # n-gram size (1 to 4)
            for i in range(len(words) - n):
                ngram = ' '.join(words[i:i + n + 1])
                counter[n][ngram] += 1

    for n in range(4):
        total = sum(counter[n].values()) + 1e-10  # Avoid division by zero
        
        # Entropy-k: Measures evenness of n-gram distribution
        entropy_score = -sum((v / total) * np.log(v / total) for v in counter[n].values())
        diversity_metrics[f'entropy_{n+1}'] = entropy_score

        # Distinct-k: Measures uniqueness of n-grams
        diversity_metrics[f'distinct_{n+1}'] = len(counter[n]) / total

    return diversity_metrics


In [17]:
# Generate explanations
test_sentence = "I can count stars."
explanation = generate_explanation(model, tokenizer, test_sentence)

# Compute corpus diversity
diversity_scores = eval_entropy_distinct(explanation)

# Print results
print("Generated Explanations:", explanation)
print("Corpus Diversity Scores:", diversity_scores)


Generated Explanations: ["Stars can't count stars.", 'Stars do not count stars.', "Stars can't be counted."]
Corpus Diversity Scores: {'entropy_1': 1.9915093613489865, 'distinct_1': 0.6153846153798816, 'entropy_2': 2.0253262207598146, 'distinct_2': 0.799999999992, 'entropy_3': 1.9459101490418007, 'distinct_3': 0.9999999999857143, 'entropy_4': 1.3862943611102332, 'distinct_4': 0.999999999975}
