In [1]:
import os
import re
import json
import shutil

In [2]:
import string

import nltk
from nltk.corpus import stopwords

nltk.download('stopwords')

[nltk_data] Error loading stopwords: <urlopen error [SSL:
[nltk_data]     CERTIFICATE_VERIFY_FAILED] certificate verify failed:
[nltk_data]     unable to get local issuer certificate (_ssl.c:1131)>


False

In [6]:
data_dir = "../data/annotated_json_data"

In [7]:
entity_to_acronyms = {
    'Activity': 'ACT',
    'Administration': 'ADM',
    'Age': 'AGE',
    'Area': 'ARA',
    'Biological_attribute': 'BAT',
    'Biological_structure': 'BST',
    'Clinical_event': 'CLE',
    'Color': 'COL',
    'Coreference': 'COR',
    'Date': 'DAT',
    'Detailed_description': 'DET',
    'Diagnostic_procedure': 'DIA',
    'Disease_disorder': 'DIS',
    'Distance': 'DIS',
    'Dosage': 'DOS',
    'Duration': 'DUR',
    'Family_history': 'FAM',
    'Frequency': 'FRE',
    'Height': 'HEI',
    'History': 'HIS',
    'Lab_value': 'LAB',
    'Mass': 'MAS',
    'Medication': 'MED',
    'Nonbiological_location': 'NBL',
    'Occupation': 'OCC',
    'Other_entity': 'OTH',
    'Other_event': 'OTE',
    'Outcome': 'OUT',
    'Personal_background': 'PER',
    'Qualitative_concept': 'QUC',
    'Quantitative_concept': 'QUC',
    'Severity': 'SEV',
    'Sex': 'SEX',
    'Shape': 'SHA',
    'Sign_symptom': 'SIG',
    'Subject': 'SUB',
    'Texture': 'TEX',
    'Therapeutic_procedure': 'THP',
    'Time': 'TIM',
    'Volume': 'VOL',
    'Weight': 'WEI'
}


acronyms_to_entities = {v: k for k, v in entity_to_acronyms.items()}

## Step 1 - Load Data

In [8]:
# Open the JSON file for reading
with open(os.path.join(data_dir, "annotated_data.json"), 'r') as f:

    # Load the JSON data into a dictionary
    data = json.load(f)

## Step 2 - Data Cleaning

In [70]:
def remove_trailing_punctuation(token):
    """
    Removes trailing punctuation from a token.

    Args:
        token (str): A string representing the token to be cleaned.

    Returns:
        str: The cleaned token with trailing punctuation removed.
    """
    while token and re.search(r'[^\w\s\']', token[-1]):
        token = token[:-1]
        
    return token

`split_text` function that takes in a text as input and returns three lists:

* tokens: a list of words (with trailing punctuation removed)
* start_end_ranges: a list of tuples representing the start and end indices of each word in the original text
* sentence_breaks: a list of indices indicating the positions in the tokens list where a new sentence begins.

The function first defines a regular expression pattern to match non-space and non-dash characters. It then initializes empty lists for tokens, start_end_ranges, and sentence_breaks.


The function then iterates over each sentence in the input text, finds the words in each sentence using regex matching, removes trailing punctuation from each word using another function, and calculates the start and end indices for each word.


The function updates the start and end indices to account for the sentence's position in the entire text, adds the indices and words to the respective lists, and appends the index of the last word in the sentence to the sentence_breaks list.


Finally, the function returns the three lists containing the processed data.

In [71]:
def split_text(text):

    regex_match = r'[^\s\u200a\-\u2010-\u2015\u2212\uff0d]+'  # r'[^\s\u200a\-\—\–]+'

    tokens = []
    start_end_ranges = []

    sentence_breaks = []

    start_idx = 0

    for sentence in text.split('\n'):
        words = [match.group(0) for match in re.finditer(regex_match, sentence)]
        processed_words = list(map(remove_trailing_punctuation, words))
        sentence_indices = [(match.start(), match.start() + len(token)) for match, token in
                            zip(re.finditer(regex_match, sentence), processed_words)]

        # Update the indices to account for the current sentence's position in the entire text
        sentence_indices = [(start_idx + start, start_idx + end) for start, end in sentence_indices]

        start_end_ranges.extend(sentence_indices)
        tokens.extend(processed_words)

        sentence_breaks.append(len(tokens))

        start_idx += len(sentence) + 1
    return tokens, start_end_ranges, sentence_breaks

In [72]:
for doc_id, doc in data.items():
    print(split_text(doc['text'][:100]))
    break

