In [None]:
import os
isColab = True
try:
  # running on CoLAB Hosted runtime
  print(os.environ['HOSTNAME'])
  print('Running on CoLAB Hosted runtime.')
except:
  # running on Local runtime
  print('You are running this script on your local machine!')
  isColab = False

You are running this script on your local machine!


In [None]:
'''Installing/Upgrading the Libraries Not Available to CoLAB'''
if isColab:
  !pip install -U 'spacy'==3.6.1
  # # Install this if you want to run on Colab GPU
  # !pip install -U spacy[cuda-autodetect]
  !pip install -U rdflib
  !pip install -U spaczz
  !pip install nervaluate
  !pip install -U print-dict

In [None]:
'''Downloading Model lg = larger - with word vectors'''
if isColab:
  !python -m spacy download en_core_web_lg
  !python -m spacy download en_core_web_trf

  # Adding Word Vectors: https://spacy.io/usage/linguistic-features#adding-vectors
  #!wget https://dl.fbaipublicfiles.com/fasttext/vectors-english/wiki-news-300d-1M-subword.vec.zip
  #!python -m spacy init vectors en wiki-news-300d-1M-subword.vec.zip /content/wiki-news-300d-1M-subword

In [None]:
import re
import os
import shutil
import spacy
from spacy.lang.en import English
from spacy.tokenizer import Tokenizer
from spacy.pipeline import Sentencizer
from nervaluate import Evaluator
import bs4
from print_dict import pd as pdic
import requests
from spacy import displacy
from spacy.matcher import Matcher
from spacy.tokens import Span
import matplotlib.pyplot as plt
from tqdm import tqdm
import csv
from textwrap import wrap
import os.path
import functools
import operator
from collections import deque
from operator import itemgetter
from spacy.symbols import nsubj, VERB, AUX

# spaczz lirrary for Similarity Matching
from spaczz.matcher import SimilarityMatcher

# RDFLib libraries
from rdflib import Graph
import pprint
from rdflib import RDFS
from rdflib import URIRef
from rdflib.namespace import RDF

# Colab specific libraries
if isColab:
  from google.colab import files

# General Libraries
import pandas as pd
import srsly
import json
import os
import re
import time
import warnings

import subprocess
import sys

# Spacy Related Imports
import spacy
from spacy.util import minibatch, compounding, compile_infix_regex, get_words_and_spaces
from spacy.tokens import Span, DocBin, Doc
from spacy.vocab import Vocab
from spacy.tokenizer import Tokenizer
from spacy.lang.en import English
from spacy.scorer import Scorer
from spacy.training import Example
# import spacy_sentence_bert

# Ignore OOV warnings
warnings.filterwarnings("ignore", message=r"\[W008\]", category=UserWarning)

# Downloading Disease A-Z Datasets

In [None]:
# Defining some of the Directories... Please also check the Main function at the bottom
COLAB_DIR = ""
if isColab:
  COLAB_DIR = "/content/"

# SCHEMA_FILE = COLAB_DIR+"data/Schema/Disease_KG_Extended.ttl"
STRUCTURED_DATA_DIR = COLAB_DIR+"data/Structured_Data"
MATCHER_DIR = COLAB_DIR+"data/Matcher"

In [None]:
data_path = "/content/data"
if not os.path.exists(data_path):
  '''Downloading the Dataset into CoLab temporary directory'''
  !mkdir /content/data

  # Comment this out when you are not manually providing the dataset
  !mkdir /content/data/Annotated_Text
  !mkdir /content/data/Annotated_Text/test
  !mkdir /content/data/Structured_Data

  # !wget -P /content/data/ 'https://raw.githubusercontent.com/dtim-upc/THOR/main/Dataset/Annotated_Text.zip'
  # !wget -P /content/data/ 'https://raw.githubusercontent.com/dtim-upc/THOR/main/Dataset/Structured_Data.zip'
  # !wget -P /content/data/ 'https://raw.githubusercontent.com/dtim-upc/THOR/main/Dataset/Schema.zip'

  # # unzipping the Dataset contents
  # !unzip "/content/data/Annotated_Text.zip" -d "/content/data/Annotated_Text/" && rm "/content/data/Annotated_Text.zip"
  # !unzip "/content/data/Structured_Data.zip" -d "/content/data/Structured_Data/" && rm "/content/data/Structured_Data.zip"
  # !unzip "/content/data/Schema.zip" -d "/content/data/Schema" && rm "/content/data/Schema.zip"

The syntax of the command is incorrect.
The syntax of the command is incorrect.
The syntax of the command is incorrect.
The syntax of the command is incorrect.


In [None]:
def create_out_dir(OUTPUT_DIR):
  '''This function will be used to remove the outputs in different run'''
  if os.path.exists(OUTPUT_DIR):
    shutil.rmtree(OUTPUT_DIR)

  os.makedirs(OUTPUT_DIR)

#Data Conversion Part

