In [None]:
import pandas as pd
import os

os.chdir(os.path.dirname(os.path.abspath(__file__)) if '__file__' in globals() else os.getcwd())

df = pd.read_csv('llama_raw_data_info_with_metrics.csv')
df = df[['doc_id', 'text','augmented_annotation','labels']]
df

In [None]:
import re

def process_text_to_labels(text):
    """
    Process input text with bracketed annotations and return BIO labels.
    
    Args:
        text (str): Input text with annotations in format '[text | label1, label2]'
        
    Returns:
        list: List of BIO labels corresponding to each token in the text
    """

    def tokenize(text):
        """
        Split text into tokens using multiple delimiters
        """
        # First, add spaces around delimiters
        delimiters = r'[.,;:\(\)]'
        text = re.sub(f'({delimiters})', r' \1 ', text)
        # Split by whitespace and filter out empty strings
        return [token for token in text.split() if token]

    # Initialize variables
    labels = []
    current_position = 0
    text_length = len(text)
    
    while current_position < text_length:
        # Handle bracketed sections
        if text[current_position] == '[':
            # Find closing bracket and separator
            closing_bracket = text.find(']', current_position)
            separator = text.find('|', current_position)
            
            # Extract the text and its labels
            phrase = text[current_position + 1:separator].strip()
            label_part = text[separator + 1:closing_bracket].strip()
            labels_list = [label.strip() for label in label_part.split(',')]
            
            # Split the phrase into tokens
            tokens = tokenize(phrase)

            # for token in tokens:
            #     labels.append(token)
            
            # Assign BIO labels to each token

            for i, token in enumerate(tokens):
                if i == 0:  # First token gets B- prefix
                    if len(labels_list) == 1:
                        labels.append(f"B-{labels_list[0]}")
                    else:
                        labels.append(f"B-{'-'.join(labels_list)}")
                else:  # Subsequent tokens get I- prefix
                    if len(labels_list) == 1:
                        labels.append(f"I-{labels_list[0]}")
                    else:
                        labels.append(f"I-{'-'.join(labels_list)}")
            
            # Move position to after closing bracket
            current_position = closing_bracket + 1
            
        else:
            # Handle non-bracketed text
            # Find next opening bracket or end of string
            next_bracket = text.find('[', current_position)
            if next_bracket == -1:
                next_bracket = text_length
            
            # Split the text into tokens and assign O labels
            plain_text = text[current_position:next_bracket]
            tokens = [token for token in tokenize(plain_text) if token]
            # for token in tokens:
            #     labels.append(token)
            labels.extend(['O'] * len(tokens))
            
            current_position = next_bracket
    
    return labels

In [None]:
import ast

def labels_to_tokens(string):
    data = ast.literal_eval(string)
    tokens = [ token[0] for token in data]
    
    return tokens


In [None]:
df['tokens'] = df['labels'].apply(labels_to_tokens)
df['tags'] = df['augmented_annotation'].apply(process_text_to_labels)
df.head()

In [None]:
# len(df['labels'][0])
print(df['tokens'][0])

In [None]:
df.to_csv('llama_data.csv', index=False)

In [None]:
def base2(n):
    return 2**n

tags = {'O': 0,
            'B-pathophysiology': base2(0), 
            "I-pathophysiology": base2(1), 
            "B-epidemiology": base2(2), 
            "I-epidemiology": base2(3),
            "B-etiology": base2(4),
            "I-etiology": base2(5),
            "B-history": base2(6),
            "I-history": base2(7),
            "B-physical": base2(8),
            "I-physical": base2(9),
            "B-exams": base2(10),
            "I-exams": base2(11),
            "B-differential": base2(12),
            "I-differential": base2(13),
            "B-therapeutic": base2(14),
            "I-therapeutic": base2(15)
           }
tags

In [None]:
ner_tags_aux = {'O': 0}

cats = ["Pathophysiology", 
        "Epidemiology", 
        "Etiology", 
        "History", 
        "Physical_examination", 
        "Complementary_exams",
        "Differential_diagnosis",
        "Therapeutic_plan"
       ]

for i in range(len(cats)):
    ner_tags_aux["B-" + cats[i]] = base2(2*i)
    ner_tags_aux["I-" + cats[i]] = base2(2*i+1)
    for j in range(i+1, len(cats)):
        ner_tags_aux["B-" + cats[i] + '-' + cats[j]] = base2(2*i) + base2(2*j)
        ner_tags_aux["I-" + cats[i] + '-' + cats[j]] = base2(2*i+1) + base2(2*j+1)
        for k in range(j+1, len(cats)):
            ner_tags_aux["B-" + cats[i] + '-' + cats[j] + '-' + cats[k]] = base2(2*i) + base2(2*j) + base2(2*k)
            ner_tags_aux["I-" + cats[i] + '-' + cats[j] + '-' + cats[k]] = base2(2*i+1) + base2(2*j+1) + base2(2*k+1)
            
