# N2c2 data preprocessing

In [None]:
import os
import json
import re
import tiktoken
import math

In [None]:
def create_pairs(directory):
    '''
    Text and annotation files are separate from each other. 
    This function creates filename pairs to ensure each text has a corresponding annotation file.
    '''
    pairs = []
    
    for filename in os.listdir(directory):
        
        f = os.path.join(directory, filename)
        
        if os.path.isfile(f):
            if f.endswith(".txt"):
                
                name = filename[:-4]
                
                if os.path.isfile(directory+name+".ann"):
                    pairs.append((filename, name+".ann"))

    
    return pairs

In [None]:
# Shortening the anonymising tokens using regex patterns
def clean_text(text):
    
    text = text.strip()
    text = text.replace("\n", " ")
    text = text.replace("\t", "")
    text = text.replace(". .", ".")

    # The patterns are removed as they may reveal information about the data
    pattern_date = r''
    pattern_date2 = r''
    pattern_date3 = r''
    pattern_date4 = r''
    pattern_year = r''
    pattern_name = r''
    pattern_hospital = r''
    pattern_location = r''
    pattern_contact = r''
    pattern_space = r' +'
    pattern_repeat = r'(.)\1{3,}'

    text = re.sub(pattern_date, "*DATE*", text)
    text = re.sub(pattern_date2, "*DATE*", text)
    text = re.sub(pattern_date3, "*DATE*", text)
    text = re.sub(pattern_date4, "*DATE*", text)
    text = re.sub(pattern_name, "*NAME*", text)
    text = re.sub(pattern_hospital, "*HOSPITAL*", text)
    text = re.sub(pattern_location, "*LOC*", text)
    text = re.sub(pattern_contact, "*CONTACT*", text)
    text = re.sub(pattern_year, "*YEAR*", text)
    text = re.sub(pattern_space, " ", text)
    text = re.sub(pattern_repeat, "", text)

    # Two or more consecutive tags are substituted by one
    text = re.sub(r'(\*DATE\* )\1{1,}', "*DATE* ", text)
    text = re.sub(r'(\*NAME\* )\1{1,}', "*NAME* ", text)
    text = re.sub(r'(\*LOC\* )\1{1,}', "*LOC* ", text)
    text = re.sub(r'(\*HOSPITAL\* )\1{1,}', "*HOSPITAL* ", text)
    text = re.sub(r'(\*CONTACT\* )\1{1,}', "*CONTACT* ", text)
    text = text.replace("[**", "*")
    text = text.replace("**]", "*")

    return text

In [None]:
def get_annotations(f):
    '''
    Function for formatting the annotations into a JSON-format (label + beginning and end index).
    '''
    
    data = {}
    data_index = {}
    lines = f.split("\n")
    
    for line in lines:
        # The line contains annotation
        if line.startswith("T"):
            
            parts = line.split("\t")
            info = parts[1].split(" ") #indexes
            
            tag = info[0] #entity label
            value = parts[2] # entity

            if len(info) > 3:
                split = info[2].split(";")
                if split[0] == split[1]:
                    index = int(split[0]) - int(info[1])
                    value = value[:index] + value[index+1:]
                    
            value = clean_text(value)

            # Adding to dictionaries
            if tag in data.keys():
                data[tag].append(value)
                data_label_index_tup = (info[1], info[-1])
                data_index[tag].append(data_label_index_tup)
            else:
                data[tag] = [value]
                data_label_index_tup = (info[1], info[-1])
                data_index[tag] = [data_label_index_tup]
    
    # Empty lists for entites not found in text         
    for t in tags:
        if t not in data.keys():
            data[t] = []
            data_index[t] = []
    
    return data, data_index