In [None]:
def trim_entity_spans(text, spans):
  '''Data Cleaning: Removes leading and trailing white spaces from entity spans.'''
  invalid_span_tokens = re.compile(r'\s')

  valid_spans = []
  for start, end, label in spans:
    valid_start = start
    valid_end = end
    while valid_start < len(text) and invalid_span_tokens.match(text[valid_start]):
      valid_start += 1
    while valid_end > 1 and invalid_span_tokens.match(text[valid_end - 1]):
      valid_end -= 1

    if valid_start < valid_end:
      valid_spans.append((valid_start, valid_end, label))

  return valid_spans

In [None]:
def clean_entities(entities):
  ''''This function will remove overlapping spans'''
  entities_copy = entities.copy()

  # append entity only if it is longer than its overlapping entity
  i = 0
  for entity in entities_copy:
      j = 0
      for overlapping_entity in entities_copy:
          # Skip self
          if i != j:
              e_start, e_end, oe_start, oe_end = entity[0], entity[1], overlapping_entity[0], overlapping_entity[1]
              # Delete any entity that overlaps, keep if longer
              if ((e_start >= oe_start and e_start <= oe_end) \
              or (e_end <= oe_end and e_end >= oe_start)) \
              and ((e_end - e_start) <= (oe_end - oe_start)):
                  entities.remove(entity)
          j += 1
      i += 1

  return entities

In [None]:
def docanno_to_spacy_ner_db(DATA_DIR):
  '''
  This function takes a directory of docanno annotated datasets for NER/RE
  and converts them into SpaCy DocBin Object which is Trainable via commandline
  '''
  # Creates a blank Tokenizer with just the English vocab
  nlp = spacy.blank("en")

  Doc.set_extension("rel", default={},force=True)
  vocab = Vocab()

  word_count = 0
  no_files = 0
  no_doc = 0
  no_entities = 0
  error_cnt = 0

  # the DocBin will store the example documents
  db = DocBin()

  for dirname, _, filenames in os.walk(DATA_DIR):
    for filename in filenames:
      file_path = os.path.join(dirname, filename)

      try:
        """ Iterate through the Jsonl file to create serialize Docbin object / .spacy IOB File """
        for json_line in srsly.read_jsonl(file_path):

          # parsing the docanno JSON data (per-line)
          text = json_line["text"]
          entities = json_line["label"]
          id = json_line["id"]

          entities = clean_entities(entities)

          new_spans = []
          for span in entities:
            new_spans.append((span[0], span[1], span[2]))

          # cleaning and validating the leading and trailing spaces from the annotated entities
          valid_spans = trim_entity_spans(text, new_spans)

          """ Parsing tokens from Text """
          tokens = nlp(text)

          entities = []

          spaces = [True if tok.whitespace_ else False for tok in tokens]
          words = [t.text for t in tokens]
          doc = Doc(nlp.vocab, words=words, spaces=spaces)

          for start, end, label in valid_spans:
            """ The modes should be: strict, contract, and expand """
            # print(eg['text'][int(span["start_offset"]):int(span["end_offset"])])
            entity = doc.char_span(start, end, label=label, alignment_mode='contract')

            # Not considering the spans which are Erroneous
            if entity is None:
              error_cnt += 1
              # print(tokens)
              # file_name = ttext.split('\n')[0]
              # print(f'Error Found in ID: {id} -- Span: {(start, end, label)} -- Part: {tokens[start: end]}')

            else:
              no_entities += 1
              entities.append(entity)

          # print(entities)
          try:
            doc.ents = entities
            word_count += len(words)-int(7)
          except:
            print()
            print(entities)
            print(f"Cannot Assign Entities for ID: {id}")
            continue

          db.add(doc)
          no_doc += 1

      except:
        print('Error While Loading JSON Data From Input Directory. Please check if you have other file type...')

      no_files +=1
  print(f"- Files: {no_files} \n- Processed Documents: {no_doc} \n- Total Entities: {no_entities} \n- Erroneous Entities (Ignored): {error_cnt} \n- Total Words: {word_count}")

  return doc, db

In [None]:
def load_json_from_docbin(file_path: str, nlp):
  '''This function loads data from SpaCy docbin formatted files into spacy compitable JSON format'''
  doc_bin = DocBin().from_disk(file_path)
  samples, entities_count = [], 0
  for doc in doc_bin.get_docs(nlp.vocab):
    sample = {
      "text": doc.text,
      "entities": []
    }
    if len(doc.ents) > 0:
      # print(doc.ents)
      entities = [(e.start_char, e.end_char, e.label_) for e in doc.ents]
      sample["entities"] = entities
      entities_count += len(entities)
    else:
      warnings.warn("Sample without entities!")
    samples.append(sample)
  return samples, entities_count

#Reading Data

In [None]:
# '''Import the appropriate model and adding the extra pipeline needed'''
# # import en_core_web_md
import en_core_web_lg

nlp = en_core_web_lg.load()

