In [None]:
!pip install -r requirements.txt

In [1]:
import torch
from transformers import (
    AutoProcessor,
    TrainingArguments,
    Trainer,
    BitsAndBytesConfig,
    Qwen2_5_VLForConditionalGeneration,
    Qwen2_5_VLProcessor
)
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
from PIL import Image
import requests
from io import BytesIO
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import pandas as pd

print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name(0)}")

PyTorch version: 2.9.1+cu128
CUDA available: False


In [2]:
print("Initializing PySpark...")

# Create Spark session with increased memory and optimizations
spark = SparkSession.builder \
    .appName("LLaVA-FineTuning") \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.memory", "6g") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.network.timeout", "800s") \
    .config("spark.executor.heartbeatInterval", "200s") \
    .config("spark.kryoserializer.buffer.max", "512m") \
    .getOrCreate()

print(f"Spark version: {spark.version}")

Initializing PySpark...


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/28 00:33:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark version: 4.0.1


In [3]:

print("Loading ROCOv2-radiology dataset with PySpark...")
print("Dataset info: 79,789 radiological images with captions")
print("Source: https://huggingface.co/datasets/eltorio/ROCOv2-radiology")

# Dataset directory - CHANGE THIS TO YOUR ACTUAL PATH
DATASET_DIR = "./ROCOv2-radiology/data"  # or "/path/to/your/data"

# ROCOv2 Dataset Statistics:
# - 59,958 training images
# - 9,904 validation images  
# - 9,927 test images

# Load train, validation, and test datasets separately
print("\nLoading training data (59,958 images)...")
train_df = spark.read.parquet(f"{DATASET_DIR}/train-*.parquet")

print("Loading validation data (9,904 images)...")
val_df = spark.read.parquet(f"{DATASET_DIR}/validation-*.parquet")

print("Loading test data (9,927 images)...")
test_df = spark.read.parquet(f"{DATASET_DIR}/test-*.parquet")

# Display schema and sample data
print("\n" + "="*70)
print("ROCOv2 DATASET INFORMATION")
print("="*70)
print("\nDataset Schema:")
train_df.printSchema()

print("\nExpected columns: image, image_id, caption, cui (medical concepts)")
print("\nSample Training Data:")
train_df.show(3, truncate=True)

print("\nDataset Statistics:")
train_count = train_df.count()
val_count = val_df.count()
test_count = test_df.count()
print(f"Training samples: {train_count}")
print(f"Validation samples: {val_count}")
print(f"Test samples: {test_count}")
print(f"Total samples: {train_count + val_count + test_count}")

Loading ROCOv2-radiology dataset with PySpark...
Dataset info: 79,789 radiological images with captions
Source: https://huggingface.co/datasets/eltorio/ROCOv2-radiology

Loading training data (59,958 images)...


25/11/28 00:33:37 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: ./ROCOv2-radiology/data/train-*.parquet.
java.io.FileNotFoundException: File ROCOv2-radiology/data/train-*.parquet does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:917)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1238)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:907)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:56)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:381)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache.spa

Loading validation data (9,904 images)...
Loading test data (9,927 images)...

ROCOv2 DATASET INFORMATION

Dataset Schema:
root
 |-- image: struct (nullable = true)
 |    |-- bytes: binary (nullable = true)
 |    |-- path: string (nullable = true)
 |-- image_id: string (nullable = true)
 |-- caption: string (nullable = true)
 |-- cui: array (nullable = true)
 |    |-- element: string (containsNull = true)


Expected columns: image, image_id, caption, cui (medical concepts)

Sample Training Data:


25/11/28 00:33:38 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: ./ROCOv2-radiology/data/validation-*.parquet.
java.io.FileNotFoundException: File ROCOv2-radiology/data/validation-*.parquet does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:917)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1238)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:907)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:56)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:381)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.

