In [1]:
import pandas as pd
import json
from pprint import pprint

## Load SQAD predictions

In [4]:
def load_sqad(dataset, model):
    with open(f"dataset/{dataset}_dev.json") as f:
        dev = json.load(f)

    counts = []
    questions = {}

    for doc in dev["data"]:
        for par in doc["paragraphs"]:
            counts.append(len(par["qas"]))
            for qq in par["qas"]:
                if not qq["answers"]:
                    continue
                q = {
                    "question": qq["question"],
                    "answer": qq["answers"][0]["text"],
                    "context": par["context"]
                }
                questions[qq["id"]] = q

#     pprint(next(iter(questions.items())))

    with open(f"models/{model}_{dataset}/nbest_predictions.json") as f:
        pred = json.load(f)
        
    return questions, pred

## Longest common sequence getter
find the overlap between answer and prediction

In [5]:
def lcs(X, Y, split=True):
    if split:
        X = X.split(" ")
        Y = Y.split(" ")
    m = len(X)
    n = len(Y)
    
    L = [[0 for x in range(n+1)] for x in range(m+1)] 
  
    # Following steps build L[m+1][n+1] in bottom up fashion. Note 
    # that L[i][j] contains length of LCS of X[0..i-1] and Y[0..j-1]  
    for i in range(m+1): 
        for j in range(n+1): 
            if i == 0 or j == 0: 
                L[i][j] = 0
            elif X[i-1] == Y[j-1]: 
                L[i][j] = L[i-1][j-1] + 1
            else: 
                L[i][j] = max(L[i-1][j], L[i][j-1]) 
  
    # Following code is used to print LCS 
    index = L[m][n] 
  
    # Create a character array to store the lcs string 
    lcs = [""] * (index+1) 
    lcs[index] = "" 
  
    # Start from the right-most-bottom-most corner and 
    # one by one store characters in lcs[] 
    i = m 
    j = n 
    while i > 0 and j > 0: 
  
        # If current character in X[] and Y are same, then 
        # current character is part of LCS 
        if X[i-1] == Y[j-1]: 
            lcs[index-1] = X[i-1] 
            i-=1
            j-=1
            index-=1
  
        # If not same, then find the larger of two and 
        # go in the direction of larger value 
        elif L[i-1][j] > L[i][j-1]: 
            i-=1
        else: 
            j-=1
  
    if split:
        return " ".join(lcs[:-1])
    return "".join(lcs[:-1])


## Interactive question/answer viewer 

In [27]:
import ipywidgets as widgets
from IPython.display import HTML


def highlight_answer(text, answer):
    sub = lcs(text, answer, False)
    if sub:
        l = text.split(sub)
        return f'<span style="background-color: #CCCC00">{sub}</span>'.join(l).replace("\n", "<br>")
    return text

def prediction_viewer(questions, pred):
    keys = list(questions.keys())
    slider = widgets.IntSlider(
        value=0,
        min=0,
        max=len(keys),
        step=1,
        description='Question:',
        disabled=False,
        continuous_update=False,
        orientation='horizontal',
        readout=True,
        readout_format='d'
    )

    left = widgets.Output(layout={"width": "50%"})
    right = widgets.Output(layout={"width": "50%"})


    def f(val):
        key = keys[val]
        q = questions[key]
        
        left.clear_output()
        with left:
            display(HTML(
                key + "<hr>" + q["question"] + "<hr>" + q["answer"] + "<hr>" + highlight_answer(q["context"], q["answer"])
            ))
        
        right.clear_output()
        with right:
            display(HTML(
                "<hr>".join(
                    highlight_answer(a["text"], q["answer"]).replace("\n", "<br>") + "<br>" + ""
                    f"prob: {a['probability']:5.2f} | slp: {a['start_log_prob']:5.2f} | elp: {a['end_log_prob']:5.2f}"
                        for a in pred[key]
                )
            ))
        
        
    out = widgets.interactive_output(f, {'val': slider})
    display(widgets.VBox([
        slider,
        widgets.HBox([left, right])
    ]))
    


## Evaluation metrics 

In [38]:
def get_match_num(row):
    try:
        i = row.guesses.index(row.answer)
        return i
    except ValueError:
        try:
            i = row.guesses.index(row.answer+",")
            return i
        except ValueError:
            pass

        return 11

    
def coverage(answer, prediction):
    common = lcs(answer, prediction, split=False)
    return len(common) / max(len(answer), len(prediction))

def coverage_list(answer, predictions):
    return max(coverage(answer, pred) for pred in predictions)

def csformat(num):
    return f"{num:.2f}".replace(".", ",")
    
def metrics(adf, count):
    vc = adf.answer_index.value_counts()
    match = vc.loc[range(count)].sum() / len(adf)
    
    fcn = lambda row: coverage_list(row.answer, row.guesses[:count])
    cov = adf.apply(fcn, axis=1).mean()
    
    return csformat(match), csformat(cov)

def evaluate_results(questions, pred):
    for i, key in enumerate(questions.keys()):
        guesses = pred[key]
        questions[key]["guesses"] = [g["text"] for g in guesses]
        questions[key]["index"] = i

    adf = pd.DataFrame.from_dict(questions, orient="index")
    adf["answer_index"] = adf.apply(get_match_num, axis=1)

    return pd.DataFrame({
        "exact": metrics(adf, 1),
        "top5": metrics(adf, 5),
        "top10": metrics(adf, 9),
    }, index=[f"match", f"coverage"])

## Run evaluation

In [36]:
questions, predictions = load_sqad("sqad_extract", "csbase3")
evaluate_results(questions, predictions)

Unnamed: 0,exact,top5,top10
match,18,37,42
coverage,34,64,70


## View questions 

In [37]:
prediction_viewer(questions, predictions)

VBox(children=(IntSlider(value=0, continuous_update=False, description='Question:', max=558), HBox(children=(O…