In [None]:
import json
import re
import string
from collections import Counter

In [None]:

NO_ANS = "[CLS]"
INVALID_QUESTION = -1


def load_json(path):
    with open(path) as json_file:
        o_file = json_file.read()
    return json.loads(o_file)


def filter_questions(exp_ans, pred_ans):
    """
    check if the expected answer and the predicted answer are the same.
    Args:
        exp_ans (str) : expected answer
        pred_ans (str) : predicted answer
    Returns:
        str : "VALID" if the answers are the same, "NO MATCH" otherwise
    """
    if pred_ans == NO_ANS:
        return "NO MATCH"
    if clean_text(exp_ans) != clean_text(pred_ans):
        return "NO MATCH"
    return "VALID"


def clean_text(text):
    """
    clean a text by removing punctuation and (some) stopwords.
    Args:
        text (str) : text to clean
    Returns:
        str : cleaned text
    """
    # TODO: improve
    # TODO: add support to french language
    text = text.lower()
    text = text.translate(str.maketrans("", "", string.punctuation))
    text = re.sub(r"\b(a|an|the|in|our)\b", " ", text)
    return re.sub(" +", " ", text).strip()


def raw_f1_score(a_gold, a_pred):
    """
    compute the raw F1 score between two answers.
    Args:
        a_gold (str) : expected answer
        a_pred (str) : predicted answer
    Returns:
        float : F1 score
    """
    if a_pred == "":
        return 0
    gold_toks = clean_text(a_gold).split()
    pred_toks = clean_text(a_pred).split()
    common = Counter(gold_toks) & Counter(pred_toks)
    num_same = sum(common.values())
    if num_same == 0:
        return 0
    precision = 1.0 * num_same / len(pred_toks)
    recall = 1.0 * num_same / len(gold_toks)
    f1_score = (2 * precision * recall) / (precision + recall)
    return f1_score


def non_personal(question, nlp):
    """
    check if a question contains personal pronouns.
    Args:
        question (str) : question to check
        nlp (spacy.lang) : spacy language model
    Returns:
        bool : True if the question does not contain personal pronouns, False otherwise
    """
    question_tok = nlp(question)
    for tok in question_tok:
        if tok.dep_ == "nsubj":
            if (
                tok.text.lower() == "i" or tok.text.lower() == "you"
            ):  # TODO: add support to french language
                return False
        elif tok.dep_ == "poss":
            if (
                tok.text.lower() == "my" or tok.text.lower() == "your"
            ):  # TODO: add support to french language
                return False
    return True


# pylint:disable=invalid-name
class MetadataExtractor:
    def __init__(self):
        self.metadata_extractor = MetafeatureExtractorsRunner()

    def add_word_regex_matches_count(self, regex_rule, name=None):
        """
        Adds a regex rule to the metadata extractor.
        For a given regex return the number of words matching the regex.

        Args:
            regex_rule (str): regex rule to add
        """
        self.metadata_extractor.add_metafeature_extractor(
            WordRegexMatchesCount(regex=regex_rule, name=name)
        )

    def add_regex_match_count(self, regex_rule, name=None):
        """
        Adds a regex rule to the metadata extractor.
        For a given regex return the number of matches it has in the text.

        Args:
            regex_rule (str): regex rule to add
        """
        self.metadata_extractor.add_metafeature_extractor(
            RegexMatchCount(regex=regex_rule, name=name)
        )

    def compute(self, text):
        """
        Computes metadata from a text using elemeta library and returns a dictionary of metadata.

        Args:
            text (str): text to extract metadata from

        Returns:
            dict: dictionary of metadata
        """
        return self.metadata_extractor.run(text)


In [None]:

class BERTScore:
    def __init__(self, lan="en", model_type=None):
        """
        BERTScore computes a similarity score for each token in the candidate sentence with each
        token in the reference sentence.
        The final score is the average of the similarity scores of all tokens in the candidate sentence.

        Args:
            lan (str, optional): language to use. Defaults to "en", It may also be "fr". Depending
            on the language, a different model is used by default.
            model_type (sr, optional): Model to use. Defaults to None. If None, a default model is
            used depending on the language (see above).
        """
        if lan == "fr":
            self.model_type = (
                "distilbert-base-multilingual-cased" if not model_type else model_type
            )  # TODO; find uncased version
        elif lan == "en":
            self.model_type = (
                "distilbert-base-uncased" if not model_type else model_type
            )
        self.metric = load("bertscore")

    def compute(self, references, predictions, **kwargs):
        """
        Args:
            references (list): List of reference sentences.
            predictions (list): List of candidate sentences.

        Returns:
            list: List of scores for each candidate sentence. Contains a list of scores for
            precisions, recalls, and F1 scores.
        """
        assert len(references) == len(
            predictions
        ), "Number of references and predictions must be equal."
        assert isinstance(references, list), "References must be a list."
        assert isinstance(predictions, list), "Predictions must be a list."

        return self.metric.compute(
            predictions=predictions,
            references=references,
            model_type=self.model_type,
            **kwargs
        )


class MAUVE:
    def __init__(self, featurize_model_name="gpt2"):
        """
        MAUVE score computes the difference between the candidate sentence distribution
        and the reference sentence distribution.
        The bigger the MAUVE score, the better.
        """
        self.metric = load("mauve")
        self.featurize_model_name = featurize_model_name

    def compute(self, references, predictions, **kwargs):
        """
        Args:
            references (list): List of reference sentences.
            predictions (list): List of candidate sentences.

        Returns:
            list: List of MAUVE scores for each candidate sentence.
        """
        return self.metric.compute(
            predictions=predictions,
            references=references,
            featurize_model_name=self.featurize_model_name,
            **kwargs
        )


