In [None]:
!pip install transformers

In [3]:
import nltk
from nltk import word_tokenize, sent_tokenize, pos_tag
from nltk.tokenize import RegexpTokenizer
import pandas as pd
import os
import re
from xml.dom import minidom
from xml.etree import cElementTree as ET
import csv
import random
import ast
from string import punctuation
from transformers import BartTokenizer

In [4]:
from google.colab import drive
drive.mount('/content/drive')

import sys
sys.path.append('/content/drive/MyDrive/Third-Year-Project')

Mounted at /content/drive


In [5]:
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.


True

In [6]:
def spans(txt):
  """
  Seperate txext into words and returns a generator of (token, start, end) tuples for the given text.
  
  Input: txt - text to be tokenized
  Output: (token, start, end) tuples
  """
  
  tokens=nltk.word_tokenize(txt)

  offset = 0
  for token in tokens:
    offset = txt.find(token, offset)
    yield token, offset, offset+len(token)
    offset += len(token)

def BIO_tagging(tags, text):
  """
  This function tags the training dataset with BIO tags
  
  Input: a list of tags (the tags includs: 'Drugclass', 'Factor', 'Severity', 'AdverseReaction','Negation','Animal'), 
      text content
  Output: A set of tuples which consist of words and BIO tag
  """
  
  dict = {'DrugClass' : 'DUC', 'Factor' : 'FAC', 'Severity' : 'SVT', 'AdverseReaction' :'ADR', 'Negation' : 'NEG', 'Animal' : 'ANM'}
  
  tagged_words = []
  tagged_position = []


  tokens = spans(text)
  
  for token in tokens:

    tagged = False

    letter = [token[0][i] for i in range(0,len(token[0]))]
    letter_off = [i for i in range(token[1],token[2])]


    if len(letter) != len(letter_off):
      raise ValueError('letter and offset does not match.')

    punc = [letter.index(i) for i in letter if i in punctuation]
    no_punc = [j for j in letter_off if letter_off.index(j) not in punc]
    

    if len(no_punc) != 0:
      temp = list(token)
      temp[1] = min(no_punc)
      temp[2] = max(no_punc)
      token = tuple(temp)


    for i in range(len(tags)):

      pos = tags[i].attrib['start'].split(',')
      length = tags[i].attrib['len'].split(',')

      if len(pos) == 1:
          
        d_pos = int(pos[0])
        d_length = int(length[0])

        if token[1] != d_pos and (d_pos >= token[1] or (d_pos+d_length) <= token[1]):
          continue      
        else:                  
          if token[1] == d_pos :
            B_tag = (token[0].strip(),'B-'+ dict[tags[i].attrib['type']])
            tagged_words.append(B_tag)
            tagged = True
            
              
          elif d_pos < token[1] < d_pos+d_length:
            I_tag = (token[0].strip(),'I-'+ dict[tags[i].attrib['type']])
            tagged_words.append(I_tag)
            tagged = True
                
      else:

        for m in range(len(pos)):

          c_pos = int(pos[m])
          c_length = int(length[m])

          if token[1] != c_pos and (c_pos >= token[1] or (c_pos+c_length) <= token[1]):
            continue            
          else:
              
            if token[1] == c_pos and m == 0:
              B_tag = (token[0].strip(),'B-'+ dict[tags[i].attrib['type']])
              tagged_words.append(B_tag)
              tagged = True
              break
            
            elif token[1] == c_pos and m != 0:
              I_tag = (token[0].strip(),'I-'+ dict[tags[i].attrib['type']])
              tagged_words.append(I_tag)
              tagged = True
              break
                
            elif c_pos < token[1] < c_pos+c_length:
              I_tag = (token[0].strip(),'I-'+ dict[tags[i].attrib['type']])
              tagged_words.append(I_tag)
              tagged = True
              break
                  
      if tagged == True:
          break

    if tagged == False:
      O_tag = (token[0].strip(),'O')
      tagged_words.append(O_tag)

  # Just leave the ADR tag, set all other tags to 'O'
  binary_classify = []
  for tag in tagged_words:
    if tag[1] != 'B-ADR' and tag[1] != 'I-ADR':
      temp = list(tag)
      temp[1] = 'O'
      binary_classify.append(tuple(temp))
    else:
      binary_classify.append(tag)


  return binary_classify