# # sbd = nlp.create_pipe('sentencizer')
nlp.add_pipe('sentencizer')

<spacy.pipeline.sentencizer.Sentencizer at 0x201c3a93bc0>

In [None]:
def get_rdf_graph(file_name=""):
  '''Getting the RDF file from the user (.nt/.ttl/.xml etc)'''

  # parsing the graph
  g = Graph()
  g.parse(file_name)

  # # Loop through some of the triples in the graph (subj, pred, obj)
  # print('\nTotal Triples Found = {}\n'.format(len(g)))
  # print('First 10 Triples:')
  # for triple in list(g)[:10]:
  #     # Check if there is at least one triple in the Graph
  #     # if (subj, pred, obj) not in g:
  #     #    raise Exception("It better be!")
  #     print(triple)

  return g

In [None]:
def get_templates(g):
  '''Given a RDF graph 'g' this function will return a list of (S, P, O) triples having only names'''

  # Getting all the unique S, P, O from the graph using the RDFS.domain and RDFS.range
  preds_subs = list(g.subject_objects(predicate=RDFS.domain))
  preds_objs = list(g.subject_objects(predicate=RDFS.range))

  # Getting all the subclasses with corresponding superclasses
  sup_sub = g.subject_objects(predicate=RDFS.subClassOf)

  # dictionary having a superclasses and it's subclasses {'Treatment':['Medicine', 'Precaution', 'Surgery']}
  sup_sub_dic = {}

  # populating the dictionary from the graph
  for sc in sup_sub:
    subj = sc[0].split('#')[1]
    obj = sc[1].split('#')[1]

    if obj in sup_sub_dic:
      sup_sub_dic[obj].append(subj)
    else:
      sup_sub_dic[obj] = [subj]

  # dictionary having a structure {'P':['S', 'O']}
  dic_triples = {}

  # gets only the name of Predicates and Subjects splitting from the URI's
  for ps in preds_subs:
    pred = ps[0].split('#')[1]
    subj = ps[1].split('#')[1]
    dic_triples[pred] = [subj]
    # # print(subj, pred)

  # matches the Subjects having specific Predicates with Objects
  for po in preds_objs:
    pred = po[0].split('#')[1]
    obj = po[1].split('#')[1]
    dic_triples[pred].append(obj)
    # # print(pred, obj)

  # saves the triples from the dictionary into a list of tuple -> [(S, P, O)]
  triples_name = []
  for pred in dic_triples:
    subj = dic_triples[pred][0]
    obj = dic_triples[pred][1]

    # checking if the subject is a superclass - we ignore the superclass and only consider it's subclasses
    if subj in sup_sub_dic:
      # copy it's predicate to all it's subclasses (sub_cls) along with the range
      for sub_cls in sup_sub_dic[subj]:
        triples_name.append((sub_cls, pred, obj))
    else:
      triples_name.append((subj, pred, obj))
      # print('({}, {}, {})'.format(subj, pred, obj))

  # print("Total Templates = {}\n".format(len(triples_name)))
  # for triple in triples_name:
  #   print(triple)

  return triples_name

In [None]:
# # for local runtime - upload the file (first time) in the file upload option (left)
# graph = get_rdf_graph(file_name = SCHEMA_FILE)

# templates = get_templates(graph)
# # Keeping only those templates/triples that has Object Type Property as it's Object
# print(f'Extracted a Tolta of {len(templates)} Relationship Templates Having Only Object Properties from the Knowledge Graph:\n')
# templates = [template for template in templates if template[2] != 'string']
# pdic(templates)

In [None]:
# """ This is just to SHOW the Concepts"""
# unique_entities = set()

# # Extracting unique subjects and objects from the list
# for sub, pred, obj in templates:
#   unique_entities.add(sub)
#   unique_entities.add(obj)

# ENTITY_LABELS = list(unique_entities)
# print(ENTITY_LABELS)

In [None]:
# # ICDE-EX: This is the changes we are doing in order to track down if concept with semantics plays any role or not
# # Mapping from ENTITY_LABELS to ENTITY_ATTR
# entity_to_attr_map = {label.split('_')[0]: f"attr{i+1}" for i, label in enumerate(ENTITY_LABELS)}

# # Mapping from ENTITY_ATTR to ENTITY_LABELS (reverse)
# attr_to_entity_map = {v: k for k, v in entity_to_attr_map.items()}

In [None]:
ENTITY_LABELS = ["AWARDS",
                 "CERTIFICATION",
                 "COLLEGE NAME",
                 "COMPANIES WORKED AT",
                 "DEGREE",
                 "LANGUAGE",
                 "LOCATION",
                 "NAME",
                 "SKILLS",
                 "UNIVERSITY",
                 "WORKED AS",
                 "YEARS OF EXPERIENCE"]

In [None]:
# print(entity_to_attr_map)
# print(entity_to_attr_map['Symptom'])
# print(attr_to_entity_map['attr1'])