In [None]:


# pylint:disable=too-many-locals
class BLEURTScore:
    def __init__(self, checkpoint="BLEURT-tiny"):
        """
        BLEURT is a learnt metric that uses BERT to compute a similarity score for each token
        in the candidate sentence with each token in the reference sentence.

        Args:
            checkpoint (str, optional): Checkpoint to use. Defaults to BLEURT-tiny if not specified.
        """
        self.checkpoint = checkpoint
        self.metric = load("bleurt", module_type="metric", checkpoint=self.checkpoint)

    def compute(self, references, predictions, **kwargs):
        """
        Args:
            references (list): List of reference sentences.
            predictions (list): List of candidate sentences.

        Returns:
            list: List of scores for each candidate sentence.
        """
        assert len(references) == len(
            predictions
        ), "Number of references and predictions must be equal."
        assert isinstance(references, list), "References must be a list."
        assert isinstance(predictions, list), "Predictions must be a list."

        return self.metric.compute(
            predictions=predictions, references=references, **kwargs
        )


class QSquared:
    def __init__(self, lan="en") -> None:
        """
        Q² is a reference-free metric that aims to evaluate the factual consistency of knowledge-grounded
        dialogue systems. The approach is based on automatic question generation and question answering
        Source: https://github.com/orhonovich/q-squared

        Args:
            lan (str, optional): Language to use. Defaults to "en", It may also be "fr".
        """
        self.qa_tokenizer = AutoTokenizer.from_pretrained(
            "ktrapeznikov/albert-xlarge-v2-squad-v2"
        )
        self.qa_model = AutoModelForQuestionAnswering.from_pretrained(
            "ktrapeznikov/albert-xlarge-v2-squad-v2"
        )
        self.qg_tokenizer = AutoTokenizer.from_pretrained(
            "mrm8488/t5-base-finetuned-question-generation-ap"
        )
        self.qg_model = AutoModelWithLMHead.from_pretrained(
            "mrm8488/t5-base-finetuned-question-generation-ap"
        )
        assert lan in ["fr", "en"], "Language must be either fr or en"
        self.bert_score = BERTScore(lan=lan)

        if lan == "fr":
            self.nlp = spacy.load("fr_core_news_sm")
        elif lan == "en":
            self.nlp = spacy.load("en_core_web_sm")

    def get_answer(
        self, question: str, text: str
    ):  # Code taken from https://huggingface.co/transformers/task_summary.html
        """
        Search for the answer in the text given the question.
        Args:
            question (str) : question to ask
            text (str) : text to search in
        Returns:
            answer (str) : answer to the question
        """
        inputs = self.qa_tokenizer.encode_plus(
            question, text, add_special_tokens=True, return_tensors="pt"
        )
        input_ids = inputs["input_ids"].tolist()[0]

        answer_start_scores, answer_end_scores = self.qa_model(
            **inputs, return_dict=False
        )

        answer_start = torch.argmax(
            answer_start_scores
        )  # Get the most likely beginning of answer with the argmax of the score
        answer_end = (
            torch.argmax(answer_end_scores) + 1
        )  # Get the most likely end of answer with the argmax of the score

        ans = self.qa_tokenizer.convert_tokens_to_string(
            self.qa_tokenizer.convert_ids_to_tokens(input_ids[answer_start:answer_end])
        )
        return ans

    def get_answer_candidates(self, text: str):
        """
        Look for candidate aswers that could be answered by the text.
        Args:
            text (str) : text to search in
        Returns:
            candidates (str) : candidates answers
        """
        doc = self.nlp(text)
        candidates = [ent.text for ent in list(doc.ents)]
        noun_chunks = list(doc.noun_chunks)
        for chunk in noun_chunks:
            found = False
            for cand in candidates:
                if chunk.text.lower() == cand.lower():
                    found = True
            if not found:
                candidates.append(chunk.text)
        # candidates += [chunk.text for chunk in list(doc.noun_chunks) if chunk.text not in candidates]
        candidates = [cand for cand in candidates if cand.lower() != "i"]
        return candidates

    def get_questions_beam(
        self, answer, context, max_length=128, beam_size=5, num_return=5
    ):
        """
        Get the n best questions for a given answer, given the context. "Beam" is the name of the
        approach
        Args:
            answer (str) : answer to the question
            context (str) : context to search in
            max_length (int, optional) : max length of the generated question. Defaults to 128.
            beam_size (int, optional) : beam size. Defaults to 5.
            num_return (int, optional) : number of questions to return. Defaults to 5.
        Returns:
            all_questions (list) : n best questions
        """
        all_questions = []
        input_text = f"answer: {answer}  context: {context} </s>"
        features = self.qg_tokenizer([input_text], return_tensors="pt")

        beam_outputs = self.qg_model.generate(
            input_ids=features["input_ids"],
            attention_mask=features["attention_mask"],
            max_length=max_length,
            num_beams=beam_size,
            no_repeat_ngram_size=3,
            num_return_sequences=num_return,
            early_stopping=True,
        )

        for beam_output in beam_outputs:
            all_questions.append(
                self.qg_tokenizer.decode(beam_output, skip_special_tokens=True).replace(
                    "question: ", "", 1
                )
            )

        return all_questions

    def single_question_score(self, question, answer, response, knowledge):
        """
        Given a candidate pair of question and answer (generated from the candidate text), get the
        score of the aswer given by taking as a context the knowledge that the LLM was given.
        The higher the F1-score, the more the model we are trying to evaluate is consistent
        with the knowledge.
        Args:
            question (str) : cadidate question (generated from the candidate text)
            answer (str) : candidate answer (generated from the candidate text)
            response (str) : text generated by the LLM
            knowledge (str) : knowledge given as a context to the LLM

        Returns:
            score, answer (tuple) : bert-score of the knowledge answer, knowledge answer
        """

        pred_ans = self.get_answer(question, response)

        if (
            filter_questions(answer, pred_ans) == "VALID"
        ):  # check if the answer is valid
            knowledge_ans = self.get_answer(question, knowledge)
            if knowledge_ans != NO_ANS:
                score = self.bert_score.compute(
                    references=[answer], predictions=[knowledge_ans]
                )
                return score["f1"][0], knowledge_ans
            return 0, NO_ANS
        return INVALID_QUESTION, INVALID_QUESTION

    def compute(self, response, knowledge, single=False, remove_personal=True):
        """
        Compute the Q² score for a given response and knowledge.
        Args:
            response (str) : text generated by the LLM
            knowledge (str) : knowledge given as a context to the LLM
            single (bool) : if True, only one question is generated for each candidate answer.
                            Defaults to False.
            remove_personal (bool) : if True, remove questions that contain personal pronouns.
                                     Defaults to True.
        Returns:
            avg_f1 (float) : average F1-bert-score of the knowledge answers (Q² score)
        """

        f1_bert_score = 0
        num_questions = 0

        # valid_questions = []
        # valid_cands = []
        # knowledge_answers = []
        # scores = []

        candidates = self.get_answer_candidates(response)
        for cand in candidates:
            questions = self.get_questions_beam(cand, response)
            for question in questions:
                if not remove_personal or non_personal(question, self.nlp):
                    question_score, _ = self.single_question_score(
                        question, cand, response, knowledge
                    )
                    if question_score != INVALID_QUESTION:
                        num_questions += 1
                        f1_bert_score += question_score

                        # valid_questions.append(question)
                        # valid_cands.append(cand)
                        # knowledge_answers.append(knowledge_ans)
                        # scores.append(question_score)

                        if single:
                            break

        if num_questions:
            avg_f1 = f1_bert_score / num_questions
        else:
            avg_f1 = INVALID_QUESTION
        return avg_f1  # , valid_questions, valid_cands, knowledge_answers, scores