def  preprocess(string):
  """
  Preprocessing for all datasets
  
  Input: A sentence from original text
  Output: Preprocessed sentence
  """
  
  string = re.sub(r"[^\w(),|!?\'\`\:\-\.;\$%#]", " ", string)
  string = re.sub(r"\'s", " is", string)
  string = re.sub(r"\'ve", " have", string)
  string = re.sub(r"n\'t", " not", string)
  string = re.sub(r"\'re", " are", string)
  string = re.sub(r"\'d", " would", string)
  string = re.sub(r"\'ll", " will", string)
  string = re.sub(r"(?<=\w)\.\.\.", " ... ", string)
  string = re.sub(r"(?<=\w)\.", " . ", string)
  string = re.sub(r"(?<=\w),", " , ", string)
  string = re.sub(r"(?<=\w);", " ; ", string)
  string = re.sub(r"(?<=\w)!", " ! ", string)
  string = re.sub(r"\((?=\w)", " ( ", string)
  string = re.sub(r"(?<=\w)\)", " ) ", string)
  string = re.sub(r"(?<=\w)\?", " ? ", string)
  string = re.sub(r"\s{2,}", " ", string)
  
  
  return string




def POS_tagging(text):
  """
  
  This function tags the training dataset with POS tags
  
  Input: text - text to be tokenized
  Output: A set of tuples which consist of words and POS tag
  """

  #Tokenization
  tokens = word_tokenize(text)

  # Part-of-speech Tagging
  pos_tagged = pos_tag(tokens)
      
  return pos_tagged

In [16]:
class Data_generation():
  """
  This class do some preprocess to the original ADR sample corpus, 
  and transform data for training with the format: sentence + template.
  
  """

  def __init__(self, path):
    self.path = path
    self.text_content_tags = {} # Format: {text: [tags]}
    self.tree = ET.parse(path)
    self.root = self.tree.getroot()
    self.POS_tags = {} # Sentence is the key, POS tags are the values, same as BIO tags and template
    self.BIO_tags = {}
    self.template = {}


  def set_text_content_tags(self):
    """
    This function extracts the text content and tags from the original corpus
    """

    for section in self.root[0].findall('Section'):
      tags = []
      for mention in self.root[1].findall('Mention'):
        if mention.attrib['section'] == section.attrib['id']:
          tags.append(mention)
        self.text_content_tags[section.text] = tags

  

  def sentence_divide(self,text):
    """
    This function divides the text into sentences

    Input: Text
    Output: A list of sentences
    """
    tokens = nltk.sent_tokenize(text)

    sentences = []

    offset = 0
    for token in tokens:
      offset = text.find(token, offset)
      yield token, offset, offset+len(token)
      sentences.append((token, offset, offset+len(token)))
      offset += len(token)

    return sentences

  

  def data_preprocess(self):
    """
    This function preprocesses the original corpus and transform data for training
    """

    for key, value in self.text_content_tags.items():

      text = key
      tags = value
      
      # Divide text into sentences
      sentences = self.sentence_divide(text)

      # Generate BIO tags and POS tags for the text
      BIO_tags = BIO_tagging(tags, text)
      POS_tags = POS_tagging(text)

      word_count = 0

      for sentence in list(sentences):

        words = nltk.word_tokenize(sentence[0])

        # Map the BIO tags and POS tags to the specific sentence
        self.BIO_tags[sentence] = BIO_tags[word_count:word_count+len(words)]
        self.POS_tags[sentence] = POS_tags[word_count:word_count+len(words)]

        # Get the start and end index of the sentence
        sen_start = sentence[1]
        sen_end = sentence[2]

        templates = []

        # Generate templates for each sentence
        for tag in tags:

          pos = tag.attrib['start'].split(',')
          length = tag.attrib['len'].split(',')

          # If the words of the annotated phrase are connected in the sentence
          if len(pos) == 1:

            # get the start index and the length of the annotated words      
            c_pos = int(pos[0])
            c_length = int(length[0])

            if sen_start <= c_pos and sen_end >= c_pos+c_length:

              if tag.attrib['type'] == 'AdverseReaction':

                templates.append(tag.attrib['str'] + " is an adverse reaction entity")

          # If the words of the annotated phrase are not connected in the sentence     
          else:

            # Get the start index and length for the first word in the phrase
            b_pos = int(pos[0])
            b_length = int(length[0])

            # Get the start index and length for the last word in the phrase
            e_pos = int(pos[-1])
            e_length = int(length[-1])

            if sen_start <= b_pos and sen_end >= e_pos+e_length:

              if tag.attrib['type'] == 'AdverseReaction':
  
                templates.append(tag.attrib['str'] + " is an adverse reaction entity")

        self.template[sentence] = templates

        word_count += len(words)

      # Delete the sentences that do not have any entity
      for key in list(self.template.keys()):
        if len(self.template[key]) == 0:
          del self.template[key]
          del self.BIO_tags[key]
          del self.POS_tags[key]

          

  def no_entity_generation(self, sample, num_template):
    """
    This function generates negative samples for training
    """
    no_entity = []

    BIO_tag = self.BIO_tags[sample]
 
    num_no = num_template # Generate same times of the number of positive samples

    for k in range(0, num_no):

      Generated =  False

      while Generated == False:

        # Randomly generate n-grams, n is between 1 and 3
        num_n_grams = random.randint(1,3)

        # Randomly generate the start position of the n-grams
        start_of_n_grams = random.choice(range(0, len(BIO_tag)-num_n_grams))

        if ' '.join(BIO_tag[i][0] for i in range(start_of_n_grams, start_of_n_grams + num_n_grams))  in no_entity:
          continue

        if BIO_tag[start_of_n_grams][1] == 'B-ADR' and ('O' not in [BIO_tag[i][1] for i in range(start_of_n_grams+1, start_of_n_grams + num_n_grams)] 
                                                   and 'B-ADR' not in [BIO_tag[j][1] for j in range(start_of_n_grams+1, start_of_n_grams + num_n_grams)]):
          continue
        
        temp = ""

        for j in range(start_of_n_grams, start_of_n_grams+num_n_grams):

          temp += BIO_tag[j][0] + " "

          if j == start_of_n_grams+num_n_grams-1:
            temp = temp.replace('\n', ' ')
            temp = temp.strip()
            no_entity.append(temp+" is not a named entity")
            Generated = True  
            break
          
        

    return no_entity


