In [None]:
import pandas as pd
import os

def read_text_file(file_path):
    df = pd.DataFrame(columns=['id', 'text'])

    for file in os.listdir(file_path):
        if file.endswith('.txt'):
            with open(os.path.join(file_path, file), 'r', encoding='utf-8') as f:
                text = f.read().strip()
                id = file.split('.')[0]
                df = pd.concat([df, pd.DataFrame({'id': [id], 'text': [text]})], ignore_index=True)

    print(f"Readed {len(df)} text files.")
    print(f"Length unique textId: {len(df['id'].unique().tolist())}")


    dfAnn = pd.DataFrame(columns=['textId', 'category', 'start', 'end', 'annText'])

    for file in os.listdir(file_path):
        if file.endswith('.ann'):
            textName = file.replace('.ann', '')
            with open(os.path.join(file_path, file), 'r', encoding='utf-8') as f:
                lines = f.readlines()
                for line in lines:
                    if line.startswith('T'):
                        parts = line.strip().split('\t')
                        if len(parts) == 3:
                            id, category, text = parts
                            start, end = map(int, category.split()[1:3])
                            dfAnn = pd.concat([dfAnn, pd.DataFrame({'textId': [textName], 'category': [category.split()[0]], 'start': [start], 'end': [end], 'annText': [text]})], ignore_index=True)
            
    print(f"Readed {len(dfAnn)} annotations.")
    print(f"Length unique textId: {len(dfAnn['textId'].unique().tolist())}")

    #Add the whole text to the dfAnn
    dfAnn = dfAnn.merge(df, left_on='textId', right_on='id', how='left')
    
    #If there are text in df that are not in dfAnn, add them
    #dfAnn = pd.concat([dfAnn, df[~df['id'].isin(dfAnn['textId'])]], ignore_index=True)

    print(f"After merging, dfAnn has {len(dfAnn)} rows.")

    #Remove the id column from dfAnn
    dfAnn = dfAnn.drop(columns=['id'])

    #Sanity check: Check that for all the mentions the start and end are within the text length
    for index, row in dfAnn.iterrows():
        if row['start'] < 0 or row['end'] > len(str(row['text'])):
            raise ValueError(f"Start or end index out of bounds for textId {row['textId']} at index {index}.")
        
    assert all(dfAnn['start'] >= 0) and all(dfAnn['end'] <= dfAnn['text'].str.len()), "Start or end indices are out of bounds."

    #Sanity check: check for all the mentions that the annText is in the position defined by start and end in the text
    for index, row in dfAnn.iterrows():
        if row['start'] < 0 or row['end'] > len(row['text']):
            raise ValueError(f"Start or end index out of bounds for textId {row['textId']} at index {index}.")
        if row['text'][row['start']:row['end']] != row['annText']:
            #print(f"Mismatch for textId {row['textId']} at index {index}: expected '{row['annText']}', found '{row['text'][row['start']:row['end']]}'.")

            #1. Find the occurrences in the near text (10 characters before and after the start index)
            textToLook = row['text'][max(0, row['start'] - 10):min(len(row['text']), row['end'] + 10)]
            #print(f"Looking for '{row['annText']}' in textId {row['textId']} at index {index}. Text snippet: '{textToLook}'")
            start_index = textToLook.find(row['annText'])

            #2. If the start index is not -1, and the difference between the found start index and the original start index is <= 2, adjust the start and end indices
            if start_index != -1 and abs(start_index - (row['start'] - max(0, row['start'] - 10))) <= 2:
                dfAnn.at[index, 'start'] = max(0, row['start'] - 10) + start_index
                dfAnn.at[index, 'end'] = dfAnn.at[index, 'start'] + len(row['annText'])
            
                #print(f"Adjusted indices for textId {row['textId']} at index {index}: new start {dfAnn.at[index, 'start']}, new end {dfAnn.at[index, 'end']}.")
            
            else:
                print(f"Could not adjust indices for textId {row['textId']} at index {index}. Original start {row['start']}, end {row['end']}, annText '{row['annText']}' not found in text.")
                print(f"Start: {row['start']}, End: {row['end']}")
            #print("------------------------------------------------------")

    assert all(dfAnn.apply(lambda row: row['text'][row['start']:row['end']] == row['annText'], axis=1)), "AnnText does not match the text at the specified start and end indices."

    print(f"Length unique textId: {len(dfAnn['textId'].unique().tolist())}")
    print()
    return dfAnn, df

pathTrain = '../PharmacoNER/Data/train/subtrack1'
dfAnnTrain, dfTextsTrain = read_text_file(pathTrain)

pathTest = '../PharmacoNER/Data/test/subtrack1'
pathTest = '../PharmacoNER/pharmaconer/test-set_1.1/test/subtrack1'
dfAnnTest, dfTextsTest = read_text_file(pathTest)

