In [1]:
from datasets import load_dataset
from transformers import AutoTokenizer
import datasets
from transformers import DataCollatorForTokenClassification
from transformers import TFAutoModelForTokenClassification
from transformers import create_optimizer
import tensorflow as tf
import evaluate
import numpy as np
from seqeval.metrics import classification_report as seqeval_classification_report
import pandas as pd
from collections import Counter
import random
import os
import re

model_checkpoint = "bert-base-cased"
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)



In [46]:
def parse_annotations(file1_path, file2_path):
    entities = []

    # Parse the second file into a dictionary for fast lookups
    id_map = {}
    with open(file2_path, "r") as f2:
        for line in f2:
            if line.startswith("TT"):
                parts = line.strip().split("\t")
                tag_number, id_and_offsets, text = parts
                meddra_ids, *offset_ranges = id_and_offsets.split(" ")

                # Split multiple meddra IDs if present
                meddra_ids = meddra_ids.split("+")

                # Handle multiple offset ranges
                for offset_range in " ".join(offset_ranges).split(";"):
                    try:
                        # Filter out invalid entries
                        offsets = list(map(int, offset_range.split(" ")))
                        if len(offsets) == 2:
                            start_offset, end_offset = offsets
                            id_map[(start_offset, end_offset)] = "+".join(meddra_ids)
                    except ValueError:
                        # Skip invalid ranges
                        continue

    # Parse the first file and match with the second file
    with open(file1_path, "r") as f1:
        for line in f1:
            if line.startswith("T"):  # Entity line
                parts = line.strip().split("\t")
                tag_number, entity_info, text = parts
                entity_type, *offsets = entity_info.split(" ")

                # Handle multiple offset ranges
                offset_ranges = " ".join(offsets).split(";")
                for offset_range in offset_ranges:
                    try:
                        start_offset, end_offset = map(int, offset_range.split(" "))
                        meddra_id = id_map.get((start_offset, end_offset))
                        entities.append((start_offset, end_offset, entity_type, meddra_id))
                    except ValueError:
                        # Skip invalid ranges
                        continue

    return entities

# File paths
file1_path = "cadecv2/original/ARTHROTEC.105.ann"
file2_path = "cadecv2/meddra/ARTHROTEC.105.ann"

# Parse the files and print the results
entities = parse_annotations(file1_path, file2_path)
print(entities)


[(55, 59, 'ADR', '10042674'), (60, 68, 'ADR', '10011301'), (152, 179, 'ADR', None), (282, 294, 'ADR', '10024840'), (397, 406, 'Drug', None), (121, 129, 'ADR', '10022437'), (223, 246, 'ADR', 'CONCEPT_LESS'), (365, 385, 'Drug', None), (60, 68, 'ADR', '10011301'), (70, 74, 'ADR', '10016065'), (60, 68, 'ADR', '10011301'), (76, 82, 'ADR', '10042707'), (60, 68, 'ADR', '10011301'), (84, 91, 'ADR', '10042679'), (60, 68, 'ADR', '10011301'), (93, 99, 'ADR', '10011301')]


In [30]:
def parse_annotations(file1_path, file2_path):
    entities = []

    # Parse the first file
    with open(file1_path, "r") as f1:
        for line in f1:
            if line.startswith("T"):  # Entity line
                parts = line.strip().split("\t")
                tag_number, entity_info, text = parts
                entity_type, *offsets = entity_info.split(" ")

                # Handle multiple offset ranges
                offset_ranges = " ".join(offsets).split(";")
                for offset_range in offset_ranges:
                    try:
                        start_offset, end_offset = map(int, offset_range.split(" "))
                        entities.append([start_offset, end_offset, entity_type])
                    except ValueError:
                        # Skip invalid ranges
                        continue

    # Parse the second file and add mappings
    with open(file2_path, "r") as f2:
        for line in f2:
            if line.startswith("TT"):
                parts = re.split(r'[\t;\+" "]', line)
                for entry in parts:
                    if entry.isdigit():
                        entry = int(entry)
                        for count, tag in enumerate(entities):
                            if entry == tag[0]:
                                entities[count].append(parts[1])
        print(parts)

    # Remove duplicates
    entities = list(map(list, set(map(tuple, entities))))

    return entities

