In [None]:
import pandas as pd
import numpy as np
import ast
import seaborn as sns
import matplotlib.pyplot as plt
from tqdm import tqdm
import sacrebleu
from scipy.stats import spearmanr, kendalltau, pearsonr
import nltk
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('averaged_perceptron_tagger')
nltk.download('omw-1.4')
from nltk.tokenize import word_tokenize
from nltk.translate.bleu_score import sentence_bleu
from nltk.translate.meteor_score import meteor_score

# BLEU
We code it by our own. It is inspired from https://www.nltk.org/_modules/nltk/translate/bleu_score.html

In [None]:
import math
import sys
import warnings
from collections import Counter
from fractions import Fraction
from nltk.util import ngrams

import nltk
nltk.download('punkt')
from nltk import tokenize

hyp1 = ['It', 'is', 'a', 'guide', 'to', 'action', 'which','ensures', 'that', 'the', 'military', 'always','obeys', 'the', 'commands', 'of', 'the', 'party']
ref1a = ['It', 'is', 'a', 'guide', 'to', 'action', 'that','ensures', 'that', 'the', 'military', 'will', 'forever','heed', 'Party', 'commands']
ref1b = ['It', 'is', 'the', 'guiding', 'principle', 'which','guarantees', 'the', 'military', 'forces', 'always','being', 'under', 'the', 'command', 'of', 'the', 'Party']
ref1c = ['It', 'is', 'the', 'practical', 'guide', 'for', 'the', 'army', 'always', 'to', 'heed', 'the', 'directions','of', 'the', 'party']

hyp2 = ['he', 'read', 'the', 'book', 'because', 'he', 'was','interested', 'in', 'world', 'history']
ref2a = ['he', 'was', 'interested', 'in', 'world', 'history','because', 'he', 'read', 'the', 'book']

list_of_references = [[ref1a, ref1b, ref1c], [ref2a]]
hypotheses = [hyp1, hyp2]

weights=(0.25, 0.25, 0.25, 0.25)

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [None]:
#To see if there is no error in the input data

if len(list_of_references) != len(hypotheses) :
    print("Error, you need the same number of ref and hyp")
if len(list_of_references) == 0 :
    print("Error, you need references")
if len(hypotheses) == 0 :
    print("Error, you need hypotheses")

#weights = [weights]
weight_length = len(weights)

# We create Counter dictionnaries (count the occurencies) 
numerators_precision = Counter()
denominators_precision = Counter()
hyp_lengths, ref_lengths = 0, 0

# Iterate through each hypothesis and their corresponding references.
for references, hypothesis in zip(list_of_references, hypotheses):
    # compute the numerator and denominator of the corpus-level precision for each order of ngram. 
    for i in range(1, weight_length + 1):
        # Extracts all ngrams in hypothesis
        counts = Counter(ngrams(hypothesis, i))
        # To get the union of counts of hyp and ref ngrams
        max_counts = {}
        for reference in references:
            # Extract all unique ngrams in references
            reference_counts = (Counter(ngrams(reference, i)))
            for ngram in counts:
                max_counts[ngram] = max(max_counts.get(ngram, 0), reference_counts[ngram])

        # Intersection between hypothesis and references' counts for each different ngrams.
        intersection_counts = {ngram: min(count, max_counts[ngram]) for ngram, count in counts.items()}

        numerator = sum(intersection_counts.values())
        # The denominator must be superior to 0 (0 posssible if the ngram order is > len(reference) ).
        denominator = max(1, sum(counts.values()))
        p_i = Fraction(numerator, denominator, _normalize=False)
        numerators_precision[i] += p_i.numerator
        denominators_precision[i] += p_i.denominator

    # Compute the hypothesis length (number of words) and the closest reference length.
    # It is useful to calculate corpus-level brevity penalty
    hyp_len = len(hypothesis)
    hyp_lengths += hyp_len
    #print(hyp_lengths)
    ref_lens = (len(reference) for reference in references)
    ref_lengths += min(ref_lens, key= lambda ref_len: (abs(ref_len - hyp_len), ref_len))
    #print(ref_lengths)

# Calculate corpus-level brevity penalty.
if hyp_lengths > ref_lengths:
    brevity_penalty = 1
else:
    brevity_penalty = math.exp(1 - ref_lengths / hyp_lengths)

# Save the precision values for the different ngram orders (from 1 to weight_length).
p_n = [Fraction(numerators_precision[i], denominators_precision[i], _normalize=False) for i in range(1, weight_length + 1)]

# In the case that precision is equal to 0
if numerators_precision[1] == 0:
        bleu_score = 0

for weight in weights:
    info = (weight * math.log(precision_i) for precision_i in p_n if precision_i > 0)
bleu_score = brevity_penalty * math.exp(math.fsum(info))

In [None]:
bleu_score

0.5920778868801042

# Baryscore