(['Our', '24', 'year', 'old', 'non', 'smoking', 'male', 'patient', 'presented', 'with', 'repeated', 'hemoptysis', 'in', 'May', '2008', 'with', '4', 'days'], [(0, 3), (4, 6), (7, 11), (12, 15), (16, 19), (20, 27), (28, 32), (33, 40), (41, 50), (51, 55), (56, 64), (65, 75), (76, 78), (79, 82), (83, 87), (88, 92), (93, 94), (95, 99)], [18])


## Step 3 - Convert to BIO format

`tag_token` function adds a tag label to a token at a given position in a sequence of tags, based on the position of the previous token and whether the current token has the same tag label as the previous token. It takes in a list of tag labels, a position in the list, and the tag label to add. If the current token is not the first in the sequence and the previous token has the same tag label as the current token, then the tag label is added as an "I-" tag. Otherwise, the tag label is added as a "B-" tag. The function modifies the original list and does not return any value.

In [83]:
def tag_token(tokens, tags, token_pos, entity):
    """
    Modifies a list of tags by adding a tag label to a token at a given position in the list, based on the position of the 
    previous token and whether the current token has the same tag label as the previous token.

    Args:
    - tokens (list): A list of tokens in a sequence.
    - tags (list): A list of tag labels corresponding to the tokens in a sequence.
    - token_pos (int): The position of the token to tag.
    - entity (str): The tag label to add to the token.

    Returns:
    - tags (list): The modified list of tag labels.
    """
    
    stop_words = stopwords.words('english')
    
    tag = entity_to_acronyms[entity]
    
    if token_pos > 0 and f'{tag}' in tags[token_pos - 1]:        
            tags[token_pos] = f'I-{tag}'
    elif tokens[token_pos] not in stop_words:
            tags[token_pos] = f'B-{tag}'
            
    return tags


In [84]:
def write_bio_files(output_file_path, tokens, tags, sentence_breaks):

    # Write the tags to a .bio file
    with open(output_file_path, 'w') as f:
        for i in range(len(tokens)):
            token = tokens[i].strip()
            if token:
                if i in sentence_breaks:
                    f.write("\n")
                f.write(f"{tokens[i]}\t{tags[i]}\n")


In [88]:
def convert_ann_to_bio(data, output_dir, filtered_entities=[]):
    
    """
    Convert annotations from a dictionary of text files to a BIO-tagged sequence.

    Args:
        data (dict): A dictionary of text files where keys are file IDs and values are dictionaries containing 'text' and
            'annotations' keys.
        filtered_entities (list): A list of entity labels to include. If provided, only annotations with these labels will
            be converted to the BIO format. Defaults to an empty list.

    Returns:
        A tuple of two lists: tokens and tags.
        - tokens (list): A list of tokens in a sequence.
        - tags (list): A list of corresponding tags for each token in the sequence. Tags are BIO formatted.

    """
    
    if os.path.exists(output_dir):
        # Delete the contents of the directory
        shutil.rmtree(output_dir)
    # Recreate the directory
    os.makedirs(output_dir)
    
    
    for file_id in data:
        text = data[file_id]['text']
        annotations = data[file_id]['annotations']
        
        # Tokenizing
        tokens, token2text, sentence_breaks = split_text(text)

        # Initialize the tags
        tags = ['O'] * len(tokens)

        ann_pos = 0
        token_pos = 0

        while ann_pos < len(annotations) and token_pos < len(tokens):

            label = annotations[ann_pos]['label']
            start = annotations[ann_pos]['start']
            end = annotations[ann_pos]['end']

            if filtered_entities:
                if label not in filtered_entities:
                    # increment to access next annotation
                    ann_pos += 1
                    continue
            
            ann_word = text[start:end]

            # find the next word that fall between the annotation start and end
            while token_pos < len(tokens) and token2text[token_pos][0] < start:
                
                token_pos += 1

            if tokens[token_pos] == ann_word or \
                ann_word in tokens[token_pos] or \
                re.sub(r'\W+', '', ann_word) in re.sub(r'\W+', '', tokens[token_pos]):
                tag_token(tokens, tags, token_pos, label)
            elif ann_word in tokens[token_pos - 1] or \
                ann_word in tokens[token_pos - 1] or \
                re.sub(r'\W+', '', ann_word) in re.sub(r'\W+', '', tokens[token_pos - 1]):
                tag_token(tokens, tags, token_pos - 1, label)
            else:
                print(tokens[token_pos], tokens[token_pos - 1], ann_word, label)

            # increment to access next annotation
            ann_pos += 1

        # write to bio file
        write_bio_files(os.path.join(output_dir, f"{file_id}.bio"), tokens, tags, sentence_breaks)
    print("Conversion complete")

In [90]:
convert_ann_to_bio(data, data_dir)

Conversion complete
