In [1]:
import json
import random
import time
from pathlib import Path
import numpy as np
from collections import defaultdict, Counter
from pprint import pprint
import tqdm
from sklearn.metrics import precision_recall_fscore_support
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer, util
import torch


def read_event_dataset(path):
    with open(path) as f:
        dataset = []
        for line in list(f)[1:]:
            id, text, label = line.strip().split("\t")
            item = {
                "id": id, "text": text, "label": label
            }
            dataset.append(item)
    return dataset

## Prepare Dataset and Labels

In [2]:
DIR = Path("data")

dataset = read_event_dataset(DIR / "test_set_final_release_with_labels.tsv")
texts = [x["text"] for x in dataset]
y_true = [x["label"] for x in dataset]

In [3]:
with open(DIR / "acled_label_to_name.json") as f:
    label_to_text = json.load(f)
    
label_names = sorted(label_to_text)
label_texts = [label_to_text[l] for l in label_names]

In [4]:
ZS_LABELS = ["ORG_CRIME", "NATURAL_DISASTER", "MAN_MADE_DISASTER", "DIPLO", "ATTRIB"]

## Default Label-Similarity Baseline

A super simple zero-shot approach:
* encode label names and the texts we want to classify in same embedding space
* for a given text, assign label to whose embedding the text is closest using cosine similarity

In [5]:
def classify_with_cosine(
        model,
        label_names,
        label_texts=None,
        input_texts=None,
        input_embeddings=None,
        label_embeddings=None
    ):
    """
    * label_names: official names/ids of labels that are outputted as predictions
    * label_texts: texts to build label representations from
    """

    if label_embeddings is None:
        label_embeddings = model.encode(label_texts)

    if input_embeddings is None:
        input_embeddings = model.encode(input_texts)
        
    S = util.pytorch_cos_sim(input_embeddings, label_embeddings)    
    predicted_labels = []
    
    for i in range(len(input_embeddings)):
        label_scores = S[i]                
        scored = sorted(zip(label_names, label_scores), key=lambda x: x[1], reverse=True)
        pred = scored[0][0]
        predicted_labels.append(pred)        
    
    return predicted_labels


def evaluate(true_labels, pred_labels, label_set=None):
    for avg in ["micro", "macro", "weighted"]:        
        p, r, f, _ = precision_recall_fscore_support(true_labels, pred_labels, average=avg, labels=label_set)
        gap = " " * (9 - len(avg))
        print(f"{avg}{gap}precision: {p:.3f}, recall: {r:.3f}, f-score: {f:.3f}")

## Pseudo-Exemplars + KNN Approach

1. Use above label-similarity approach to heuristically mine k exemplars for each label. 

2. Use these exemplars to build a KNN classifier

In this experiment we will use the same test set to get initial predictions to mine exemplars, apply KNN and to evaluate our final predictions. <br> Note that we do not involve the real labels until the final evaluation.

In [6]:
def get_top_k_exemplars(
        model,
        label_names,
        label_texts=None,
        input_texts=None,
        input_embeddings=None,
        label_embeddings=None,
        k=10,
    ):
    """
    For each label, collect k examples closest to the label embedding.
    """
    if input_embeddings is None:
        input_embeddings = model.encode(input_texts)
    if label_embeddings is None:
        label_embeddings = model.encode(label_texts)

    S = util.pytorch_cos_sim(label_embeddings, input_embeddings)
    label_to_exemplars = defaultdict(list)

    n_labels, n_examples = S.shape
    
    for i in range(n_labels):
        label = label_names[i]
        scores = S[i]
        scored_indices = zip(range(n_examples), scores)
        scored_indices = sorted(
            scored_indices,
            key=lambda x: x[1],
            reverse=True
        )
        for j, score in scored_indices[:k]:
            exemplar = {
                "index": j,
                "text": input_texts[j],
                "score": score
            }
            label_to_exemplars[label].append(exemplar)
    return label_to_exemplars


