In [None]:
!pip install datasets transformers accelerate seqeval -U

# TASK1 : Classification (Drug Related/Not Related) using BERT and Clinical BERT

In [None]:
# Model selection
# model_checkpoint = "google-bert/bert-base-uncased"  # BERT-base
model_checkpoint = "dmis-lab/biobert-v1.1"  # BioBERT
# model_checkpoint = "medicalai/ClinicalBERT"  # ClinicalBERT
# model_checkpoint = "emilyalsentzer/Bio_ClinicalBERT"  # Bio-ClinicalBERT

In [None]:
import re
from datasets import load_dataset, DatasetDict
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer, EarlyStoppingCallback
import torch
import numpy as np
from sklearn.metrics import accuracy_score

# Set up the device for GPU usage
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def clean_text(text):
    """
    Perform simple text cleaning operations:
    - Lowercasing
    - Removing non-alphanumeric characters
    """
    text = text.lower()  # Lowercase text
    text = re.sub(r'[^a-z0-9\s]', '', text)  # Remove non-alphanumeric characters
    return text

def custom_split(dataset, train_frac=0.8, val_frac=0.1, seed=42):
    """
    Custom function to split the dataset into train, validation, and test sets.
    """
    np.random.seed(seed)
    total_size = len(dataset)
    indices = np.random.permutation(total_size)
    train_size = int(total_size * train_frac)
    val_size = int(total_size * val_frac)

    train_indices = indices[:train_size]
    val_indices = indices[train_size:train_size + val_size]
    test_indices = indices[train_size + val_size:]

    print(f"Train size: {len(train_indices)}")
    print(f"Val size: {len(val_indices)}")
    print(f"Test size: {len(test_indices)}")

    train_dataset = dataset.select(train_indices)
    val_dataset = dataset.select(val_indices)
    test_dataset = dataset.select(test_indices)

    return DatasetDict({
        'train': train_dataset,
        'validation': val_dataset,
        'test': test_dataset
    })

# Load the dataset
raw_dataset = load_dataset("ade_corpus_v2", "Ade_corpus_v2_classification")

# Split the data using the custom function
split_datasets = custom_split(raw_dataset['train'])

# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

# Tokenize and clean the datasets
def preprocess_data(examples):
    examples['text'] = [clean_text(text) for text in examples['text']]
    return tokenizer(examples['text'], padding="max_length", truncation=True, max_length=256)

# Apply preprocessing
split_datasets = split_datasets.map(preprocess_data, batched=True)


In [None]:
def compute_metrics(eval_pred):
    """
    Compute accuracy of the model
    """
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    return {"accuracy": accuracy_score(labels, predictions)}

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from transformers import TrainingArguments, Trainer, EarlyStoppingCallback, AutoTokenizer, AutoModelForSequenceClassification

# Load the tokenizer and model
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)
model = AutoModelForSequenceClassification.from_pretrained(model_checkpoint, num_labels=2)
model.to(device)

# Define training arguments with logging directory
training_args = TrainingArguments(
    output_dir="./content/drive/My Drive/results",
    evaluation_strategy="steps",
    eval_steps=100,  # Evaluate every 100 steps
    logging_dir="./content/drive/My Drive/logs",  # Save logs in this directory
    logging_steps=100,  # Log metrics every 100 steps
    save_steps=500,  # Save the model every 500 steps
    learning_rate=1e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=2,
    weight_decay=0.01,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
)

# Assume compute_metrics function is defined
def compute_metrics(eval_pred):
    from sklearn.metrics import accuracy_score, precision_recall_fscore_support
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    acc = accuracy_score(labels, predictions)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, predictions, average='binary')
    return {"accuracy": acc, "f1": f1, "precision": precision, "recall": recall}

# Initialize Trainer with the compute_metrics function
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=split_datasets['train'],
    eval_dataset=split_datasets['validation'],
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
)

# Train the model
trainer.train()

# Evaluate the model on the test set
results = trainer.evaluate(split_datasets['test'])
print("Test Evaluation results:", results)


In [None]:
training_loss = [log['loss'] for log in trainer.state.log_history if 'loss' in log and 'eval_loss' not in log]
validation_loss = [log['eval_loss'] for log in trainer.state.log_history if 'eval_loss' in log][:-1]
epochs = [log['epoch'] for log in trainer.state.log_history if 'loss' in log and 'eval_loss' not in log]
import matplotlib.pyplot as plt

# Plotting training and validation loss
plt.figure(figsize=(10, 5))  # Set the figure size
plt.plot(epochs, training_loss, label='Training Loss')  # Plot training loss
plt.plot(epochs, validation_loss, label='Validation Loss')  # Plot validation loss
plt.title('Training and Validation Loss')  # Title of the plot
# plt.xlabel('Training steps / 100')  # Label for the x-axis
plt.xlabel('Epochs')  # Label for the x-axis
plt.ylabel('Loss')  # Label for the y-axis
plt.legend()  # Add a legend
plt.grid(True)  # Show grid
plt.show()  # Display the plot


