In [1]:
!pip install huggingface_hub transformers datasets peft sacremoses 

Collecting sacremoses
  Downloading sacremoses-0.1.1-py3-none-any.whl.metadata (8.3 kB)
Downloading sacremoses-0.1.1-py3-none-any.whl (897 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m897.5/897.5 kB[0m [31m26.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: sacremoses
Successfully installed sacremoses-0.1.1


In [2]:
from huggingface_hub import login
login(token="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")

In [10]:
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModelForCausalLM ,TrainingArguments, Trainer ,BioGptTokenizer, BioGptForCausalLM ,DataCollatorForLanguageModeling
from datasets import load_dataset ,concatenate_datasets, DatasetDict ,Dataset
from peft import get_peft_model, PromptTuningConfig, TaskType ,PromptTuningInit ,PeftModel
from sklearn.model_selection import train_test_split


import random
import pandas as pd

In [4]:
model_name = "microsoft/biogpt"
model = BioGptForCausalLM.from_pretrained(model_name)
tokenizer = BioGptTokenizer.from_pretrained(model_name)

In [8]:
def get_random_samples(dataset,type_d, n=5,seed = 45 ):
    if seed is not None:
        random.seed(seed)
    return random.sample(list(dataset[type_d]), n)
def process_data(sample, dataset_name):
    if dataset_name == "MedQA":
        question = sample['data']['Question']
        context = ""
        correct_answer = sample['data']['Correct Answer']
        options = sample['data']['Options']
    elif dataset_name == "PubMedQA":
        question = sample['data']['Question']
        context = ' '.join(sample['data']['Context'])
        correct_answer = sample['data']['Correct Answer']
        options = sample['data']['Options']
    elif dataset_name == "MedMCQA":
        question = sample['question']
        context = sample.get('exp', '')
        correct_answer = [k for k, v in {'opa': 0, 'opb': 1, 'opc': 2, 'opd': 3}.items() if v == sample['cop']][0]
        options = {
            'A': sample['opa'],
            'B': sample['opb'],
            'C': sample['opc'],
            'D': sample['opd']
        }
    elif dataset_name == "LiveQA":
        question = sample['message']
        context = sample.get('focus', '')
        correct_answer = sample['answer']
        options = "QA"
    
    if dataset_name != "LiveQA":
        input_text = f"Context: {context}\nQuestion: {question}\nOptions: {options}\nAnswer:"
    else:
        input_text = f"Context: {context}\nQuestion: {question}\nAnswer:"
    
    return input_text, correct_answer

def split_dataset(dataset, test_size=0.2, seed=42):
    if 'train' in dataset:
        train_dataset = dataset['train']
    else:
        train_dataset = dataset

    train_test_split = train_dataset.train_test_split(test_size=test_size, seed=seed)
    train_split = train_test_split['train'].map(lambda x: {'split': 'train'})
    test_split = train_test_split['test'].map(lambda x: {'split': 'test'})
    
    return DatasetDict({
        'train': train_split,
        'test': test_split
    })

## healthsearchqa = split_dataset(load_dataset("aisc-team-d2/healthsearchqa"))
pubmedqa = split_dataset(load_dataset("openlifescienceai/pubmedqa"))
medmcqa = split_dataset(load_dataset("openlifescienceai/medmcqa"))
medqa = split_dataset(load_dataset("openlifescienceai/medqa"))
liveqa = split_dataset(load_dataset("truehealth/liveqa"))

In [9]:
##Checking the base modal
model.config.pad_token_id = model.config.eos_token_id 
results = []
for dataset_name, dataset in [("PubMedQA", pubmedqa) , ("MedMCQA", medmcqa), ("MedQA", medqa),("LiveQA",liveqa)]:
    print(dataset_name )
    samples = get_random_samples(dataset,'test' ,6)
    for sample in samples:
        input_text ,correct_answer = process_data(sample, dataset_name)
        inputs = tokenizer(input_text, return_tensors="pt")
        if "options:" in input_text.lower():
            additional_prompt = f"If the question is a multiple-choice question, return only the correct answer from the options.Donot Add input prompt in output \n"
        else:
            additional_prompt = f"Donot Add input prompt in output \n.Only Answer is required"

        input_text_with_prompt = additional_prompt  + input_text   
        inputs = tokenizer(input_text_with_prompt, return_tensors="pt")
        print(input_text_with_prompt)
        try:
            outputs = model.generate(**inputs, max_length = 1024 )
            llm_answer = tokenizer.decode(outputs[0], skip_special_tokens=True)
            llm_answer = llm_answer.replace(input_text_with_prompt, "").strip()
            print("LLM answer",llm_answer)
        except:
            llm_answer = "No Answer"

        result = {
            'prompt': input_text_with_prompt,
            'correct_answer': correct_answer,
            'llm_answer': llm_answer
        }
        
        result['dataset'] = dataset_name
        results.append(result)


result_df = pd.DataFrame(results)
result_df.to_csv("gs://llm-medical-model/result-med/result_base_model_Bio_GPT_F.csv")

PubMedQA
If the question is a multiple-choice question, return only the correct answer from the options.Donot Add input prompt in output 
Context: Severe upper gastrointestinal (GI) motor disorders, including gastroparesis (GP), can consume significant health care resources. Many patients are refractory to traditional drug therapy. To compare symptoms, healthcare resource utilization and costs in two groups of patients with the symptoms of GP: those treated via gastric electrical stimulation (GES) and those treated with traditional pharmacological agents in an intensive outpatient program (MED). A long-term comparison of patients with devices (n = 9) vs intensive medical therapy (n = 9). A total of 18 eligible patients with the symptoms of GP reported for 1-year baseline and long-term treatment for 3 years. Patients with the symptoms of GP were treated by a GES or intensive medical therapy (MED). GP Symptoms, healthcare resource utilization using investigator-derived independent outcom

In [17]:

def transform_sample(sample, dataset_name="LiveQA"):
    return {
        'prompt': f"Context: {sample.get('focus', '')}\nQuestion: {sample['message']}\nAnswer:",
        'question': sample['message'],
        'context': sample.get('focus', ''),
        'correct_answer': sample['answer'],
        'dataset_source': dataset_name
    }

def transform_dataset(dataset):
    transformed_dataset = DatasetDict()
    for split, data in dataset.items():
        transformed_dataset[split] = data.map(transform_sample)
    return transformed_dataset

transformed_dataset = transform_dataset(liveqa)

def tokenize_function(examples):
    model_inputs = tokenizer(
        examples['prompt'],
        padding="max_length",
        truncation=True,
        max_length=512,
        return_tensors="pt"
    )
    model_inputs['labels'] = model_inputs['input_ids'].clone()
    return model_inputs

tokenized_dataset = transformed_dataset.map(
    tokenize_function,
    batched=True,
    remove_columns=transformed_dataset["train"].column_names
)

peft_config = PromptTuningConfig(
    task_type=TaskType.CAUSAL_LM,
    prompt_tuning_init=PromptTuningInit.TEXT,
    num_virtual_tokens=20,
    prompt_tuning_init_text= """ You are a helpful medical knowledge assistant. Provide useful, complete, and
                                scientifically-grounded answers to common consumer search queries about
                                health. \n""",
    tokenizer_name_or_path=model_name,
)

# Get PEFT model
model = get_peft_model(model, peft_config)

# Training arguments
training_args = TrainingArguments(
    output_dir="gs://llm-medical-model/model-med/prompt_tuned_biogpt",
    num_train_epochs=3,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    warmup_steps=500,
    weight_decay=0.01,
    logging_dir="gs://llm-medical-model/model-med//logs",
)


data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset= tokenized_dataset["train"],
    data_collator=data_collator,
)
trainer.train()

model.save_pretrained("gs://llm-medical-model/model-med/prompt_tuned_biogpt")
tokenizer.save_pretrained("gs://llm-medical-model/model-med/prompt_tuned_biogpt")

Map:   0%|          | 0/508 [00:00<?, ? examples/s]

Map:   0%|          | 0/127 [00:00<?, ? examples/s]

Step,Training Loss


('gs://llm-medical-model/model-med/prompt_tuned_biogpt/tokenizer_config.json',
 'gs://llm-medical-model/model-med/prompt_tuned_biogpt/special_tokens_map.json',
 'gs://llm-medical-model/model-med/prompt_tuned_biogpt/vocab.json',
 'gs://llm-medical-model/model-med/prompt_tuned_biogpt/merges.txt',
 'gs://llm-medical-model/model-med/prompt_tuned_biogpt/added_tokens.json')

In [18]:
model_path= "gs://llm-medical-model/model-med/prompt_tuned_biogpt"
model_trained = BioGptForCausalLM.from_pretrained(model_name)
tokenizer_trained = BioGptTokenizer.from_pretrained(model_name)

In [None]:
##Checking the base modal
model.config.pad_token_id = model_trained.config.eos_token_id 
results = []
for dataset_name, dataset in [("PubMedQA", pubmedqa) , ("MedMCQA", medmcqa), ("MedQA", medqa),("LiveQA",liveqa)]:
    print(dataset_name )
    samples = get_random_samples(dataset,'test' ,6)
    for sample in samples:
        input_text ,correct_answer = process_data(sample, dataset_name)
        inputs = tokenizer(input_text, return_tensors="pt")
        if "options:" in input_text.lower():
            additional_prompt = f"If the question is a multiple-choice question, return only the correct answer from the options.Donot Add input prompt in output \n"
        else:
            additional_prompt = f"Donot Add input prompt in output \n.Only Answer is required"

        input_text_with_prompt = additional_prompt  + input_text   
        inputs = tokenizer(input_text_with_prompt, return_tensors="pt")
        print(input_text_with_prompt)
        try:
            outputs = model_trained.generate(**inputs, max_length = 1024 )
            llm_answer = tokenizer.decode(outputs[0], skip_special_tokens=True)
            llm_answer = llm_answer.replace(input_text_with_prompt, "").strip()
            print("LLM answer",llm_answer)
        except:
            llm_answer = "No Answer"

        result = {
            'prompt': input_text_with_prompt,
            'correct_answer': correct_answer,
            'llm_answer': llm_answer
        }
        
        result['dataset'] = dataset_name
        results.append(result)


result_df = pd.DataFrame(results)
result_df.to_csv("gs://llm-medical-model/result-med/result_base_model_Bio_GPT_F_prompt_tuned.csv")

PubMedQA
If the question is a multiple-choice question, return only the correct answer from the options.Donot Add input prompt in output 
Context: Severe upper gastrointestinal (GI) motor disorders, including gastroparesis (GP), can consume significant health care resources. Many patients are refractory to traditional drug therapy. To compare symptoms, healthcare resource utilization and costs in two groups of patients with the symptoms of GP: those treated via gastric electrical stimulation (GES) and those treated with traditional pharmacological agents in an intensive outpatient program (MED). A long-term comparison of patients with devices (n = 9) vs intensive medical therapy (n = 9). A total of 18 eligible patients with the symptoms of GP reported for 1-year baseline and long-term treatment for 3 years. Patients with the symptoms of GP were treated by a GES or intensive medical therapy (MED). GP Symptoms, healthcare resource utilization using investigator-derived independent outcom