In [1]:
import pandas as pd
from transformers import T5Tokenizer, T5ForConditionalGeneration, Trainer, TrainingArguments, AdamW, get_linear_schedule_with_warmup
import torch
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Load the sample data into a DataFrame
# df = pd.read_json("HealthCareMagic-100k.json")
df = pd.read_parquet("HealthCareMagic.parquet")
# df = df.head(10000)

In [3]:
# df["input_text"] = df.apply(lambda x: f"question: {x['input']} context: {x['instruction']}", axis=1)
# del df["instruction"], df["input"]
# df.to_parquet("HealthCareMagic.parquet")

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [5]:
# Load model and tokenizer
model_name = "t5-small"  # or "dmis-lab/biobert-v1.1"
tokenizer = T5Tokenizer.from_pretrained(model_name)
# tokenizer.pad_token = tokenizer.eos_token  # or use a new token if desired
model = T5ForConditionalGeneration.from_pretrained(model_name).to(device)

You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565


In [6]:
# Custom Dataset Class
# class MedicalDataset(Dataset):
#     def __init__(self, dataframe, tokenizer, max_length=512):
#         self.tokenizer = tokenizer
#         self.input_texts = dataframe["input"].tolist()
#         self.output_texts = dataframe["output"].tolist()
#         self.max_length = max_length

#     def __len__(self):
#         return len(self.input_texts)

#     def __getitem__(self, idx):
#         input_encoding = self.tokenizer(
#             self.input_texts[idx],
#             truncation=True,
#             padding="max_length",
#             max_length=self.max_length,
#             return_tensors="pt",
#         )
#         output_encoding = self.tokenizer(
#             self.output_texts[idx],
#             truncation=True,
#             padding="max_length",
#             max_length=self.max_length,
#             return_tensors="pt",
#         )

#         return {
#             "input_ids": input_encoding["input_ids"].flatten(),
#             "attention_mask": input_encoding["attention_mask"].flatten(),
#             "labels": output_encoding[
#                 "input_ids"
#             ].flatten(),  # Use output ids as labels
#         }


# # Create the dataset
# dataset = MedicalDataset(df, tokenizer)

In [7]:
# def __getitem__(self, idx):
#     input_encoding = self.tokenizer(
#         self.input_texts[idx],
#         truncation=True,
#         padding="max_length",
#         max_length=self.max_length,
#         return_tensors="pt",
#     )

#     # Check if output text is empty
#     if self.output_texts[idx]:
#         output_encoding = self.tokenizer(
#             self.output_texts[idx],
#             truncation=True,
#             padding="max_length",
#             max_length=self.max_length,
#             return_tensors="pt",
#         )
#         labels = output_encoding["input_ids"].flatten()
#     else:
#         # If output is empty, create a tensor filled with the padding token
#         labels = torch.full((self.max_length,), self.tokenizer.pad_token_id)

#     return {
#         "input_ids": input_encoding["input_ids"].flatten(),
#         "attention_mask": input_encoding["attention_mask"].flatten(),
#         "labels": labels,  # Use output ids as labels
#     }


In [8]:
class MedicalDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_length=512):
        self.tokenizer = tokenizer
        self.input_texts = dataframe["input_text"].tolist()
        self.output_texts = dataframe["output"].tolist()
        self.max_length = max_length

        # Filter out entries where input or output is empty
        self.data = [
            (input_text, output_text)
            for input_text, output_text in zip(self.input_texts, self.output_texts)
            if input_text and output_text
        ]

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        input_text, output_text = self.data[idx]

        input_encoding = self.tokenizer(
            input_text,
            truncation=True,
            padding="max_length",
            max_length=self.max_length,
            return_tensors="pt",
        )

        output_encoding = self.tokenizer(
            output_text,
            truncation=True,
            padding="max_length",
            max_length=self.max_length,
            return_tensors="pt",
        )

        return {
            "input_ids": input_encoding["input_ids"].flatten(),
            "attention_mask": input_encoding["attention_mask"].flatten(),
            "labels": output_encoding["input_ids"].flatten(),  # Use output ids as labels
        }
dataset = MedicalDataset(df, tokenizer)

In [9]:
def compute_accuracy(predictions, labels):
    """
    Computes accuracy by comparing the model's predictions to the true labels.
    Args:
        predictions (torch.Tensor): Predicted logits from the model
        labels (torch.Tensor): True labels
    Returns:
        float: Accuracy as a percentage
    """
    # Get predicted labels by taking the argmax over the logits
    preds = torch.argmax(predictions, dim=-1)
    
    # Calculate the number of correct predictions
    correct = (preds == labels).sum().item()
    
    # Return accuracy as a percentage
    accuracy = correct / labels.size(0)
    return accuracy

In [10]:
# Set training arguments
training_args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=3,
    per_device_train_batch_size=32,
    save_steps=500,
    save_total_limit=2,
    logging_dir="./logs",
    logging_steps=10,  # Log every 10 steps
    evaluation_strategy="epoch",  # Optional: evaluate at the end of each epoch
    learning_rate=5e-5,  # Set initial learning rate here
    fp16=True,  # Mixed precision training for larger batch sizes
    gradient_accumulation_steps=2  # Adjust if needed to save memory
)