In [None]:
def format_messages(text, ann, ann_i):
    '''
    Function for formatting message examples.
    Annotations by index and full text are included for testing and have to be removed when creating the fine-tuning file.
    '''
    messages = []

    # System prompt
    sys = {}
    sys["role"] = "system"
    sys["content"] = system_content

    # Model prompt and text
    user = {}
    user["role"] = "user"
    user["content"] = instruction + clean_text(text)

    # Annotations
    ass = {}
    ass["role"] = "assistant"
    ass["content"] = ann

    # Annotations by index
    ann_index = {}
    ann_index["role"] = "annotations"
    ann_index["content"] = ann_i

    # Text without changes so the annotation indexes match the text
    full_text = {}
    full_text["role"] = "full_user"
    full_text["content"] = text

    messages.append(sys)
    messages.append(user)
    messages.append(ass)
    messages.append(ann_index)
    messages.append(full_text)

    example = {}
    example["messages"] = messages

    return example

In [None]:
def create_example(pair):
    '''
    Function for creating an example of a data file pair.
    '''
    ann_file = pair[1]
    text_file = pair[0]
    
    with open(directory+text_file) as file0:
        text = file0.read()
    
    with open(directory+ann_file) as file1:
        f = file1.read()
        ann, ann_i = get_annotations(f)
    
    example = format_messages(text, ann, ann_i)

    return example

In [None]:
def example_within_limit(example):
    '''
    Function for ensuring that the example is within models context limit.
    '''
    messages = example["messages"]

    total_tokens = 0
    for message in messages:
        if message["role"] != "annotations" and message["role"] != "full_user": # Excluded as they are not used in fine-tuning
            total_tokens += len(encoding.encode(str(message["content"])))

    if total_tokens <= token_limit:
        return True

    return False

In [None]:
# Kui mõni märgendus peaks jääma täpselt kahe teksti vahele, võtan selle alguse eelmise teksti lõpppunktiks ning alustan uut näidet selle algusest
def split_example_into_parts(pair, num_parts):

    '''
    Function that splits text and annotations into multiple parts (num_parts).

    The annotations are in a random order, therefore, the whole file has to be parsed for each part.
    '''
    
    examples_after_split = []

    with open(directory+pair[1]) as file1:
        ann_file = file1.read()

    with open(directory+pair[0]) as file0:
        text_file = file0.read()

    # The maximum length of text parts
    len_texts = math.ceil(len(text_file) / num_parts)
    
    beginning = 0
    stuck_in_the_middle_b = 0

    lines = ann_file.split("\n")
    
    for i in range(num_parts):
        
        end = beginning + len_texts
        data = {}
        data_index = {}
        middle_of_tag = False

        for line in lines:
            if line.startswith("T"):
                
                parts = line.split("\t")
                info = parts[1].split(" ")

                tag = info[0] # entity label
                b = int(info[1]) # entity beginning index
                e = int(info[-1])  # entity end index

                value = parts[2]
    
                if len(info) > 3:
                    split = info[2].split(";")
                    if split[0] == split[1]:
                        index = int(split[0]) - int(info[1])
                        value = value[:index] + value[index+1:]

                value = clean_text(value)
                
                # Entity is in the current text part
                if b >= beginning and e <= end:
                    if tag in data.keys():
                        data[tag].append(value)
                        data_label_index_tup = (b-beginning, e-beginning)
                        data_index[tag].append(data_label_index_tup)
                    else:
                        data[tag] = [value]
                        data_label_index_tup = (b-beginning, e-beginning)
                        data_index[tag] = [data_label_index_tup]
                        
                # An entity would be split between two parts, so the enity is moved to the second part
                elif end > b and b >= beginning and e > end:
                    stuck_in_the_middle_b = b
                    middle_of_tag = True
                    
        for t in tags:
            if t not in data.keys():
                data[t] = []
                data_index[t] = []
        
        if middle_of_tag:
            text = text_file[beginning:stuck_in_the_middle_b]
            beginning = stuck_in_the_middle_b
        else:
            text = text_file[beginning:end]
            beginning = end

        example = format_messages(text, data, data_index)
        
        if example_within_limit(example):
            if no_mistakes_in_example(example):
                examples_after_split.append(example)
    
    return examples_after_split

