In [1]:
import os
import random
import pandas as pd
from transformers import pipeline
import numpy as np
from tqdm.notebook import tqdm

In [2]:
import_df = pd.read_csv("local_test.csv")
test_files = []
for tf in list(import_df["0"]):
    test_files.append(tf[:-4])

In [3]:
def calc_word_indices(full_text, discourse_start, discourse_end):
    start_index = len(full_text[:discourse_start].split())
    token_len = len(full_text[discourse_start:discourse_end].split())
    output = list(range(start_index, start_index + token_len))
    if output[-1] >= len(full_text.split()):
        output = list(range(start_index, start_index + token_len-1))
    return output

def predict_and_format(fileName, predict_df):
    data = ""
    with open('train/' + fileName + ".txt", 'r') as file:
        data = file.read().replace('\n', '')
        data = data.replace("Â\xa0", '')
    
    predicts = token_classifier(data)

    for p in predicts:
        word_Indices_Array = calc_word_indices(data, p["start"], p["end"])
        word_Indices_String = " ".join(str(x) for x in word_Indices_Array)
        word_class = p["entity_group"]
        word_id = fileName[:-4]
        if len(word_Indices_Array) >= 3:
            predict_df.loc[len(predict_df.index)] = [word_id, word_class, word_Indices_String]

In [4]:
#other evaluation
# source: https://www.kaggle.com/robikscube/student-writing-competition-twitch#Competition-Metric-Code

def calc_overlap(row):
    """
    Calculates the overlap between prediction and
    ground truth and overlap percentages used for determining
    true positives.
    """
    set_pred = set(row.predictionstring_pred.split(" "))
    set_gt = set(row.predictionstring_gt.split(" "))
    # Length of each and intersection
    len_gt = len(set_gt)
    len_pred = len(set_pred)
    inter = len(set_gt.intersection(set_pred))
    overlap_1 = inter / len_gt
    overlap_2 = inter / len_pred
    return [overlap_1, overlap_2]


def score_feedback_comp_micro(pred_df, gt_df):
    """
    A function that scores for the kaggle
        Student Writing Competition

    Uses the steps in the evaluation page here:
        https://www.kaggle.com/c/feedback-prize-2021/overview/evaluation
    """
    gt_df = (
        gt_df[["id", "discourse_type", "predictionstring"]]
        .reset_index(drop=True)
        .copy()
    )
    pred_df = pred_df[["id", "class", "predictionstring"]].reset_index(drop=True).copy()
    pred_df["pred_id"] = pred_df.index
    gt_df["gt_id"] = gt_df.index
    # Step 1. all ground truths and predictions for a given class are compared.
    joined = pred_df.merge(
        gt_df,
        left_on=["id", "class"],
        right_on=["id", "discourse_type"],
        how="outer",
        suffixes=("_pred", "_gt"),
    )
    joined["predictionstring_gt"] = joined["predictionstring_gt"].fillna(" ")
    joined["predictionstring_pred"] = joined["predictionstring_pred"].fillna(" ")

    joined["overlaps"] = joined.apply(calc_overlap, axis=1)

    # 2. If the overlap between the ground truth and prediction is >= 0.5,
    # and the overlap between the prediction and the ground truth >= 0.5,
    # the prediction is a match and considered a true positive.
    # If multiple matches exist, the match with the highest pair of overlaps is taken.
    joined["overlap1"] = joined["overlaps"].apply(lambda x: eval(str(x))[0])
    joined["overlap2"] = joined["overlaps"].apply(lambda x: eval(str(x))[1])

    joined["potential_TP"] = (joined["overlap1"] >= 0.5) & (joined["overlap2"] >= 0.5)
    joined["max_overlap"] = joined[["overlap1", "overlap2"]].max(axis=1)
    tp_pred_ids = (
        joined.query("potential_TP")
        .sort_values("max_overlap", ascending=False)
        .groupby(["id", "predictionstring_gt"])
        .first()["pred_id"]
        .values
    )

    # 3. Any unmatched ground truths are false negatives
    # and any unmatched predictions are false positives.
    fp_pred_ids = [p for p in joined["pred_id"].unique() if p not in tp_pred_ids]

    matched_gt_ids = joined.query("potential_TP")["gt_id"].unique()
    unmatched_gt_ids = [c for c in joined["gt_id"].unique() if c not in matched_gt_ids]

    # Get numbers of each type
    TP = len(tp_pred_ids)
    FP = len(fp_pred_ids)
    FN = len(unmatched_gt_ids)
    # calc microf1
    my_f1_score = TP / (TP + 0.5 * (FP + FN))
    return my_f1_score


def score_feedback_comp(pred_df, gt_df, return_class_scores=False):
    class_scores = {}
    pred_df = pred_df[["id", "class", "predictionstring"]].reset_index(drop=True).copy()
    for discourse_type, gt_subset in gt_df.groupby("discourse_type"):
        pred_subset = (
            pred_df.loc[pred_df["class"] == discourse_type]
            .reset_index(drop=True)
            .copy()
        )
        class_score = score_feedback_comp_micro(pred_subset, gt_subset)
        class_scores[discourse_type] = class_score
    f1 = np.mean([v for v in class_scores.values()])
    if return_class_scores:
        return f1, class_scores
    return f1

In [5]:
def make_predictions(test_files):
    dict = {'id':[],
        'class': [],
        'predictionstring':[]}
    predict_df = pd.DataFrame(dict)
    for file in tqdm(test_files):
        try:
            predict_and_format(file, predict_df)
        except:
            print(file)
    
    return predict_df

In [6]:
def ground_truth(test_files):
    full_df = pd.read_csv("train.csv")
    full_df = full_df.set_index("id")
    train_df = full_df.loc[test_files]
    return train_df
    

In [7]:
model_checkpoint = "brad1141/bert-finetuned-ner"
token_classifier = pipeline(
    "token-classification", model="longformer_v1", aggregation_strategy="simple", tokenizer='longformer_v1'
)

In [8]:
results_df = make_predictions(test_files)
gt_df = ground_truth(test_files)

  0%|          | 0/2329 [00:00<?, ?it/s]

168E5BD0AECE


In [None]:
f1_score = score_feedback_comp(results_df, gt_df)