# File paths
file1_path = "cadecv2/original/ARTHROTEC.105.ann"
file2_path = "cadecv2/meddra/ARTHROTEC.105.ann"

# Parse the files and print the results
entities = parse_annotations(file1_path, file2_path)
print(entities)



['TT3', '10024840', '282', '294', 'loose', 'stools\n']
[[282, 294, 'ADR', '10024840'], [152, 179, 'ADR', '10017060'], [93, 99, 'ADR', '10011301'], [397, 406, 'Drug'], [121, 129, 'ADR', '10022437'], [70, 74, 'ADR', '10016065'], [55, 59, 'ADR', '10042674'], [223, 246, 'ADR', 'CONCEPT_LESS'], [76, 82, 'ADR', '10042707'], [365, 385, 'Drug'], [60, 68, 'ADR', '10042674', '10016065', '10042707', '10042679', '10011301'], [84, 91, 'ADR', '10042679']]


In [25]:
# Define paths
base_path = "cadecv2"
original_path = os.path.join(base_path, "original")
meddra_path = os.path.join(base_path, "meddra")
text_path = os.path.join(base_path, "text")
output_path = os.path.join("OutputFolder", "train.txt")

def parse_annotations(file1_path, file2_path):
    entities = []
    
    # Parse the second file into a dictionary for fast lookups
    id_map = {}
    with open(file2_path, "r") as f2:
        for line_num, line in enumerate(f2, start=1):
            if line.startswith("TT"):
                parts = line.strip().split("\t")
                if len(parts) < 3:
                    print(f"Skipping malformed line {line_num} in {file2_path}: {line.strip()}")
                    continue

                tag_number, id_and_offsets, text = parts
                # Extract IDs and offsets
                try:
                    ids_and_offsets = id_and_offsets.rsplit(" ", 2)  # Split into IDs and offsets
                    ids = ids_and_offsets[0].split("+")  # Handle multiple IDs
                    start_offset = int(ids_and_offsets[1])  # Second-to-last part is the start offset
                    end_offset = int(ids_and_offsets[2])  # Last part is the end offset

                    for meddra_id in ids:
                        id_map.setdefault((start_offset, end_offset), []).append(meddra_id)

                except (IndexError, ValueError) as e:
                    print(f"Error parsing line {line_num} in {file2_path}: {line.strip()}")
                    print(f"Details: {e}")
                    continue

    # Parse the first file and match with the second file
    with open(file1_path, "r") as f1:
        for line in f1:
            if line.startswith("T"):  # Entity line
                parts = line.strip().split("\t")
                if len(parts) < 3:
                    continue
                tag_number, entity_info, text = parts
                entity_type, *offsets = entity_info.split(" ")
                offset_ranges = " ".join(offsets).split(";")

                for offset_range in offset_ranges:
                    try:
                        start_offset, end_offset = map(int, offset_range.split(" "))
                        meddra_ids = id_map.get((start_offset, end_offset), [])
                        for meddra_id in meddra_ids:
                            entities.append((start_offset, end_offset, entity_type, meddra_id))
                    except ValueError as e:
                        print(f"Error parsing offsets: {offset_range}")
                        print(f"Details: {e}")
                        continue

    return entities



# Function to create IOB labeling
def create_iob_labels(text, entities):
    labels = ["O"] * len(text)  # Initialize all tokens with "O"
    for start, end, entity_type, meddra_id in entities:
        label_prefix = f"-{meddra_id}" if meddra_id else ""
        labels[start] = f"B-{entity_type}{label_prefix}"
        for i in range(start + 1, end):
            labels[i] = f"I-{entity_type}{label_prefix}"
    return labels

# Function to tokenize text and align labels
def tokenize_and_label(text, labels):
    tokens = text.split()
    token_labels = []
    text_index = 0
    for token in tokens:
        token_length = len(token)
        if any(char.isalnum() for char in token):  # Skip punctuation
            token_label = labels[text_index : text_index + token_length]
            label = token_label[0] if token_label else "O"
            token_labels.append((token, label))
        else:
            token_labels.append((token, "O"))
        text_index += token_length + 1  # Move index past the token and space
    return token_labels

