In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import json

def read_data_from_json(file_path):

  with open(file_path, 'r') as file:
      deeds_data = json.load(file)

  classes = deeds_data['classes']
  annotations = deeds_data['annotations']

  return classes, annotations


In [3]:
import json
import os

def read_data_from_second_json_file(file_path):

  subfolders = os.listdir(file_path)

  valid_entities = {"BUYER NAME", "SELLER NAME", "BUYER ADDRESS", "BUYER NONINDIVIDUAL NAME", "SELLER NONINDIVIDUAL NAME", "PROPERTY ADDRESS", "ASSESSOR PARCEL NUMBER"}

  labelled_text = []
  for subfolder in subfolders:
    subfolder_path = os.path.join(file_path, subfolder)
    json_files = os.listdir(subfolder_path)
    for json_file in json_files:

      full_file_path = os.path.join(subfolder_path, json_file)
      with open(full_file_path, 'r') as file:
        deeds_data = json.load(file)

      classes = deeds_data['classes']
      annotations = deeds_data['annotations']
      for text, entities in annotations:

        entities_present_in_text = entities["entities"]
        updated_entities = entities_present_in_text[:]

        for i, entity_present_in_text in enumerate(entities_present_in_text):

          if len(entities_present_in_text) > 0:
            if entity_present_in_text[2] == 'SELLER NON INDIVIDUAL NAMES':
              updated_entities[i][2] = 'SELLER NONINDIVIDUAL NAME'

            if entity_present_in_text[2] == 'BUYER ADDDRESS':
              updated_entities[i][2] = 'BUYER ADDRESS'

            filtered_entities = list(filter(lambda x: x[2] in valid_entities, updated_entities))

        labelled_text.append([text, {"entities":filtered_entities}])

  return labelled_text



file_path = '/content/drive/My Drive/WD-(4129)/'

def load_test_data_from_json(file_path , second_file_path):

  json_files = os.listdir(file_path)
  classes = ['ASSESSOR PARCEL NUMBER', 'PROPERTY ADDRESS', 'BUYER ADDRESS', 'BUYER NAME', 'BUYER NONINDIVIDUAL NAME', 'SELLER NAME', 'SELLER NONINDIVIDUAL NAME']
  cleaned_annotations = []
  for json_file in json_files:

    full_file_path = os.path.join(file_path, json_file)

    with open(full_file_path, 'r') as file:
      deeds_data = json.load(file)
      for text, entity in deeds_data["annotations"]:
        cleaned_annotations.append([text, entity])

  if second_file_path != None:

    second_batch = read_data_from_second_json_file(second_file_path)
    cleaned_annotations.extend(second_batch)

  print(len(cleaned_annotations))
  return classes, cleaned_annotations


In [4]:
import spacy # 3.3.0
print(spacy.__version__)
from sklearn.utils import shuffle
from tqdm import tqdm

nlp = spacy.blank("en")

3.8.4


In [5]:
def locate_entity_in_text(annotations, idx):

  record = annotations[idx]
  text = record[0]
  entities = record[1]["entities"]

  for start_idx, end_idx, ent, in entities:

    print("entity: ", ent)
    print(text[start_idx:end_idx])
    print("\n")



In [6]:
import re
import itertools


def get_overlapping_tokens(entity, token_matches):
    start_idx, end_idx, name = entity
    token_matches = list(token_matches)

    overlapping_tokens = [
        x for x in token_matches
        if (start_idx <= x.start() <= end_idx) ^ (start_idx <= x.end() <= end_idx)
    ]

    return overlapping_tokens

def adjust_entity_boundaries(patterns, annotations):
    adjusted_data = []

    for text, annotation in annotations:
        token_matches = list(itertools.chain.from_iterable(
            re.finditer(pattern, text) for pattern in patterns
        ))

        if token_matches:
            updated_entities = {"entities": []}
            previous_entities = annotation["entities"]

            for entity in previous_entities:
                start_idx, end_idx, name = entity
                overlapping_tokens = get_overlapping_tokens(entity, token_matches)

                if overlapping_tokens:
                    for token in overlapping_tokens:
                        if end_idx - token.start() > token.end() - start_idx:
                            start_idx = token.end()
                        else:
                            end_idx = token.start()

                    if start_idx < end_idx:
                        updated_entities["entities"].append([start_idx, end_idx, name])
                else:

                    updated_entities["entities"].append([start_idx, end_idx, name])

            adjusted_data.append([text, updated_entities])
        else:
            adjusted_data.append([text, annotation])

    return adjusted_data


pattern1 = re.escape("<laysep@@##$$>")
pattern2 = re.escape("<pagesep@@##$$>")
pattern3 = r"(?:\\u[0-9]{2}[a-z]{2}){3}"
patterns = [pattern1, pattern2, pattern3]


In [7]:

def clean_text(raw_text, patterns):

  ct = re.sub(pattern1, "", raw_text)
  ct = re.sub(pattern2, "", ct)
  ct = re.sub(pattern3, "", ct)

  return ct


def clean_data(correct_annotations):

  cleaned_data = []
  for t, e in correct_annotations:

    ct = clean_text(t, patterns)
    entities = e["entities"]
    updated_dict = {}
    values = []
    for start_idx, end_idx, ent in entities:

      raw_text_copy = t[0:start_idx]
      cleaned_text = clean_text(raw_text_copy, patterns)
      offset = len(raw_text_copy) - len(cleaned_text)
      values.append([start_idx-offset, end_idx-offset, ent])

    updated_dict["entities"] = values
    cleaned_data.append([ct, updated_dict])

  return cleaned_data




In [8]:
from spacy.util import filter_spans
from tqdm import tqdm
from spacy.training import Example
from spacy.tokens import DocBin


def align_data(cleaned_data):

  aligned_data = []
  entity_occurence = {}
  for text, entities in cleaned_data:

    labels = entities["entities"]
    doc = nlp.make_doc(text)
    values = []
    for start, end, name in labels:

      if name in entity_occurence:
        entity_occurence[name] = entity_occurence[name] + 1
      else:
        entity_occurence[name] = 1

      span = doc.char_span(start, end, label=name, alignment_mode="expand")
      if span is not None:
        values.append([span.start_char, span.end_char, span.label_])

    updated_dict = {"entities":values}
    aligned_data.append([text, updated_dict])

  return aligned_data



In [None]:
from transformers import AutoTokenizer
import pandas as pd


def label_data(aligned_data):

  annotated_bert = []
  tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
  for text, entities in aligned_data:

    encoding = tokenizer(text, truncation=False, return_offsets_mapping=True)

    mappings = encoding["offset_mapping"]
    input_ids = encoding["input_ids"]
    labels = ['O']*len(input_ids)

    ents = entities["entities"]
    for i, mapping in enumerate(mappings):
      for start_idx, end_idx, name in ents:

        if mapping == (0,0):
          continue

        if mapping[0] >= start_idx and mapping[1] <= end_idx:

          if mapping[0] == start_idx:
            labels[i] = f"B-{name}"

          else:
            labels[i] = f"I-{name}"
    annotated_bert.append([input_ids, labels])

  return annotated_bert, tokenizer