This code is by COLOMBO et STAERMAN (https://github.com/PierreColombo/nlg_eval_via_simi_measures/blob/main/nlg_eval_via_simi_measures/bary_score.py) 


In [19]:
from __future__ import absolute_import, division, print_function
import numpy as np
import torch
from tqdm import tqdm
import ot
from math import log
from collections import defaultdict, Counter
from transformers import AutoModelForMaskedLM, AutoTokenizer


class BaryScoreMetric:
    def __init__(self, model_name="bert-base-uncased", last_layers=5, use_idfs=True, sinkhorn_ref=0.01):
        """
        BaryScore metric
        :param model_name: model name or path from HuggingFace Librairy
        :param last_layers: last layer to use in the pretrained model
        :param use_idfs: if true use idf costs else use uniform weights
        :param sinkhorn_ref:  weight of the KL in the SD
        """

        self.model_name = model_name
        self.load_tokenizer_and_model()
        n = self.model.config.num_hidden_layers + 1
        assert n - last_layers > 0
        self.layers_to_consider = range(n - last_layers, n)
        self.use_idfs = use_idfs
        self.sinkhorn_ref = sinkhorn_ref
        self.idfs = []
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    def prepare_idfs(self, hyps, refs):
        """
        :param hyps: hypothesis list of string sentences has to be computed at corpus level
        :param refs:reference list of string sentences has to be computed at corpus level
        """
        t_hyps = self.tokenizer(hyps)['input_ids']
        t_refs = self.tokenizer(refs)['input_ids']
        idf_dict_ref = self.ref_list_to_idf(t_refs)
        idf_dict_hyp = self.ref_list_to_idf(t_hyps)
        idfs_tokenizer = (idf_dict_ref, idf_dict_hyp)
        self.model_ids = idfs_tokenizer
        return idf_dict_hyp, idf_dict_ref

    def ref_list_to_idf(self, input_refs):
        """
        :param input_refs: list of input reference
        :return: idf dictionnary
        """
        idf_count = Counter()
        num_docs = len(input_refs)

        idf_count.update(sum([list(set(i)) for i in input_refs], []))

        idf_dict = defaultdict(lambda: log((num_docs + 1) / (1)))
        idf_dict.update({idx: log((num_docs + 1) / (c + 1)) for (idx, c) in idf_count.items()})
        return idf_dict

    def load_tokenizer_and_model(self):
        """
        Loading and initializing the chosen model and tokenizer
        """
        tokenizer = AutoTokenizer.from_pretrained('{}'.format(self.model_name))
        model = AutoModelForMaskedLM.from_pretrained('{}'.format(self.model_name))
        model.config.output_hidden_states = True
        model.eval()
        self.tokenizer = tokenizer
        self.model = model

    def evaluate_batch(self, batch_hyps, batch_refs, idf_hyps=None, idf_ref=None):
        """
        :param batch_hyps: hypothesis list of string sentences
        :param batch_refs: reference list of string sentences
        :param idf_hyps: idfs of hypothesis computed at corpus level
        :param idf_ref: idfs of references computed at corpus level
        :return: dictionnary of scores
        """
        ###############################################
        ## Extract Embeddings From Pretrained Models ##
        ###############################################
        if isinstance(batch_hyps, str):
            batch_hyps = [batch_hyps]
        if isinstance(batch_refs, str):
            batch_refs = [batch_refs]
        nb_sentences = len(batch_refs)
        baryscores = []
        assert len(batch_hyps) == len(batch_refs)

        if (idf_hyps is None) and (idf_ref is None):
            idf_hyps, idf_ref = self.model_ids

        model = self.model.to(self.device)

        with torch.no_grad():
            ###############################################
            ## Extract Embeddings From Pretrained Models ##
            ###############################################
            batch_refs = self.tokenizer(batch_refs, return_tensors='pt', padding=True, truncation=True).to(self.device)
            batch_refs_embeddings_ = model(**batch_refs)[-1]

            batch_hyps = self.tokenizer(batch_hyps, return_tensors='pt', padding=True, truncation=True).to(self.device)
            batch_hyps_embeddings_ = model(**batch_hyps)[-1]

            batch_refs_embeddings = [batch_refs_embeddings_[i] for i in list(self.layers_to_consider)]
            batch_hyps_embeddings = [batch_hyps_embeddings_[i] for i in list(self.layers_to_consider)]

            batch_refs_embeddings = torch.cat([i.unsqueeze(0) for i in batch_refs_embeddings])
            batch_refs_embeddings.div_(torch.norm(batch_refs_embeddings, dim=-1).unsqueeze(-1))
            batch_hyps_embeddings = torch.cat([i.unsqueeze(0) for i in batch_hyps_embeddings])
            batch_hyps_embeddings.div_(torch.norm(batch_hyps_embeddings, dim=-1).unsqueeze(-1))

            ref_tokens_id = batch_refs['input_ids'].cpu().tolist()
            hyp_tokens_id = batch_hyps['input_ids'].cpu().tolist()

            ####################################
            ## Unbatched BaryScore Prediction ##
            ####################################
            for index_sentence in tqdm(range(nb_sentences), 'BaryScore Progress'):
                dict_score = {}
                ref_ids_idf = batch_refs['input_ids'][index_sentence]
                hyp_idf_ids = batch_hyps['input_ids'][index_sentence]

                ref_tokens = [i for i in self.tokenizer.convert_ids_to_tokens(ref_tokens_id[index_sentence],
                                                                              skip_special_tokens=False) if
                              i != self.tokenizer.pad_token]
                hyp_tokens = [i for i in self.tokenizer.convert_ids_to_tokens(hyp_tokens_id[index_sentence],
                                                                              skip_special_tokens=False) if
                              i != self.tokenizer.pad_token]

                ref_ids = [k for k, w in enumerate(ref_tokens)]
                hyp_ids = [k for k, w in enumerate(hyp_tokens)]

                # With stop words
                ref_idf_i = [idf_ref[i] for i in ref_ids_idf[ref_ids]]
                hyp_idf_i = [idf_hyps[i] for i in hyp_idf_ids[hyp_ids]]

                ref_embedding_i = batch_refs_embeddings[:, index_sentence, ref_ids, :]
                hyp_embedding_i = batch_hyps_embeddings[:, index_sentence, hyp_ids, :]
                measures_locations_ref = ref_embedding_i.permute(1, 0, 2).cpu().numpy().tolist()
                measures_locations_ref = [np.array(i) for i in measures_locations_ref]
                measures_locations_hyps = hyp_embedding_i.permute(1, 0, 2).cpu().numpy().tolist()
                measures_locations_hyps = [np.array(i) for i in measures_locations_hyps]

                # ADDED
                measures_locations_ref = [np.array(i) for i in
                                          np.array(measures_locations_ref).transpose(1, 0, 2).tolist()]
                measures_locations_hyps = [np.array(i) for i in
                                           np.array(measures_locations_hyps).transpose(1, 0,
                                                                                       2).tolist()]

                if self.use_idfs:
                    #########################
                    ## Use TF-IDF weights  ##
                    #########################
                    baryscore = self.baryscore(measures_locations_ref, measures_locations_hyps, ref_idf_i,
                                               hyp_idf_i)
                else:
                    #####################
                    ## Uniform Weights ##
                    #####################
                    baryscore = self.baryscore(measures_locations_ref, measures_locations_hyps, None, None)

                for key, value in baryscore.items():
                    dict_score['baryscore_{}'.format(key)] = value
                baryscores.append(dict_score)
            baryscores_dic = {}
            for k in dict_score.keys():
                baryscores_dic[k] = []
                for score in baryscores:
                    baryscores_dic[k].append(score[k])

        return baryscores_dic

    def baryscore(self, measures_locations_ref, measures_locations_hyps, weights_refs, weights_hyps):
        """
        :param measures_locations_ref: input measure reference locations
        :param measures_locations_hyps: input measure hypothesis locations
        :param weights_refs: references weights in the Wasserstein Barycenters
        :param weights_hyps: hypothesis weights in the Wasserstein Barycenters
        :return:
        """
        if weights_hyps is not None or weights_refs is not None:
            assert weights_refs is not None
            assert weights_hyps is not None
            weights_hyps = np.array([i / sum(weights_hyps) for i in weights_hyps]).astype(np.float64)
            weights_refs = np.array([i / sum(weights_refs) for i in weights_refs]).astype(np.float64)

        self.n_layers = len(measures_locations_ref)
        self.d_bert = measures_locations_ref[0].shape[1]
        ####################################
        ## Compute Wasserstein Barycenter ##
        ####################################
        bary_ref = self.w_barycenter(measures_locations_ref, weights_refs)
        bary_hyp = self.w_barycenter(measures_locations_hyps, weights_hyps)

        #################################################
        ## Compute Wasserstein and Sinkhorn Divergence ##
        #################################################

        C = ot.dist(bary_ref, bary_hyp)
        weights_first_barycenter = np.zeros((C.shape[0])) + 1 / C.shape[0]
        weights_second_barycenter = np.zeros((C.shape[1])) + 1 / C.shape[1]
        wasserstein_distance = ot.emd2(weights_first_barycenter, weights_second_barycenter, C,
                                       log=True)[0]
        dic_results = {
            "W": wasserstein_distance,

        }
        for reg in [10, 1, 5, 1, 0.1, 0.5, 0.01, 0.001]:
            wasserstein_sinkhorn = ot.bregman.sinkhorn2(weights_first_barycenter, weights_second_barycenter, C,
                                                        reg=reg, numItermax=10000).tolist()
            if isinstance(wasserstein_sinkhorn, list):
                wasserstein_sinkhorn = wasserstein_sinkhorn[0]  # for POT==0.7.0
            dic_results['SD_{}'.format(reg)] = wasserstein_sinkhorn
        return dic_results

    def w_barycenter(self, measures_locations, weights):
        """
        :param measures_locations: location of the discrete input measures
        :param weights: weights of the input measures
        :return: barycentrique distribution
        """
        X_init = np.zeros((measures_locations[0].shape[0], self.d_bert)).astype(np.float64)
        if weights is None:
            measures_weights = [np.array(
                [1 / measures_locations[0].shape[0]] * measures_locations[0].shape[0])] * self.n_layers
        else:
            measures_weights = [weights / sum(weights)] * self.n_layers
        b = np.array([1 / measures_locations[0].shape[0]] * measures_locations[0].shape[0]).astype(np.float64)
        mesure_bary = ot.lp.free_support_barycenter(measures_locations, measures_weights, X_init,
                                                    b=b, numItermax=1000, verbose=False)
        return mesure_bary

    @property
    def supports_multi_ref(self):
        """
        :return: BaryScore does not support multi ref
        """
        return False

In [18]:
metric_call = BaryScoreMetric(use_idfs=False)

ref = [
        'I like my cakes very much',
        'I hate these cakes!']
hypothesis = ['I like my cakes very much',
                  'I like my cakes very much']

metric_call.prepare_idfs(ref, hypothesis)
final_preds = metric_call.evaluate_batch(ref, hypothesis)
print(final_preds)

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForMaskedLM: ['cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


TypeError: ignored

# InfoLM

This code is by COLOMBO et STAERMAN (https://github.com/PierreColombo/nlg_eval_via_simi_measures/blob/main/nlg_eval_via_simi_measures/infolm.py) 

In [36]:
from __future__ import absolute_import, division, print_function

from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModelForMaskedLM
import torch
import os
from math import log
from collections import defaultdict, Counter


class InfoLM:
    def __init__(self, model_name="bert-base-uncased", temperature=0.25, measure_to_use='fisher_rao',
                 use_idf_weights=True, alpha=None, beta=None):
        """
        :param model_name: model name or path from HuggingFace Librairy
        :param temperature: temperature to calibrate the LM
        :param measure_to_use: which measure of information to use
        :param use_idf_weights: if true use tf-idf weights
        :param alpha: alpha parameter in the ab, alpha or renyi div
        :param beta: beta parameter in the ab or beta div
        """
        self.model_name = model_name
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.load_tokenizer_and_model()
        self.use_idf_weights = use_idf_weights
        self.temperature = temperature
        self.alpha = alpha
        self.beta = beta
        self.measure_to_use = measure_to_use
        assert self.measure_to_use in ['kl', 'alpha', 'renyi', 'beta', 'ab', 'l1', "l2", "linf", 'fisher_rao']
        if self.measure_to_use in ['alpha', 'ab', 'renyi']: assert alpha is not None
        if self.measure_to_use in ['beta', 'ab']:  assert beta is not None

    def prepare_idfs(self, hyps, refs):
        """
        :param hyps: hypothesis list of string sentences has to be computed at corpus level
        :param refs:reference list of string sentences has to be computed at corpus level
        """
        t_hyps = self.tokenizer(hyps)['input_ids']
        t_refs = self.tokenizer(refs)['input_ids']
        idf_dict_ref = self.ref_list_to_idf(t_refs)
        idf_dict_hyp = self.ref_list_to_idf(t_hyps)
        self.idf_dict_hyp, self.idf_dict_ref = idf_dict_hyp, idf_dict_ref
        return idf_dict_hyp, idf_dict_ref

    def ref_list_to_idf(self, input_refs):
        """
        :param input_refs: list of input reference
        :return: idf dictionnary
        """
        idf_count = Counter()
        num_docs = len(input_refs)

        idf_count.update(sum([list(set(i)) for i in input_refs], []))

        idf_dict = defaultdict(lambda: log((num_docs + 1) / (1)))
        idf_dict.update({idx: log((num_docs + 1) / (c + 1)) for (idx, c) in idf_count.items()})
        return idf_dict

    def load_tokenizer_and_model(self):
        """
        Loading and initializing the chosen model and tokenizer
        """
        tokenizer = AutoTokenizer.from_pretrained('{}'.format(self.model_name))
        model = AutoModelForMaskedLM.from_pretrained('{}'.format(self.model_name))
        model.config.output_hidden_states = True
        model.eval()
        self.tokenizer = tokenizer
        self.model = model.to(self.device)

    def _safe_divide(self, numerator, denominator):
        """
        :param numerator: quotient numerator
        :param denominator: quotient denominator
        :return: safe divide of numerator/denominator
        """
        return numerator / (denominator + 1e-30)

    def nan_to_num(self, tensor):
        """
        :param tensor: input tensor
        :return: tensor without nan
        """
        tensor[tensor != tensor] = 0
        return tensor

    def alpha_div(self, ref_dist, hypo_dist):
        """
        :param ref_dist: discreate input reference distribution over the vocabulary
        :param hypo_dist: discreate hypothesis reference distribution over the vocabulary
        :param alpha: alpha parameter of the divergence
        :return: alpha divergence between the reference and hypothesis distribution
        """
        alpha = self.alpha
        assert alpha != 1 and alpha != 0
        return 1 / (alpha * (alpha - 1)) - torch.sum(ref_dist ** alpha * hypo_dist ** (1 - alpha), dim=-1) / (
                alpha * (alpha - 1))

    def linfinity(self, ref_dist, hypo_dist):
        """
        :param ref_dist: discreate input reference distribution over the vocabulary
        :param hypo_dist: discreate hypothesis reference distribution over the vocabulary
        :return: l infinity norm between the reference and hypothesis distribution
        """
        return torch.norm(ref_dist - hypo_dist, p=float("inf"), dim=-1)

    def l2(self, ref_dist, hypo_dist):
        """
        :param ref_dist: discreate input reference distribution over the vocabulary
        :param hypo_dist: discreate hypothesis reference distribution over the vocabulary
        :return: l2 norm between the reference and hypothesis distribution
        """
        return torch.norm(ref_dist - hypo_dist, p=2, dim=-1)

    def l1(self, ref_dist, hypo_dist):
        """
        :param ref_dist: discreate input reference distribution over the vocabulary
        :param hypo_dist: discreate hypothesis reference distribution over the vocabulary
        :return: l1 norm between the reference and hypothesis distribution
        """
        return torch.norm(ref_dist - hypo_dist, p=1, dim=-1)

    def fisher_rao(self, ref_dist, hypo_dist):
        """
        :param ref_dist: discreate input reference distribution over the vocabulary
        :param hypo_dist: discreate hypothesis reference distribution over the vocabulary
        :return: fisher rao distance between the reference and hypothesis distribution
        """
        rao_fi = torch.clamp(
            torch.sum(torch.sqrt(ref_dist) * torch.sqrt(
                hypo_dist),
                      dim=-1), 0, 1)
        rao = 2 * torch.acos(rao_fi)
        return rao

    def kl_div(self, ref_dist, hypo_dist):
        """
        :param ref_dist: discreate input reference distribution over the vocabulary
        :param hypo_dist: discreate hypothesis reference distribution over the vocabulary
        :return: kl divergence between the reference and hypothesis distribution
        """
        kl = torch.sum(ref_dist * torch.log(hypo_dist / ref_dist), dim=-1)
        return kl

    def renyi_div(self, ref_dist, hypo_dist):
        """
        :param ref_dist: discreate input reference distribution over the vocabulary
        :param hypo_dist: discreate hypothesis reference distribution over the vocabulary
        :param alpha: alpha parameter of the divergence
        :return: renyi divergence between the reference and hypothesis distribution
        """
        alpha = self.alpha
        assert alpha != 1
        return torch.log(torch.sum(ref_dist ** alpha * hypo_dist ** (1 - alpha), dim=-1)) / (alpha - 1)

    def beta_div(self, ref_dist, hypo_dist):
        """
        :param ref_dist: discreate input reference distribution over the vocabulary
        :param hypo_dist: discreate hypothesis reference distribution over the vocabulary
        :param beta: beta parameter of the divergence
        :return: beta divergence between the reference and hypothesis distribution
        """
        beta = self.beta
        assert beta != -1
        assert beta != 0
        first_term = torch.log(torch.sum(ref_dist ** (beta + 1), dim=-1)) / (beta * (beta + 1))
        second_term = torch.log(torch.sum(hypo_dist ** (beta + 1), dim=-1)) / (beta + 1)
        third_term = torch.log(torch.sum(ref_dist * hypo_dist ** (beta), dim=-1)) / (beta)
        return first_term + second_term - third_term

    def ab_div(self, ref_dist, hypo_dist):
        """
        :param ref_dist: discreate input reference distribution over the vocabulary
        :param hypo_dist: discreate hypothesis reference distribution over the vocabulary
        :param alpha: alpha parameter of the divergence
        :param beta: beta parameter of the divergence
        :return: ab divergence between the reference and hypothesis distribution
        """
        beta = self.beta
        alpha = self.alpha
        assert alpha != 0
        assert beta != 0
        assert beta + alpha != 0
        first_term = torch.log(torch.sum(ref_dist ** (beta + alpha), dim=-1)) / (beta * (beta + alpha))
        second_term = torch.log(torch.sum(hypo_dist ** (beta + alpha), dim=-1)) / (alpha * (beta + alpha))
        third_term = torch.log(torch.sum((ref_dist ** (alpha)) * (hypo_dist ** (beta)), dim=-1)) / (beta * alpha)
        return first_term + second_term - third_term

    def compute_infolm(self, ref_distribution, hyp_distribution):
        """
        :param ref_distribution: aggregated reference distribution (weighted or not / calibrated of not)
        :param hyp_distribution: : aggregated hypothesis distribution (weighted or not  / calibrated of not)
        :return: infoLM score
        """
        if self.measure_to_use == 'kl':
            measure = self.kl_div
        elif self.measure_to_use == 'alpha':
            measure = self.alpha_div
        elif self.measure_to_use == 'renyi':
            measure = self.renyi_div
        elif self.measure_to_use == 'beta':
            measure = self.beta_div
        elif self.measure_to_use == 'ab':
            measure = self.ab_div
        elif self.measure_to_use == 'l1':
            measure = self.l1
        elif self.measure_to_use == 'l2':
            measure = self.l2
        elif self.measure_to_use == 'linf':
            measure = self.linfinity
        elif self.measure_to_use == 'fisher_rao':
            measure = self.fisher_rao
        else:
            raise NotImplementedError
        normal_div = self.nan_to_num(measure(ref_distribution, hyp_distribution))
        reversed_div = self.nan_to_num(measure(hyp_distribution, ref_distribution))
        return {
            "{}".format(self.measure_to_use): normal_div.tolist(),
            "r_{}".format(self.measure_to_use): reversed_div.tolist(),
            "sim_{}".format(self.measure_to_use): ((normal_div + reversed_div) / 2).tolist(),
        }

    def get_distribution(self, tokenizer_output, idf_dic):
        """
        :param tokenizer_output:
        :param idf_dic:
        :return:
        """
        final_distribution = []
        idfs = []
        max_length = self.tokenizer(tokenizer_output, return_tensors="pt", padding=True, truncation=True).to(
            self.device)['input_ids'].size()[-1]
        for index_to_mask in range(max_length):
            unmasked_data = self.tokenizer(tokenizer_output, return_tensors="pt", padding=True, truncation=True).to(
                self.device)
            if self.use_idf_weights:
                ids_masked_list = unmasked_data['input_ids'][:, index_to_mask].tolist()
                tf_idf_term = torch.tensor([idf_dic[id] for id in ids_masked_list]).unsqueeze(-1)
                idfs.append(tf_idf_term)
            labels = unmasked_data['input_ids'].clone()
            masked_indices = torch.zeros_like(labels).to(self.device).bool()
            masked_indices[:, index_to_mask] = 1
            labels[~masked_indices] = -100
            masked_input_ids = unmasked_data['input_ids']
            masked_input_ids[:, index_to_mask] = self.tokenizer.mask_token_id
            unmasked_data['input_ids'] = masked_input_ids
            outputs = self.model(**unmasked_data, labels=labels)
            logits_distribution = outputs[1][:, index_to_mask, :].cpu()
            dict_logits_distribution = {}
            pad_token_mask = ((labels.eq(self.tokenizer.pad_token_id)[:, index_to_mask] |
                               labels.eq(self.tokenizer.cls_token_id)[:,
                               index_to_mask]) |
                              labels.eq(self.tokenizer.sep_token_id)[:, index_to_mask])
            pad_token_mask = pad_token_mask.unsqueeze(1).repeat(1, logits_distribution.size(-1))

            dict_logits_distribution[str(self.temperature)] = torch.nn.Softmax()(
                logits_distribution / self.temperature)
            if self.use_idf_weights:
                dict_logits_distribution[str(self.temperature)] = dict_logits_distribution[
                                                                      str(self.temperature)] * tf_idf_term

            dict_logits_distribution[str(self.temperature)][pad_token_mask] = torch.ones_like(
                dict_logits_distribution[str(self.temperature)][pad_token_mask]) * 10000
            del masked_input_ids
            del labels
            del unmasked_data
            del outputs
            final_distribution.append(dict_logits_distribution)
        return final_distribution, idfs

    def evaluate_batch(self, batch_hyps, batch_refs, idf_hyps=None, idf_ref=None):
        """
        :param batch_hyps: hypothesis list of string sentences
        :param batch_refs: reference list of string sentences
        :param idf_hyps: idfs of hypothesis computed at corpus level
        :param idf_ref: idfs of references computed at corpus level
        :return: dictionary of scores
        """
        if self.use_idf_weights:
            if (idf_hyps is None) and (idf_ref is None):
                idf_hyps, idf_ref = self.idf_dict_hyp, self.idf_dict_ref
            idf_hyps[self.model.config.pad_token_id] = 0  # for padding
            idf_ref[self.model.config.pad_token_id] = 0
        with torch.no_grad():
            dict_final_distribution_batch_refs, idfs_ref = self.get_distribution(batch_refs,
                                                                                 idf_ref if self.use_idf_weights else None)
            dict_final_distribution_batch_hypothesis, idfs_hyp = self.get_distribution(batch_hyps,
                                                                                       idf_hyps if self.use_idf_weights else None)
        mask_ref = self.tokenizer(batch_refs, return_tensors="pt", padding=True, truncation=True)['input_ids']
        mask_hyps = self.tokenizer(batch_hyps, return_tensors="pt", padding=True, truncation=True)['input_ids']
        mask_ref = ((mask_ref.eq(self.tokenizer.sep_token_id) |
                     mask_ref.eq(self.tokenizer.cls_token_id)) |
                    mask_ref.eq(self.tokenizer.pad_token_id))
        mask_hyps = ((mask_hyps.eq(self.tokenizer.sep_token_id) |
                      mask_hyps.eq(self.tokenizer.cls_token_id)) |
                     mask_hyps.eq(self.tokenizer.pad_token_id))

        mask_words_hyps = torch.sum(~mask_hyps, dim=1)
        mask_words_refs = torch.sum(~mask_ref, dim=1)
        mask_ref = mask_ref.unsqueeze(-1).repeat(1, 1,
                                                 dict_final_distribution_batch_hypothesis[0][
                                                     str(self.temperature)].size(
                                                     -1))
        mask_hyps = mask_hyps.unsqueeze(-1).repeat(1, 1,
                                                   dict_final_distribution_batch_hypothesis[0][
                                                       str(self.temperature)].size(
                                                       -1))

        final_distribution_batch_refs = torch.cat(
            [i[str(self.temperature)].unsqueeze(1) for i in dict_final_distribution_batch_refs],
            dim=1)
        final_distribution_batch_refs[mask_ref] = 0
        final_distribution_batch_hypothesis = torch.cat(
            [i[str(self.temperature)].unsqueeze(1) for i in dict_final_distribution_batch_hypothesis], dim=1)
        final_distribution_batch_hypothesis[mask_hyps] = 0
        if self.use_idf_weights:
            sum_distribution_refs = torch.sum(final_distribution_batch_refs, dim=1) / torch.sum(
                torch.cat(idfs_ref, dim=-1),
                dim=-1).unsqueeze(-1)
            sum_distribution_hypothesis = torch.sum(final_distribution_batch_hypothesis,
                                                    dim=1) / torch.sum(torch.cat(idfs_hyp, dim=-1),
                                                                       dim=-1).unsqueeze(-1)
        else:
            sum_distribution_hypothesis = torch.sum(final_distribution_batch_hypothesis,
                                                    dim=1) / mask_words_hyps.unsqueeze(-1).repeat(1,
                                                                                                  final_distribution_batch_hypothesis[
                                                                                                      0].size(
                                                                                                      -1))
            sum_distribution_refs = torch.sum(final_distribution_batch_refs, dim=1) / mask_words_refs.unsqueeze(
                -1).repeat(1, final_distribution_batch_hypothesis[0].size(-1))

        info_dic = self.compute_infolm(sum_distribution_hypothesis, sum_distribution_refs)
        return info_dic

In [None]:
for measure in ['kl', 'alpha', 'renyi', 'beta', 'ab', 'l1', "l2", "linf", 'fisher_rao']:
      metric = InfoLM(measure_to_use=measure, alpha=0.25, beta=0.25, temperature=1, use_idf_weights=False)

      ref = ['I like my cakes very much', 'I like my cakes very much']
      hypothesis = ['I like my cakes very much', 'I hate these cakes very much']

      idf_ref, idf_hypot = metric.prepare_idfs(ref, hypothesis)

      final_preds = metric.evaluate_batch(ref, hypothesis)
      print(final_preds)

Downloading (…)okenizer_config.json:   0%|          | 0.00/28.0 [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

Downloading (…)solve/main/vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading (…)/main/tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/440M [00:00<?, ?B/s]

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForMaskedLM: ['cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
  dict_logits_distribution[str(self.temperature)] = torch.nn.Softmax()(


{'kl': [0.0, -1.148772954940796], 'r_kl': [0.0, -0.7877662777900696], 'sim_kl': [0.0, -0.9682695865631104]}


Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForMaskedLM: ['cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


{'alpha': [6.723403930664062e-05, -0.6946702003479004], 'r_alpha': [6.723403930664062e-05, -0.8292465209960938], 'sim_alpha': [6.723403930664062e-05, -0.7619583606719971]}


Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForMaskedLM: ['cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


{'renyi': [-1.684814014879521e-05, 0.18606692552566528], 'r_renyi': [-1.684814014879521e-05, 0.2253216952085495], 'sim_renyi': [-1.684814014879521e-05, 0.205694317817688]}


Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForMaskedLM: ['cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


{'beta': [0.0, 0.5876803398132324], 'r_beta': [0.0, 0.4290280342102051], 'sim_beta': [0.0, 0.5083541870117188]}


Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForMaskedLM: ['cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


{'ab': [0.0, 1.581939697265625], 'r_ab': [0.0, 1.581939697265625], 'sim_ab': [0.0, 1.581939697265625]}


Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForMaskedLM: ['cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


{'l1': [0.0, 0.6655765175819397], 'r_l1': [0.0, 0.6655765175819397], 'sim_l1': [0.0, 0.6655765175819397]}


Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForMaskedLM: ['cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


{'l2': [0.0, 0.1671038120985031], 'r_l2': [0.0, 0.1671038120985031], 'sim_l2': [0.0, 0.1671038120985031]}


Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForMaskedLM: ['cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


{'linf': [0.0, 0.11335118114948273], 'r_linf': [0.0, 0.11335118114948273], 'sim_linf': [0.0, 0.11335118114948273]}


Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForMaskedLM: ['cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


{'fisher_rao': [0.0, 1.2079099416732788], 'r_fisher_rao': [0.0, 1.2079099416732788], 'sim_fisher_rao': [0.0, 1.2079099416732788]}


# Depthscore

This code is by COLOMBO et STAERMAN (https://github.com/PierreColombo/nlg_eval_via_simi_measures/blob/main/nlg_eval_via_simi_measures/depth_score.py)

In [25]:
pip install geomloss

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting geomloss
  Downloading geomloss-0.2.5.tar.gz (26 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: geomloss
  Building wheel for geomloss (setup.py) ... [?25l[?25hdone
  Created wheel for geomloss: filename=geomloss-0.2.5-py3-none-any.whl size=32068 sha256=fdc244fe229ece0c9ea979466c8db1fe369de6d9dffb0e5907012f1469967fb3
  Stored in directory: /root/.cache/pip/wheels/91/5f/d7/0dbc6074929fc09f1280db596bfe0e2b59e5790bdfbaefe017
Successfully built geomloss
Installing collected packages: geomloss
Successfully installed geomloss-0.2.5


In [26]:
from __future__ import absolute_import, division, print_function
import torch
from tqdm import tqdm
from transformers import AutoModelForMaskedLM, AutoTokenizer
from sklearn.preprocessing import normalize
from sklearn.covariance import MinCovDet as MCD
from sklearn.decomposition import PCA
import logging

import ot
import geomloss

logging.basicConfig(
    format="%(asctime)s - %(levelname)s - %(name)s -   %(message)s", datefmt="%m/%d/%Y %H:%M:%S",
    level=logging.INFO)


class DepthScoreMetric:
    def __init__(self, model_name="bert-base-uncased", layers_to_consider=9, considered_measure='irw', p=None, eps=None,
                 n_alpha=None):

        """
        DepthScore metric
        :param model_name: model name or path from HuggingFace Librairy
        :param layers_to_consider: layer to use in the pretrained model
        :param considered_measure: measure of similarity to use should be in ["irw", "ai_irw", "wasserstein", "sliced", "mmd"]
        :param p: the power of the ground cost.
        :param eps:   the highest level set.
        :param n_alpha: The Monte-Carlo parameter for the approximation of the integral
        over alpha.
        """
        self.n_alpha = 5 if n_alpha is None else n_alpha
        self.eps = 0.3 if eps is None else eps
        self.p = 5 if p is None else p
        self.model_name = model_name
        self.load_tokenizer_and_model()
        self.considered_measure = considered_measure
        assert considered_measure in ["irw", "ai_irw", "wasserstein", "sliced", "mmd"]
        self.layers_to_consider = layers_to_consider
        assert layers_to_consider < self.model.config.num_hidden_layers + 1
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    def load_tokenizer_and_model(self):
        """
        Loading and initializing the chosen model and tokenizer
        """
        tokenizer = AutoTokenizer.from_pretrained('{}'.format(self.model_name))
        model = AutoModelForMaskedLM.from_pretrained('{}'.format(self.model_name))
        model.config.output_hidden_states = True
        model.eval()
        self.tokenizer = tokenizer
        self.model = model

    def evaluate_batch(self, batch_hyps, batch_refs, idf_hyps=None, idf_ref=None):
        """
        :param batch_hyps: hypothesis list of string sentences
        :param batch_refs: reference list of string sentences
        :return: dictionnary of scores
        """
        ###############################################
        ## Extract Embeddings From Pretrained Models ##
        ###############################################
        if isinstance(batch_hyps, str):
            batch_hyps = [batch_hyps]
        if isinstance(batch_refs, str):
            batch_refs = [batch_refs]
        nb_sentences = len(batch_refs)
        depth_scores = []
        assert len(batch_hyps) == len(batch_refs)

        model = self.model.to(self.device)

        with torch.no_grad():
            ###############################################
            ## Extract Embeddings From Pretrained Models ##
            ###############################################
            batch_refs = self.tokenizer(batch_refs, return_tensors='pt', padding=True, truncation=True).to(self.device)
            batch_refs_embeddings_ = model(**batch_refs)[-1]

            batch_hyps = self.tokenizer(batch_hyps, return_tensors='pt', padding=True, truncation=True).to(self.device)
            batch_hyps_embeddings_ = model(**batch_hyps)[-1]

            batch_refs_embeddings = [batch_refs_embeddings_[i] for i in [self.layers_to_consider]]
            batch_hyps_embeddings = [batch_hyps_embeddings_[i] for i in [self.layers_to_consider]]

            batch_refs_embeddings = torch.cat([i.unsqueeze(0) for i in batch_refs_embeddings])
            batch_refs_embeddings.div_(torch.norm(batch_refs_embeddings, dim=-1).unsqueeze(-1))
            batch_hyps_embeddings = torch.cat([i.unsqueeze(0) for i in batch_hyps_embeddings])
            batch_hyps_embeddings.div_(torch.norm(batch_hyps_embeddings, dim=-1).unsqueeze(-1))

            ref_tokens_id = batch_refs['input_ids'].cpu().tolist()
            hyp_tokens_id = batch_hyps['input_ids'].cpu().tolist()

            ######################################
            ## Unbatched Depth Score Prediction ##
            ######################################
            for index_sentence in tqdm(range(nb_sentences), 'Depth Score Progress'):
                ref_tokens = [i for i in self.tokenizer.convert_ids_to_tokens(ref_tokens_id[index_sentence],
                                                                              skip_special_tokens=False) if
                              i != self.tokenizer.pad_token]
                hyp_tokens = [i for i in self.tokenizer.convert_ids_to_tokens(hyp_tokens_id[index_sentence],
                                                                              skip_special_tokens=False) if
                              i != self.tokenizer.pad_token]

                ref_ids = [k for k, w in enumerate(ref_tokens) if True]
                hyp_ids = [k for k, w in enumerate(hyp_tokens) if True]

                ref_embedding_i = batch_refs_embeddings[:, index_sentence, ref_ids, :]
                hyp_embedding_i = batch_hyps_embeddings[:, index_sentence, hyp_ids, :]
                measures_locations_ref = ref_embedding_i.permute(1, 0, 2).cpu().numpy().tolist()
                measures_locations_ref = [np.array(i) for i in measures_locations_ref]
                measures_locations_hyps = hyp_embedding_i.permute(1, 0, 2).cpu().numpy().tolist()
                measures_locations_hyps = [np.array(i) for i in measures_locations_hyps]

                dict_score = self.depth_score(measures_locations_ref, measures_locations_hyps)
                depth_scores.append(dict_score)
        depth_scores_dic = {}
        for k in dict_score.keys():
            depth_scores_dic[k] = []
            for score in depth_scores:
                depth_scores_dic[k].append(score[k])
        return depth_scores_dic

    def prepare_idfs(self, hyps, refs):
        """
        Depth Score does not use idfs
        """
        return None, None

    def depth_score(self, measures_locations_ref, measures_locations_hyps):
        """
        :param measures_locations_ref: discrete input measures of one reference
        :param measures_locations_hyps: discrete input measures of one hypothesis
        :return:
        """
        ##################################################################
        ## Compute Score between the location and hypothesis reference ##
        ##################################################################
        measures_locations_ref = np.array(measures_locations_ref).squeeze(1)
        measures_locations_hyps = np.array(measures_locations_hyps).squeeze(1)
        depth_score = dr_distance(measures_locations_ref, measures_locations_hyps, n_alpha=self.n_alpha,
                                  n_dirs=10000, data_depth=self.considered_measure, eps_min=self.eps, eps_max=1,
                                  p=self.p)
        return {'depth_score': depth_score}

    @property
    def supports_multi_ref(self):
        """
        :return: BaryScore does not support multi ref
        """
        return False


########################################################
#################### Sampled distribution ########################
########################################################


########################################################
#################### Some useful functions ########################
########################################################


def cov_matrix(X, robust=False):
    """
    :param X: input matrix
    :param robust: if true compute a robust estimate
    :return: covariance matrix of X
    """

    if robust:
        cov = MCD().fit(X)
        sigma = cov.covariance_
    else:
        sigma = np.cov(X.T)

    return sigma


def standardize(X, robust=False):
    """
    :param X:  input matrix
    :param robust: if true compute a robust estimate of the covariance matrix
    :return: square inverse f the covariance matrix of X.
    """

    sigma = cov_matrix(X, robust)
    n_samples, n_features = X.shape
    rank = np.linalg.matrix_rank(X)

    if (rank < n_features):
        pca = PCA(rank)
        pca.fit(X)
        X_transf = pca.fit_transform(X)
        sigma = cov_matrix(X_transf)
    else:
        X_transf = X.copy()

    u, s, _ = np.linalg.svd(sigma)
    square_inv_matrix = u / np.sqrt(s)

    return X_transf @ square_inv_matrix


########################################################
#################### Sampled distributions ########################
########################################################

def sampled_sphere(n_dirs, d):
    """
    :param n_dirs: number of direction to consider
    :param d: dimension of the unite sphere
    :return: ndirs samples of d-dimensional uniform distribution on the
        unit sphere
    """

    mean = np.zeros(d)
    identity = np.identity(d)
    U = np.random.multivariate_normal(mean=mean, cov=identity, size=n_dirs)

    return normalize(U)


def Wasserstein(X, Y):
    """
    :param X: input distribution X
    :param Y: input distribution Y
    :return: wasserstein distance between X and Y
    """
    M = ot.dist(X, Y)
    n = len(X)
    m = len(Y)
    w_X = np.zeros(n) + 1 / n
    w_Y = np.zeros(m) + 1 / m

    return ot.emd2(w_X, w_Y, M)


def SW(X, Y, ndirs, p=2, max_sliced=False):
    """
    :param X: input distribution X
    :param Y: input distribution Y
    :param ndirs: number of direction to consider when slicing
    :param p: order of the Sliced wasserstein distance
    :param max_sliced: if true take the maximum, if false the mean is applied
    :return: Sliced-Wasserstein distance between X and Y
    """
    n, d = X.shape
    U = sampled_sphere(ndirs, d)
    Z = np.matmul(X, U.T)
    Z2 = np.matmul(Y, U.T)
    Sliced = np.zeros(ndirs)
    for k in range(ndirs):
        Sliced[k] = ot.emd2_1d(Z[:, k], Z2[:, k], p=2)
    if (max_sliced == True):
        return (np.max(Sliced)) ** (1 / p)
    else:
        return (np.mean(Sliced)) ** (1 / p)


def MMD(X, Y):
    """
    :param X: input distribution X
    :param Y: input distribution Y
    :return:  MMD cost between X and Y
    """
    return geomloss.SamplesLoss("gaussian")(torch.tensor(X), torch.tensor(Y)).item()


########################################################
#################### Data Depths ########################
########################################################

def ai_irw(X, AI=True, robust=False, n_dirs=None, random_state=None):
    """
    :param X: Array of shape (n_samples, n_features)
            The training set.
    :param AI: bool
        if True, the affine-invariant version of irw is computed.
        If False, the original irw is computed.
    :param robust:  if robust is true, the MCD estimator of the covariance matrix
        is performed.
    :param n_dirs:   The number of random directions needed to approximate
        the integral over the unit sphere.
        If None, n_dirs is set as 100* n_features.
    :param random_state:  The random state.
    :return:   Depth score of each element in X_test, where the considered depth is (Affine-invariant-) integrated rank
        weighted depth of X_test w.r.t. X
    """

    if random_state is None:
        random_state = 0

    np.random.seed(random_state)

    if AI:
        X_reduced = standardize(X, robust)
    else:
        X_reduced = X.copy()

    n_samples, n_features = X_reduced.shape

    if n_dirs is None:
        n_dirs = n_features * 100

    # Simulated random directions on the unit sphere.
    U = sampled_sphere(n_dirs, n_features)

    sequence = np.arange(1, n_samples + 1)
    depth = np.zeros((n_samples, n_dirs))

    proj = np.matmul(X_reduced, U.T)
    rank_matrix = np.matrix.argsort(proj, axis=0)

    for k in range(n_dirs):
        depth[rank_matrix[:, k], k] = sequence

    depth = depth / (n_samples * 1.)
    depth_score = np.minimum(depth, 1 - depth)
    ai_irw_score = np.mean(depth_score, axis=1)

    return ai_irw_score


import numpy as np


def dr_distance(X, Y, n_alpha=10, n_dirs=100, data_depth='tukey', eps_min=0,
                eps_max=1, p=2, random_state=None):
    """
    :param X: array of shape (n_samples, n_features)
        The first sample.
    :param Y: array of shape (n_samples, n_features)
        The second sample.
    :param n_alpha: The Monte-Carlo parameter for the approximation of the integral
        over alpha.
    :param n_dirs: The number of directions for approximating the supremum over
        the unit sphere.
    :param data_depth: depth to consider in  {'tukey', 'projection', 'irw', 'ai_irw'}
    :param eps_min: float in [0,eps_max]
        the lowest level set.
    :param eps_max: float in [eps_min,1]
        the highest level set.
    :param p:    the power of the ground cost.
    :param random_state:  The random state.
    :return: the computed pseudo-metric score.
    """

    if random_state is None:
        random_state = 0

    np.random.seed(random_state)

    if data_depth not in {'tukey', 'projection', 'irw', 'ai_irw', 'wasserstein', 'mmd', 'sliced'}:
        raise NotImplementedError('This data depth is not implemented')

    if eps_min > eps_max:
        raise ValueError('eps_min must be lower than eps_max')

    if eps_min < 0 or eps_min > 1:
        raise ValueError('eps_min must be in [0,eps_max]')

    if eps_max < 0 or eps_max > 1:
        raise ValueError('eps_min must be in [eps_min,1]')

    _, n_features = X.shape
    if data_depth == "irw":
        depth_X = ai_irw(X, AI=False, n_dirs=n_dirs)
        depth_Y = ai_irw(Y, AI=False, n_dirs=n_dirs)
    elif data_depth == "ai_irw":
        depth_X = ai_irw(X, AI=True, n_dirs=n_dirs)
        depth_Y = ai_irw(Y, AI=True, n_dirs=n_dirs)
    elif data_depth == 'wasserstein':
        return Wasserstein(X, Y)
    elif data_depth == 'sliced':
        return SW(X, Y, ndirs=10000)
    elif data_depth == 'mmd':
        return MMD(X, Y)

        # draw n_dirs vectors of the unit sphere in dimension n_features.
    U = sampled_sphere(n_dirs, n_features)
    proj_X = np.matmul(X, U.T)
    proj_Y = np.matmul(Y, U.T)

    liste_alpha = np.linspace(int(eps_min * 100), int(eps_max * 100), n_alpha)
    quantiles_DX = [np.percentile(depth_X, j) for j in liste_alpha]
    quantiles_DY = [np.percentile(depth_Y, j) for j in liste_alpha]

    dr_score = 0
    for i in range(n_alpha):
        d_alpha_X = np.where(depth_X >= quantiles_DX[i])[0]
        d_alpha_Y = np.where(depth_Y >= quantiles_DY[i])[0]
        supp_X = np.max(proj_X[d_alpha_X], axis=0)
        supp_Y = np.max(proj_Y[d_alpha_Y], axis=0)
        dr_score += np.max((supp_X - supp_Y) ** p)

    return (dr_score / n_alpha) ** (1 / p)

In [None]:
model_name = 'distilbert-base-uncased'  # we consider distillbert for speed concerns
metric_call = DepthScoreMetric(model_name, layers_to_consider=4)

ref = ['I like my cakes very much', 'I hate these cakes so much']
hypothesis = ['I like my cakes very much', 'I like my cakes very much']

final_preds = metric_call.evaluate_batch(ref, hypothesis)
print(final_preds)

Downloading (…)okenizer_config.json:   0%|          | 0.00/28.0 [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/483 [00:00<?, ?B/s]

Downloading (…)solve/main/vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading (…)/main/tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/268M [00:00<?, ?B/s]

Depth Score Progress: 100%|██████████| 2/2 [00:15<00:00,  7.95s/it]

{'depth_score': [0.0, 0.09306903874347265]}





# Dataset and Human judgement

In [38]:
import pandas as pd
import os

cmd_string = 'git clone https://github.com/PierreColombo/nlg_eval_via_simi_measures.git'

os.system(cmd_string)

32768

In [39]:
import json

# Mettre le nom des fichiers
with open('.//nlg_eval_via_simi_measures//raw_score//15_fi-en_formated.json') as mon_fichier:
    data = json.load(mon_fichier)

data = pd.DataFrame(data)

data = data.transpose()

def get_gene(dico):
    return dico['wmt15']['generated_sentence']

data['system'][0]

{'wmt15': {'generated_sentence': 'Seppo Sir'}}['wmt15']['generated_sentence']

data['generated_sentence'] = data['system'].apply(get_gene)

def get_human(dico):
    return dico['wmt15']['scores']['human']

data['human_score'] = data['system'].apply(get_human)

data = data.reset_index(drop=True)

In [40]:
data['generated_sentence'] = data['generated_sentence'].apply(str)
data['references_sentences'] = data['references_sentences'].apply(str)

In [None]:
measure = 'kl'
metric = InfoLM(measure_to_use=measure, alpha=0.25, beta=0.25, temperature=1, use_idf_weights=False)
#data['infoLM_'+measure]={}
list_infoLM = []
for i in data.index[139:] : 
  ref = [data.loc[i,'references_sentences']]
  hyp = [data.loc[i,'generated_sentence']]
  idf_ref, idf_hypot = metric.prepare_idfs(ref, hyp)
  final_preds = metric.evaluate_batch(ref, hyp)
  print(final_preds)
  list_infoLM.append(final_preds)
  print(i)

In [20]:
metric_call = BaryScoreMetric(use_idfs=False)
list_BaryScore = []
for i in tqdm(data.index): 
  ref = [data.loc[i,'references_sentences']]
  hypothesis = [data.loc[i,'generated_sentence']]
  metric_call.prepare_idfs(ref, hypothesis)
  final_preds = metric_call.evaluate_batch(ref, hypothesis)
  list_BaryScore.append(final_preds)

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForMaskedLM: ['cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
  0%|          | 0/500 [00:00<?, ?it/s]
  u = 1. / nx.dot(Kp, v)

BaryScore Progress: 100%|██████████| 1/1 [00:00<00:00,  1.26it/s]
  0%|          | 1/500 [00:01<16:04,  1.93s/it]
  v = b / KtransposeU

BaryScore Progress: 100%|██████████| 1/1 [00:02<00:00,  2.39s/it]
  0%|          | 2/500 [00:05<22:58,  2.77s/it]
  v = b / KtransposeU

BaryScore Progress: 100%|██

In [31]:
model_name = 'distilbert-base-uncased'  # we consider distillbert for speed concerns
metric_call = DepthScoreMetric(model_name, layers_to_consider=4)
list_depthscore = []
for i in tqdm(data.index): 
  ref = [data.loc[i,'references_sentences']]
  hypothesis = [data.loc[i,'generated_sentence']]
  metric_call.prepare_idfs(ref, hypothesis)
  final_preds = metric_call.evaluate_batch(ref, hypothesis)
  list_depthscore.append(final_preds)

Downloading (…)okenizer_config.json:   0%|          | 0.00/28.0 [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/483 [00:00<?, ?B/s]

Downloading (…)solve/main/vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading (…)/main/tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/268M [00:00<?, ?B/s]

  0%|          | 0/500 [00:00<?, ?it/s]
Depth Score Progress:   0%|          | 0/1 [00:00<?, ?it/s][A
Depth Score Progress: 100%|██████████| 1/1 [00:05<00:00,  5.93s/it]
  0%|          | 1/500 [00:06<52:12,  6.28s/it]
Depth Score Progress:   0%|          | 0/1 [00:00<?, ?it/s][A
Depth Score Progress: 100%|██████████| 1/1 [00:03<00:00,  3.76s/it]
  0%|          | 2/500 [00:10<40:51,  4.92s/it]
Depth Score Progress:   0%|          | 0/1 [00:00<?, ?it/s][A
Depth Score Progress: 100%|██████████| 1/1 [00:03<00:00,  3.11s/it]
  1%|          | 3/500 [00:13<34:42,  4.19s/it]
Depth Score Progress:   0%|          | 0/1 [00:00<?, ?it/s][A
Depth Score Progress: 100%|██████████| 1/1 [00:04<00:00,  4.29s/it]
  1%|          | 4/500 [00:18<35:34,  4.30s/it]
Depth Score Progress:   0%|          | 0/1 [00:00<?, ?it/s][A
Depth Score Progress: 100%|██████████| 1/1 [00:03<00:00,  3.24s/it]
  1%|          | 5/500 [00:21<33:33,  4.07s/it]
Depth Score Progress:   0%|          | 0/1 [00:00<?, ?it/s][A
De

In [32]:
data['DepthScore'] = list_depthscore

In [None]:
data['Baryscore'] = data['Baryscore'].apply(ast.literal_eval)

def baryscore_W(dico): 
    return dico.get('baryscore_W')[0]

data['Baryscore'] = data['Baryscore'].apply(baryscore_W)

data['DepthScore'] = data['DepthScore'].apply(ast.literal_eval)

def depth_score(dico): 
    return dico.get('depth_score')[0]

data['DepthScore'] = data['DepthScore'].apply(depth_score)

In [None]:
list_TER = []
for i in tqdm(data.index): 
    ref = word_tokenize(data.loc[i,'references_sentences'])
    #print(ref)
    hypothesis = word_tokenize(data.loc[i,'generated_sentence'])
    #print(hypothesis)
    ter_score = sacrebleu.corpus_ter(hypotheses=hypothesis, references=[ref])
    list_TER.append(ter_score.score/100)
    # WARNING : when TER = 1.0, the sentence are TOTALLY DIFFERENT

data['TER'] = list_TER

In [None]:
list_METEOR = []
for i in tqdm(data.index): 
    ref = word_tokenize(data.loc[i,'references_sentences'])
    #print(ref)
    hypothesis = word_tokenize(data.loc[i,'generated_sentence'])
    #print(hypothesis)
    score = meteor_score([ref], hypothesis)
    list_METEOR.append(score)

data['METEOR'] = list_METEOR

In [None]:
list_bleu = []
for i in tqdm(data.index): 
    ref = word_tokenize(data.loc[i,'references_sentences'])
    #print(ref)
    hypothesis = word_tokenize(data.loc[i,'generated_sentence'])
    #print(hypothesis)
    score = sentence_bleu([ref], hypothesis)
    list_bleu.append(score)
    # score = 0 if no 4-grams match each other

data['BLEU_nltk'] = list_bleu

# Text-level correlation  : 

$C_{t,f} = \frac{1}{N} \sum_{i=1}^{N} K([f(h_i, r_i^1), ... , f(h_i ,r_i^S)], [g(h_i, r_i^1), ... , g(h_i ,r_i^S)]$



## Spearman

In [None]:
def C_spearman_text_level(df, autometric, humanmetric):
    #calculate Spearman Rank correlation and corresponding p-value
    rho, p = spearmanr(list(df[autometric]), list(df[humanmetric]))
    #print Spearman rank correlation and p-value
    print("rho =", rho)
    print("p-value", p)
    return abs(rho), p

In [None]:
df = sns.load_dataset('penguins')
matrix = df.corr().round(2)

liste_metric = ['human_score','BLEU_nltk', 'Baryscore', 'DepthScore']
matrix.columns=liste_metric
matrix.index=liste_metric
matrix['TER'] = 0
matrix.loc['TER',:] = 0
matrix['METEOR'] = 0
matrix.loc['METEOR',:] = 0
matrix['infoML_kl'] = 0
matrix.loc['infoML_kl',:] = 0

for col in matrix.columns:
    for index in matrix.index:
        matrix.loc[index, col] = np.float64(C_spearman_text_level(data, index, col)[0].round(2))
        print(type(matrix.loc[index, col]))

In [None]:
(matrix.style.background_gradient(cmap='coolwarm', axis=None, vmin=0.0, vmax=1.0).format(precision=2))

In [None]:
mask = np.zeros(matrix.shape, dtype=bool)
np.fill_diagonal(mask, False)
for i in range(matrix.shape[0]):
    for j in range(i):
        mask[j][i] = True

#mask = np.triu(np.ones_like(matrix, dtype=bool))
sns.heatmap(matrix, annot=True, vmax=1, vmin=0, center=0.5,cmap='coolwarm', mask=mask)
plt.show()

In [None]:
# For p-values : 

df = sns.load_dataset('penguins')
matrix = df.corr().round(2)

liste_metric = ['human_score','BLEU_nltk', 'Baryscore', 'DepthScore']
matrix.columns=liste_metric
matrix.index=liste_metric
matrix['TER'] = 0
matrix.loc['TER',:] = 0
matrix['METEOR'] = 0
matrix.loc['METEOR',:] = 0
"""matrix['infoML_kl'] = 0
matrix.loc['infoML_kl',:] = 0"""

for col in matrix.columns:
    for index in matrix.index:
        matrix.loc[index, col] = round(np.float64(C_spearman_text_level(data, index, col)[1]),2)
        #print(type(matrix.loc[index, col]))

matrix

## Pearson

In [None]:
def C_pearson_le(df, autometric, humanmetric):
    rho, p = pearsonr(list(df[autometric]), list(df[humanmetric]))
    print("rho =", rho)
    print("p-value", p)
    return abs(rho), p

In [None]:
# A changer

df = sns.load_dataset('penguins')
matrix = df.corr().round(2)

liste_metric = ['human_score','BLEU_nltk', 'Baryscore', 'DepthScore']
matrix.columns=liste_metric
matrix.index=liste_metric
matrix['TER'] = 0
matrix.loc['TER',:] = 0
matrix['METEOR'] = 0
matrix.loc['METEOR',:] = 0
"""matrix['infoML_kl'] = 0
matrix.loc['infoML_kl',:] = 0"""

for col in matrix.columns:
    for index in matrix.index:
        matrix.loc[index, col] = round(np.float64(C_pearson_le(data, index, col)[0]),2)
        print(type(matrix.loc[index, col]))

In [None]:
(matrix.style.background_gradient(cmap='coolwarm', axis=None, vmin=0.0, vmax=1.0).format(precision=2))

In [None]:
mask = np.zeros(matrix.shape, dtype=bool)
np.fill_diagonal(mask, False)
for i in range(matrix.shape[0]):
    for j in range(i):
        mask[j][i] = True

#mask = np.triu(np.ones_like(matrix, dtype=bool))
sns.heatmap(matrix, annot=True, vmax=1, vmin=0, center=0.5,cmap='coolwarm', mask=mask)
plt.show()

In [None]:
# For p-values : 

df = sns.load_dataset('penguins')
matrix = df.corr().round(2)

liste_metric = ['human_score','BLEU_nltk', 'Baryscore', 'DepthScore']
matrix.columns=liste_metric
matrix.index=liste_metric
matrix['TER'] = 0
matrix.loc['TER',:] = 0
matrix['METEOR'] = 0
matrix.loc['METEOR',:] = 0
"""matrix['infoML_kl'] = 0
matrix.loc['infoML_kl',:] = 0"""

for col in matrix.columns:
    for index in matrix.index:
        matrix.loc[index, col] = round(np.float64(C_pearson_le(data, index, col)[1]),2)
        #print(type(matrix.loc[index, col]))

matrix

## Kendall

In [None]:
def C_kendall_le(df, autometric, humanmetric):
    rho, p = kendalltau(list(df[autometric]), list(df[humanmetric]))
    return abs(rho), p

In [None]:
df = sns.load_dataset('penguins')
matrix = df.corr().round(2)

liste_metric = ['human_score','BLEU_nltk', 'Baryscore', 'DepthScore']
matrix.columns=liste_metric
matrix.index=liste_metric
matrix['TER'] = 0
matrix.loc['TER',:] = 0
matrix['METEOR'] = 0
matrix.loc['METEOR',:] = 0
"""matrix['infoML_kl'] = 0
matrix.loc['infoML_kl',:] = 0"""

for col in matrix.columns:
    for index in matrix.index:
        matrix.loc[index, col] = round(np.float64(C_kendall_le(data, index, col)[0]),2)
        #print(type(matrix.loc[index, col]))

In [None]:
(matrix.style.background_gradient(cmap='coolwarm', axis=None, vmin=0.0, vmax=1.0).format(precision=2))

In [None]:
mask = np.zeros(matrix.shape, dtype=bool)
np.fill_diagonal(mask, False)
for i in range(matrix.shape[0]):
    for j in range(i):
        mask[j][i] = True

#mask = np.triu(np.ones_like(matrix, dtype=bool))
sns.heatmap(matrix, annot=True, vmax=1, vmin=0, center=0.5,cmap='coolwarm', mask=mask)
plt.show()

In [None]:
# For p-values : 

df = sns.load_dataset('penguins')
matrix = df.corr().round(2)

liste_metric = ['human_score','BLEU_nltk', 'Baryscore', 'DepthScore']
matrix.columns=liste_metric
matrix.index=liste_metric
matrix['TER'] = 0
matrix.loc['TER',:] = 0
matrix['METEOR'] = 0
matrix.loc['METEOR',:] = 0
"""matrix['infoML_kl'] = 0
matrix.loc['infoML_kl',:] = 0"""

for col in matrix.columns:
    for index in matrix.index:
        matrix.loc[index, col] = round(np.float64(C_kendall_le(data, index, col)[1]),2)
        #print(type(matrix.loc[index, col]))

matrix