In [None]:
class SelfCheckGPT:
    def __init__(
        self,
        model,
        eval_model_name_or_path="TheBloke/Llama-2-7b-Chat-GGUF",
        eval_model_basename="llama-2-7b-chat.Q4_K_M.gguf",
    ):
        """
        This class implements the self-check GPT evaluation metric for generative language models.
        It is inspired by the self-check metric proposed in https://arxiv.org/pdf/2303.08896.pdf.
        Args:
            model (transformers.PreTrainedModel): GPT model to evaluate.
            eval_model_name_or_path (str): Evaluation model name or path. Defaults to "TheBloke/Llama-2-7b-Chat-GGUF".
            eval_model_basename (str): Evaluation model basename. Defaults to "llama-2-7b-chat.Q4_K_M.gguf".
        """
        assert isinstance(
            eval_model_name_or_path, str
        ), "eval_model_name_or_path must be a string."
        assert isinstance(
            eval_model_basename, str
        ), "eval_model_basename must be a string."

        self.model = model
        self.eval_model_path = hf_hub_download(
            repo_id=eval_model_name_or_path, filename=eval_model_basename
        )

        self.eval_model = Llama(
            model_path=self.eval_model_path, n_threads=2, verbose=False  # CPU cores
        )

    def get_prompt(self, pred, sample, question):
        """
        This method returns a prompt template given a candidate sentence, a sample sentence, and a question.
        Args:
            pred (str): Candidate sentence.
            sample (str): Sample sentence.
            question (str): Question asked to the model for which it generated $pred.

        Returns:
            str: Prompt template.
        """
        system_prompt = "You are a helpful, polite and concise assistant. Your task is to check if two texts provide the same answer to a given question. Always answer with a single word. The possible answers are either YES or NO.\n\n"
        question = "###Question:\n" + question
        text1 = "\n###Text 1: " + sample
        text2 = "\n###Text 2: " + pred

        prompt_template = f"""SYSTEM: {system_prompt}
        USER: {question + text1 + text2}
        ASSISTANT (YES or NO):"""

        return prompt_template

    def get_prompts(self, pred, samples, question):
        """
        This method returns a list of prompt templates given a candidate sentence, a list
        of sample sentences, and a question.
        Args:
            pred (str): Candidate sentence.
            samples (list of str): List of sample sentences.
            question (str): Question asked to the model for which it generated $pred.

        Returns:
            list: List of prompt templates.
        """
        print(samples)
        return [self.get_prompt(pred, sample, question) for sample in samples]

    def compute(self, question, pred, n_samples):
        """
        Args:
            question (str): Question asked to the model for which it generated $pred.
            pred (str): Candidate sentence.
            n_samples (int): Number of samples to generate.

        Returns:
            score (float): Score for the candidate sentence.
        """
        assert isinstance(question, str), "Prediction must be a string."
        assert isinstance(pred, str), "Prediction must be a string."
        assert isinstance(n_samples, int), "Number of samples must be an integer."
        assert n_samples > 0, "Number of samples must be greater than 0."
        assert question and pred, "Question and prediction must be non-empty."

        # Generate n_samples samples from the model
        samples = []
        print("Samples:\n")
        for _ in range(n_samples):
            system_prompt = "You are a helpful, respectful and honest assistant. Always answer as helpfully as possible."
            prompt_template = f"""SYSTEM: {system_prompt}
            USER: {question}
            ASSISTANT:"""

            response = self.model(prompt_template, max_tokens=200)
            sample = response["choices"][0]["text"]
            print(sample, "\n")
            samples.append(sample)
        print("\n")

        # For each sample, ask evaluator model to evaluate the sample
        prompts = self.get_prompts(pred, samples, question)
        scores = []
        print("Prompts:\n")
        for prompt in prompts:
            print(prompt, "\n")
            answer = self.eval_model(prompt, max_tokens=200)["choices"][0]["text"]
            print(answer, "\n")
            scores.append(answer)
        print("\n")

        # Compute the score: how often the sentence if supported by the sample
        score = np.mean([1 if "yes" in score.lower() else 0 for score in scores])

        return score