In [None]:
def input_file(file_name=""):
  '''This function uploads a file and retunrs it's filename'''

  # if file_name is empty, that means we are using CoLAB Google Hosted Runtime
  if not file_name:
    input_file = files.upload()
    file_name = list(input_file.keys())[0]

  # removing the new file if already exits
  path = os.path.abspath(file_name.split('.')[0]+' (1).'+file_name.split('.')[1])
  if os.path.exists(path) == True:
    os.remove(path)

  return file_name

In [None]:
def get_data_from_csv(file_name=""):
  '''
  This function will get a CSV from the user having TWO COL i.e., Disease_Name, Affected_Anatomy and
  returns a Dictionary having a structure: {"Tuberculosis": ['lungs', 'brain', 'kidneys', 'spine'], ...}
  '''
  # if file_name is empty, that means we are using CoLAB Google Hosted Runtime
  if not file_name:
    file_name = input_file()

  structured_data = {}

  with open(file_name, 'r', encoding='ISO-8859-1') as file:
      csvreader = csv.reader(file)
      # we can ignore the header
      header = next(csvreader)
      # print('Reading New Structured Data Source: {}'.format(file_name.split('\\')[1]))
      # print('Data Headers: ', header)

      for row in csvreader:
        # # print(row)
        # splits the comma separated values from the 1st column
        first_cols = row[0].split(',')

        # each of the instances of the first col (subject) will have the same value domain
        for instance in first_cols:
          instance = instance.strip()

          # if we do not have any second column (non-relational data)
          try:
            # splits the comma separated values from the 2nd column
            value_domain = row[1].split(',')
            # removes the leading and trailing spaces
            value_domain = [x.strip() for x in value_domain]
            # making instance/value-domain dictionary
            structured_data[instance] = value_domain
          except:
            structured_data[instance] = []

  # print('Instances/Values:')
  # print(structured_data)
  # print()

  return header, structured_data

In [None]:
def accumulate_structured_data(STRUCTURED_DATA_DIR=STRUCTURED_DATA_DIR):
  '''This will read the Structured Data From the CSV Files of a Directory and accumulate them'''
  # DATA: gets the structured data from CSV files
  accu_data = {}

  for dirname, _, filenames in os.walk(STRUCTURED_DATA_DIR):
    for filename in filenames:
      file_path = os.path.join(dirname, filename)
      # # print(file_path)
      sd_header, sd_data = get_data_from_csv(file_path)
      accu_data = accumulate_unique_data(prev_data=accu_data, keys_list=sd_header, data_dic=sd_data)

  # print('\nAfter Structured Data Accumulation:')
  # print(accu_data)

  return accu_data

In [None]:
def get_text(file_name=""):
  '''Getting the TEXT file from the user - and renaming it'''

  # if file_name is empty, that means we are using CoLAB Google Hosted Runtime
  input_txt = ""
  if not file_name:
    text_file = files.upload()
    file_name = list(text_file.keys())[0]
    input_txt = text_file[file_name].decode("utf-8").strip()

    given_text_file_name = str(file_name)

    old_name = COLAB_DIR + given_text_file_name
    new_name = COLAB_DIR + 'inference_text.txt'

    # removing the new file if already exits
    if os.path.exists(new_name) == True:
      os.remove(new_name)

    os.rename(old_name, new_name)
    file_name = new_name

  # if the file is called from local runtime with filename specified

  with open(file_name, 'r', encoding='utf-8') as text_file:
    input_txt = text_file.read().strip()

  # print("Contents of the Input Text File:")
  # print(input_txt)

  return input_txt

In [None]:
def sentence_segmentation(input_txt, print_indx = False):
    '''This Function Segments the Text into Sentences and Tokens'''

    doc = nlp(input_txt)

    sentences = []
    # sentences_tokens = []
    for sent in doc.sents:
        # print(sent)
        # here sentences are strings
        sentences.append(sent.text.strip())
        # here a  particular sentence is a list of strings containing the words/tokens
        # sentences_tokens.append([token.text for token in sent])

    # print('\nTotal Sentence Found = ', len(sentences))

    if print_indx is not False:
        print('\nExample Sentence and Tokens:')
        print(sentences[print_indx])
        # print(sentences_tokens[print_indx])

    return doc, sentences

## Conceptualization

In [None]:
def accumulate_unique_data(prev_data, keys_list, data_dic):
  ''' This function creates a set (unique lists) of data for each of the Concepts (e.g., Disease, Sysmtoms, Anatomy etc.)
      Parameters:
        pre_data  -> already created dictionary with other data sources previously
        keys_list -> list of keys (column names) for the new data source
        data_dic  -> a dictionary holding the source

      Returns:  a dictionary having the keys_list as key and a list of unique (set) values from the data source
  '''
  new_data = {}
  # if we have data already residing in the prev_dic
  if prev_data:
    new_data = prev_data

  # get a list from the keys of the data_dic - for example list of disease names
  keys_data_list = list(data_dic.keys())

  values_data_list = list(data_dic.values())
  # flattening the list of lists and taking only a the unique values
  values_data_list = list(set(functools.reduce(operator.iconcat, values_data_list)))

  # merging the above tow in a single list of lists
  data_lists = [keys_data_list, values_data_list]

  # Accumulates the new data into Key and Value while merging prev values if the key is already present
  for key, val in zip(keys_list, data_lists):
    if key in new_data:
      pre_val = new_data[key]
      # # print(pre_val)
      val = list(set(functools.reduce(operator.iconcat, [val + pre_val])))
      # # print(val)

    # removing empty string from the instances
    if '' in val:
      val.remove('')

    new_data[key] = val

  # # print(new_data,'\n')
  return new_data