# Process files
output_lines = []
for text_file in os.listdir(text_path):
    text_file_path = os.path.join(text_path, text_file)
    annotation_file_path = os.path.join(original_path, text_file.replace(".txt", ".ann"))
    meddra_file_path = os.path.join(meddra_path, text_file.replace(".txt", ".ann"))
    
    if os.path.exists(annotation_file_path) and os.path.exists(meddra_file_path):
        # Read text
        with open(text_file_path, "r") as f:
            text = f.read()
        
        # Parse annotations and create labels
        entities = parse_annotations(annotation_file_path, meddra_file_path)
        labels = create_iob_labels(text, entities)
        token_labels = tokenize_and_label(text, labels)
        
        # Write to output
        for token, label in token_labels:
            output_lines.append(f"{token}\t{label}")
            if token.endswith("."):  # Add a blank line after sentences
                output_lines.append("")

# Write the output to train.txt
with open(output_path, "w") as f:
    f.write("\n".join(output_lines))



Error parsing line 1 in cadecv2\meddra\ARTHROTEC.105.ann: TT1	10042674 55 59;60 68	body swelling
Details: invalid literal for int() with base 10: '59;60'
Error parsing line 2 in cadecv2\meddra\ARTHROTEC.105.ann: TT8	10016065 60 68;70 74	swelling face
Details: invalid literal for int() with base 10: '68;70'
Error parsing line 3 in cadecv2\meddra\ARTHROTEC.105.ann: TT9	10042707 60 68;76 82	swelling wrists
Details: invalid literal for int() with base 10: '68;76'
Error parsing line 4 in cadecv2\meddra\ARTHROTEC.105.ann: TT10	10042679 60 68;84 91	swelling abdomen
Details: invalid literal for int() with base 10: '68;84'
Error parsing line 5 in cadecv2\meddra\ARTHROTEC.105.ann: TT11	10011301 60 68;93 99	swelling thighs
Details: invalid literal for int() with base 10: '68;93'
Error parsing line 3 in cadecv2\meddra\ARTHROTEC.112.ann: TT3	10046823 58 73;98 123	uterine, cramps (menopausal for 20 years)
Details: invalid literal for int() with base 10: '73;98'
Error parsing line 4 in cadecv2\meddra

In [40]:
import os

# Define paths
base_path = "cadecv2"
original_path = os.path.join(base_path, "original")
meddra_path = os.path.join(base_path, "meddra")
text_path = os.path.join(base_path, "text")
output_path = os.path.join("OutputFolder", "train.txt")

def parse_annotations(file1_path, file2_path):
    entities = []

    # Parse the second file into a dictionary for fast lookups
    id_map = {}
    with open(file2_path, "r") as f2:
        for line_num, line in enumerate(f2, start=1):
            if line.startswith("TT"):
                parts = line.strip().split("\t")
                if len(parts) < 3:
                    print(f"Skipping malformed line {line_num} in {file2_path}: {line.strip()}")
                    continue

                tag_number, id_and_offsets, text = parts
                try:
                    # Separate IDs and offsets
                    ids_and_offsets = id_and_offsets.rsplit(" ", 2)
                    ids = ids_and_offsets[0].split("+")  # Handle multiple IDs
                    offsets = ids_and_offsets[1:]  # Remaining parts are offsets

                    # Split offsets by `;` and process each range
                    offset_ranges = " ".join(offsets).split(";")
                    for offset_range in offset_ranges:
                        start_end = offset_range.strip().split()
                        if len(start_end) == 2:  # Ensure valid offset pair
                            start_offset, end_offset = map(int, start_end)
                            for meddra_id in ids:
                                id_map.setdefault((start_offset, end_offset), []).append(meddra_id)
                        else:
                            print(f"Skipping invalid offset range in line {line_num}: {offset_range}")

                except (IndexError, ValueError) as e:
                    print(f"Error parsing line {line_num} in {file2_path}: {line.strip()}")
                    print(f"Details: {e}")
                    continue

    # Parse the first file and match with the second file
    with open(file1_path, "r") as f1:
        for line in f1:
            if line.startswith("T"):  # Entity line
                parts = line.strip().split("\t")
                if len(parts) < 3:
                    continue
                tag_number, entity_info, text = parts
                entity_type, *offsets = entity_info.split(" ")
                offset_ranges = " ".join(offsets).split(";")

                for offset_range in offset_ranges:
                    try:
                        start_end = offset_range.strip().split()
                        if len(start_end) == 2:  # Ensure valid offset pair
                            start_offset, end_offset = map(int, start_end)
                            meddra_ids = id_map.get((start_offset, end_offset), [])
                            for meddra_id in meddra_ids:
                                entities.append((start_offset, end_offset, entity_type, meddra_id))
                        else:
                            print(f"Skipping invalid offset range in original file: {offset_range}")
                    except ValueError as e:
                        print(f"Error parsing offsets: {offset_range}")
                        print(f"Details: {e}")
                        continue

    return entities