In [None]:
# Assume trainer.state.log_history contains your log data
accuracy = [log['eval_accuracy'] for log in trainer.state.log_history if 'eval_accuracy' in log][:-1]
f1_score = [log['eval_f1'] for log in trainer.state.log_history if 'eval_f1' in log][:-1]
epochs = [log['epoch'] for log in trainer.state.log_history if 'eval_f1' in log][:-1]
import matplotlib.pyplot as plt

# Plotting accuracy and F1-score
plt.figure(figsize=(12, 6))

# Subplot for accuracy
plt.subplot(1, 2, 1)  # 1 row, 2 columns, 1st subplot
plt.plot(epochs, accuracy, label='Accuracy', color='blue')
plt.title('Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.grid(True)
plt.legend()

# Subplot for F1-score
plt.subplot(1, 2, 2)  # 1 row, 2 columns, 2nd subplot
plt.plot(epochs, f1_score, label='F1 Score', color='green')
plt.title('Validation F1 Score')
plt.xlabel('Epochs')
plt.ylabel('F1 Score')
plt.grid(True)
plt.legend()

plt.tight_layout()  # Adjust layout to not overlap
plt.show()


In [None]:
metrics = {
    "Accuracy": results["eval_accuracy"],
    "F1 Score": results["eval_f1"],
    "Precision": results["eval_precision"],
    "Recall": results["eval_recall"]
}
import pandas as pd

# Create a DataFrame
df = pd.DataFrame([metrics])

# Set more friendly column names if desired
df.columns = ['Accuracy', 'F1 Score', 'Precision', 'Recall']

df

In [None]:
# Assuming 'trainer' is already initialized and the model is trained
predictions = trainer.predict(split_datasets['test'])
predictions

In [None]:
import numpy as np

# Softmax function to convert logits to probabilities
def softmax(logits):
    e = np.exp(logits - np.max(logits))
    return e / e.sum(axis=-1, keepdims=True)

# Apply softmax to convert logits into probabilities
probabilities = softmax(predictions.predictions)

# Get the predicted labels
predicted_labels = np.argmax(probabilities, axis=1)


In [None]:
# Number of examples to display
num_examples = 10

print("Prediction | True Label | Text")
print("--------------------------------")
for i in range(num_examples):
    true_label = predictions.label_ids[i]
    pred_label = predicted_labels[i]
    # Get the original text for visualization
    text = tokenizer.decode(split_datasets['test'][i]['input_ids'], skip_special_tokens=True)
    print(f"{pred_label}        | {true_label}         | {text}")


# TASK 2: NER for ADR Relation Extraction using Clinical BERT


In [None]:
# Model selection
# model_checkpoint = "google-bert/bert-base-uncased"  # BERT-base
model_checkpoint = "dmis-lab/biobert-v1.1"  # BioBERT
# model_checkpoint = "medicalai/ClinicalBERT"  # ClinicalBERT
# model_checkpoint = "emilyalsentzer/Bio_ClinicalBERT"  # Bio-ClinicalBERT

In [None]:
import pandas as pd
from transformers import AutoTokenizer

# Load the dataset
raw_dataset = load_dataset("ade_corpus_v2", "Ade_corpus_v2_drug_ade_relation")

# Split the data using the custom function
split_datasets = custom_split(raw_dataset['train'])
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

def preprocess_for_ner(texts, drugs, effects, tokenizer):
    tokenized_inputs = tokenizer(texts, padding=True, truncation=True, max_length=512, return_tensors='pt')
    labels = []

    for i, text in enumerate(texts):
        word_ids = tokenized_inputs.word_ids(batch_index=i)  # Map tokens to their respective words
        label = ["O"] * len(word_ids)  # Initialize labels as 'O' for each token

        # Assign labels based on the entity matches
        def assign_labels(entity_tokens, label_prefix):
            sequence = tokenized_inputs.input_ids[i].tolist()  # Get the list of input_ids
            for start_index in range(len(sequence)):
                if sequence[start_index:start_index+len(entity_tokens)] == entity_tokens:
                    label[start_index] = f'B-{label_prefix}'
                    for idx in range(start_index + 1, start_index + len(entity_tokens)):
                        if idx < len(label):
                            label[idx] = f'I-{label_prefix}'

        # Tokenize entities
        drug_tokens = tokenizer(drugs[i], add_special_tokens=False)['input_ids']
        effect_tokens = tokenizer(effects[i], add_special_tokens=False)['input_ids']

        # Assign labels for drug and effect
        assign_labels(drug_tokens, 'DRUG')
        assign_labels(effect_tokens, 'EFFECT')

        labels.append(label)

    return tokenized_inputs, labels

In [None]:
# Example usage
texts = [entry['text'] for entry in split_datasets['train']]
drugs = [entry['drug'] for entry in split_datasets['train']]
effects = [entry['effect'] for entry in split_datasets['train']]
print(len(texts))  # This should match the expected number of texts
print(texts[0])
# Preprocess for NER
tokenized_inputs, labels = preprocess_for_ner(texts, drugs, effects, tokenizer)

In [None]:
for i in range(5):
    print(f"Text: {texts[i]}")
    print("Tokens:", tokenizer.convert_ids_to_tokens(tokenized_inputs.input_ids[i]))
    print("Labels:", labels[i])
    print("actual drug:", drugs[i])
    print("actual effect:", effects[i])
    print("\n")

In [None]:
from torch.utils.data import Dataset

class NERDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)


