### Raw Data Processing

In [1]:
# import libraries

import os
import io
import re
import csv
from itertools import chain
from string import punctuation
from time import time

#### Helper Functions

In [2]:
def process_ann(ann_file):
    """Helper function that reads a .ann file,
       strips out newline characters, splits the tab-delimited entries,
       and extracts information for labeling corresponding .txt file
       
       Input:
       ann_file = tab-delimited brat annotation file with the following format
                  NER: [entity_ID]\t[label start_offset end_offset]\t[entity]
                  RE:  [relation_ID]\t[relation_type argument1 argument2]
       
       Outputs:
       cleaned_offsets = list of tuples for labeling corresponding .txt file
                         format: (offset, label, entity ID)
       corrections = dictionary of entity ID mappings for overlapping offsets"""
    
    with io.open(ann_file, 'r', encoding='utf-8', errors='ignore') as text:
        ann = [x.strip().split('\t') for x in text.readlines() if x.strip().split('\t')[0][0] == 'T']
    
    offsets = []
    
    for x in ann:
        entity_id = x[0]
        start = int(x[1].split()[1])
        end = int(x[1].split()[2])
        label = x[1].split()[0]
        
        offsets.append((start, 'S', label, entity_id))
        offsets.append((end, 'E', label, entity_id))
    
    sorted_offsets = sorted(offsets, key=lambda x:x[0])
    
    cleaned_offsets = []
    corrections = {}
    
    hold = None
    indicator = None
    
    for tup in sorted_offsets:
        
        if indicator == 'S':
            if tup[1] == 'E':
                cleaned_offsets.append(hold)
                hold = (tup[0], 'O', 'X')
                indicator = tup[1]
            elif tup[1] == 'S':
                corrections.update({tup[3]:hold[2]})
                indicator = '*'
        
        elif indicator == 'E':
            cleaned_offsets.append(hold)
            hold = (tup[0], tup[2], tup[3])
            indicator = tup[1]
        
        elif indicator == '*':
            indicator = 'S'

        else:
            hold = (tup[0], tup[2], tup[3])
            indicator = tup[1]
            
    cleaned_offsets.append(hold)
    
    return cleaned_offsets, corrections

In [3]:
def process_txt(txt_file, offsets):
    """Helper function that reads in a .txt file as one string,
       and labels it with BIO tags based on cleaned offsets 
       from its .ann file
       
       Inputs:
       txt_file = file that contains all the patent text
                  considered as one sentence in this task
       offsets = list of tuples for labeling corresponding .txt file
                 format: (offset, label, entity ID)
       
       Output:
       ann_sentence = patent sentence with annotations for each entity"""
    
    with io.open(txt_file, 'r', encoding='utf-8', errors='ignore') as text:
        full_text = text.read()
    
    start = 0
    end = offsets[0][0]
    label = 'O'
    entity_id = 'X'
    
    sentence = [[x.strip(punctuation), label, entity_id] 
                for x in full_text[:end].replace('\n', ' ').split(' ')
                if x.strip(punctuation)]
    
    for i in range(len(offsets)):
        start = offsets[i][0]
        label = offsets[i][1]
        entity_id = offsets[i][2]
        
        if i < len(offsets) - 1:
            end = offsets[i+1][0]
            
            if label == 'O':            
                terms = [[x.strip(punctuation), label, entity_id] 
                         for x in full_text[start:end].replace('\n', ' ').split(' ') 
                         if x.strip(punctuation)]
                sentence.extend(terms)
            
            else:
                terms = [x 
                         for x in full_text[start:end].replace('\n', ' ').split(' ') 
                         if x]
                bio_terms = [[terms[0], 'B-' + label, entity_id]]
                if len(terms) > 1:
                    bio_terms.extend([[terms[i], 'I-' + label, entity_id] for i in range(1, len(terms))])
                sentence.extend(bio_terms)
        
        else:
            terms = [[x.strip(punctuation), label, entity_id] 
                     for x in full_text[start:].replace('\n', ' ').split(' ') 
                     if x.strip(punctuation)]  
            sentence.extend(terms)
    
    return sentence

In [4]:
def generate_ner_files(filepaths, output_path):
    """Helper function that reads .txt and corresponding .ann files from a path
       and generates csv file for use in NER task
       
       Inputs:
       filepaths = filepaths (folder + filename, but no extension) for .txt and .ann files
       output_path = filepath (folder + filename, but no extension) for output file"""
    
    start = time()
    
    sentences = []
    corrections = []
    
    for file in filepaths:
        
        sentences.append([f'SENTENCE: {file[-4:]}'])
        corrections.append([f'SENTENCE: {file[-4:]}'])
        
        cleaned_offsets, file_corrections = process_ann(f'{file}.ann')
        corrections.append(file_corrections)
        
        ann_sentence = process_txt(f'{file}.txt', cleaned_offsets)
        sentences.extend(ann_sentence)
        
        #sentences.append([f'COUNT: {len(ann_sentence)}'])
    
    with open(f'{output_path}.csv', 'w') as f:
        writer = csv.writer(f, delimiter='\t')
        writer.writerows(sentences)
    
    with open(f'{output_path}_corrections.csv', 'w') as f:
        writer = csv.writer(f)
        writer.writerows(corrections)
    
    end = time() - start
    print(f'Finished in {end:.3f} seconds')

#### Test the Helper Functions

In [5]:
test_path = 'raw_data/EE/ee_train/1029'

In [6]:
with io.open(f'{test_path}.txt', 'r', encoding='utf-8', errors='ignore') as text:
    full_text = text.read()
full_text

