In [1]:
import os
import spacy
import pickle
from collections import namedtuple
import random
from cassis import load_typesystem, load_cas_from_xmi
import pandas as pd
import numpy as np

In [2]:
NumberAnnotation = namedtuple("NumberAnnotation", ["tokens", "span"])
AnnotationSpan = namedtuple("AnnotationSpan", ["begin", "end"])
SharedElement = namedtuple("SharedElement", ["ground", "predicted", "distance"])

In [3]:
def get_paths(test_set_metadata_path, rounds_path="../data/annotations_and_sources"):
  paths_data = {}

  with open(test_set_metadata_path, "rb") as f:
    test_set_metadata = pickle.load(f)

  for fname, annotator in test_set_metadata.items():
    paths_data[fname] = {}
    paths_data[fname]["annotations"] = f"{rounds_path}/annotation/{fname}.txt/{annotator}/{annotator}.xmi"
    paths_data[fname]["typesystem"] = f"{rounds_path}/annotation/{fname}.txt/{annotator}/TypeSystem.xml"
    paths_data[fname]["source"] = f"{rounds_path}/source/{fname}.txt"
    
  return paths_data

In [4]:
def get_source(fname, paths_data):
  with open(paths_data[fname]["source"], "r") as f:
    text = f.read()

  return text

In [5]:
def get_cas(paths):
  with open(paths["typesystem"], "rb") as f:
    typesystem = load_typesystem(f)
    
  with open(paths["annotations"], "rb") as f:
    cas = load_cas_from_xmi(f, typesystem=typesystem)

  return cas

In [6]:
def get_spacy_pipeline(enable_first_rule=True):
  improved_en_pipeline = spacy.load("en_core_web_sm")

  ruler = improved_en_pipeline.add_pipe("entity_ruler", config={"overwrite_ents": True})

  if enable_first_rule:
    ruler.add_patterns([{"label": "CARDINAL", "pattern": [{"LIKE_NUM": True, "OP": "+"}]}]) # Merge consecutive number tokens

  percentages_patterns = [
    [{'LIKE_NUM': True}, {'LOWER': {'IN': ['%', 'percent', 'percentage', 'percentages']}}],
    [{'LIKE_NUM': True}, {'LOWER': 'per'}, {'LOWER': 'cent'}]
  ]

  for pattern in percentages_patterns:
    ruler.add_patterns([{"label": "PERCENT", "pattern": pattern}])

  MONTHS = ('january', 'february', 'march', 'april', 'may', 'june', 'july', 'august', 'september', 'october', 'november', 'december', 'jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec')
  dates_patterns = [
    [{'LIKE_NUM': True}, {'LOWER': {'IN': MONTHS}}, {'LIKE_NUM': True}],
    [{'LIKE_NUM': True}, {'LOWER': 'of'}, {'LOWER': {'IN': MONTHS}}, {'LIKE_NUM': True}],
    [{'LOWER': {'IN': MONTHS}}, {'LIKE_NUM': True}],
    [{'LIKE_NUM': True}, {'LOWER': {'IN': MONTHS}}]
  ]

  for pattern in dates_patterns:
    ruler.add_patterns([{"label": "DATE", "pattern": pattern}])

  return improved_en_pipeline

In [7]:
# Baseline in which we use all the NUM tagged tokens as numbers
def extract_numbers_baseline(doc):
  numbers = []

  for token in doc:
    if token.pos_ == "NUM":
      span = AnnotationSpan(token.idx, token.idx + len(token.text))
      ann = NumberAnnotation(doc[token.i:token.i + 1], span)
      numbers.append([ann])
  
  return numbers