In [None]:
# Converting labels from text to indices
label_dict = {'O': 0, 'B-DRUG': 1, 'I-DRUG': 2, 'B-EFFECT': 3, 'I-EFFECT': 4}
labels = [[label_dict[label] for label in doc] for doc in labels]
# Create reverse label dictionary
index_to_label = {idx: label for label, idx in label_dict.items()}
# Creating the dataset
train_labels = labels
train_dataset = NERDataset(tokenized_inputs, train_labels)


In [None]:
tokenized_inputs[:5]

In [None]:
from transformers import BertForTokenClassification, Trainer, TrainingArguments

model = BertForTokenClassification.from_pretrained(model_checkpoint, num_labels=len(label_dict))

In [None]:
print(model)

In [None]:
def prepare_dataset(texts, drugs, effects, tokenizer):
    tokenized_inputs = tokenizer(texts, truncation=True, padding=True, max_length=512, return_tensors='pt')
    labels = []

    for i, text in enumerate(texts):
        word_ids = tokenized_inputs.word_ids(batch_index=i)  # Map tokens to their respective words
        label = ["O"] * len(word_ids)  # Initialize labels as 'O' for each token

        def assign_labels(entity_tokens, label_prefix):
            sequence = tokenized_inputs.input_ids[i].tolist()
            for start_index in range(len(sequence)):
                if sequence[start_index:start_index+len(entity_tokens)] == entity_tokens:
                    label[start_index] = f'B-{label_prefix}'
                    for idx in range(start_index + 1, start_index + len(entity_tokens)):
                        if idx < len(label):
                            label[idx] = f'I-{label_prefix}'

        drug_tokens = tokenizer(drugs[i], add_special_tokens=False)['input_ids']
        effect_tokens = tokenizer(effects[i], add_special_tokens=False)['input_ids']

        assign_labels(drug_tokens, 'DRUG')
        assign_labels(effect_tokens, 'EFFECT')

        labels.append(label)

    # Convert labels from text to indices
    label_dict = {'O': 0, 'B-DRUG': 1, 'I-DRUG': 2, 'B-EFFECT': 3, 'I-EFFECT': 4}
    labels = [[label_dict[lbl] for lbl in doc] for doc in labels]

    return NERDataset(tokenized_inputs, labels)

# Prepare training, validation, and testing datasets
train_texts = [entry['text'] for entry in split_datasets['train']]
train_drugs = [entry['drug'] for entry in split_datasets['train']]
train_effects = [entry['effect'] for entry in split_datasets['train']]

validation_texts = [entry['text'] for entry in split_datasets['validation']]
validation_drugs = [entry['drug'] for entry in split_datasets['validation']]
validation_effects = [entry['effect'] for entry in split_datasets['validation']]

test_texts = [entry['text'] for entry in split_datasets['test']]
test_drugs = [entry['drug'] for entry in split_datasets['test']]
test_effects = [entry['effect'] for entry in split_datasets['test']]

train_dataset = prepare_dataset(train_texts, train_drugs, train_effects, tokenizer)
validation_dataset = prepare_dataset(validation_texts, validation_drugs, validation_effects, tokenizer)
test_dataset = prepare_dataset(test_texts, test_drugs, test_effects, tokenizer)

print(f"Train Dataset: {len(train_dataset)}")
print(f"Validation Dataset: {len(validation_dataset)}")
print(f"Test Dataset: {len(test_dataset)}")

In [None]:
from seqeval.metrics import precision_score, recall_score, f1_score, classification_report
from seqeval.scheme import IOB2

def align_predictions(predictions, label_ids):
    preds = np.argmax(predictions, axis=2)
    batch_size, seq_len = preds.shape
    out_label_list = [[] for _ in range(batch_size)]
    preds_list = [[] for _ in range(batch_size)]

    for i in range(batch_size):
        for j in range(seq_len):
            if label_ids[i, j] != torch.nn.CrossEntropyLoss().ignore_index:
                out_label_list[i].append(index_to_label.get(label_ids[i][j], 'O'))
                preds_list[i].append(index_to_label.get(preds[i][j], 'O'))

    return preds_list, out_label_list  # Return lists of lists, one per sentence