In [None]:
def initiate_matcher(patterns):
  '''Initializing and Fine-tuning the matcher'''
  # For more information of how it works please see: https://github.com/gandersen101/spaczz
  # changing min_r2 from default of 75 to produce matches in this example -- using custom vocab doesn't work for now
  matcher = SimilarityMatcher(vocab=nlp.vocab, min_r2=100, thresh=100)

  # UNCOMMENT - this if you set the threshold level to below 50
  # matcher = SimilarityMatcher(vocab=nlp.vocab, min_r2=threshold, min_r1=1)

  for key_concept in patterns:
    # print(key_concept)

    pattern = [nlp(instance.lower()) for instance in patterns[key_concept]]

    # we can add as many patterns as we want with specified names of Keys/Concepts
    matcher.add(key_concept, pattern)

  return matcher

In [None]:
def identify_noun_chunks(SENTENCES):
  ''' This function iterates over the sentences and splits them into NP (Noun Chunks)
  '''
  # doc = nlp("Typical symptoms of active Tuberculosis are chronic cough with blood-containing mucus, fever, night sweats, and weight loss.")
  # NP per sentence {indx:(sen_indx, [chunk1, chunk1,...])}
  np_sentences = {}

  for indx, sentence in enumerate(SENTENCES):
    # print("\nSENTENCE-{}: {}\n".format(indx, sentence))
    # This is using Transformer Based Model to generate better Dependency Parsing.
    sent = nlp(sentence)

    # Extracts noun chunks possibly with leading and trailing stop words from the sentence
    np_chunks = [chunk for chunk in sent.noun_chunks]

    np_sentences[indx] = (sent, np_chunks)

  return np_sentences

In [None]:
def remove_dup_match(matches):
  '''This function will remove the matched instances where they have same span and entity type but different confidence score
    Example:
      (abdomen, 35, 36, Anatomy, 1.0)
      (abdomen, 35, 36, Anatomy, 0.5)
      (abdomen, 35, 36, Anatomy, 0.3)
  '''
  filtered_dict = {}

  for match_tup in matches:
    # We'll create a dictionary with keys as a combination of (match_id, start, end) and values as the original tuple.
    match_id, start, end, confidence_score, pattern = match_tup
    key = (match_id, start, end)

    # If we encounter a tuple with the same (match_id, start, end) but a higher confidence_score, we'll update the dictionary value.
    if key not in filtered_dict or confidence_score > filtered_dict[key][3]:
      filtered_dict[key] = match_tup

  return list(filtered_dict.values())

In [None]:
def baseline_matcher(np_sentences, matcher, top_k=1):
  ''' Need More Details: This is a Core function that uses Word Vectors internally to match similar concepts...!! '''

  # matches per sentence {i:[(np_chunk, matched_sub_chunk, pattern_concept, confidence), ...]}
  matched_sentences = {}

  for i in np_sentences:

    sent, clean_np_chunks = np_sentences[i]
    matches_sent = []
    # print(f"\nSENTENCE-{i+1}: {sent}")

    for chunk in clean_np_chunks:
      phrase = nlp(chunk.text)

      # TODO: we could apply the Matcher for the whole DOCUMENT - to check **speed improvement**
      matches_orig = matcher(phrase)

      matches = []

      # We go for further ranking (+sorting) using two more matching techniques
      for match_id, start, end, ratio, pattern in matches_orig:
        confidence_score = round(ratio/100, 2)

        # if confidence_score == 1.0 and str(phrase[start:end]) == pattern:
          # print(f'{phrase[start:end]} -- {pattern}')
          # print(type(phrase[start:end]), type(pattern))
        matches.append((match_id, start, end, confidence_score, pattern))

      # Removing duplicate matches having same span as well as same entity type (matching id)
      matches = remove_dup_match(matches)

      # TODO: For now we are only taking the TOP-3 matching results.
      for match_id, match_start, match_end, conf, pattern in matches[:top_k]:
        # This is to track down the matches within the whole Sentence
        sent_start = chunk.start + match_start
        sent_end = chunk.start + match_end

        try:
          matched_sub_chunk = Span(sent, sent_start, sent_end, label=chunk.label)
        except:
          print(f'Error Creating Span in Sentence {i}\n{sent}')
          continue

        # TODO: If we need we can also include the NP (chunk) here for future use
        matches_sent.append((chunk, matched_sub_chunk, match_id, conf))

        # print(f'Chunk = {chunk}, Match Start = {match_start}, Match End = {match_end}, Matched Sub-Chunk = {matched_sub_chunk}, Concept = {match_id}, Confidence = {conf}, Pattern = {pattern}')

    # Adding the (sent, matches) to the dictionary with Sentence Index as Key for return purpose
    matched_sentences[i] = (sent, matches_sent)

  return matched_sentences