In [17]:
path = 'drive/MyDrive/Third-Year-Project/data/train_xml'# Path which contains xml files

train_path = 'drive/MyDrive/Third-Year-Project/data/train.csv'
dev_path = 'drive/MyDrive/Third-Year-Project/data/dev.csv'
test_path = 'drive/MyDrive/Third-Year-Project/data/test.csv'

count = 0

if os.path.exists(train_path):

    os.remove(train_path)

if os.path.exists(dev_path):

    os.remove(dev_path)

if os.path.exists(test_path):

    os.remove(test_path)


with open(train_path, 'w', newline='') as csvfile1:

  with open(dev_path, 'w', newline='') as csvfile2:

    with open(test_path, 'w', newline='') as csvfile3:


      sen_tem_train = []
      sen_tem_test = []
      sen_tem_valid = []

      for file in os.listdir(path):

          file_path = os.path.join(path, file)

          example = Data_generation(file_path)

          example.set_text_content_tags()

          example.data_preprocess()

          writer_train = csv.writer(csvfile1)
          writer_dev = csv.writer(csvfile2)
          writer_test = csv.writer(csvfile3)

          sen_tem = []         
          for key,value in example.template.items():

              # Get sentence that length <= 500 and >= 80.
              if len(value) != 0 and len(key[0]) <= 800 and len(key[0]) >= 80:

                  sen_tem.append((key,value))

                  

          temp_train_valid = random.sample(sen_tem,round(len(sen_tem)*0.9))
          temp_test = [i for i in sen_tem if i not in temp_train_valid]

          temp_train = random.sample(temp_train_valid, round(len(temp_train_valid)*0.75))
          temp_valid = [j for j in temp_train_valid if j not in temp_train]

          for content in temp_test:
            for i in range(len(content[1])):

              text = content[0][0].replace('\n', ' ')
              text = text.strip()

              sen_tem_test.append((text, content[1][i], example.BIO_tags[content[0]]))
              count += 1


          for content in temp_valid:
            for i in range(len(content[1])):

              text = content[0][0].replace('\n', ' ')
              text = text.strip()

              sen_tem_valid.append((text, content[1][i]))
              count += 1


          for content in temp_train:
            batch_set = []

            text = content[0][0].replace('\n', ' ')
            text = text.strip()

            for i in range(len(content[1])):
              batch_set.append((text,content[1][i]))
              count += 1
            
            # Generate nagative examples for traing set, with ratio: pos:neg = 1:1
            no_entity = example.no_entity_generation(content[0], len(content[1]))

            for i in range(len(no_entity)):
                batch_set.append((text,no_entity[i]))
                count += 1
            
            # disrupt the order of pos and neg examples
            random.shuffle(batch_set)

            for i in batch_set:
              sen_tem_train.append(i)


      for content in sen_tem_train:
          writer_train.writerow([content[0], content[1]])

      for content in sen_tem_valid:
          writer_dev.writerow([content[0], content[1]])

      for content in sen_tem_test:
          writer_test.writerow([content[0], content[1], content[2]])

      print(count, end = '\r')


df1 = pd.read_csv(train_path, header=None)
df2 = pd.read_csv(dev_path, header=None)
df3 = pd.read_csv(test_path, header=None)
print('Train data generated: ', len(df1))      
print('Dev data generated: ', len(df2))   
print('Test data generated: ', len(df3))  

print('Generated :' , count)


15236Train data generated:  12184
Dev data generated:  2129
Test data generated:  923
Generated : 15236