+--------------------+--------------------+--------------------+----------+
|               image|            image_id|             caption|       cui|
+--------------------+--------------------+--------------------+----------+
|{[89 50 4E 47 0D ...|ROCOv2_2023_train...|Head CT demonstra...|[C0040405]|
|{[89 50 4E 47 0D ...|ROCOv2_2023_train...|Acquired renal cy...|[C0041618]|
|{[89 50 4E 47 0D ...|ROCOv2_2023_train...|Computed tomograp...|[C0040405]|
+--------------------+--------------------+--------------------+----------+
only showing top 3 rows

Dataset Statistics:
Training samples: 59962
Validation samples: 9904
Test samples: 9927
Total samples: 79793


In [4]:
print("\n" + "="*70)
print("PREPROCESSING DATA") 
print("="*70)

# ROCOv2 columns: image, image_id, caption, cui
# - image: binary image data
# - image_id: unique identifier  
# - caption: radiological description
# - cui: medical concept codes (array)

# Filter out invalid entries
print("\nFiltering invalid entries...")
train_df = train_df.filter(
    (col("image").isNotNull()) & 
    (col("caption").isNotNull()) & 
    (col("caption") != "")
)

val_df = val_df.filter(
    (col("image").isNotNull()) & 
    (col("caption").isNotNull()) &
    (col("caption") != "")
)

# OPTIONAL: Sample a subset for faster training and testing
# Set USE_SAMPLE = True to test with smaller dataset first
USE_SAMPLE = True  # Change to False for full dataset
SAMPLE_FRACTION = 0.1  # Use 10% of data for testing

if USE_SAMPLE:
    print(f"\n⚠️  SAMPLING {SAMPLE_FRACTION*100}% of data for testing...")
    train_df = train_df.sample(fraction=SAMPLE_FRACTION, seed=42)
    val_df = val_df.sample(fraction=SAMPLE_FRACTION, seed=42)
    print("Set USE_SAMPLE = False to train on full dataset")

print(f"\nAfter filtering:")
train_count = train_df.count()
val_count = val_df.count()
print(f"Training samples: {train_count}")
print(f"Validation samples: {val_count}")

# Convert to Pandas for PyTorch compatibility
# Using Arrow for faster conversion
print("\nConverting to Pandas (this may take a few minutes)...")
try:
    train_data = train_df.toPandas()
    eval_data = val_df.toPandas()
    
    print(f"\n✓ Pandas DataFrames created successfully:")
    print(f"Training: {len(train_data)} samples")
    print(f"Validation: {len(eval_data)} samples")
    
    # Display sample to verify structure
    print("\nSample training data:")
    print(train_data[['image_id', 'caption']].head(2))
    
except Exception as e:
    print(f"\n❌ Error during toPandas conversion: {e}")
    print("\nTroubleshooting suggestions:")
    print("1. Increase Spark memory in cell 2")
    print("2. Enable USE_SAMPLE = True to test with smaller dataset")
    print("3. Restart kernel and try again")
    raise

# Stop Spark session to free memory
print("\nStopping Spark session...")
spark.stop()

print("\n✓ Data loading complete!")


PREPROCESSING DATA

Filtering invalid entries...

⚠️  SAMPLING 10.0% of data for testing...
Set USE_SAMPLE = False to train on full dataset

After filtering:


                                                                                

Training samples: 6111
Validation samples: 966

Converting to Pandas (this may take a few minutes)...


                                                                                


✓ Pandas DataFrames created successfully:
Training: 6111 samples
Validation: 966 samples

Sample training data:
                   image_id                                            caption
0  ROCOv2_2023_train_000008  Computed tomography of the head on Day 0 shows...
1  ROCOv2_2023_train_000017                                  Strawberry skull.

Stopping Spark session...

✓ Data loading complete!


In [5]:
# Fine-tuning approach:
# 1. Fine-tune Qwen2.5-VL model with LoRA on ROCOv2 radiology dataset
# 2. Save the fine-tuned model for inference

MODEL_NAME = "Qwen/Qwen2.5-VL-3B-Instruct"
OUTPUT_DIR = "./qwen-lora-finetuned"

# LoRA configuration
LORA_CONFIG = {
    "r": 16,
    "lora_alpha": 32,
    "lora_dropout": 0.05,
    "target_modules": ["q_proj", "k_proj", "v_proj", "o_proj"],
    "bias": "none",
    "task_type": "CAUSAL_LM"
}

# Training configuration for CPU
TRAINING_CONFIG = {
    "output_dir": OUTPUT_DIR,
    "num_train_epochs": 1,  # Reduced for CPU training
    "per_device_train_batch_size": 1,  # Reduced batch size for CPU
    "per_device_eval_batch_size": 1,
    "gradient_accumulation_steps": 16,  # Increased to maintain effective batch size
    "learning_rate": 2e-4,
    "warmup_steps": 50,  # Reduced for shorter training
    "logging_steps": 10,
    "save_steps": 500,
    "eval_steps": 500,
    "save_total_limit": 2,
    "fp16": False,  # Disabled fp16 for CPU training
    "remove_unused_columns": False,
    "dataloader_pin_memory": False,
    "report_to": "none",
    "eval_strategy": "steps"
}

print("="*70)
print("FINE-TUNING CONFIGURATION FOR CPU")
print("="*70)
print(f"Model: {MODEL_NAME}")
print(f"Output directory: {OUTPUT_DIR}")
print("\nFine-tuning Qwen2.5-VL on ROCOv2 radiology dataset")
print("NOTE: Training on CPU - reduced batch size and epochs for feasibility")

FINE-TUNING CONFIGURATION FOR CPU
Model: Qwen/Qwen2.5-VL-3B-Instruct
Output directory: ./qwen-lora-finetuned

Fine-tuning Qwen2.5-VL on ROCOv2 radiology dataset
NOTE: Training on CPU - reduced batch size and epochs for feasibility


In [1]:
print("Loading Qwen2.5-VL model for CPU training...")

# Processor
processor = Qwen2_5_VLProcessor.from_pretrained(MODEL_NAME)

# Model - load without quantization for CPU training
model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
    MODEL_NAME,
    torch_dtype=torch.float32,  # Use float32 for CPU
    trust_remote_code=True
)

print("Model loaded successfully for CPU training!")

Loading Qwen2.5-VL model for CPU training...


NameError: name 'Qwen2_5_VLProcessor' is not defined

In [7]:

print("Preparing model for LoRA training...")

# Prepare model for k-bit training
model = prepare_model_for_kbit_training(model)

# Configure LoRA
lora_config = LoraConfig(**LORA_CONFIG)

# Apply LoRA to the model
model = get_peft_model(model, lora_config)

# Print trainable parameters
model.print_trainable_parameters()

Preparing model for LoRA training...
trainable params: 7,372,800 || all params: 9,311,059,968 || trainable%: 0.0792


In [8]:

class LLaVADataset(torch.utils.data.Dataset):
    """Dataset class for ROCOv2-radiology medical imaging dataset"""
    def __init__(self, dataframe, processor, mode='caption'):
        self.data = dataframe.to_dict('records')
        self.processor = processor
        self.mode = mode  # 'caption' or 'vqa'
        
        print(f"\nROCOv2 Dataset initialized with {len(self.data)} samples")
        print(f"Mode: {self.mode}")
        print(f"Columns: {list(dataframe.columns)}")
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        item = self.data[idx]
        
        # ROCOv2 has binary image data stored in 'image' column
        image_data = item['image']
        
        try:
            # Handle binary image data from ROCOv2
            if isinstance(image_data, bytes):
                image = Image.open(BytesIO(image_data)).convert('RGB')
            elif isinstance(image_data, dict) and 'bytes' in image_data:
                # HuggingFace datasets format
                image = Image.open(BytesIO(image_data['bytes'])).convert('RGB')
            else:
                print(f"Unexpected image format at index {idx}")
                image = Image.new('RGB', (224, 224), color='black')
        except Exception as e:
            print(f"Error loading image at index {idx}: {e}")
            image = Image.new('RGB', (224, 224), color='black')
        
        # Get caption (radiological description)
        caption = item.get('caption', '')
        if caption is None or caption == '':
            caption = "Medical image."
        
        # Format as image captioning task or VQA task
        if self.mode == 'vqa':
            # Format as visual question answering
            questions = [
                "What does this medical image show?",
                "Describe this radiological image.",
                "What is visible in this scan?",
                "What are the findings in this image?",
            ]
            question = questions[idx % len(questions)]
            answer = caption
        else:
            # Format as direct captioning
            question = "Describe this medical image."
            answer = caption
        
        # LLaVA conversation format
        conversation = [
            {
                "role": "user",
                "content": f"<image>\n{question}"
            },
            {
                "role": "assistant",
                "content": answer
            }
        ]
        
        # Apply chat template
        prompt = self.processor.apply_chat_template(
            conversation, 
            add_generation_prompt=False
        )
        
        # Process inputs
        inputs = self.processor(
            text=prompt,
            images=image,
            return_tensors="pt",
            padding="max_length",
            truncation=True,
            max_length=512
        )
        
        # Prepare labels
        labels = inputs["input_ids"].clone()
        
        # Mask padding tokens
        labels[labels == self.processor.tokenizer.pad_token_id] = -100
        
        # Find the assistant's response start
        # Mask everything before the assistant's response
        assistant_start = prompt.find("assistant")
        if assistant_start != -1:
            tokens_before = self.processor.tokenizer(
                prompt[:assistant_start], 
                add_special_tokens=False
            )["input_ids"]
            labels[0, :len(tokens_before)] = -100
        
        return {
            "input_ids": inputs["input_ids"].squeeze(0),
            "attention_mask": inputs["attention_mask"].squeeze(0),
            "pixel_values": inputs["pixel_values"].squeeze(0),
            "labels": labels.squeeze(0)
        }

# Create datasets
print("\nCreating PyTorch datasets...")
train_dataset = LLaVADataset(train_data, processor, mode='vqa')
eval_dataset = LLaVADataset(eval_data, processor, mode='vqa')

print(f"\nDataset created:")
print(f"Training samples: {len(train_dataset)}")
print(f"Evaluation samples: {len(eval_dataset)}")
print("\nNote: Fine-tuning LLaVA on medical radiology images (ROCOv2)")
print("This model will learn to describe radiological images.")


Creating PyTorch datasets...

ROCOv2 Dataset initialized with 6111 samples
Mode: vqa
Columns: ['image', 'image_id', 'caption', 'cui']

ROCOv2 Dataset initialized with 966 samples
Mode: vqa
Columns: ['image', 'image_id', 'caption', 'cui']

Dataset created:
Training samples: 6111
Evaluation samples: 966

Note: Fine-tuning LLaVA on medical radiology images (ROCOv2)
This model will learn to describe radiological images.


In [9]:

def collate_fn(batch):
    """Custom collator for batching"""
    input_ids = torch.stack([item["input_ids"] for item in batch])
    attention_mask = torch.stack([item["attention_mask"] for item in batch])
    pixel_values = torch.stack([item["pixel_values"] for item in batch])
    labels = torch.stack([item["labels"] for item in batch])
    
    return {
        "input_ids": input_ids,
        "attention_mask": attention_mask,
        "pixel_values": pixel_values,
        "labels": labels
    }

In [14]:

print("\nSetting up training...")

# Training arguments
training_args = TrainingArguments(**TRAINING_CONFIG)

# Initialize Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    data_collator=collate_fn
)


Setting up training...


In [15]:
print("\nStarting training...")
print("="*70)

trainer.train()

print("="*70)
print("Training completed!")


Starting training...


ValueError: You can't train a model that has been loaded in 8-bit or 4-bit precision with CPU or disk offload. If you want train the 8-bit or 4-bit model in CPU, please install bitsandbytes with multi-backend, see https://huggingface.co/docs/bitsandbytes/main/en/installation#multi-backend

In [None]:
print(f"\nSaving model to {OUTPUT_DIR}...")

# Save the LoRA adapter
model.save_pretrained(OUTPUT_DIR)
processor.save_pretrained(OUTPUT_DIR)

print("Model saved successfully!")

In [None]:
print("\n" + "="*70)
print("SAVING FINAL MODEL")
print("="*70)

# Step 1: Merge LoRA weights with base model
print("\n[1/2] Merging LoRA weights with base model...")
model = model.merge_and_unload()

# Step 2: Save merged model in HuggingFace format
MERGED_OUTPUT_DIR = "./qwen-radiology-merged"
print(f"[2/2] Saving merged model to {MERGED_OUTPUT_DIR}...")
model.save_pretrained(MERGED_OUTPUT_DIR)
processor.save_pretrained(MERGED_OUTPUT_DIR)
print(f"✓ Merged model saved")

print("\n" + "="*70)
print("MODEL EXPORT COMPLETE")
print("="*70)
print(f"\nFinal model saved to: {MERGED_OUTPUT_DIR}")
print(f"You can now use this model for inference or further deployment.")
print("\n" + "="*70)

# Testing

In [None]:
print("\n" + "="*70)
print("TESTING THE FINE-TUNED MODEL")
print("="*70 + "\n")

from peft import PeftModel

# Load base model for testing (CPU version)
base_model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
    MODEL_NAME,
    torch_dtype=torch.float32,
    trust_remote_code=True
)

# Load LoRA adapter
model = PeftModel.from_pretrained(base_model, OUTPUT_DIR)
model.eval()

def generate_response(image_data, question):
    """Generate response for an image and question"""
    try:
        # Handle binary image data from ROCOv2 dataset
        if isinstance(image_data, bytes):
            image = Image.open(BytesIO(image_data)).convert('RGB')
        elif isinstance(image_data, dict) and 'bytes' in image_data:
            image = Image.open(BytesIO(image_data['bytes'])).convert('RGB')
        else:
            print(f"Unexpected image format: {type(image_data)}")
            return "Error: Could not load image"
    except Exception as e:
        print(f"Error loading image: {e}")
        return "Error: Could not load image"
    
    # Create conversation in Qwen2.5-VL format
    conversation = [
        {
            "role": "user",
            "content": [
                {"type": "image"},
                {"type": "text", "text": question}
            ]
        }
    ]
    
    # Apply chat template
    prompt = processor.apply_chat_template(
        conversation,
        add_generation_prompt=True
    )
    
    # Process inputs
    inputs = processor(text=prompt, images=[image], return_tensors="pt")
    
    # Generate response
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=200,
            do_sample=True,
            temperature=0.7,
            top_p=0.9,
            pad_token_id=processor.tokenizer.pad_token_id
        )
    
    # Decode response
    response = processor.decode(outputs[0][len(inputs["input_ids"][0]):], skip_special_tokens=True)
    
    return response.strip()

# Test with a sample from the dataset
if len(eval_data) > 0:
    test_sample = eval_data.iloc[0]
    print(f"Testing with sample from evaluation dataset:")
    print(f"Image ID: {test_sample.get('image_id', 'Unknown')}")
    print(f"Ground Truth Caption: {test_sample['caption']}")
    print(f"\nModel Response:")
    
    question = "Describe this medical image."
    response = generate_response(test_sample['image'], question)
    print(response)
else:
    print("No evaluation data available for testing")

print("\n" + "="*70)
print("FINE-TUNING COMPLETE!")
print("="*70)

print(f"\nNext steps:")
print(f"1. LoRA adapter saved at: {OUTPUT_DIR}")
print(f"2. Use the generate_response() function to test inference")
print(f"3. Merge weights and save final model when satisfied with results")