In [None]:
def print_match(matched_sentences):
  # print('Final List of Concepts in this Sentence:\n')

  count = 0
  for S_i in matched_sentences:
    sent, matches = matched_sentences[S_i]

    print("Sentence-{}: '{}'".format(S_i, sent))
    # print(matches)

    for np_chunk, matched_sub_chunk, match_id, conf in matches:
      print(f'\t\tNP Chunk = {np_chunk}, Matched Sub-Chunk = {matched_sub_chunk}, Match Start = {matched_sub_chunk.start}, Match End = {matched_sub_chunk.end}, Concept = {match_id}, Confidence = {conf}')
      count += 1
    print()

  print(f'\nTotal Entities Recognized in Sentences = {count}')

In [None]:
def conceptualized_preprocessed_doc(prep_doc, matched_sentences):
  ''' This function will map the sentence based matching results into the coreferred documents.
      The chunks produces here uses token based span according to the pre-processed document as a whole.

      Returns a list of 3 tuples (np_chunk_doc, matched_sub_chunk_doc, match_id, conf)
      --> [(Abdominal aortic aneurysm, Abdominal aortic aneurysm, 'Disease', 1.0), (Abdominal aortic aneurysm, Abdominal, 'Anatomy', 0.62), ...]
  '''

  entities_pre_doc = []
  for i, sent in enumerate(prep_doc.sents):
    sent_from_matcher, matched_results = matched_sentences[i]

    for np_chunk, matched_sub_chunk, match_id, conf in matched_results:
      # Adjust the start and end indices of NP Chunk according to the doc
      np_chunk_doc_start = sent.start + np_chunk.start
      np_chunk_doc_end = sent.start + np_chunk.end

      # Adjust the start and end indices of matched sub-chunk according to the doc
      sub_chunk_doc_start = sent.start + matched_sub_chunk.start
      sub_chunk_doc_end = sent.start + matched_sub_chunk.end

      # Making token based span according to the doc
      np_chunk_doc = Span(prep_doc, np_chunk_doc_start, np_chunk_doc_end, label=np_chunk.label)
      matched_sub_chunk_doc = Span(prep_doc, sub_chunk_doc_start, sub_chunk_doc_end, label=matched_sub_chunk.label)
      entities_pre_doc.append((np_chunk_doc, matched_sub_chunk_doc, match_id, conf))

      # print(matched_sub_chunk, matched_sub_chunk_doc)

  # print(f'\nTotal Entities Mapped in Document = {len(entities_pre_doc)}')
  return entities_pre_doc

In [None]:
def get_char_span(orig_doc, input_txt, entities_pre_doc, tag_np_chunk_only = False):
  '''This function conversts token span into character span for the evaluation script to work.
     You can choose either to tag the NP Chunk or to be more granular by tagging with Sub-Chunk level matching (default).
     The original document object is also populated with the token span so that we can visualize the results of our prediction
  '''

  entities_span = []
  doc_span = []

  if tag_np_chunk_only:
    for np_chunk_doc, matched_sub_chunk_doc, match_id, conf in entities_pre_doc:
      # This will be used for SemEval Evaluation
      entities_span.append((np_chunk_doc.start_char, np_chunk_doc.end_char, match_id))
      # This will be used to visualize the Span
      doc_span.append(Span(orig_doc, np_chunk_doc.start, np_chunk_doc.end, match_id))
  else:
    for np_chunk_doc, matched_sub_chunk_doc, match_id, conf in entities_pre_doc:
      # This will be used for SemEval Evaluation
      entities_span.append((matched_sub_chunk_doc.start_char, matched_sub_chunk_doc.end_char, match_id))
      # This will be used to visualize the Span
      doc_span.append(Span(orig_doc, matched_sub_chunk_doc.start, matched_sub_chunk_doc.end, match_id))

  # A dictionary in the format {'text': 'Tuberculosis generally damages the lungs', 'entities': [(0, 12, 'Disease_E'), (35, 40, 'Anatomy_E')]}
  ner_prediction = {'text': input_txt, 'entities': entities_span}
  orig_doc.spans["sc"] = doc_span

  return orig_doc, ner_prediction