def classify_with_exemplars(
        model, 
        label_names,
        input_texts=None,        
        input_embeddings=None,
        label_to_exemplars=None,
        knn=5,
    ):
    """
    Classify using weighted KNN based on a fixed small set of exemplars for 
    each class.
    """
    if input_embeddings is None:
        input_embeddings = model.encode(input_texts)

    label_to_similarities = {}
    for label, exemplars in label_to_exemplars.items():
        texts = [x["text"] for x in exemplars]
        exemplar_embeddings = model.encode(texts)
        label_to_similarities[label] = cosine_similarity(input_embeddings, exemplar_embeddings)
    
    predicted_labels = []
    for i in range(len(input_embeddings)):
        results = []
        for label in label_names:
            similarities = label_to_similarities[label]
            for score in similarities[i]:
                results.append((label, score))
        results.sort(key=lambda x: x[1], reverse=True)
        label_to_score = defaultdict(float)
        for label, score in results[:knn]:
            label_to_score[label] += score
        
        top_label = max(label_to_score, key=label_to_score.get)
        predicted_labels.append(top_label)

    return predicted_labels

## Label Propagation

In [7]:
import numpy as np
from sklearn import datasets
from sklearn.semi_supervised import LabelPropagation, LabelSpreading


def classify_with_label_propagation(
        model, 
        label_names,
        input_texts=None,        
        input_embeddings=None,
        label_to_exemplars=None,
    ):
    
    if input_embeddings is None:
        input_len = len(input_texts)
        input_embeddings = model.encode(input_texts)
    else:
        input_len = len(input_embeddings)
    
    label_prop_model = LabelSpreading()
    labels = [-1] * input_len
    
    num_to_label = dict(enumerate(label_names))
    label_to_num = dict((v, k) for k, v in num_to_label.items())
    
    for l, exemplars in label_to_exemplars.items():
        for x in exemplars:
            labels[x["index"]] = label_to_num[l]
    
    label_prop_model.fit(input_embeddings, labels)
    predicted_labels = label_prop_model.predict(input_embeddings)
    predicted_labels = [num_to_label[num] for num in predicted_labels]
    return predicted_labels

## Run Classification

In [8]:
model = SentenceTransformer("paraphrase-mpnet-base-v2", device="cpu")

You try to use a model that was created with version 1.2.0, however, your version is 1.1.0. This might cause unexpected behavior or errors. In that case, try to update to the latest version.





In [9]:
label_embeddings = model.encode(label_texts)

In [10]:
input_embeddings = model.encode(texts)

In [11]:
predicted_labels = classify_with_cosine(
    model,
    label_names,
    label_embeddings=label_embeddings,
    input_embeddings=input_embeddings,
)

In [12]:
label_to_exemplars = get_top_k_exemplars(
    model,
    label_names,
    input_texts=texts,
    label_embeddings=label_embeddings,
    input_embeddings=input_embeddings,
    k=20,
)

In [13]:
predicted_labels_ex = classify_with_exemplars(
    model, 
    label_names,
    input_embeddings=input_embeddings,
    label_to_exemplars=label_to_exemplars,
    knn=10,
)

In [14]:
predicted_labels_lp = classify_with_label_propagation(
    model, 
    label_names,
    input_embeddings=input_embeddings,
    label_to_exemplars=label_to_exemplars,
)

  probabilities /= normalizer


## Evaluation

In [15]:
evaluate(y_true, predicted_labels)
print("\nZero shot labels only:\n")
evaluate(y_true, predicted_labels, label_set=ZS_LABELS)

micro    precision: 0.520, recall: 0.520, f-score: 0.520
macro    precision: 0.528, recall: 0.495, f-score: 0.461
weighted precision: 0.569, recall: 0.520, f-score: 0.489

Zero shot labels only:

micro    precision: 0.782, recall: 0.358, f-score: 0.491
macro    precision: 0.871, recall: 0.383, f-score: 0.467
weighted precision: 0.870, recall: 0.358, f-score: 0.431


  _warn_prf(average, modifier, msg_start, len(result))