In [None]:
def calculate_split_number(example):
    '''
    Function for calculating the number of parts a text should be split into.
    I ensure that each text part, along with all its annotations, stays within the limit, as all entities might be contained within a single part.
    '''
    
    text_tokens = len(encoding.encode(str(example["messages"][1]["content"]))) - len(encoding.encode(instruction))
    tag_tokens = len(encoding.encode(str(example["messages"][2]["content"]))) - len(encoding.encode(assistant_base))

    num_parts = math.ceil(text_tokens / available_tokens) if math.ceil(text_tokens / available_tokens) > 2 else 2
    
    in_limit = False
    
    while not in_limit:
        
        split_text_tokens = math.floor(text_tokens/num_parts)
        new_example_tokens = split_text_tokens + tag_tokens
        
        if new_example_tokens <= available_tokens:
            in_limit = True
        else: 
            num_parts += 1
            
    return num_parts

In [None]:
def no_mistakes_in_example(example):
    '''
    Function for ensuring that all entites are present in corresponding text.
    '''
    all_tags_in_text = True

    messages = example["messages"]
    text = messages[1]["content"]
    ann = messages[2]["content"]

    for key, value in ann.items():
        for item in value:
            item = item.strip()
            if item not in text:
                all_tags_in_text = False
        
    return all_tags_in_text

In [None]:
def create_examples_in_limit(pairs):
    '''
    Function that creates a formatted example and splits the examples that are over the context limit.
    '''
    
    examples = []
    
    for pair in pairs:
        example = create_example(pair)
        
        if example_within_limit(example):
            if no_mistakes_in_example(example):
                	examples.append(example)
        else:
            num_parts = calculate_split_number(example)  
            examples += split_example_into_parts(pair, num_parts)
    
    return examples

In [None]:
def write_examples_to_file(filename, examples):
    '''
    Function for writing examples into a JSONL-file.
    '''
    with open(filename, "w") as file:
        for example in examples:
            json.dump(example, file)
            file.write('\n')
    
    files.append(filename)

# Data formatting and saving

In [None]:
path = ""
folder = ""

# Path to original data
directory = path + folder

# Test-train/val split
test_size = 0.15

token_limit = 4063
encoding = tiktoken.get_encoding("cl100k_base")

tags = ["Drug", "Strength", "Dosage", "Duration", "Frequency", "Form", "Route", "Reason", "ADE"]

system_content = "This model extracts entities from text, returning JSON-formatted output for tags " + ", ".join(tags) + "."
instruction = "Extract entities " + ", ".join(tags) + " from the following text and return the output in JSON format. "
assistant_base = "{" + ":[], ".join(tags) + ":[]}"

available_tokens = token_limit - (len(encoding.encode(system_content)) + len(encoding.encode(instruction)) + len(encoding.encode(assistant_base)))

files = []

In [None]:
# Text and annotation file pairs
pairs = create_pairs(directory)

In [None]:
# Test and train split
test_train_split = round(len(pairs) * test_size)

train_pairs = pairs[test_train_split:]
test_pairs = pairs[:test_train_split]

In [None]:
# Creating the examples
train_data = create_examples_in_limit(train_pairs)
test_data = create_examples_in_limit(test_pairs)

In [None]:
# Reformatting the test examples to match synthetic data examples

full_test_data = []

for example in test_data:
    fixed_example = {}
    parts = example["messages"]

    new_messages = []
    new_messages.append(parts[0])
    user_text = parts[-1]
    user_text["role"] = "user"
    new_messages.append(user_text)
    new_messages.append(parts[2])
    new_messages.append(parts[3])

    fixed_example["messages"] = new_messages

    full_test_data.append(fixed_example)

In [None]:
# Writing the examples into a files

write_examples_to_file("", train_data)
write_examples_to_file("", full_test_data)