class GEval:
    def __init__(
        self,
        model_name_or_path="TheBloke/Llama-2-7b-Chat-GGUF",
        model_basename="llama-2-7b-chat.Q4_K_M.gguf",
    ):
        """
        This class implements the GEval evaluation metric for generative language models.
        It is inspired by the GEval metric proposed in https://arxiv.org/pdf/2303.16634.pdf.
        Args:
            model_name_or_path (str): Model name or path. Defaults to "TheBloke/Llama-2-7b-Chat-GGUF".
            model_basename (str): Model basename. Defaults to "llama-2-7b-chat.Q4_K_M.gguf".
        """
        assert isinstance(
            model_name_or_path, str
        ), "model_name_or_path must be a string."
        assert isinstance(model_basename, str), "model_basename must be a string."

        self.model_path = hf_hub_download(
            repo_id=model_name_or_path, filename=model_basename
        )

        self.lcpp_llm = Llama(
            model_path=self.model_path,
            n_threads=2,  # CPU cores
            logits_all=True,
            n_ctx=1000,
        )

        self.tasks = {
            "summ": "You will be given one summary written for a news article. Your task is to rate the summary on one metric. Please make sure you read and understand these instructions carefully. Please keep this document open while reviewing, and refer to it as needed.",
            "diag": "You will be given a conversation between two individuals. You will then be given one potential response for the next turn in the conversation. The response concerns an interesting fact, which will be provided as well. Your task is to rate the responses on one metric. Please make sure you read and understand these instructions carefully. Please keep this document open while reviewing, and refer to it as needed.",
        }
        self.aspects = {
            "COH": {
                "name": "Coherence",
                "prompt": "Coherence (1-5) - the collective quality of all sentences. We align this dimension with the DUC quality question of structure and coherence whereby ”the summary should be well-structured and well-organized. The summary should not just be a heap of related information, but should build from sentence to sentence to a coherent body of information about a topic.”",
            },
            "CON": {
                "name": "Consistency",
                "prompt": "Consistency (1-5) - the factual alignment between the summary and the summarized source. A factually consistent summary contains only statements that are entailed by the source document. Annotators were also asked to penalize summaries that contained hallucinated facts. ",
            },
            "ENG": {
                "name": "Engagingness",
                "prompt": "Engagingness (1-5) - Is the response dull/interesting? - A score of 1 indicates that the response is dull and uninteresting. A score of 5 indicates that the response is interesting and engaging.",
            },
            "FLU": {
                "name": "Fluency",
                "prompt": "Fluency (1-5) - the quality of the summary in terms of grammar, spelling, punctuation, word choice, and sentence structure. - 1: Poor. The summary is difficult to read and understand. It contains many grammatical errors, spelling mistakes, and/or punctuation errors. - 2: Fair. The summary is somewhat difficult to read and understand. It contains some grammatical errors, spelling mistakes, and/or punctuation errors. - 3: Good. The summary is easy to read and understand. It contains few grammatical errors, spelling mistakes, and/or punctuation errors. - 4: Very Good. The summary is easy to read and understand. It contains no grammatical errors, spelling mistakes, and/or punctuation errors. - 5: Excellent. The summary is easy to read and understand. It contains no grammatical errors, spelling mistakes, and/or punctuation errors.",
            },
            "REL": {
                "name": "Relevance",
                "prompt": "Relevance (1-5) - selection of important content from the source. The summary should include only important information from the source document. Annotators were instructed to penalize summaries which contained redundancies and excess information.",
            },
            "POL": {
                "name": "Politeness",
                "prompt": "Politeness (1-5) - the degree to which the response is polite. - 1: Very impolite. The response is very impolite. - 2: Somewhat impolite. The response is somewhat impolite. - 3: Neutral. The response is neutral. - 4: Somewhat polite. The response is somewhat polite. - 5: Very polite. The response is very polite.",
            },
        }

    def get_prediction(self, prompt):
        """
        This method returns a prediction given a prompt template.
        Args:
            prompt (str): Prompt template.

        Returns:
            response (dict): Response from the model.
        """
        response = self.lcpp_llm.create_completion(
            prompt=prompt,
            max_tokens=250,
            temperature=0.5,
            top_p=0.95,
            logprobs=5,
            repeat_penalty=1.2,
            top_k=50,
            echo=True,
        )
        return response

    def get_cot(self, prompt):
        """
        This method returns a chain of thoughts given a prompt template.
        Args:
            prompt (str): Prompt template.

        Returns:
            cot (str): Chain of thoughts.
        """
        title = "\nEvaluation steps:\n"
        cot = self.get_prediction(prompt + title)["choices"][0]["text"]
        return cot

    # pylint: disable=consider-iterating-dictionary
    def get_prompt(self, src, pred, task, aspect, custom_prompt):
        """
        Args:
            src (str): Source text.
            pred (str): Candidate sentence to evaluate.
            task (str): Definition of the task.
            aspect (str): Evaluation criterion code.
            custom_prompt (dict): Custom prompt template.
                Must contain the following keys: "task", "aspect", "name".
        """
        definition = (
            "\n Task definition:\n" + self.tasks[task]
            if task in self.tasks.keys()
            else custom_prompt["task"]
        )
        crit = (
            "\n Evaluation criteria:\n" + self.aspects[aspect]["prompt"]
            if aspect in self.aspects.keys()
            else custom_prompt["aspect"]
        )
        name = (
            self.aspects[aspect]["name"]
            if aspect in self.aspects.keys()
            else custom_prompt["name"]
        )

        prompt = f"{definition} {crit}"

        # Chain of thoughts, set of intermediate instructions generated by llm detailing evaluation steps
        auto_cot = self.get_cot(prompt)

        return (
            prompt
            + auto_cot
            + "\n Example:\n Source Text:\n"
            + src
            + "\n Generated text:\n"
            + pred
            + "\n Evaluation Form (scores ONLY):\n"
            + name
            + ": "
        )

    def get_score(self, prompt):
        """
        Args:
            prompt (str): Prompt template.

        Returns:
            score (float): Score for the candidate sentence.
        """
        response = self.get_prediction(prompt)
        tokens = response["choices"][0]["logprobs"]["tokens"]
        top_logprobs = response["choices"][0]["logprobs"]["top_logprobs"]

        # Extract evaluation form from tokens ()
        template_tokens = [
            " E",
            "valu",
            "ation",
            " Form",
            " (",
            "sc",
            "ores",
            " ON",
            "LY",
            "):",
        ]
        start_index = tokens.index(template_tokens[-1]) + 1
        # Extract number index from the remaining tokens
        for token in tokens[start_index:]:
            if token.isdigit():
                number_index = tokens.index(token)
                break

        # Get logprobs associated with number
        logprobs = top_logprobs[number_index]

        # Compute score
        # Get only keys that are numbers
        number_keys = [int(key) for key in logprobs.keys() if key.isdigit()]
        number_logprobs = [logprobs[str(key)] for key in number_keys]
        number_probs = [np.exp(logprob) for logprob in number_logprobs]

        score = np.sum(np.multiply(number_keys, number_probs)) / len(number_keys)

        return score

    def compute(self, source, pred, task, aspect, custom_prompt=None):
        """
        This method computes the GEval score for a candidate sentence given a source text,
        a prompt template, an aspect to evaluate, and a task description.
        Args:
            source (str): Source text.
            pred (str): Candidate sentence to evaluate.
            task (str): Definition of the task.
            aspect (str): Evaluation criterion code.
            custom_prompt (str, optional): Custom prompt template. Defaults to None.

        Returns:
            score (float): Score for the candidate sentence.
        """
        assert isinstance(source, str), "Source must be a string."
        assert isinstance(pred, str), "Pred must be a string."
        assert isinstance(task, str), "Definition must be a string."
        assert isinstance(aspect, str), "Criterion must be a string."
        assert custom_prompt is None or isinstance(
            custom_prompt, str
        ), "Criterion name must be a string."
        assert (
            aspect in self.aspects.keys() or custom_prompt is not None
        ), "Criterion name must be given if criterion is not in the list of criteria."
        if not custom_prompt:
            assert task and aspect, "Task and aspect must be given if no custom prompt is given."
        if not (task and aspect):
            assert custom_prompt, "A custom prompt must be given if task and aspect are not given."


        prompt = self.get_prompt(source, pred, task, aspect, custom_prompt)
        return self.get_score(prompt)


