In [1]:
!pip install datasets
!pip install transformers
!pip install sentence-transformers
!pip install accelerate -U
!pip install nltk



In [2]:
from datasets import load_dataset
from transformers import AutoTokenizer, BertForSequenceClassification, Trainer, TrainingArguments
import torch
import torch.nn.functional as F
import re

from sentence_transformers import SentenceTransformer
from sklearn.metrics import accuracy_score
from sklearn.metrics.pairwise import cosine_similarity

import numpy as np
from nltk.translate.bleu_score import sentence_bleu
import json, os

In [3]:
data = load_dataset("lavita/ChatDoctor-HealthCareMagic-100k")

data

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


DatasetDict({
    train: Dataset({
        features: ['instruction', 'input', 'output'],
        num_rows: 112165
    })
})

In [4]:
df = data['train'].select(range(15000))

In [5]:
train_test_valid = df.train_test_split(test_size=0.2, seed=42)
test_valid = train_test_valid['test'].train_test_split(test_size=0.5, seed=42)

train_df = train_test_valid['train']
val_df = test_valid['train']
test_df = test_valid['test']

In [6]:
# unique labels
unique_labels = list(set(train_df['output'] + val_df['output'] + test_df['output']))

# label mapping
label_mapping = {label: idx for idx, label in enumerate(unique_labels)}

# saving the label mapping to a json file
label_mapping_path = "./label_mapping.json"
with open(label_mapping_path, 'w') as file:
    json.dump(label_mapping, file)

In [7]:
# Load a pre-trained sentence transformer model
sent_transf = SentenceTransformer('all-MiniLM-L6-v2')

# Encode all possible responses
response = sent_transf.encode(list(label_mapping.keys()))

# Save the response embeddings
response_path = "./response_embeddings.npy"
np.save(response_path, response)

# Save the semantic model
sent_transf_path = "./sent_transf"
os.makedirs(sent_transf_path, exist_ok=True)
sent_transf.save(sent_transf_path)


def semantic_search(query, top_k=5):
    # Encode
    query_embedding = sent_transf.encode([query])

    # Cosine similarity
    similarities = cosine_similarity(query_embedding, response)[0]

    # Get top-k similar responses
    top_indices = similarities.argsort()[-top_k:][::-1]

    return [(list(label_mapping.keys())[i], similarities[i]) for i in top_indices]

In [12]:
def map_label(example):
    example['label'] = label_mapping[example['output']]
    return example

train_df = train_df.map(map_label)
val_df = val_df.map(map_label)
test_df = test_df.map(map_label)

Map:   0%|          | 0/1500 [00:00<?, ? examples/s]

Map:   0%|          | 0/1500 [00:00<?, ? examples/s]

In [14]:
# loading the BERT tokenizer
tokenizer = AutoTokenizer.from_pretrained("emilyalsentzer/Bio_ClinicalBERT")

In [15]:

def clean_text(text):
    # Lowercase
    text = text.lower()
    # removing special characters
    text = re.sub(r'[^a-z\s]', '', text)
    # removing extra whitespace
    text = re.sub(r'\s+', ' ', text).strip()
    return text

In [16]:
# tokenization
def tokenize(examples):

    cleaning = [clean_text(inst) for inst in examples['instruction']]
    cleaned_inputs = [clean_text(inp) for inp in examples['input']]

    inputs = [f"{inst} {inp}" for inst, inp in zip(cleaning, cleaned_inputs)]

    tokenized_inputs = tokenizer(inputs, truncation=True, padding="max_length", max_length=512)
    tokenized_inputs["labels"] = examples["label"]
    return tokenized_inputs


# tokenize datasets
tokenized_train = train_df.map(tokenize, batched=True)
tokenized_val = val_df.map(tokenize, batched=True)
tokenized_test = test_df.map(tokenize, batched=True)

tokenized_train.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
tokenized_val.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
tokenized_test.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])

Map:   0%|          | 0/12000 [00:00<?, ? examples/s]

Map:   0%|          | 0/1500 [00:00<?, ? examples/s]

Map:   0%|          | 0/1500 [00:00<?, ? examples/s]

In [17]:
#Model
num_labels = len(label_mapping)
model = BertForSequenceClassification.from_pretrained("emilyalsentzer/Bio_ClinicalBERT", num_labels=num_labels)

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at emilyalsentzer/Bio_ClinicalBERT and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [18]:

training_args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=3,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    warmup_steps=500,
    weight_decay=0.01,
    logging_dir="./logs",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
)



In [20]:
# class CustomTrainer(Trainer):
#     def compute_loss(self, model, inputs, return_outputs=False):
#         labels = inputs.get("labels")
#         if labels is None:
#             raise ValueError("Labels should not be None")
#         outputs = model(**inputs)
#         logits = outputs.get("logits")
#         loss = F.cross_entropy(logits, labels)
#         return (loss, outputs) if return_outputs else loss


# trainer = CustomTrainer(
#     model=model,
#     args=training_args,
#     train_dataset=tokenized_train,
#     eval_dataset=tokenized_val,
# )

class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.get("labels")
        if labels is None:
            raise ValueError("Labels should not be None")
        outputs = model(**inputs)
        logits = outputs.get("logits")

        logits = logits.contiguous()
        labels = labels.contiguous()

        loss = F.cross_entropy(logits, labels)

        return (loss, outputs) if return_outputs else loss

trainer = CustomTrainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_val,
)

In [21]:
trainer.train()

Epoch,Training Loss,Validation Loss


Epoch,Training Loss,Validation Loss
1,9.6006,9.440289
2,9.6278,10.097114
3,9.4962,11.388518


TrainOutput(global_step=4500, training_loss=9.576681857638889, metrics={'train_runtime': 3595.3945, 'train_samples_per_second': 10.013, 'train_steps_per_second': 1.252, 'total_flos': 1.0729306939392e+16, 'train_loss': 9.576681857638889, 'epoch': 3.0})

In [22]:
# saving the model
model_path = "./saved_model"
os.makedirs(model_path, exist_ok=True)
model.save_pretrained(model_path)
tokenizer.save_pretrained(model_path)

('./saved_model/tokenizer_config.json',
 './saved_model/special_tokens_map.json',
 './saved_model/vocab.txt',
 './saved_model/added_tokens.json',
 './saved_model/tokenizer.json')

In [23]:
evaluation = trainer.evaluate()
evaluation

{'eval_loss': 9.440289497375488,
 'eval_runtime': 40.2781,
 'eval_samples_per_second': 37.241,
 'eval_steps_per_second': 4.668,
 'epoch': 3.0}

In [26]:

def predict_category(instruction, input_text):

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    cleaned_instruction = clean_text(instruction)
    cleaned_input = clean_text(input_text)

    inputs = tokenizer(f"{cleaned_instruction} {cleaned_input}", return_tensors="pt", truncation=True, padding="max_length", max_length=512)
    inputs = {k: v.to(device) for k, v in inputs.items()}

    with torch.no_grad():
        outputs = model(**inputs)
    logits = outputs.logits
    predicted_id = torch.argmax(logits, dim=-1).item()
    bert_prediction = list(label_mapping.keys())[list(label_mapping.values()).index(predicted_id)]

    semantic_results = semantic_search(f"{cleaned_instruction} {cleaned_input}")

    if bert_prediction in [result[0] for result in semantic_results]:
        return bert_prediction
    else:
        return semantic_results[0][0]



instruction_input = "If you are a doctor, please answer the medical questions based on the patient's description."
sample_input = "I have a 13 month old baby who is very congested with a terrible cough. Its rattly/raspy and croupy sounding cough. She started choking on her coughs and the mucous that has come up. She also has a fever and runny nose. Should i take her to urgent care?"
predicted_label = predict_category(instruction_input, sample_input)
print(f"Instruction: {instruction_input}")
print(f"Input: {sample_input}")
print(f"Predicted Response: {predicted_label}")

Instruction: If you are a doctor, please answer the medical questions based on the patient's description.
Input: I have a 13 month old baby who is very congested with a terrible cough. Its rattly/raspy and croupy sounding cough. She started choking on her coughs and the mucous that has come up. She also has a fever and runny nose. Should i take her to urgent care?
Predicted Response: Hi, fever, cough and cold in a 13-month-old child suggest upper respiratory tract infection. Since you are already giving antipyretic medicine for 5 days, the child should be given antibiotic depending on the chest examination. Kindly get the child examined by doctor so that antibiotic can be started. Blood test like complete blood count, CRP level should be done to know the causes of fever. Take care.