In [8]:
def extract_numbers(doc):
  numbers = []
  named_entities = doc.ents

  for ent in named_entities:
    # If the label is related to numerals and is only one token long or is made only of NUM-tagged tokens, I consider it a possible number
    if (ent.label_ == "PERCENT") or (ent.label_ in ["MONEY", "QUANTITY", "CARDINAL"] and (len(ent) == 1 or all(map(lambda t: t.pos_ == "NUM", ent)))):
      if ent.label_ != "MONEY" and ent.start - 1 >= 0 and doc[ent.start - 1:ent.start][0].pos_ != "PROPN":
        span = AnnotationSpan(ent[0].idx, ent[-1].idx + len(ent[-1].text))
        ann = NumberAnnotation(ent, span)
        numbers.append([ann])
    else:
      # If the label is not related to numerals or it is but is more than a token long (or is longer but is not made only of NUM-tagged tokens), I consider only the NUM-tagged tokens as possible numbers (and ig two or more of them are consecutive I consider them as a possible single number)
      if ent.label_ not in ["ORDINAL", "DATE", "TIME"]:
        nums_group = []
        for token in ent:
          if token.pos_ == "NUM" and token.i - 1 >= 0 and doc[token.i - 1:token.i][0].pos_ != "PROPN":
            token_span = AnnotationSpan(token.idx, token.idx + len(token.text))
            num_ann = NumberAnnotation(doc[token.i:token.i + 1], token_span)
            nums_group.append(num_ann)
          else:
            if len(nums_group) > 0:
              numbers.append(nums_group)
              nums_group = []
        
        if len(nums_group) > 0:
          numbers.append(nums_group)
  
  return numbers

## Predict units

In [9]:
def predict_units(numbers, algorithm="humquant"):
  def search_for_tags(span, tags):
    tokens = []
    i = 0
    
    while i < len(span) and (span[i].pos_ not in tags or span[i].text in ["%", "percent"]):
      i += 1
      
    while i < len(span) and span[i].pos_ in tags:
      tokens.append((span[i], span[i].idx, span[i].idx + len(span[i].text)))
      i += 1

    return tokens

  # Get closest consecutive tokens that match specific constraints
  def get_cct(number_ann, algorithm=algorithm):
    if algorithm == "humquant":
      tokens_before_number = [elem for elem in number_ann.root.head.subtree if elem.i < number_ann.start]
      tokens_after_number = [elem for elem in number_ann.root.head.subtree if elem.i >= number_ann.end]

      # Currencies symbols are taken from https://github.com/vmasek/CurrencyConverter/blob/master/symbols.csv
      CURRENCIES_SYMBOLS = set(pd.read_csv("../data/currencies_symbols.csv", sep="\t")[["Code ISO 4217", "Symbol"]].values.ravel())
      relevant_tokens = []
      for token in tokens_before_number:
        if token.text.upper() in CURRENCIES_SYMBOLS:
          relevant_tokens.append((token, token.idx, token.idx + len(token.text)))
          break
      
      if len(relevant_tokens) == 0:
        relevant_tokens = search_for_tags(tokens_after_number, ["NOUN", "PROPN", "ADJ", "SYM"])

      return relevant_tokens
    elif algorithm == "b1":
      return [(elem, elem.idx, elem.idx + len(elem.text)) for elem in number_ann.root.head.subtree if not number_ann.start <= elem.i < number_ann.end]
    elif algorithm == "b2":
      return [(elem, elem.idx, elem.idx + len(elem.text)) for elem in number_ann.doc[number_ann.end:number_ann.end + 1]]
    
  units_data = {}

  for number_ann in numbers:
    pred_number = number_ann[0].tokens.doc.char_span(number_ann[0].span.begin, number_ann[-1].span.end)

    k = (pred_number.text, number_ann[0].span.begin, number_ann[-1].span.end)
    units_data[k] = get_cct(pred_number)


  return units_data

## Get annotated units

In [10]:
def get_annotated_units(paths_data):
  ground_truth = {}
  get_main_info = lambda ground_elem: (ground_elem.get_covered_text(), pd.Interval(ground_elem.begin, ground_elem.end, closed="left"))

  for fname, paths in paths_data.items():
    ground_truth[fname] = {}

    cas = get_cas(paths)

    ground_numbers = [ann for ann in cas.select("custom.Span") if ann.label == "Number"]
    ground_units = [(ann.Governor, ann.Dependent) for ann in cas.select("custom.Relation") if ann.Governor.label == "Number" and ann.Dependent.label == "Unit"]

    for ground_number in ground_numbers:
      ground_n = get_main_info(ground_number)
      ground_truth[fname][ground_n] = []

    for ground_number, ground_unit in ground_units:
      ground_n = get_main_info(ground_number)
      ground_u = get_main_info(ground_unit)
      ground_truth[fname][ground_n].append(ground_u)

  return ground_truth