'Example 5: Synthesis of racemic 3-aminoquinuclidine.mono-p-toluenesulfonate\n2-propanol solution (12.12 g) containing racemic 3-aminoquinuclidine (3.02 g: 23.9 mmol) was charged into a 50 mL-flask equipped with a mechanical stirrer, a thermometer, and a cooling line, and 2-propanol solution (19.48 g) containing p-toluenesulfonic acid monohydrate (4.53 g: 23.8 mmol) was added thereto while stirring at room temperature. Precipitated crystals were filtered, and then dried under reduced pressure at 60° C., to obtain racemic 3-aminoquinuclidine.mono-p-toluenesulfonate (6.76 g: 22.7 mmol, yield=95%) as a white crystal.\nEndothermic peak top temperature in DSC: 198° C.\nElemental analysis: C7H14N2.C7H8O3S\nTheoretical value: C, 56.35%, H, 7.43%, N, 9.39%, S 10.74%\nActual measured value: C, 56.4%, H, 7.5%, N, 9.3%, S 10.2%'

In [7]:
with io.open(f'{test_path}.ann', 'r', encoding='utf-8', errors='ignore') as text:
    ann = [x.strip().split('\t') for x in text.readlines()] #if x.strip().split('\t')[0][0] == 'T']
ann[:5]

[['T0', 'EXAMPLE_LABEL 8 9', '5'],
 ['T1', 'STARTING_MATERIAL 312 346', 'p-toluenesulfonic acid monohydrate'],
 ['T2',
  'REACTION_PRODUCT 517 568',
  'racemic 3-aminoquinuclidine.mono-p-toluenesulfonate'],
 ['T3', 'TEMPERATURE 499 504', '60° C'],
 ['T12', 'REACTION_STEP 169 176', 'charged']]

In [8]:
test_offsets, test_corrections = process_ann(f'{test_path}.ann')

In [9]:
test_offsets[:5]

[(8, 'EXAMPLE_LABEL', 'T0'),
 (9, 'O', 'X'),
 (24, 'REACTION_PRODUCT', 'T10'),
 (75, 'O', 'X'),
 (76, 'SOLVENT', 'T7')]

In [10]:
test_corrections

{}

In [11]:
trial_sentence = process_txt(f'{test_path}.txt', test_offsets)

In [12]:
trial_sentence[:5]

[['Example', 'O', 'X'],
 ['5', 'B-EXAMPLE_LABEL', 'T0'],
 ['Synthesis', 'O', 'X'],
 ['of', 'O', 'X'],
 ['racemic', 'B-REACTION_PRODUCT', 'T10']]

In [13]:
generate_ner_files([test_path], 'raw_data/test')
with io.open('raw_data/test.csv', 'r', encoding='utf-8', errors='ignore') as sample:
    output = sample.readlines()
output[:5]

Finished in 0.087 seconds


['SENTENCE: 1029\n',
 'Example\tO\tX\n',
 '5\tB-EXAMPLE_LABEL\tT0\n',
 'Synthesis\tO\tX\n',
 'of\tO\tX\n']

#### Process the Raw Data

In [108]:
# generate filename list for train, dev, and test sets
path_train = 'raw_data/EE/ee_train'
filenames_train = list({x[:4] for x in os.listdir(path_train) if x[0] != '.'})
print(f'Number of train files: {len(filenames_train)}')

path_dev = 'raw_data/EE/ee_dev'
filenames_dev = list({x[:4] for x in os.listdir(path_dev) if x[0] != '.'})
print(f'Number of dev files: {len(filenames_dev)}')

path_test = 'raw_data/EE/ee_test'
filenames_test = list({x[:4] for x in os.listdir(path_test) if x[0] != '.'})
print(f'Number of test files: {len(filenames_test)}')

path_test_ann = 'raw_data/EE/ee_test_ann'
filenames_test_ann = list({x[:4] for x in os.listdir(path_test_ann) if x[0] != '.'})
print(f'Number of test .ann files: {len(filenames_test_ann)}')

Number of train files: 900
Number of dev files: 225
Number of test files: 9999
Number of test .ann files: 375


In [109]:
# check how many test .txt files match the .ann files
intersect = list(set(filenames_test) & set(filenames_test_ann))
len(intersect)

375

In [174]:
# generate train set
filepath_train = [f'{path_train}/{x}' for x in filenames_train]
# print(len(filepath_train))
# filepath_train[:5]

output_train = 'data/ee_ner_train'
generate_ner_files(filepath_train, output_train)

with io.open(f'{output_train}.csv', 'r', encoding='utf-8', errors='ignore') as sample:
    output = sample.readlines()
check = [1 for x in output if x[:9] == 'SENTENCE:']
print(f'Number of train sentences: {len(check)}')

Finished in 7.175 seconds
Number of train sentences: 900


In [175]:
# generate train set
filepath_dev = [f'{path_dev}/{x}' for x in filenames_dev]

output_dev = 'data/ee_ner_dev'
generate_ner_files(filepath_dev, output_dev)

with io.open(f'{output_dev}.csv', 'r', encoding='utf-8', errors='ignore') as sample:
    output = sample.readlines()
check = [1 for x in output if x[:9] == 'SENTENCE:']
print(f'Number of train sentences: {len(check)}')

Finished in 5.112 seconds
Number of train sentences: 225


In [176]:
# generate train set
filepath_test = [f'{path_test}/{x}' for x in intersect]

output_test = 'data/ee_ner_test'
generate_ner_files(filepath_test, output_test)

with io.open(f'{output_test}.csv', 'r', encoding='utf-8', errors='ignore') as sample:
    output = sample.readlines()
check = [1 for x in output if x[:9] == 'SENTENCE:']
print(f'Number of train sentences: {len(check)}')

Finished in 8.986 seconds
Number of train sentences: 375