In [16]:
evaluate(y_true, predicted_labels_ex)
print("\nZero shot labels only:\n")
evaluate(y_true, predicted_labels_ex, label_set=ZS_LABELS)

micro    precision: 0.600, recall: 0.600, f-score: 0.600
macro    precision: 0.579, recall: 0.590, f-score: 0.557
weighted precision: 0.631, recall: 0.600, f-score: 0.585

Zero shot labels only:

micro    precision: 0.840, recall: 0.663, f-score: 0.741
macro    precision: 0.854, recall: 0.684, f-score: 0.742
weighted precision: 0.846, recall: 0.663, f-score: 0.723


In [17]:
evaluate(y_true, predicted_labels_lp)
print("\nZero shot labels only:\n")
evaluate(y_true, predicted_labels_lp, label_set=ZS_LABELS)

micro    precision: 0.310, recall: 0.310, f-score: 0.310
macro    precision: 0.576, recall: 0.335, f-score: 0.381
weighted precision: 0.623, recall: 0.310, f-score: 0.378

Zero shot labels only:

micro    precision: 0.816, recall: 0.326, f-score: 0.466
macro    precision: 0.774, recall: 0.371, f-score: 0.485
weighted precision: 0.741, recall: 0.326, f-score: 0.435


### Evaluate Exemplars

In [18]:
def evaluate_exemplars(label_to_exemplars, dataset):
    for l in sorted(label_to_exemplars):
        exemplars = label_to_exemplars[l]
        results = []
        for x in exemplars:
            i = x["index"]            
            if l == dataset[i]["label"]:
                results.append(1)
            else:
                results.append(0)
        acc = np.mean(results)
        print(f"Label: {l}, Accuracy: {acc:.3f}")

In [19]:
evaluate_exemplars(label_to_exemplars, dataset)

Label: ABDUCT_DISSAP, Accuracy: 0.750
Label: AGREEMENT, Accuracy: 0.600
Label: AIR_STRIKE, Accuracy: 0.600
Label: ARMED_CLASH, Accuracy: 0.700
Label: ARREST, Accuracy: 0.500
Label: ART_MISS_ATTACK, Accuracy: 0.650
Label: ATTACK, Accuracy: 0.050
Label: ATTRIB, Accuracy: 0.900
Label: CHANGE_TO_GROUP_ACT, Accuracy: 0.200
Label: CHEM_WEAP, Accuracy: 0.850
Label: DIPLO, Accuracy: 0.300
Label: DISR_WEAP, Accuracy: 0.550
Label: FORCE_AGAINST_PROTEST, Accuracy: 0.300
Label: GOV_REGAINS_TERIT, Accuracy: 0.600
Label: GRENADE, Accuracy: 0.850
Label: HQ_ESTABLISHED, Accuracy: 0.550
Label: MAN_MADE_DISASTER, Accuracy: 0.300
Label: MOB_VIOL, Accuracy: 0.350
Label: NATURAL_DISASTER, Accuracy: 0.650
Label: NON_STATE_ACTOR_OVERTAKES_TER, Accuracy: 0.350
Label: NON_VIOL_TERRIT_TRANSFER, Accuracy: 0.250
Label: ORG_CRIME, Accuracy: 0.750
Label: OTHER, Accuracy: 0.050
Label: PEACE_PROTEST, Accuracy: 0.900
Label: PROPERTY_DISTRUCT, Accuracy: 0.650
Label: PROTEST_WITH_INTER, Accuracy: 0.050
Label: REM_EXPLOS

## Conclusions

* Two-pass classification with pseudo-exemplars works extremely well compared to default zero-shot baseline
* Need to sanity-check this on other folds/datasets
* While it adds running time within this test set, it is still a fast method since we can keep a fixed KNN classifier for future predictions 
* Actual label propagation algorithms from sklearn don't work great so far