## Get predicted units

In [11]:
def get_shared_extracted_numbers(extracted_numbers, annotated_numbers):
  shared_numbers = []

  for extracted_number in extracted_numbers:
    extracted_number_span = pd.Interval(extracted_number[0].span.begin, extracted_number[-1].span.end, closed="left")

    for annotated_number in annotated_numbers:
      if extracted_number_span.overlaps(annotated_number[1]):
        shared_numbers.append((extracted_number, annotated_number))

  return shared_numbers

In [12]:
def get_predicted_units(paths_data, en_pipeline, annotated_numbers_by_fname, extract_numbers_alg=extract_numbers, consider_only_matching_numbers=True, save_numbers_wo_units=False, algorithm="humquant"):
  pred_units = {}
  shared_numbers_ground = {}

  for fname in paths_data.keys():
    source_text = get_source(fname, paths_data)
    source_doc = en_pipeline(source_text)
    extracted_numbers = extract_numbers_alg(source_doc)

    if consider_only_matching_numbers:
      shared_numbers = get_shared_extracted_numbers(extracted_numbers, annotated_numbers_by_fname[fname])
      extracted_numbers = [elem[0] for elem in shared_numbers] # I take only the predicted version of each shared number
      shared_numbers_ground[fname] = [elem[1] for elem in shared_numbers]

    pred_units_data = predict_units(extracted_numbers, algorithm=algorithm)

    pred_units[fname] = {}
    for num, unit in pred_units_data.items():
      pred_num = (num[0], pd.Interval(num[1], num[2], closed="left"))

      if len(unit) > 0:
        pred_unit = (unit[0][0].doc.text[unit[0][1]:unit[-1][2]], pd.Interval(unit[0][1], unit[-1][2], closed="left"))
        
        try:
          pred_units[fname][pred_num].append(pred_unit)
        except KeyError:
          pred_units[fname][pred_num] = [pred_unit]
      else:
        if save_numbers_wo_units:
          pred_units[fname][pred_num] = []

  return pred_units, shared_numbers_ground

In [13]:
def get_rels_by_fname(full_raw_data):
  rels = {}

  for fname, anns in full_raw_data.items():
    rels[fname] = []

    for num_ann, unit_anns in anns.items():
      if len(unit_anns) > 0:
        for unit_ann in unit_anns:
          rels[fname].append((num_ann, unit_ann))
      else:
        rels[fname].append((num_ann, None))

  return rels

In [14]:
def get_distance(pred, ground):
  def get_intersection(fst, snd):
    if snd.left > fst.right or fst.left > snd.right:
      return None # It's not needed because this function will be called only in overlapping scenarios
    else:
      overlap_l = max(fst.left, snd.left)
      overlap_r = min(fst.right, snd.right)

    return pd.Interval(overlap_l, overlap_r, closed="left")
  
  intersection = get_intersection(pred[1], ground[1])
  diff = (pred[1].right - pred[1].left) + (ground[1].right - ground[1].left) - 2 * (intersection.right - intersection.left)

  return diff

In [15]:
def custom_intersection_for_docs_sets(ground_rels, pred_rels):
  intersection = set()
  already_matched_ground_numbers = set()

  for pred_rel_num, pred_rel_unit in pred_rels:
    for ground_rel_num, ground_rel_unit in ground_rels:
      if pred_rel_num[1].overlaps(ground_rel_num[1]):
        if ground_rel_num not in already_matched_ground_numbers:
          if (ground_rel_unit is None and pred_rel_unit is None) or (pred_rel_unit is not None and ground_rel_unit is not None and pred_rel_unit[1].overlaps(ground_rel_unit[1])):
            if ground_rel_unit is None or pred_rel_unit is None:
              distance = 0
            else:
              distance = get_distance(pred_rel_unit, ground_rel_unit)

            intersection.add(SharedElement((ground_rel_num, ground_rel_unit), (pred_rel_num, pred_rel_unit), distance))
            already_matched_ground_numbers.add(ground_rel_num)
            break

  return intersection