class GPTScore:
    # pylint: disable=f-string-without-interpolation
    def __init__(
        self,
        model_name_or_path="TheBloke/Llama-2-7b-Chat-GGUF",
        model_basename="llama-2-7b-chat.Q4_K_M.gguf",
    ):
        """
        This class implements the GPTScore evaluation metric for generative language models.
        It is inspired by the GPTScore metric proposed in https://arxiv.org/pdf/2302.04166.pdf.
        Args:
            model_name_or_path (str): Model name or path. Defaults to "TheBloke/Llama-2-7b-Chat-GGUF".
            model_basename (str): Model basename. Defaults to "llama-2-7b-chat.Q4_K_M.gguf".
        """
        assert isinstance(
            model_name_or_path, str
        ), "model_name_or_path must be a string."
        assert isinstance(model_basename, str), "model_basename must be a string."

        self.templates = {
            "summ": {
                "FAC": f"Generate a summary with consistent facts for the following text: {{src}}\n\nTl;dr{{pred}}",
                "COV": f"Generate a summary with as much semantic coverage as possible for the following text: {{src}}\n\nTl;dr{{pred}}",
                "CON": f"Generate factually consistent summary for the following text: {{src}}\n\nTl;dr{{pred}}",
                "INF": f"Generate an informative summary that captures the key points of the following text:{{src}}\n\nTl;dr{{pred}}",
                "COH": f"Generate a coherent summary for the following text: {{src}}\n\nTl;dr{{pred}}",
                "REL": f"Generate a relevant summary with consistent details for the following text: {{src}}\n\nTl;dr{{pred}}",
                "FLU": f"Generate a fluent and grammatical summary for the following text: {{src}}\n\nTl;dr{{pred}}",
            },
            "MT": {
                "ACC": f"Rewrite the following text with its core information and consistent facts:{{src}} In other words, {{pred}}",
                "FLU": f"Rewrite the following text to make it more grammatical and well-written:{{src}} In other words,{{pred}}",
                "MQM": f"Rewrite the following text into high-quality text with its core information:{{src}} In other words,{{pred}}",
            },
            "D2T": {
                "INF": f"Convert the following text to another expression that preserves key information:\n\n{{src}} In other words, {{pred}}",
                "NAT": f"Convert the following text into another expression that is human-like and natural:\n\n{{src}} In other words, {{pred}}",
                "FLU": f"Convert the following text into another expression that preserves key information and is human-like and natural:\n\n{{src}} In other words, {{pred}}",
            },
            "diag": {
                "COH": f"Answer the question based on the conversation between a human and AI.\nQuestion: Is the AI coherent and maintains a good conversation flow throughout the conversation? (a) Yes. (b) No.\nConversation:\nUser: {{src}}\nAI: {{pred}}\nAnswer:",
                "DIV": f"Answer the question based on the conversation between a human and AI.\nQuestion: Is there diversity in the AI responses? (a) Yes. (b) No.\nConversation:\nUser: {{src}}\nAI: {{pred}}\nAnswer:",
                "FLE": f"Answer the question based on the conversation between a human and AI.\nQuestion: Is the AI flexible and adaptable to human and their interests? (a) Yes. (b) No.\nConversation:\nUser: {{src}}\nAI: {{pred}}\nAnswer:",
                "UND": f"Answer the question based on the conversation between a human and AI.\nQuestion: Does the AI seem to understand the human? (a) Yes. (b) No.\nConversation:\nUser: {{src}}\nAI: {{pred}}\nAnswer:",
                "INQ": f"Answer the question based on the conversation between a human and AI.\nQuestion: Is the AI inquisitive throughout the conversation? (a) Yes. (b) No.\nConversation:\nUser: {{src}}\nAI: {{pred}}\nAnswer:",
                "CON": f"Answer the question based on the conversation between a human and AI.\nQuestion: Are the responses of AI consistent in the information it provides throughout the conversation? (a) Yes. (b) No.\nConversation:\nUser: {{src}}\nAI: {{pred}}\nAnswer:",
                "INF": f"Answer the question based on the conversation between a human and AI.\nQuestion: Are the responses of AI informative throughout the conversation? (a) Yes. (b) No.\nConversation:\nUser: {{src}}\nAI: {{pred}}\nAnswer:",
                "LIK": f"Answer the question based on the conversation between a human and AI.\nQuestion: Does the AI display a likeable personality? (a) Yes. (b) No.\nConversation:\nUser: {{src}}\nAI: {{pred}}\nAnswer:",
                "DEP": f"Answer the question based on the conversation between a human and AI.\nQuestion: Does the AI discuss topics in depth? (a) Yes. (b) No.\nConversation:\nUser: {{src}}\nAI: {{pred}}\nAnswer:",
                "ERR": f"Answer the question based on the conversation between a human and AI.\nQuestion: Is the AI able to recover from errors that it makes? (a) Yes. (b) No.\nConversation:\nUser: {{src}}\nAI: {{pred}}\nAnswer:",
            },
        }

        self.tasks = self.templates.keys()
        self.aspects = list(
            {aspect for task in self.tasks for aspect in self.templates[task]}
        )

        self.model_path = hf_hub_download(
            repo_id=model_name_or_path, filename=model_basename
        )

        self.lcpp_llm = Llama(
            model_path=self.model_path,
            n_threads=2,  # CPU cores
            logits_all=True,
        )

    def get_prompts(self, aspect, task, sources, preds):
        """
        This method returns a list of prompt templates given a task description, and an aspect to evaluate.
        Args:
            aspect (str): Aspect to evaluate.
            task (str): Task description.
            sources (list of str): Source texts.
            preds (list of str): Candidate sentences.
        Returns:
            list: List of prompt templates.
        """
        return [
            self.get_prompt(aspect, task, src, pred)
            for (src, pred) in zip(sources, preds)
        ]

    def get_prompt(self, aspect, task, src, pred):
        """
        This method returns a prompt template given a task description, and an aspect to evaluate.
        Args:
            aspect (str): Aspect to evaluate.
            task (str): Task description.
            src (str): Source text.
            pred (str): Candidate sentence.
        Returns:
            str: Prompt template.
        """
        # Check that the corresponding entry exists in the prompt template
        assert (
            aspect in self.templates[task]
        ), f"Aspect {aspect} is not available for task {task}."
        # Check that the prompt template is not empty
        assert self.templates[task][
            aspect
        ], f"Prompt template for aspect {aspect} and task {task} is non-existent. Please specify a prompt template."

        template = self.templates[task][aspect]

        # Replace placeholders with source and candidate sentence
        template = template.replace("{src}", src)
        template = template.replace("{pred}", pred)

        return template

    def compute(self, source, pred, prompt=None, aspect=None, task=None):
        """
        This method computes the GPTScore for a candidate sentence given a source text,
        a prompt template, an aspect to evaluate, and a task description.
        Args:
            source (str): Source text.
            pred (str): Candidate sentence.
            prompt (str, optional): Prompt template. Defaults to None.
            aspect (str, optional): Aspect to evaluate. Defaults to None.
            task (str, optional): Task description. Defaults to None.
        Returns:
            score (float): Score for the candidate sentence.
        """
        assert isinstance(source, str), "Source must be a string."
        assert isinstance(pred, str), "Pred must be a string."

        # If prompt is given, check that it is a string
        if prompt:
            assert isinstance(prompt, str), "Prompt must be a string."
            assert not aspect, "Aspect must not be given if prompt is given."
            assert not task, "Task must not be given if prompt is given."
        else:
            # If prompt is not given, check that task and aspect are given
            assert aspect, "Aspect must be given if prompt is not given."
            assert task, "Task must be given if prompt is not given."

        # If aspect is given, check that it is a string
        if aspect:
            assert isinstance(aspect, str), "Aspect must be a string."
            assert aspect in self.aspects, f"Aspect must be one of {self.aspects}."

        # If task is given, check that it is a string
        if task:
            assert isinstance(task, str), "Task must be a string."
            assert task in self.tasks, f"Task must be one of {self.tasks}."

        # Generative LLM is given a prompt template and some context information
        if not prompt:
            prompt = self.get_prompt(aspect, task, source, pred)

        response = self.lcpp_llm.create_completion(
            prompt=prompt,
            max_tokens=500,
            temperature=0.5,
            top_p=0.95,
            logprobs=1,
            repeat_penalty=1.2,
            top_k=50,
            echo=True,
        )

        # Compute logprobs
        # Find the end position of the input...
        print(response["choices"][0]["logprobs"]["text_offset"])
        i = response["choices"][0]["logprobs"]["text_offset"].index(len(prompt))
        if i == 0:
            i = i + 1

        # Get logprobs
        loss = -sum(
            response["choices"][0]["logprobs"]["token_logprobs"][i:-1]
        )  # ignore the last '.'
        avg_loss = loss / (
            len(response["choices"][0]["logprobs"]["text_offset"]) - i - 1
        )  # 1 is the last '.'

        return avg_loss