In [None]:
def list_to_spacy_ner_doc(ner_pred):
  '''
  This function takes a list of directory of NER predictions of the form
  {'text': '...', 'entities':[(start, end, tag)]} and converts them into SpaCy Doc Object
  '''
  # Creates a blank Tokenizer with just the English vocab
  nlp = spacy.blank("en")

  Doc.set_extension("rel", default={},force=True)
  vocab = Vocab()

  # try:
  # parsing the docanno JSON data (per-line)
  text = ner_pred["text"]
  spans = ner_pred["entities"]

  """ Parsing tokens from Text """
  tokens = nlp(text)

  entities = []

  spaces = [True if tok.whitespace_ else False for tok in tokens]
  words = [t.text for t in tokens]
  doc = Doc(nlp.vocab, words=words, spaces=spaces)

  for start, end, label in spans:
    """ The modes should be: strict, contract, and expand """
      # print(eg['text'][int(span["start_offset"]):int(span["end_offset"])])
    entity = doc.char_span(start, end, label=label, alignment_mode='contract')

    # Not considering the spans which are Erroneous
    if entity is None:
      # file_name = text.split('\n')[0]
      print(f'No Entity Found in File: {file_name};\n Span = {start}-{end}; Phrase = {doc.text[start:end]}; Label = {label}\n')
      continue
    else:
      entities.append(entity)

  # print(entities[0].label_)
  try:
    doc.ents = entities
  except:
    print("=>> Error")
    print(text)

  # except:
  #   print('Error While Loading Predicted List...')

  return doc

In [None]:
'''Assigns different colors to the Entities during visualization.'''

color_list = ['yellow', 'white', 'orange', '#008000', '#800000', '#0D9CB4', '#5813C7', '#0D350E', '#1AA436',
          '#1AE0F9', '#BADCA1', '#78A2E5', '#D845FB', '#54B69E', '#800080', '#FF00FF', '#000080']

colors = dict(zip(ENTITY_LABELS, color_list))
options = {"colors": colors}

In [None]:
def render_pred_span_cat(doc):
  '''Desired Format: doc.spans["sc"] = [Span(doc, 3, 6, "ORG"), Span(doc, 5, 6, "GPE")]
  '''
  spacy.displacy.render(doc, style="span", options=options, jupyter=True)

In [None]:
def render_groud_truth_entities(doc):
  spacy.displacy.render(doc, style="ent", options=options, jupyter=True)

In [None]:
def save_predictions(ner_predictions, filename, semeval_format=True):
  # Saving the predictions as JSON - each dictionary on a line
  semeval_ent = []
  with open(OUTPUT_DIR+'/'+filename, 'w', encoding='UTF-8') as json_file:
    for pred in ner_predictions:
      if semeval_format:
        # prodigy format to work with nereval library - for SemEval 2013 - 9.1 task.
        tmp_ent = []
        for ent in pred['entities']:
          # saved in this format: [{"label": "PER", "start": 2, "end": 4}, ... ]
          tmp_ent.append({"label": ent[2], "start": ent[0], "end": ent[1]})

        semeval_ent.append(tmp_ent)

      else:
        # this is regullar spacy format, can be used for spacy's default evaluation later
        json_file.write(json.dumps(pred, ensure_ascii=False))
        json_file.write('\n')

    if semeval_format:
      # dumping it into a JSON file
      json_file.write(json.dumps(semeval_ent, ensure_ascii=False))

  return semeval_ent
  # # This is single line JSON Dump of the entile list of dictionary - parser cannot parse it directly
  # with open(OUTPUT_DIR+'/predition.jsonl', 'w') as fout:
  #     json.dump(ner_predictions, fout)

In [None]:
def process_results(results_by_tag):
  '''This fucntion is used to process the individual entity based scores together'''
  results_by_entity = []
  for entity in ENTITY_LABELS:
    df = pd.DataFrame(results_by_tag[entity])
    df = df.round(decimals = 2)
    df.insert(0,'Entity','')
    df['Entity'] = entity
    results_by_entity.append(df)

  return results_by_entity

In [None]:
def semeval_evaluation(true, pred):
    evaluator = Evaluator(true, pred, tags=ENTITY_LABELS)
    results, results_by_tag = evaluator.evaluate()

    results = pd.DataFrame(results)
    results.to_excel(OUTPUT_DIR+'/'+'overall_benchmark.xlsx')

    results_by_entity = pd.concat(process_results(results_by_tag))
    results_by_entity.to_excel(OUTPUT_DIR+'/'+'entity_benchmark.xlsx')

    return results, results_by_entity

In [None]:
import pickle
import glob

def save_matcher(matcher_file):
  with open(matcher_file, 'wb') as out_dir:
      pickle.dump(matcher, out_dir, pickle.HIGHEST_PROTOCOL)

def load_matcher(matcher_dir):
  matcher_file = glob.glob(os.path.join(matcher_dir, '*.pkl'))[0]
  with open(matcher_file, 'rb') as inp_dir:
    matcher = pickle.load(inp_dir)
  return matcher

# Main Function