## Evaluation over the entire set

In [16]:
def pretty_print(to_print):
  final_str = ""

  for elem in to_print:
    if not isinstance(elem, SharedElement):
      num_data, unit_data = elem

      final_str += f"{num_data[0]} [B: {num_data[1].left} - E: {num_data[1].right}]\n"
      
      if unit_data is None:
        final_str += "  NO_UNIT\n"
      else:
        final_str += f"  {unit_data[0]} [B: {unit_data[1].left} - E: {unit_data[1].right}]\n"
    else:
      g_num_data, g_unit_data = elem.ground
      p_num_data, p_unit_data = elem.predicted

      final_str += f"G: {g_num_data[0]} [B: {g_num_data[1].left} - E: {g_num_data[1].right}]\n"
      final_str += f"P: {p_num_data[0]} [B: {p_num_data[1].left} - E: {p_num_data[1].right}]\n"

      final_str += f"  === distance: {elem.distance} ===\n"

      # IMPORTANT: refactoring del codice, così è pieno di ripetizioni inutili
      if g_unit_data is None:
        final_str += "  NO_UNIT\n"
      else:
        final_str += f"  G: {g_unit_data[0]} [B: {g_unit_data[1].left} - E: {g_unit_data[1].right}]\n"

      if p_unit_data is None:
        final_str += "  NO_UNIT\n"
      else:
        final_str += f"  P: {p_unit_data[0]} [B: {p_unit_data[1].left} - E: {p_unit_data[1].right}]\n"

    final_str += "\n"

  return final_str

# Evaluation

Variables:

In [17]:
TEST_SET_METADATA_PATH = "../data/test_set_metadata.pkl"
ALGORITHM = "humquant" # One of "humquant" (use our approach), "b1" (use the entire subtree of each extracted number as predicted unit), "b2" (as next consecutive token as predicted unit)
ENABLE_FIRST_RULE = True # Enable the concatenation of consecutive tokens
CONSIDER_ONLY_MATCHING_NUMBERS = False # When evaluating, consider only the predicted numbers (and their units) as the ground truth
SAVE_NUMBERS_WO_UNITS = False # Save predicted numbers that are not linked to a predicted unit
USE_NUMBER_EXTRACTOR_BASELINE = False # Use the baseline for the extraction of numbers

Code:

In [18]:
paths = get_paths(TEST_SET_METADATA_PATH)
en_pipeline = get_spacy_pipeline(enable_first_rule=ENABLE_FIRST_RULE)
annotated_units = get_annotated_units(paths)
annotated_numbers_by_fname = {fname: [num_ann for num_ann, _ in anns.items()] for fname, anns in annotated_units.items()}

extract_numbers_alg = extract_numbers
if USE_NUMBER_EXTRACTOR_BASELINE:
  extract_numbers_alg = extract_numbers_baseline

predicted_units, shared_numbers_ground = get_predicted_units(paths, en_pipeline, annotated_numbers_by_fname, extract_numbers_alg=extract_numbers_alg, consider_only_matching_numbers=CONSIDER_ONLY_MATCHING_NUMBERS, save_numbers_wo_units=SAVE_NUMBERS_WO_UNITS, algorithm=ALGORITHM)

ground_anns_reshaped = get_rels_by_fname(annotated_units)
pred_anns_reshaped = get_rels_by_fname(predicted_units)

assert ground_anns_reshaped.keys() == pred_anns_reshaped.keys()

avg_ps = []
avg_rs = []
avg_f1s = []
diffs = {}