ner_tags_aux['B-Pathophysiology-Epidemiology-Etiology-History'] = 85
ner_tags_aux['I-Pathophysiology-Epidemiology-Etiology-History'] = 170

tags_ner = [k for k, v in ner_tags_aux.items()]

In [None]:
tags_ner_aux = {value: key for key, value in ner_tags_aux.items()}
ner_tags = {key: index for index, key in enumerate(ner_tags_aux.keys())}
ner_tags_inverted = {value: key for key, value in ner_tags.items()}

In [None]:
ner_tags_inverted

In [None]:
def labels_to_id(string):
    if string == 'O':
        return 0
    
    words = string.split('-')
    start = words[0]
    labels = words[1:]
    valor = 0
    for label in labels:
            valor += tags[start+"-"+label]
    valor = ner_tags[tags_ner_aux[valor]]
    return valor

In [None]:
df['ner_ids'] = df['tags'].apply(lambda x: [labels_to_id(label) for label in x])
df.head()

In [None]:
df['check'] = df.apply(lambda row: [len(row['tokens']) == len(row['tags'])], axis=1)
df.head()

In [None]:
df['check'].value_counts()

In [None]:
df = df.drop(columns=['augmented_annotation','check', 'labels', 'tags'])

In [None]:
test = pd.read_csv('teste.csv')
treino = pd.read_csv('treino.csv')
validacao = pd.read_csv('validacao.csv')

In [None]:
test_df = df[df['doc_id'].isin(test['doc_id'])]
train_df = df[df['doc_id'].isin(treino['doc_id'])]
validation_df = df[df['doc_id'].isin(validacao['doc_id'])]

print(
    f"Train shape: {train_df.shape}\n"
    f"Test shape: {test_df.shape}\n"
    f"Validation shape: {validation_df.shape}"
)

In [None]:
from datasets import load_dataset, Dataset, DatasetDict, ClassLabel, Sequence

train_dataset = Dataset.from_pandas(train_df)
validation_dataset = Dataset.from_pandas(validation_df)
test_dataset = Dataset.from_pandas(test_df)

In [None]:
ner_tags_aux = {'O': 0}

cats = ["Pathophysiology", 
        "Epidemiology", 
        "Etiology", 
        "History", 
        "Physical_examination", 
        "Complementary_exams",
        "Differential_diagnosis",
        "Therapeutic_plan"
       ]

for i in range(len(cats)):
    ner_tags_aux["B-" + cats[i]] = base2(2*i)
    ner_tags_aux["I-" + cats[i]] = base2(2*i+1)
    for j in range(i+1, len(cats)):
        ner_tags_aux["B-" + cats[i] + '-' + cats[j]] = base2(2*i) + base2(2*j)
        ner_tags_aux["I-" + cats[i] + '-' + cats[j]] = base2(2*i+1) + base2(2*j+1)
        for k in range(j+1, len(cats)):
            ner_tags_aux["B-" + cats[i] + '-' + cats[j] + '-' + cats[k]] = base2(2*i) + base2(2*j) + base2(2*k)
            ner_tags_aux["I-" + cats[i] + '-' + cats[j] + '-' + cats[k]] = base2(2*i+1) + base2(2*j+1) + base2(2*k+1)
            
ner_tags_aux['B-Pathophysiology-Epidemiology-Etiology-History'] = 85
ner_tags_aux['I-Pathophysiology-Epidemiology-Etiology-History'] = 170

tags_ner = [k for k, v in ner_tags_aux.items()]

In [None]:
ner_class_labels = ClassLabel(num_classes = len(tags_ner),names=tags_ner)

train_dataset = train_dataset.cast_column("ner_ids", Sequence(ner_class_labels))
validation_dataset = validation_dataset.cast_column("ner_ids", Sequence(ner_class_labels))
test_dataset = test_dataset.cast_column("ner_ids", Sequence(ner_class_labels))

In [None]:
dataset = DatasetDict({
    'train': train_dataset,
    'validation': validation_dataset,
    'test': test_dataset})

dataset

In [None]:
from datasets import DatasetDict

# Assuming `dataset` is your DatasetDict
dataset = DatasetDict({
    "train": dataset["train"].remove_columns(["__index_level_0__"]),
    "validation": dataset["validation"].remove_columns(["__index_level_0__"]),
    "test": dataset["test"].remove_columns(["__index_level_0__"]),
})

# Check the modified dataset
dataset

In [None]:
dataset.push_to_hub("harena-lab/bioberpt-llama-dpoc-is-multiple")