In [None]:
from google.colab import drive
drive.mount('/content/drive')
import os
os.chdir("/content/drive/MyDrive/KLTN/NER-medical-text/")

Mounted at /content/drive


In [None]:
import os
import re
import json
import shutil

In [None]:
import string

import nltk
from nltk.corpus import stopwords

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [None]:
data_dir = "data/annotated_json_data"

In [None]:
entity_to_acronyms = {
    'Activity': 'ACT',
    'Administration': 'ADM',
    'Age': 'AGE',
    'Area': 'ARA',
    'Biological_attribute': 'BAT',
    'Biological_structure': 'BST',
    'Clinical_event': 'CLE',
    'Color': 'COL',
    'Coreference': 'COR',
    'Date': 'DAT',
    'Detailed_description': 'DET',
    'Diagnostic_procedure': 'DIA',
    'Disease_disorder': 'DIS',
    'Distance': 'DIS',
    'Dosage': 'DOS',
    'Duration': 'DUR',
    'Family_history': 'FAM',
    'Frequency': 'FRE',
    'Height': 'HEI',
    'History': 'HIS',
    'Lab_value': 'LAB',
    'Mass': 'MAS',
    'Medication': 'MED',
    'Nonbiological_location': 'NBL',
    'Occupation': 'OCC',
    'Other_entity': 'OTH',
    'Other_event': 'OTE',
    'Outcome': 'OUT',
    'Personal_background': 'PER',
    'Qualitative_concept': 'QUC',
    'Quantitative_concept': 'QUC',
    'Severity': 'SEV',
    'Sex': 'SEX',
    'Shape': 'SHA',
    'Sign_symptom': 'SIG',
    'Subject': 'SUB',
    'Texture': 'TEX',
    'Therapeutic_procedure': 'THP',
    'Time': 'TIM',
    'Volume': 'VOL',
    'Weight': 'WEI'
}


acronyms_to_entities = {v: k for k, v in entity_to_acronyms.items()}

## Load Data

In [None]:
# Open the JSON file for reading
with open(os.path.join(data_dir, "annotated_data.json"), 'r') as f:

    # Load the JSON data into a dictionary
    data = json.load(f)

## Data Cleaning

In [None]:
def remove_trailing_punctuation(token):
    """
    Removes trailing punctuation from a token.

    Args:
        token (str): A string representing the token to be cleaned.

    Returns:
        str: The cleaned token with trailing punctuation removed.
    """
    while token and re.search(r'[^\w\s\']', token[-1]):
        token = token[:-1]

    return token

In [None]:
def split_text(text):

    regex_match = r'[^\s\u200a\-\u2010-\u2015\u2212\uff0d]+'  # r'[^\s\u200a\-\—\–]+'

    tokens = []
    start_end_ranges = []

    sentence_breaks = []

    start_idx = 0

    for sentence in text.split('\n'):
        words = [match.group(0) for match in re.finditer(regex_match, sentence)]
        processed_words = list(map(remove_trailing_punctuation, words))
        sentence_indices = [(match.start(), match.start() + len(token)) for match, token in
                            zip(re.finditer(regex_match, sentence), processed_words)]

        # Update the indices to account for the current sentence's position in the entire text
        sentence_indices = [(start_idx + start, start_idx + end) for start, end in sentence_indices]

        start_end_ranges.extend(sentence_indices)
        tokens.extend(processed_words)

        sentence_breaks.append(len(tokens))

        start_idx += len(sentence) + 1
    return tokens, start_end_ranges, sentence_breaks

In [None]:
for doc_id, doc in data.items():
    print(split_text(doc['text'][:100]))
    break

(['CASE', 'A', '28', 'year', 'old', 'previously', 'healthy', 'man', 'presented', 'with', 'a', '6', 'week', 'history', 'of', 'palpitations', 'The', 'symp'], [(0, 4), (6, 7), (8, 10), (11, 15), (16, 19), (20, 30), (31, 38), (39, 42), (43, 52), (53, 57), (58, 59), (60, 61), (62, 66), (67, 74), (75, 77), (78, 90), (92, 95), (96, 100)], [16, 18])


## Convert to BIO format

In [None]:
def tag_token(tokens, tags, token_pos, entity):

    stop_words = stopwords.words('english')

    tag = entity_to_acronyms[entity]

    if token_pos > 0 and f'{tag}' in tags[token_pos - 1]:
            tags[token_pos] = f'I-{tag}'
    elif tokens[token_pos] not in stop_words:
            tags[token_pos] = f'B-{tag}'

    return tags


In [None]:
def write_bio_files(output_file_path, tokens, tags, sentence_breaks):

    # Write the tags to a .bio file
    with open(output_file_path, 'w') as f:
        for i in range(len(tokens)):
            token = tokens[i].strip()
            if token:
                if i in sentence_breaks:
                    f.write("\n")
                f.write(f"{tokens[i]}\t{tags[i]}\n")


In [None]:
def convert_ann_to_bio(data, output_dir, filtered_entities=[]):

    if os.path.exists(output_dir):
        # Delete the contents of the directory
        shutil.rmtree(output_dir)
    # Recreate the directory
    os.makedirs(output_dir)


    for file_id in data:
        text = data[file_id]['text']
        annotations = data[file_id]['annotations']

        # Tokenizing
        tokens, token2text, sentence_breaks = split_text(text)

        # Initialize the tags
        tags = ['O'] * len(tokens)

        ann_pos = 0
        token_pos = 0

        while ann_pos < len(annotations) and token_pos < len(tokens):

            label = annotations[ann_pos]['label']
            start = annotations[ann_pos]['start']
            end = annotations[ann_pos]['end']

            if filtered_entities:
                if label not in filtered_entities:
                    # increment to access next annotation
                    ann_pos += 1
                    continue

            ann_word = text[start:end]

            # find the next word that fall between the annotation start and end
            while token_pos < len(tokens) and token2text[token_pos][0] < start:

                token_pos += 1

            if tokens[token_pos] == ann_word or \
                ann_word in tokens[token_pos] or \
                re.sub(r'\W+', '', ann_word) in re.sub(r'\W+', '', tokens[token_pos]):
                tag_token(tokens, tags, token_pos, label)
            elif ann_word in tokens[token_pos - 1] or \
                ann_word in tokens[token_pos - 1] or \
                re.sub(r'\W+', '', ann_word) in re.sub(r'\W+', '', tokens[token_pos - 1]):
                tag_token(tokens, tags, token_pos - 1, label)
            else:
                print(tokens[token_pos], tokens[token_pos - 1], ann_word, label)

            # increment to access next annotation
            ann_pos += 1

        # write to bio file
        write_bio_files(os.path.join(output_dir, f"{file_id}.bio"), tokens, tags, sentence_breaks)
    print("Conversion complete")

In [None]:
convert_ann_to_bio(data, data_dir)

Conversion complete