final_str = ""
for fname in ground_anns_reshaped.keys():
  if CONSIDER_ONLY_MATCHING_NUMBERS:
    relevant_docs = [elem for elem in ground_anns_reshaped[fname] if elem[0] in shared_numbers_ground[fname]]
  else:
    relevant_docs = ground_anns_reshaped[fname]
  
  retrieved_docs = pred_anns_reshaped[fname]

  shared_docs = custom_intersection_for_docs_sets(relevant_docs, retrieved_docs)

  if len(relevant_docs) == 0 and len(retrieved_docs) == 0:
    precision = 1.
    recall = 1.
    f1_score = 1.
  else:
    try:
      precision = len(shared_docs) / len(set(retrieved_docs))
    except ZeroDivisionError:
      precision = 0.

    try:
      recall = len(shared_docs) / len(set(relevant_docs))
    except ZeroDivisionError:
      recall = 0.

    try:
      f1_score = (2 * precision * recall) / (precision + recall)
    except ZeroDivisionError:
      f1_score = 0.

  final_str += f"Excerpt: {fname}\n\n"
  final_str += f"=== RELEVANT DOCS ({len(relevant_docs)}) ===\n"
  final_str += pretty_print(relevant_docs)
  final_str += f"=== RETURNED DOCS ({len(retrieved_docs)}) ===\n"
  final_str += pretty_print(retrieved_docs)
  final_str += f"=== SHARED DOCS ({len(shared_docs)}) ===\n"
  final_str += pretty_print(shared_docs)
  final_str += f"P: {precision}, R: {recall}, F1: {f1_score}\n"
  final_str += "#" * 50 + "\n\n"
      
  # Average of differences for each excerpt (by considering only instances that partially match)
  if len(shared_docs) > 0:
    partial_matches_dists = [elem.distance for elem in shared_docs if elem.distance > 0]
    if len(partial_matches_dists) > 0:
      diffs[fname] = sum(partial_matches_dists) / len(partial_matches_dists)

  avg_ps.append(precision)
  avg_rs.append(recall)
  avg_f1s.append(f1_score)

avg_p, std_p = np.mean(avg_ps), np.std(avg_ps)
avg_r, std_r = np.mean(avg_rs), np.std(avg_rs)
avg_f1, std_f1 = np.mean(avg_f1s), np.std(avg_f1s)

avg_diff = sum(diffs.values()) / len(diffs.keys())

print(f"Avg P: {avg_p} (std: {std_p})", f"Avg R: {avg_r} (std: {std_r})", f"Avg F1: {avg_f1} (std: {std_f1})", sep="\n")
print(f"Average difference: {avg_diff} chars")

Avg P: 0.630811087061087 (std: 0.31507495045474854)
Avg R: 0.6233820937207225 (std: 0.3308537470149872)
Avg F1: 0.6037998871117858 (std: 0.30457789068400304)
Average difference: 14.266600529100531 chars


In [19]:
print(final_str)

Excerpt: a2_6051

=== RELEVANT DOCS (3) ===
1.1 million [B: 270 - E: 281]
  people [B: 282 - E: 288]

378,000 [B: 299 - E: 306]
  children [B: 311 - E: 319]

307,000 [B: 324 - E: 331]
  women [B: 336 - E: 341]

=== RETURNED DOCS (3) ===
two [B: 95 - E: 98]
  household multi-sectoral needs assessments [B: 99 - E: 141]

1.1 million [B: 270 - E: 281]
  people [B: 282 - E: 288]

378,000 [B: 299 - E: 306]
  children [B: 311 - E: 319]

=== SHARED DOCS (2) ===
G: 378,000 [B: 299 - E: 306]
P: 378,000 [B: 299 - E: 306]
  === distance: 0 ===
  G: children [B: 311 - E: 319]
  P: children [B: 311 - E: 319]

G: 1.1 million [B: 270 - E: 281]
P: 1.1 million [B: 270 - E: 281]
  === distance: 0 ===
  G: people [B: 282 - E: 288]
  P: people [B: 282 - E: 288]

P: 0.6666666666666666, R: 0.6666666666666666, F1: 0.6666666666666666
##################################################

Excerpt: a2_186485

=== RELEVANT DOCS (4) ===
84% [B: 104 - E: 107]
  women [B: 111 - E: 116]

5% [B: 170 - E: 172]
  NO_UNIT

