In [0]:
from typing import Callable, List, Set, Tuple, TypeVar, Optional
import warnings
import pandas as pd
import os
import re

from google.colab import drive
import gzip
drive.mount('/content/drive/', force_remount=True)
__dir__ = "/content/drive/My Drive/W266_Project/"

test_file = os.path.join(__dir__, 'data/onto.test.ner')
dev_file = os.path.join(__dir__, 'data/onto.development.ner')
train_file = os.path.join(__dir__, 'data/onto.train.ner')
domain_file = os.path.join(__dir__, 'data/bios-tagged-final-flat.csv')


Mounted at /content/drive/


In [0]:
class InvalidTagSequence(Exception):
    def __init__(self, tag_sequence=None):
        super().__init__()
        self.tag_sequence = tag_sequence

    def __str__(self):
        return ' '.join(self.tag_sequence)

def to_bioul(tag_sequence: List[str], encoding: str = "IOB1") -> List[str]:
    """
    Given a tag sequence encoded with IOB1 labels, recode to BIOUL.
    In the IOB1 scheme, I is a token inside a span, O is a token outside
    a span and B is the beginning of span immediately following another
    span of the same type.
    In the BIO scheme, I is a token inside a span, O is a token outside
    a span and B is the beginning of a span.
    Parameters
    ----------
    tag_sequence : ``List[str]``, required.
        The tag sequence encoded in IOB1, e.g. ["I-PER", "I-PER", "O"].
    encoding : `str`, optional, (default = ``IOB1``).
        The encoding type to convert from. Must be either "IOB1" or "BIO".
    Returns
    -------
    bioul_sequence: ``List[str]``
        The tag sequence encoded in IOB1, e.g. ["B-PER", "L-PER", "O"].
    """

    if not encoding in {"IOB1", "BIO"}:
        raise ConfigurationError(f"Invalid encoding {encoding} passed to 'to_bioul'.")
    # pylint: disable=len-as-condition

    def replace_label(full_label, new_label):
        # example: full_label = 'I-PER', new_label = 'U', returns 'U-PER'
        parts = list(full_label.partition('-'))
        parts[0] = new_label
        return ''.join(parts)

    def pop_replace_append(in_stack, out_stack, new_label):
        # pop the last element from in_stack, replace the label, append
        # to out_stack
        tag = in_stack.pop()
        new_tag = replace_label(tag, new_label)
        out_stack.append(new_tag)

    def process_stack(stack, out_stack):
        # process a stack of labels, add them to out_stack
        if len(stack) == 1:
            # just a U token
            pop_replace_append(stack, out_stack, 'U')
        else:
            # need to code as BIL
            recoded_stack = []
            pop_replace_append(stack, recoded_stack, 'L')
            while len(stack) >= 2:
                pop_replace_append(stack, recoded_stack, 'I')
            pop_replace_append(stack, recoded_stack, 'B')
            recoded_stack.reverse()
            out_stack.extend(recoded_stack)


    # Process the tag_sequence one tag at a time, adding spans to a stack,
    # then recode them.
    bioul_sequence = []
    stack: List[str] = []

    for label in tag_sequence:
        # need to make a dict like
        # token = {'token': 'Matt', "labels": {'conll2003': "B-PER"}
        #                   'gold': 'I-PER'}
        # where 'gold' is the raw value from the CoNLL data set

        if label == 'O' and len(stack) == 0:
            bioul_sequence.append(label)
        elif label == 'O' and len(stack) > 0:
            # need to process the entries on the stack plus this one
            process_stack(stack, bioul_sequence)
            bioul_sequence.append(label)
        elif label[0] == 'I':
            # check if the previous type is the same as this one
            # if it is then append to stack
            # otherwise this start a new entity if the type
            # is different
            if len(stack) == 0:
                if encoding == "BIO":
                    raise InvalidTagSequence(tag_sequence)
                stack.append(label)
            else:
                # check if the previous type is the same as this one
                this_type = label.partition('-')[2]
                prev_type = stack[-1].partition('-')[2]
                if this_type == prev_type:
                    stack.append(label)
                else:
                    if encoding == "BIO":
                        raise InvalidTagSequence(tag_sequence)
                    # a new entity
                    process_stack(stack, bioul_sequence)
                    stack.append(label)
        elif label[0] == 'B':
            if len(stack) > 0:
                process_stack(stack, bioul_sequence)
            stack.append(label)
        else:
            raise InvalidTagSequence(tag_sequence)

    # process the stack
    if len(stack) > 0:
        process_stack(stack, bioul_sequence)

    return bioul_sequence

In [0]:
def create_onto_data_vectors(src_file):
  X,y = [], []
  sentence = []
  ner_sent = []
  has_header = False

  with open(src_file, 'r') as f:
      for line in f.readlines():
          if re.search('^token.*label$', line):
              # this is the domain file, it has headers
              continue
          elif not re.search('^\s+$', line):
              #print(line.split('\t'))
              try:
                word, *_, ner = line.split('\t')
              except:
                print(line)
                raise Exception()
              #print(ner)
              sentence.append(word)
              ner_sent.append(ner.rstrip())

          elif len(sentence) != 0:
              X.append(sentence)
              try:
                y.append(to_bioul(ner_sent, 'BIO'))
              except:
                print(sentence)
                raise Exception()
              sentence = []
              ner_sent=[]
              continue
              
  if len(sentence) != 0:
    X.append(sentence)
    try:
        y.append(to_bioul(ner_sent, 'BIO'))
    except:
        print(sentence)
        raise Exception()

  return X, y 

In [0]:
def df_and_csv(X, y, src_file):
  out_name = src_file.replace('.csv', '') + '_bilou.csv'
  
  pd_bilou = pd.DataFrame([X,y]).T
  pd_bilou.columns=['x', 'y']
  pd_bilou.head()
  pd_bilou.to_csv(out_name)

In [0]:
X_dev, y_dev = create_onto_data_vectors(train_file)
  
df_and_csv(X_dev, y_dev, train_file)

In [0]:
X_dev, y_dev = create_onto_data_vectors(dev_file)

  
df_and_csv(X_dev, y_dev, dev_file)

In [0]:
X_dev, y_dev = create_onto_data_vectors(test_file)

  
df_and_csv(X_dev, y_dev, test_file)

In [0]:
X_dev, y_dev = create_onto_data_vectors(domain_file)
  
df_and_csv(X_dev, y_dev, domain_file)