In [None]:

class LLMScorer:
    def __init__(
        self,
        model,
        lan="en",
        bleurt_model="BLEURT-tiny",
        mauve_model="gpt2",
        eval_model_name_or_path="TheBloke/Llama-2-7b-Chat-GGUF",
        eval_model_basename="llama-2-7b-chat.Q4_K_M.gguf",
        model_name_or_path="TheBloke/Llama-2-7b-Chat-GGUF",
        model_basename="llama-2-7b-chat.Q4_K_M.gguf",
    ) -> None:
        assert isinstance(lan, str), "lan must be a string."
        assert isinstance(bleurt_model, str), "bleurt_model must be a string."
        assert isinstance(mauve_model, str), "mauve_model must be a string."
        assert isinstance(eval_model_name_or_path, str), (
            "eval_model_name_or_path must be a string."
        )
        assert isinstance(eval_model_basename, str), (
            "eval_model_basename must be a string."
        )
        assert isinstance(model_name_or_path, str), (
            "model_name_or_path must be a string."
        )
        assert isinstance(model_basename, str), "model_basename must be a string."

        # Metrics
        self.bert_score = BERTScore(lan=lan)
        self.mauve = MAUVE(featurize_model_name=mauve_model)
        self.bleurt_score = BLEURTScore(checkpoint=bleurt_model)
        self.q_squared = QSquared(lan=lan)
        self.selfcheckgpt = SelfCheckGPT(
            model,
            eval_model_name_or_path=eval_model_name_or_path,
            eval_model_basename=eval_model_basename,
        )
        self.geval = GEval(
            model_name_or_path=model_name_or_path, model_basename=model_basename
        )
        self.gptscore = GPTScore(
            model_name_or_path=model_name_or_path, model_basename=model_basename
        )

        # Metadata
        self.metadata_extractor = MetadataExtractor()

    def score(
        self,
        input: str,
        prompt: str,
        prediction: str,
        context: str = None,
        reference: str = None,
        n_samples: int = 5,
        task: str = None,
        aspects: list = None,
        custom_prompt: str = None,
    ):
        """
        Args:
            input (str): Input to the model.
            prompt (str): Prompt to the model. Comprises the context and the input.
            prediction (str): Prediction of the model.
            context (str, optional): Context of the prediction. Defaults to None.
            reference (str, optional): Reference of the prediction. Defaults to None.
            n_samples (int, optional): Number of samples to generate. Defaults to 5.
            task (str, optional): Task definition. Defaults to None.
            aspects (list, optional): Aspects to evaluate. Defaults to None.
            custom_prompt (str, optional): Custom prompt. Defaults to None.
        """
        assert isinstance(prompt, str), "prompt must be a string."
        assert isinstance(input, str), "input must be a string."
        assert isinstance(context, str), "context must be a string."
        assert isinstance(prediction, str), "prediction must be a string."
        assert (
            isinstance(reference, str) or reference is None
        ), "Reference must be a string or None."
        assert isinstance(n_samples, int), "n_samples must be an integer."
        assert n_samples > 0, "n_samples must be greater than 0."
        assert (
            isinstance(task, str) or task is None
        ), "task must be a string or None."
        assert (
            isinstance(aspects, list) or aspects is None
        ), "aspects must be a list or None."
        assert (
            isinstance(custom_prompt, str) or custom_prompt is None
        ), "custom_prompt must be a string or None."


        if aspects:
            geval_scores = {key: 0 for key in task}
            gpt_scores = {key: 0 for key in task}
            for aspect in aspects:
                geval_scores[aspect] = self.geval.compute(
                    prompt, prediction, task, aspect, custom_prompt
                )
                gpt_scores[aspect] = self.gptscore.compute(
                    prompt, prediction, custom_prompt, aspect, task
                )

        metadata_dict = {
            "prompt": self.metadata_extractor.compute(prompt),
            "input": self.metadata_extractor.compute(input),
            "context": self.metadata_extractor.compute(context),
            "prediction": self.metadata_extractor.compute(prediction),
            "reference": self.metadata_extractor.compute(reference)
            if reference
            else None,
        }

        metrics_dict = {
            "bert_score": self.bert_score.compute([reference], [prediction])
            if reference
            else None,
            "mauve": self.mauve.compute([reference], [prediction])
            if reference
            else None,
            "bleurt_score": self.bleurt_score.compute([reference], [prediction])
            if reference
            else None,
            "q_squared": self.q_squared.compute(prediction, context),
            "selfcheck_gpt": self.selfcheckgpt.compute(prompt, prediction, n_samples),
            "g_eval": self.geval.compute(prompt, prediction, custom_prompt) if custom_prompt else geval_scores if aspects and task else None,
            "gpt_score": self.gptscore.compute(prompt, prediction, custom_prompt) if custom_prompt else gpt_scores if aspects and task else None,
        }

        output = {
            "metadata": metadata_dict,
            "metrics": metrics_dict,
        }

        return output


