# ChatPI

In [1]:
# !git clone https://github.com/Manan-dev/ChatPI.git

!!pip3 install -r ./ChatPi/requirements.txt

%load_ext autoreload
%autoreload 2


Cloning into 'ChatPI'...
remote: Enumerating objects: 548, done.[K
remote: Counting objects: 100% (93/93), done.[K
remote: Compressing objects: 100% (73/73), done.[K
remote: Total 548 (delta 38), reused 42 (delta 20), pack-reused 455[K
Receiving objects: 100% (548/548), 7.95 MiB | 13.38 MiB/s, done.
Resolving deltas: 100% (313/313), done.


## Utility functions

Most of the functions are used for utility purposes throughout the application

In [None]:
## These are our utility functions, used for various tasks throughout the project
import os
import glob
import evaluate
from termcolor import colored, cprint
import numpy as np
from termcolor import cprint, colored
from matplotlib import pyplot as plt
from tabulate import tabulate


def read_context(fname, basepath="./content/sections"):
    globpath = os.path.join(basepath, f"{fname}.*.md")
    print("GLOBPATH: ", globpath)
    files = glob.glob(globpath)
    print("FILES: ", files)
    files = sorted(files)

    fnames = [os.path.basename(f) for f in files]
    print(f"Found: {fnames}")

    for fname in files:
        fnameparts = fname.split(".")
        if not fnameparts[-2].isdigit():
            # print(f"Skipping: {fname}")
            continue
        with open(fname, "r") as f:
            text = f.read().strip()
            yield fname, text


def read_qa(fname, basepath="../sections"):
    fname = os.path.join(basepath, f"{fname}.qa.md")
    with open(fname, "r") as f:
        text = f.read().strip()

        for qa_text in text.split("---"):
            qa = qa_text.strip().split("\n")
            qa = [l for l in qa if l.strip()]

            if len(qa) > 2:
                raise ValueError(f"Too many lines in QA:\n{qa}")

            if len(qa) < 2:
                raise ValueError(f"Too few lines in QA:\n{qa}")

            q = qa[0].strip()
            a = qa[1].strip()

            yield q, a


def cscore(score: float):
    if score > 0.9:
        color = "green"
    elif score > 0.8:
        color = "light_green"
    elif score > 0.65:
        color = "light_yellow"
    elif score > 0.5:
        color = "yellow"
    elif score > 0.25:
        color = "light_red"
    else:
        color = "red"
    return colored(f"{round(score, 4)}", color)


def get_similarity_score(prediction: str, reference: str):
    import spacy
    from spacy.cli import download

    file = "en_core_web_lg"

    if not spacy.util.is_package(file):
        download(file)
    nlp = spacy.load(file)

    # Process the sentences
    doc1 = nlp(prediction)
    doc2 = nlp(reference)

    # remove stop words and punctuation
    doc1 = [t for t in doc1 if not t.is_stop and not t.is_punct]
    doc2 = [t for t in doc2 if not t.is_stop and not t.is_punct]

    # combine into a single doc
    doc1 = nlp(" ".join([t.text for t in doc1]))
    doc2 = nlp(" ".join([t.text for t in doc2]))

    # Compute the similarity score
    score = doc1.similarity(doc2)

    return score


def get_eval_score(
    prediction: str,
    reference: str,
    metric: str,
    **kwargs,
):
    prediction = prediction.strip().lower()
    reference = reference.strip().lower()

    if metric == "spacy_sim":
        score = get_similarity_score(prediction, reference)
        return dict(spacy_sim=score)

    m = evaluate.load(metric)

    # dictionary containing the evaluation metric values
    m_dict: dict = m.compute(
        predictions=[prediction],
        references=[reference],
        **kwargs,
    )

    # iterate over the dictionary and prepend the metric name to the key
    # if not already present (e.g. "bertscore" -> "bertscore_f1")
    # m_dict = {f"{metric}_{k}": v for k, v in m_dict.items() if not k.startswith(metric)}
    m_dict_new = {}
    for k, v in m_dict.items():
        if not k.startswith(metric):
            k = f"{metric}_{k}"
        m_dict_new[k] = v

    # if any of the values are lists with just 1 element, then unpack the list
    m_dict_new = {
        k: v[0] if isinstance(v, list) and len(v) == 1 else v
        for k, v in m_dict_new.items()
    }

    assert len(m_dict_new) > 0, f"Metric: {metric} returned empty dict"

    return m_dict_new


def create_plots(
    ctx_name: str,
    scores_by_model: dict[str, list],
    scores_by_answer: dict[str, dict[str, list]],
    scores_by_question: dict[str, dict[str, dict]],
    tablefmt="double_grid",
    savedir="./plots",
):
    print("#" * 80)
    print("Plotting")

    os.makedirs(savedir, exist_ok=True)

    models = list(scores_by_model.keys())
    num_models = len(models)
    num_questions = len(scores_by_question[models[0]])

    print(f"Models: {models}")
    print(f"Questions: {num_questions}")

    #############################################################################
    # scores_by_question

    # Plot Version
    # First subplots are individual models
    fig = plt.figure(figsize=((num_models + 1) * 5, 5))
    all_scores = []
    for i, m in enumerate(models):
        ax = fig.add_subplot(1, len(models) + 1, i + 1)
        scores = [d["score"] for d in scores_by_question[m]]
        all_scores.append(scores)
        ax.bar(range(len(scores)), scores)
        ax.set_ylim(-0.05, 1.05)
        ax.set_title(m)
        ax.set_ylabel("Evaluation Score")
        ax.set_xlabel("Question Index")

    # Last subplot is average across all models
    ax = fig.add_subplot(1, len(models) + 1, len(models) + 1)
    # convert to numpy array and average across axis 0
    all_scores = np.array(all_scores)
    avg_scores = np.mean(all_scores, axis=0)
    ax.bar(range(len(avg_scores)), avg_scores)
    ax.set_ylim(-0.05, 1.05)
    ax.set_title("Average")
    ax.set_ylabel("Evaluation Score")
    ax.set_xlabel("Question Index")

    fig.suptitle(f"QA Score by Question - (CTX: {ctx_name})")
    plt.show()
    fig.savefig(os.path.join(savedir, f"{ctx_name}.scores_by_question.png"))
    plt.close(fig)

    # Table Version
    headers = ["Q Idx", "Model", "Score", "Question", "Answer", "Expected Answer"]
    table = []
    scores = []
    for model, questions in scores_by_question.items():
        for i, data in enumerate(questions):
            score = data["score"]
            scores.append(score)
            question = data["question"]
            answer = data["answer"]
            expected_answer = data["expected_answer"]
            table.append([i, model, cscore(score), question, answer, expected_answer])

    # sort by question index
    table = sorted(table, key=lambda x: x[0])

    # last row for average across the scores
    avg_score = np.mean(scores)
    table.append(["Avg", "-", cscore(avg_score), "-", "-"])

    print(
        tabulate(
            table,
            headers=headers,
            tablefmt=tablefmt,
        )
    )

    #############################################################################
    # scores_by_answer
    fig = plt.figure(figsize=(num_models * 5, 5))

    # Boxplot Version
    expected_answers = list(scores_by_answer[models[0]].keys())
    for i, m in enumerate(models):
        ax = fig.add_subplot(1, len(models), i + 1)
        ax.boxplot(scores_by_answer[m].values())
        ax.set_xticklabels(range(len(expected_answers)))
        ax.set_ylim(-0.05, 1.05)
        ax.set_title(m)
        ax.set_ylabel("Evaluation Score")
        ax.set_xlabel("Expected Answer Group")

    fig.suptitle(
        f"QA Score by Expected Answer - {num_questions} Q's each (CTX: {ctx_name})"
    )
    plt.show()
    fig.savefig(os.path.join(savedir, f"{ctx_name}.scores_by_answer.png"))
    plt.close(fig)

    # Table Version
    headers = ["Model", "A Idx", "Expected Answer", "Min", "Mean", "Max"]
    table = []
    min_scores, mean_scores, max_scores = [], [], []
    for model, answers in scores_by_answer.items():
        for i, a in enumerate(answers.keys()):
            scores = answers[a]
            smin = min(scores)
            smean = sum(scores) / len(scores)
            smax = max(scores)
            min_scores.append(smin)
            mean_scores.append(smean)
            max_scores.append(smax)
            table.append(
                [
                    model,
                    i,
                    a,
                    cscore(smin),
                    cscore(smean),
                    cscore(smax),
                ]
            )

    # sort by answer index
    table = sorted(table, key=lambda x: x[1])

    # average across the scores (min, mean, max)
    min_avg, mean_avg, max_avg = (
        np.mean(min_scores),
        np.mean(mean_scores),
        np.mean(max_scores),
    )
    table.append(
        [
            "Avg",
            "-",
            "-",
            cscore(min_avg),
            cscore(mean_avg),
            cscore(max_avg),
        ]
    )

    print(
        tabulate(
            table,
            headers=headers,
            tablefmt=tablefmt,
        )
    )

    #############################################################################
    # scores_by_model

    # Boxplot Version
    fig, ax = plt.subplots(figsize=(10, 5))
    ax.boxplot(scores_by_model.values())
    ax.set_xticklabels(scores_by_model.keys(), rotation=10, ha="right")
    ax.set_ylim(-0.05, 1.05)
    ax.set_title(f"QA Score by Model - {num_questions} Q's each (CTX: {ctx_name})")
    ax.set_ylabel("Evaluation Score")
    ax.set_xlabel("Model ID")
    plt.show()
    fig.savefig(os.path.join(savedir, f"{ctx_name}.scores_by_model.png"))
    plt.close(fig)

    # Table Version
    headers = ["Model", "Min", "Mean", "Max"]
    table = []
    min_scores, mean_scores, max_scores = [], [], []
    for i, m in enumerate(scores_by_model.keys()):
        scores = scores_by_model[m]
        smin = min(scores)
        smean = sum(scores) / len(scores)
        smax = max(scores)
        min_scores.append(smin)
        mean_scores.append(smean)
        max_scores.append(smax)
        table.append(
            [
                m,
                cscore(smin),
                cscore(smean),
                cscore(smax),
            ]
        )

    # average across the scores (min, mean, max)
    min_avg, mean_avg, max_avg = (
        np.mean(min_scores),
        np.mean(mean_scores),
        np.mean(max_scores),
    )
    table.append(["Avg", cscore(min_avg), cscore(mean_avg), cscore(max_avg)])

    print(
        tabulate(
            table,
            headers=headers,
            tablefmt=tablefmt,
        )
    )

## Part 1 - Question-Answering pipeline

**Objective** - Implement a prompt interface that takes in a question, runs it through the question-answering pipeline and returns the answer

Tasks:
- Find five 300-words sections from a book that introduces the following:
  - Protagonist
  - Antagonist
  - Crime and crime scene
  - Any significant evidence
  - Resolution of crime/a narrative that presents the case against perpetrator

- Ask the model questions and return the answers
- Document the results
- Use different Question-Answering model to do same tasks mentioned above and document differences in the results

In [None]:
from typing import Optional
from transformers import pipeline
import torch
from pprint import pprint
from .utils import get_similarity_score, get_eval_score


def run_qa(
    question: str,
    context: str,
    model: Optional[str] = None,
    verbosity: int = 1,
    **kwargs,
):
    print("-" * 80)
    match verbosity:
        case 1:
            print(f"model: {model}")
        case 2:
            print(f"model: {model}")
            for k, v in kwargs.items():
                print(f"{k}: {v}")
            print("~" * 80)

    # Construct Pipeline

    device = "cuda" if torch.cuda.is_available() else "cpu"
    pipe = pipeline(
        "question-answering",
        model=model,
        # model="deepset/roberta-base-squad2",
        device=device,
    )

    # Run Pipeline

    question = question.strip()
    context = context.strip()

    match verbosity:
        case 0:
            pass
        case 1:
            print(f"Q: {question}")
        case 2:
            print(f"C: {context}")
            print(f"Q: {question}")

    # display(Markdown(f"**Q:** {question}"))
    # display(Markdown(f"**C:** {context}"))

    res = pipe(
        question=question,
        context=context,
        **kwargs,
    )
    # pprint(res)

    answer, score = "idk", 1.0

    # Get the result
    if res and isinstance(res, dict):
        answer = res.get("answer", "idk")
        score = res.get("score", 1.0)

    answer = answer.strip()
    score = round(score, 3)

    print(f"A: {answer} (model confidence score: {round(score, 3)})")
    # display(Markdown(f"**A:** {answer} (score: {score})"))

    return answer


def run_qa_models(
    question: str,
    context: str,
    models: list[str],
    answer_true: str,
    **kwargs,
):
    if answer_true is None:
        answer_true = ""

    answer_preds = []
    score_dicts = []

    for model in models:
        answer_pred = run_qa(
            question,
            context,
            model=model,
            **kwargs,
        )

        # compare the predicted answer to the true answer
        score_dict_1 = get_eval_score(
            answer_pred,
            answer_true,
            metric="spacy_sim",
        )
        score_dict_2 = get_eval_score(
            answer_pred,
            answer_true,
            metric="bertscore",
            lang="en",
            model_type="microsoft/deberta-xlarge-mnli",
        )
        score_dict_3 = get_eval_score(
            answer_pred,
            answer_true,
            metric="rouge",
        )

        score_dict = {**score_dict_1, **score_dict_2, **score_dict_3}
        pprint(score_dict)

        answer_preds.append(answer_pred)
        score_dicts.append(score_dict)

    return answer_preds, score_dicts


## Part 2 - Translation pipeline (French)

**Objective** - Utilize a translation pipeline that translates the answers found in Part 1 into French and back to English Tasks:

```
> Question
> Answer in English
> Answer in French
> Answer in English, translated from French
```

- Document the results
- Use different Translation model to do same tasks mentioned above and document differences in the results


In [None]:
from typing import Optional
from transformers import pipeline
import torch
from pprint import pprint


def run_tr(
    text: str,
    model: Optional[str] = None,
    pipeline_name: str = "translation_en_to_fr",
    verbosity: int = 1,
    **kwargs,
):
    print("-" * 100)
    match verbosity:
        case 1:
            print(f"model: {model}")
        case 2:
            print(f"model: {model}")
            for k, v in kwargs.items():
                print(f"{k}: {v}")
            print("~" * 80)

    # Construct Pipeline

    device = "cuda" if torch.cuda.is_available() else "cpu"
    pipe = pipeline(
        pipeline_name,
        model=model,
        device=device,
    )

    # Run Pipeline

    text = text.strip()

    # match verbosity:
    #     case 0:
    #         pass
    #     case 1:
    #         print(f"> {text}")

    res = pipe(
        text,
        **kwargs,
    )
    # pprint(res)

    # Get the result
    translation_text = "idk"
    if res and isinstance(res, list):
        assert len(res) == 1, "Expected only 1 result"
        translation_text = res[0].get("translation_text", "idk")

    translation_text = translation_text.strip()

    # print()
    print(f"> {translation_text}")
    return translation_text


def run_tr_models(
    text: str,
    models: list[tuple[str]],
    **kwargs,
):
    translation_preds = []
    score_dicts = []

    for model_en_to_fr, model_fr_to_en in models:
        # translate original text from english to french
        text_fr = run_tr(
            text, model=model_en_to_fr, pipeline_name="translation_en_to_fr", **kwargs
        )
        # translate the french text back to english
        text_en = run_tr(
            text_fr,
            model=model_fr_to_en,
            pipeline_name="translation_fr_to_en",
            **kwargs,
        )

        # evaluate by comparing the text translated back to english to the original english text
        score_dict_1 = get_eval_score(
            text_en,
            text,
            metric="spacy_sim",
        )
        score_dict_2 = get_eval_score(
            text_en,
            text,
            metric="bertscore",
            lang="en",
            model_type="microsoft/deberta-xlarge-mnli",
        )
        score_dict_3 = get_eval_score(
            text_en,
            text,
            metric="rouge",
        )

        score_dict = {**score_dict_1, **score_dict_2, **score_dict_3}
        pprint(score_dict)

        translation_preds.append((text_fr, text_en))
        score_dicts.append(score_dict)

    return translation_preds, score_dicts


## Part 3 - Summarization pipeline

**Objective** - Utilize a text summarization pipeline that summarizes the 300-words sections found in Part 1 Tasks:

- Run the five 300-words sections through pipeline
- Document the results
- Use different text summarization model to do same tasks mentioned above and document differences in the results

In [None]:
from typing import Optional
from transformers import pipeline
import torch
from pprint import pprint

def run_sum(
    text: str,
    model: Optional[str] = None,
    verbosity: int = 1,
    **kwargs,
):
    # TODO: set some good defaults here?
    # kwargs.setdefault("min_length", 5)
    # kwargs.setdefault("max_length", 20)

    print("-" * 80)
    match verbosity:
        case 2:
            print(f"model: {model}")
            for k, v in kwargs.items():
                print(f"{k}: {v}")
            print("~" * 80)

    # Construct Pipeline

    device = "cuda" if torch.cuda.is_available() else "cpu"
    pipe = pipeline(
        "summarization",
        model=model,
        device=device,
    )

    # Run Pipeline

    text = text.strip()

    match verbosity:
        case 1 | 2:
            print(f"> {text}")

    res = pipe(
        text,
        **kwargs,
    )
    # pprint(res)

    # Get the result
    summary_text = "idk"
    if res and isinstance(res, list):
        assert len(res) == 1, "Expected only 1 result"
        summary_text = res[0].get("summary_text", "idk")

    summary_text = summary_text.strip()

    # print()
    print(f"> {summary_text}")

    return summary_text


def run_sum_models(
    text: str,
    models: list[str],
    expected_answer: str,
    metric: str = "spacy_sim",
    **kwargs,
):
    if expected_answer is None:
        expected_answer = ""

    answers = []
    scores = []

    for model in models:
        a = run(text, model)
        s = get_eval_score(a, expected_answer, metric, **kwargs)

        answers.append(a)
        scores.append(s)

    return answers, scores


## Part 4: ChatBot Implementation

Implement the ChatBot functionality

In [None]:
model = [
    "distilbert-base-uncased-distilled-squad",
]

def QA_Func(ctx_name, question):
  for ctx_idx, (ctx_fname, ctx_text) in enumerate(read_context(ctx_name)):
    ctx_fname = os.path.basename(ctx_fname)
    print("#" * 80)
    print("#" * 80)
    print(ctx_text)
    

    # for q_idx, (q_text, q_answer_true) in enumerate(read_qa(ctx_name)):
    #   print("=" * 80)
    #   print("=" * 80)
    #   print(f"Current Question: {q_text}")
    #   print(f"Expected Answer: {q_answer_true}")

    q_answers_preds, q_answers_scores = run_qa_models(
      q_text,
      ctx_text,
      model,
      q_answer_true,
    )

In [None]:
# translation

In [None]:
# summarization

In [None]:
# function to call for qa
# function to call for translation
# function to call for summarization
import sys

logo = """
=================================
  ____ _           _   ____ ___ 
 / ___| |__   __ _| |_|  _ \_ _|
| |   | '_ \ / _` | __| |_) | | 
| |___| | | | (_| | |_|  __/| | 
 \____|_| |_|\__,_|\__|_|  |___|
 
=================================
    """
print(logo)
print("Welcome! Please select a context!")
print("[1] Protagonist\n[2] Antagonist\n[3] Crime\n[4] Evidence\n[5] Resolution\n")
print("Type 'help' for more commands!")

context_dictionary = {
    1: "protagonist",
    2: "antagonist",
    3: "crime",
    4: "evidence",
    5: "resolution"
}

# Pick context
# Give example questions to use for said context
# When asked, run the pipeline and return 

while True:  
    user_input = input("> ")

    match user_input.lower():
      case 'protagonist' | '1':
        print(" Loading protagonist\n")
        # handle qa and translation stuff first
        ctx_name = 'protagonist'
        QA_Func(ctx_name, "Who is the main character that the story revolves around?")

      case 'antagonist' | '2':
        print(" Loading antagonist\n")

      case 'crime' | '3':
        print(" Loading crime\n")

      case 'evidence' | '4':
        print(" Loading evidence\n")

      case 'resolution' | '5':
        print(" Loading resolution\n")

      case 'help':
        print("Select a context: \n[1] Protagonist\n[2] Antagonist\n[3] Crime\n[4] Evidence\n[5] Resolution\n\nOr type\nHelp -- list commands\nQuit -- exit the program\n")

      case 'quit':
        print("exiting program...\n")
        sys.exit()

      case _:
        print("please enter a command\n")
    