# Function to create IOB labeling
def create_iob_labels(text, entities):
    labels = ["O"] * len(text)  # Initialize all tokens with "O"
    for start, end, entity_type, meddra_id in entities:
        labels[start] = f"B-{entity_type}"  # IOB tag only
        for i in range(start + 1, end):
            labels[i] = f"I-{entity_type}"  # IOB tag only
    return labels, {start: meddra_id for start, _, _, meddra_id in entities}  # Map offsets to IDs

# Function to tokenize text and align labels
def tokenize_and_label(text, labels):
    tokens = text.split()
    token_labels = []
    text_index = 0
    for token in tokens:
        token_length = len(token)
        if any(char.isalnum() for char in token):  # Skip punctuation
            token_label = labels[text_index : text_index + token_length]
            label = token_label[0] if token_label else "O"
            token_labels.append((token, label))
        else:
            token_labels.append((token, "O"))
        text_index += token_length + 1  # Move index past the token and space
    return token_labels

# Process files
output_lines = []
for text_file in os.listdir(text_path):
    text_file_path = os.path.join(text_path, text_file)
    annotation_file_path = os.path.join(original_path, text_file.replace(".txt", ".ann"))
    meddra_file_path = os.path.join(meddra_path, text_file.replace(".txt", ".ann"))
    
    if os.path.exists(annotation_file_path) and os.path.exists(meddra_file_path):
        # Read text
        with open(text_file_path, "r") as f:
            text = f.read()
        
        # Parse annotations and create labels
        entities = parse_annotations(annotation_file_path, meddra_file_path)
        labels = create_iob_labels(text, entities)
        token_labels = tokenize_and_label(text, labels)
        
        # Write to output
        for token, label in token_labels:
            output_lines.append(f"{token}\t{label}")
            if token.endswith("."):  # Add a blank line after sentences
                output_lines.append("")

# Write the output to train.txt
with open(output_path, "w") as f:
    f.write("\n".join(output_lines))


Skipping invalid offset range in line 1: 59
Skipping invalid offset range in line 2: 68
Skipping invalid offset range in line 3: 68
Skipping invalid offset range in line 4: 68
Skipping invalid offset range in line 5: 68
Skipping invalid offset range in line 3: 73
Skipping invalid offset range in line 4: 97
Skipping invalid offset range in line 2: 178
Skipping invalid offset range in line 3: 178
Skipping invalid offset range in line 5: 73
Skipping invalid offset range in line 6: 73
Skipping invalid offset range in line 7: 392
Skipping invalid offset range in line 8: 392
Skipping invalid offset range in line 4: 322
Skipping invalid offset range in line 5: 328
Skipping invalid offset range in line 7: 197
Skipping invalid offset range in line 8: 208
Skipping invalid offset range in line 1: 49
Skipping invalid offset range in line 3: 132
Skipping invalid offset range in line 7: 515
Skipping invalid offset range in line 8: 521
Skipping invalid offset range in line 1: 129
Skipping invalid off