def getExampleAnnotated(id, df, format='html'):
    if type(id) == int:
        textToAnnotate = df.iloc[id]['text']
        textId = df.iloc[id]['textId']
        annotations = df[df['textId'] == textId]
    elif type(id) == str:
        textToAnnotate = df[df['textId'] == id]['text'].values[0]
        annotations = df[df['textId'] == id]
    else:
        raise ValueError("id must be an integer or a string.")
    
    if len(annotations) == 0:
        print(f"No annotations found for id {id}.")
    
    # Create the HTML output
    if len(annotations) == 0:
        return textToAnnotate
    
    # Sort annotations by start position in descending order to avoid offset issues
    annotations = annotations.sort_values('start', ascending=False)
    
    # Create a copy of the text to modify
    html_text = textToAnnotate
    
    # Map entity types to CSS classes (matching your prompt format)
    entity_class_map = {
        'NORMALIZABLES': 'normalizables',
        'NO_NORMALIZABLES': 'no_normalizables', 
        'PROTEINAS': 'proteinas',
        'UNCLEAR': 'unclear'
    }
    
    # Process each annotation from right to left
    for _, annotation in annotations.iterrows():
        start = int(annotation['start'])
        end = int(annotation['end'])
        entity_type = annotation['category']
        
        # Get the entity text
        entity_text = html_text[start:end]
        
        # Get the appropriate CSS class
        css_class = entity_class_map.get(entity_type, 'unclear')
        
        # Create the span tag
        span_tag = f'<span class="{css_class}">{entity_text}</span>'
        
        # Replace the text with the tagged version
        html_text = html_text[:start] + span_tag + html_text[end:]
    
    return html_text


In [None]:
#Get texts that are in dfTextsTest and not in dfAnnTest
dfMissingTextsTrain = dfTextsTrain[~dfTextsTrain['id'].isin(dfAnnTrain['textId'])]
dfMissingTextsTest  = dfTextsTest[~dfTextsTest['id'].isin(dfAnnTest['textId'])]

print(f"Number of texts in training set without annotations: {len(dfMissingTextsTrain)}")
print(f"Number of texts in test set without annotations: {len(dfMissingTextsTest)}")

In [None]:
#Get a text that contains annotations from all the categories
def getExampleAnnotatedAllCategories(df):
    # Filter the DataFrame to get a text that contains annotations from all categories
    categories = ['NORMALIZABLES', 'NO_NORMALIZABLES', 'PROTEINAS', 'UNCLEAR']
    
    texts = df['textId'].unique()

    allTexts = []

    for textId in texts:
        annotationsCategories = df[df['textId'] == textId]['category'].unique()
        if len(annotationsCategories) == len(categories):
            # If the text contains annotations from all categories, return it
            allTexts.append(textId)

    return allTexts
    
dfAllCats = getExampleAnnotatedAllCategories(dfAnnTrain)

In [None]:
EXAMPLE_1_TEXT = dfAnnTrain[dfAnnTrain['textId'] == dfAllCats[0]].iloc[0]['text']
EXAMPLE_1_HTML = getExampleAnnotated(dfAllCats[0], dfAnnTrain, format='html')
EXAMPLE_2_TEXT = dfAnnTrain[dfAnnTrain['textId'] == dfAllCats[1]].iloc[0]['text']
EXAMPLE_2_HTML = getExampleAnnotated(dfAllCats[1], dfAnnTrain, format='html')

In [50]:
formattedPromptsTrain = []
formattedPromptsTest = []

textsTrain = dfTextsTrain['id'].unique()
textsTest  = dfTextsTest['id'].unique()

#Check if some of the texts in textsTest is also in textsTrain
for text in textsTest:
    if text in textsTrain:
        print(f"Text {text} is in both train and test sets.")

#Read prompt1 from prompt1.txt
with open('prompt2.txt', 'r', encoding='utf-8') as f:
    prompt1 = f.read()

#Delete the first line of the prompt1
prompt1 = '\n'.join(prompt1.split('\n')[1:])

for textId in textsTrain:
    TARGET_TEX = dfTextsTrain[dfTextsTrain['id'] == textId]['text'].values[0]

    promptFormatted = prompt1.format(
        EXAMPLE_1_TEXT=EXAMPLE_1_TEXT,
        EXAMPLE_1_HTML=EXAMPLE_1_HTML,
        EXAMPLE_2_TEXT=EXAMPLE_2_TEXT,
        EXAMPLE_2_HTML=EXAMPLE_2_HTML,
        TEXT=TARGET_TEX
    )

    formattedPromptsTrain.append({'text': textId, 'text_to_annotate': TARGET_TEX, 'prompt': promptFormatted})