In [None]:
class TestLLMScorer(unittest.TestCase):
    def test_init(self):
        model = "TheBloke/Llama-2-7b-Chat-GGUF"
        false = False
        with self.assertRaises(AssertionError):
            
            LLMScorer(model=model, lan=false)
            LLMScorer(model=model, bleurt_model=false)
            LLMScorer(model=model, mauve_model=false)
            LLMScorer(model=model, eval_model_name_or_path=false)
            LLMScorer(model=model, eval_model_basename=false)
            LLMScorer(model=model, model_name_or_path=false)
            LLMScorer(model=model, model_basename=false)

    def test_score_bad_arguments(self):
        model = "TheBloke/Llama-2-7b-Chat-GGUF"
        scorer = LLMScorer(model=model)

        input = "I am a dog."
        prompt = f"System: You are a cat. You don't like dogs. User: {input}"
        context = "System: You are a cat. You don't like dogs."
        prediction = "I am a cat, I don't like dogs."
        reference = "I am a cat, I don't like dogs, miau."
        n_samples = 5
        task = "diag"
        aspect = ["FLU"]
        criterion_name = "Fluency"
        custom_prompt = "System: You are an evaluator. You must evaluate the fluency of the following dialog."

        with self.assertRaises(AssertionError):
            scorer.score(False, prompt, context, prediction, reference)
            scorer.score(input, False, context, prediction, reference)
            scorer.score(input, prompt, False, prediction, reference)
            scorer.score(input, prompt, context, False, reference)
            scorer.score(input, prompt, context, prediction, False)
            scorer.score(input, prompt, context, prediction, reference, n_samples=False)
            scorer.score(input, prompt, context, prediction, reference, task=False)
            scorer.score(input, prompt, context, prediction, reference, aspects=False)
            scorer.score(input, prompt, context, prediction, reference, criterion_name=False)
            scorer.score(input, prompt, context, prediction, reference, custom_prompt=False)

    def test_score(self):
        model_name_or_path = "TheBloke/Llama-2-7b-Chat-GGUF"
        model_basename = "llama-2-7b-chat.Q2_K.gguf"  # the model is in bin format

        model_path = hf_hub_download(
            repo_id=model_name_or_path, filename=model_basename
        )
        model = Llama(model_path=model_path, n_threads=2, verbose=False)  # CPU cores

        scorer = LLMScorer(model=model, eval_model_name_or_path=model_name_or_path, eval_model_basename=model_basename, model_name_or_path=model_name_or_path, model_basename=model_basename)

        input = "I am a dog."
        prompt = f"System: You are a cat. You don't like dogs. User: {input}"
        context = "Examples: Eww, I hate dogs."
        prediction = "I am a cat, I don't like dogs."
        reference = "I am a cat, I don't like dogs, miau."
        task = "diag"
        aspect = ["FLU"]
        custom_prompt = {"name": "Fluency", "task": "Dialog", "aspect": "Evaluate the fluency of the following dialog."}

        scores = scorer.score(input, prompt, context, prediction, reference)
        self.assertTrue(isinstance(scores, dict))
        self.assertTrue("scores" in scores)
        self.assertTrue("metadata" in scores)

        # All default
        print("All default")
        scores = scorer.score(
            input,
            prompt,
            prediction,
        )
        self.assertTrue(isinstance(scores, dict))
        self.assertTrue("scores" in scores)
        self.assertTrue("metadata" in scores)

        # All default, but with context
        print("All default, but with context")
        scores = scorer.score(
            input,
            prompt,
            prediction,
            context=context
        )
        self.assertTrue(isinstance(scores, dict))
        self.assertTrue("scores" in scores)
        self.assertTrue("metadata" in scores)

        # All default, but with reference
        print("All default, but with reference")
        scores = scorer.score(
            input,
            prompt,
            prediction,
            reference=reference,
        )
        self.assertTrue(isinstance(scores, dict))
        self.assertTrue("scores" in scores)
        self.assertTrue("metadata" in scores)

        # Precise task and aspect
        print("Precise task and aspect")
        scores = scorer.score(
            input,
            prompt,
            prediction,
            task=task,
            aspects=aspect,
        )
        self.assertTrue(isinstance(scores, dict))
        self.assertTrue("scores" in scores)
        self.assertTrue("metadata" in scores)

        # Precise custom prompt
        print("Precise custom prompt")
        scores = scorer.score(
            input,
            prompt,
            context,
            prediction,
            reference,
            custom_prompt=custom_prompt,
        )