def compute_metrics(p):
    predictions, labels = p
    predictions, true_labels = align_predictions(predictions, labels)
    return {
        "precision": precision_score(true_labels, predictions),
        "recall": recall_score(true_labels, predictions),
        "f1": f1_score(true_labels, predictions)
    }

In [None]:
training_args = TrainingArguments(
    output_dir="./content/drive/My Drive/ner_results",
    evaluation_strategy="steps",
    eval_steps=20,  # Evaluate every 100 steps
    logging_dir="./content/drive/My Drive/ner_logs",  # Save logs in this directory
    logging_steps=20,  # Log metrics every 100 steps
    save_steps=1000,  # Save the model every 500 steps
    learning_rate=1e-5,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    num_train_epochs=3,
    weight_decay=0.01,
    load_best_model_at_end=True,
    metric_for_best_model="f1"
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=validation_dataset,
    compute_metrics=compute_metrics  # Set the compute_metrics function
)


In [None]:
# Train the model
trainer.train()

In [None]:
# Evaluate the model on the test set
test_results = trainer.evaluate(test_dataset)
print("Test Results:", test_results)

In [None]:
training_loss = [log['loss'] for log in trainer.state.log_history if 'loss' in log and 'eval_loss' not in log]
validation_loss = [log['eval_loss'] for log in trainer.state.log_history if 'eval_loss' in log][:-1]
epochs = [log['epoch'] for log in trainer.state.log_history if 'loss' in log and 'eval_loss' not in log]
import matplotlib.pyplot as plt

# Plotting training and validation loss
plt.figure(figsize=(10, 5))  # Set the figure size
plt.plot(epochs, training_loss, label='Training Loss')  # Plot training loss
plt.plot(epochs, validation_loss, label='Validation Loss')  # Plot validation loss
plt.title('Training and Validation Loss')  # Title of the plot
plt.xlabel('Epochs')  # Label for the x-axis
plt.ylabel('Loss')  # Label for the y-axis
plt.legend()  # Add a legend
plt.grid(True)  # Show grid
plt.show()  # Display the plot


In [None]:
# Assume trainer.state.log_history contains your log data
precision = [log['eval_precision'] for log in trainer.state.log_history if 'eval_precision' in log][:-1]
recall = [log['eval_recall'] for log in trainer.state.log_history if 'eval_recall' in log][:-1]
f1_score = [log['eval_f1'] for log in trainer.state.log_history if 'eval_f1' in log][:-1]
epochs = [log['epoch'] for log in trainer.state.log_history if 'eval_f1' in log][:-1]
import matplotlib.pyplot as plt

# Plotting accuracy and F1-score
plt.figure(figsize=(12, 6))

# Subplot for accuracy
plt.subplot(1, 3, 1)  # 1 row, 3 columns, 1st subplot
plt.plot(epochs, precision, label='precision', color='blue')
plt.title('Validation precision')
plt.xlabel('Epochs')
plt.ylabel('precision')
plt.grid(True)
plt.legend()

# Subplot for F1-score
plt.subplot(1, 3, 2)  # 1 row, 3 columns, 2nd subplot
plt.plot(epochs, recall, label='recall', color='red')
plt.title('Validation recall')
plt.xlabel('Epochs')
plt.ylabel('recall')
plt.grid(True)
plt.legend()

plt.subplot(1, 3, 3)  # 1 row, 3 columns, 3nd subplot
plt.plot(epochs, f1_score, label='F1 Score', color='green')
plt.title('Validation F1 Score')
plt.xlabel('Epochs')
plt.ylabel('F1 Score')
plt.grid(True)
plt.legend()

plt.tight_layout()  # Adjust layout to not overlap
plt.show()


In [None]:
metrics = {
    "Precision": test_results["eval_precision"],
    "Recall": test_results["eval_recall"],
    "F1 Score": test_results["eval_f1"],
}
import pandas as pd

# Create a DataFrame
df = pd.DataFrame([metrics])

# Set more friendly column names if desired
df.columns = [ 'Precision', 'Recall', 'F1 Score']

df

In [None]:
# Assuming 'trainer' is already initialized and the model is trained
predictions, labels, _ = trainer.predict(test_dataset)

In [None]:
predictions, true_labels = align_predictions(predictions, labels)

In [None]:
token_sequences = [tokenizer.convert_ids_to_tokens(ids) for ids in test_dataset.encodings['input_ids']]

In [None]:
# Select 5 random samples (or just the first five if randomness isn't needed)
for i in range(5):
    print(f"Text: {token_sequences[i]}")
    print(f"True Labels: {true_labels[i]}")
    print(f"Predicted Labels: {predictions[i]}\n")