In [None]:
if __name__=='__main__':

  # Prompting for evaluation set - validation/test
  EVAL_SET = ['valid', 'test', 'train', 'inf']
  EVAL_SET = EVAL_SET[int(input('Chose Dataset for Evaluation:\n[0] Validation\n[1] Test\n[2] Training\n[3] Inferencing\nEnter your choice:'))]

  # Input/Output Directories
  INPUT_DIR = ""
  OUTPUT_DIR = ""

  if EVAL_SET == 'inf':
    if isColab:
      input_text = get_text()
    else:
      # uploaded_text = files.upload()
      # text_filename = inference_file(uploaded_text)
      input_text = get_text("data/Annotated_Text/single.txt")

    samples = [{'text':input_text}]
    # print(input_text)
  else:
    # Input/Output Directories
    INPUT_DIR = f'{COLAB_DIR}data/Annotated_Text/{EVAL_SET}'
    OUTPUT_DIR = f'{COLAB_DIR}output/{EVAL_SET}'
    create_out_dir(OUTPUT_DIR)

    print('\nPreparing Evaluation Dataset:')
    doc_valid, db_valid = docanno_to_spacy_ner_db(INPUT_DIR)
    db_valid.to_disk(OUTPUT_DIR + f"/disease_A-Z_{EVAL_SET}.spacy")

    # reading the ground truth entities with text file from spacy docbin
    samples, ground_entities_count = load_json_from_docbin(OUTPUT_DIR + f"/disease_A-Z_{EVAL_SET}.spacy", nlp)

  # reading and accumulating (integrated) structured data
  accu_data = accumulate_structured_data()
  # pdic(accu_data)
  # initializing and fine-tuning the matcher

  if not os.path.exists(MATCHER_DIR):
    create_out_dir(MATCHER_DIR)
    print('Building the patterns...')
    matcher = initiate_matcher(patterns=accu_data)

    # saving the matcher for future usecases... reducing time
    save_matcher(f'{MATCHER_DIR}/matcher_baseline.pkl')

  else:
    print('Loading the matcher from directory...')
    matcher = load_matcher(MATCHER_DIR)

  ner_predictions = []
  all_docs = []
  count = 0
  total_doc = len(samples)

  print('\nConceptualization Process Started...')
  # each sample represents one document..
  for ground in samples:
    input_txt = ground['text']
    # print(f'\n{input_txt}')

    # Segmenting the document into sentences - for Conceptualization we can also process the whole document
    doc, sentences = sentence_segmentation(input_txt, print_indx=False)

    # Detecting the NP Chunks - also removes leading and trailing stop words in the NP chunks
    np_sentences = identify_noun_chunks(sentences)
    # print(np_sentences)

    # Getting the initial Top-k matrching results
    matched_sentences = baseline_matcher(np_sentences, matcher=matcher, top_k=1)
    # print_match(matched_sentences)

    # Conceptualizing the Text Document - token based indices
    entities_pre_doc = conceptualized_preprocessed_doc(doc, matched_sentences)
    # print(f'\nEntities in the Coreference Resolved Document:\n{entities_pre_doc}')

    # Organizing indexes of those entitties according to the original text, also creating span categories for visualization
    doc, ner_pred_tmp = get_char_span(doc, input_txt, entities_pre_doc, tag_np_chunk_only = False)
    # print(f'\nNER Prediction for current document:\n{ner_pred_tmp}')
    ner_predictions.append(ner_pred_tmp)
    all_docs.append(doc)

    count +=1
    # print(f'Document-{count}')
    if count % 10 == 0:
      print(f'{count}/{total_doc} Document Processed...')

  if EVAL_SET == 'inf':
    # Visualizing the Predictions for the Given Input Text
    print('\n########### Prediction ###########\n')
    render_pred_span_cat(all_docs[0])

  else:
    # saving the grond and predictions into a JSONL file for later evaluation.
    semeval_ground = save_predictions(samples, filename='ground.jsonl')
    semeval_pred = save_predictions(ner_predictions, filename='predition.jsonl')

    # doing the evaluation following SemEval 2013 metrics
    results, results_by_entity = semeval_evaluation(true=semeval_ground, pred=semeval_pred)

    # Saving this for Future Experiments... Spacy Format
    _ = save_predictions(ner_predictions, filename='predition_Baseline_spacy.jsonl', semeval_format=False)

    print('\n########### Overall Results ###########\n')
    print(f"Precision: {results['partial']['precision']}\nRecall: {results['partial']['recall']}\nF1: {results['partial']['f1']}\n")

Chose Dataset for Evaluation:
[0] Validation
[1] Test
[2] Training
[3] Inferencing
Enter your choice:1

Preparing Evaluation Dataset:
- Files: 1 
- Processed Documents: 20 
- Total Entities: 2140 
- Erroneous Entities (Ignored): 107 
- Total Words: 38319
Building the patterns...

Conceptualization Process Started...
10/20 Document Processed...
20/20 Document Processed...

########### Overall Results ###########

Precision: 0.14519056261343014
Recall: 0.07476635514018691
F1: 0.09870450339296731