optimizer = AdamW(model.parameters(), lr=training_args.learning_rate)
scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=500,  # Adjust based on your needs
    num_training_steps=len(dataset) // training_args.per_device_train_batch_size * training_args.num_train_epochs
)



In [11]:
train_dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
model.train()  # Set model to training mode
for epoch in range(training_args.num_train_epochs):
    # Wrap train_dataloader with tqdm to show progress bar for each batch
    epoch_progress_bar = tqdm(train_dataloader, desc=f"Epoch {epoch+1}/{training_args.num_train_epochs}", dynamic_ncols=True)
    
    # Reset epoch loss and accuracy
    total_loss = 0.0
    total_accuracy = 0.0
    
    for batch in epoch_progress_bar:
        # Move batch to device (GPU/CPU)
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)
        
        # Zero gradients
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        logits = outputs.logits
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        scheduler.step()
        
        # Track loss
        total_loss += loss.item()
        
        # Calculate accuracy
        batch_accuracy = compute_accuracy(logits, labels)
        total_accuracy += batch_accuracy
        
        # Update progress bar with loss and accuracy
        epoch_progress_bar.set_postfix(loss=total_loss / (epoch_progress_bar.n + 1), accuracy=total_accuracy / (epoch_progress_bar.n + 1))

    # Optionally: Print epoch stats after each epoch
    avg_loss = total_loss / len(train_dataloader)
    avg_accuracy = total_accuracy / len(train_dataloader)
    print(f"Epoch {epoch+1} - Loss: {avg_loss:.4f}, Accuracy: {avg_accuracy:.4f}")

Epoch 1/3:   0%|          | 0/3506 [00:00<?, ?it/s]Passing a tuple of `past_key_values` is deprecated and will be removed in Transformers v4.48.0. You should pass an instance of `EncoderDecoderCache` instead, e.g. `past_key_values=EncoderDecoderCache.from_legacy_cache(past_key_values)`.
Epoch 1/3: 100%|██████████| 3506/3506 [16:53:39<00:00, 17.35s/it, accuracy=391, loss=1.53]  


Epoch 1 - Loss: 1.5291, Accuracy: 390.6495


Epoch 2/3: 100%|██████████| 3506/3506 [16:55:36<00:00, 17.38s/it, accuracy=412, loss=1.08]  


Epoch 2 - Loss: 1.0788, Accuracy: 412.1550


Epoch 3/3: 100%|██████████| 3506/3506 [17:21:46<00:00, 17.83s/it, accuracy=414, loss=1.04]  

Epoch 3 - Loss: 1.0445, Accuracy: 414.4083





In [2]:
# Save the trained model and tokenizer
model.save_pretrained("./saved_model")
tokenizer.save_pretrained("./saved_model")

print("Model and tokenizer saved successfully!")

NameError: name 'model' is not defined

In [1]:
# Testing the model on a prompt
def generate_response(prompt):
    # Encode the input prompt
    input_encodings = tokenizer(
        prompt,
        return_tensors="pt",
        padding="max_length",
        truncation=True,
        max_length=512,
    ).to(device)

    # Generate a response from the model
    output = model.generate(
        input_ids=input_encodings["input_ids"],
        attention_mask=input_encodings["attention_mask"],
        max_length=1024,  # Maximum length of generated text
        num_return_sequences=1,  # Number of generated outputs to return
        pad_token_id=tokenizer.eos_token_id,  # Ensure correct padding token (usually EOS token for models like GPT)
        do_sample=True,  # Enable sampling for diversity
        temperature=0.7,  # Controls randomness, 0.7 is a good balance for coherence and diversity
        top_k=50,  # Limit the sampling to the top 50 tokens (high diversity, reasonable quality)
        top_p=0.9,  # Nucleus sampling: use the top 90% of probability mass (ensures diversity while keeping quality)
        num_beams=4,  # Beam search to improve quality of generation (4 beams strikes a good balance)
        no_repeat_ngram_size=2,  # Prevents repeating bigrams to avoid redundancy in generated text
        early_stopping=True  # Stops generation when EOS token is generated
    )


    # Decode the output to text
    
    response = tokenizer.decode(output[0], skip_special_tokens=True)
    # response1 = tokenizer.decode(output[1], skip_special_tokens=True)
    # return response, response1
    return response


# Example prompt to test the model
instruction = "If you are a doctor, please answer the medical questions based on the patient's description."
input = "I've been feeling really tired lately, and I've noticed some shortness of breath, especially when doing everyday activities. My sleep has been disturbed, and I sometimes feel dizzy. I've also had a mild cough for a couple of weeks that doesn't seem to go away. I'm concerned it might be something more serious, like a lung issue or maybe even anemia. I just don't feel like myself, and it's been affecting my daily routine."
prompt = f"question: {input} context: {instruction}"
# test_prompt = "What should I do if I have a fever and headache?"
# response, response1 = generate_response(prompt)
response = generate_response(prompt)
print(f"Prompt: {input}")
print(f"Response: {response}")
# print(f"Response 2: {response1}")
print(len(response))

NameError: name 'tokenizer' is not defined