for textId in textsTest:
    TARGET_TEX = dfTextsTest[dfTextsTest['id'] == textId]['text'].values[0]
    
    promptFormatted = prompt1.format(
        EXAMPLE_1_TEXT=EXAMPLE_1_TEXT,
        EXAMPLE_1_HTML=EXAMPLE_1_HTML,
        EXAMPLE_2_TEXT=EXAMPLE_2_TEXT,
        EXAMPLE_2_HTML=EXAMPLE_2_HTML,
        TEXT=TARGET_TEX
    )

    formattedPromptsTest.append({'text': textId, 'text_to_annotate': TARGET_TEX, 'prompt': promptFormatted})

In [51]:
#Create a {text, type, start, end} for each annotation in the dfAnnTest
allAnnotationsTrain = []
allAnnotationsTest = []

for textId in textsTrain:

    textToAppend = dfTextsTrain[dfTextsTrain['id'] == textId].iloc[0]['text']
    annotationsToAdd = []

    for index, row in dfAnnTrain[dfAnnTrain['textId'] == textId].iterrows():
        if textToAppend[row['start']:row['end']] != row['annText']:
            print(f"Mismatch for textId {row['textId']} at index {index}: expected '{row['annText']}', found '{textToAppend[row['start']:row['end']]}'.")
        annotationsToAdd.append({'text': row['annText'], 'type': row['category'], 'start': int(row['start']), 'end': int(row['end'])})

    toAppend = {'textId': textId, 'text': textToAppend, 'annotations': annotationsToAdd}
    
    allAnnotationsTrain.append(toAppend)

#Append first all the texts without annotations
for textId in textsTest:

    textToAppend = dfTextsTest[dfTextsTest['id'] == textId].iloc[0]['text']
    annotationsToAdd = []

    for index, row in dfAnnTest[dfAnnTest['textId'] == textId].iterrows():
        if textToAppend[row['start']:row['end']] != row['annText']:
            print(f"Mismatch for textId {row['textId']} at index {index}: expected '{row['annText']}', found '{textToAppend[row['start']:row['end']]}'.")
        annotationsToAdd.append({'text': row['annText'], 'type': row['category'], 'start': int(row['start']), 'end': int(row['end'])})

    toAppend = {'textId': textId, 'text': textToAppend, 'annotations': annotationsToAdd}
    
    allAnnotationsTest.append(toAppend)

In [52]:
#Create a JSONL file with the formatted prompts
#Each json object will contain a INPUT_TEXT field with the original text to annotate
#Also will contain a assistant field with the expected output in the format:
#[{"text": "entity text", "type": "entity type", "start": start, "end": end}, ...]
#The assistant field mst
#Use 80% of the data for training and 20% for validation
import json
with open('instructed_prompts_train_p0.jsonl', 'w', encoding='utf-8') as f:
    for i in range(len(formattedPromptsTrain) * 80 // 100):
        json_object = {
            "INPUT_TEXT": formattedPromptsTrain[i]['text_to_annotate'],
            "assistant": allAnnotationsTrain[i]['annotations']
        }
        f.write(json.dumps(json_object, ensure_ascii=False) + '\n')

with open('instructed_prompts_valid_p0.jsonl', 'w', encoding='utf-8') as f:
    for i in range(len(formattedPromptsTrain) * 80 // 100, len(formattedPromptsTrain)):
        json_object = {
            "INPUT_TEXT": formattedPromptsTrain[i]['text_to_annotate'],
            "assistant": allAnnotationsTrain[i]['annotations']
        }
        f.write(json.dumps(json_object, ensure_ascii=False) + '\n')

with open('instructed_prompts_test_p0.jsonl', 'w', encoding='utf-8') as f:
    for i in range(len(formattedPromptsTest)):
        json_object = {
            "INPUT_TEXT": formattedPromptsTest[i]['text_to_annotate'],
            "assistant": allAnnotationsTest[i]['annotations']
        }
        f.write(json.dumps(json_object, ensure_ascii=False) + '\n')

In [53]:
#Check the lengths of the datasets
with open('instructed_prompts_train_p0.jsonl', 'r', encoding='utf-8') as f:
    train_lines = f.readlines()
    print(f"Number of training examples: {len(train_lines)}")
with open('instructed_prompts_valid_p0.jsonl', 'r', encoding='utf-8') as f:
    valid_lines = f.readlines()
    print(f"Number of validation examples: {len(valid_lines)}")
with open('instructed_prompts_test_p0.jsonl', 'r', encoding='utf-8') as f:
    test_lines = f.readlines()
    print(f"Number of test examples: {len(test_lines)}")

Number of training examples: 400
Number of validation examples: 100
Number of test examples: 250
