# FER Training & Inference - Load from Drive and Train BLIP-2
This notebook loads the preprocessed dataset from Google Drive and trains BLIP-2 model with LoRA adapters.

**Input:** Processed images and balanced dataset from `/content/drive/MyDrive/processed_data/`

**Output:** Trained model saved to `/content/drive/MyDrive/blip2-emotion-rafce-final/`

In [None]:
from google.colab import drive
import os
import json

# Mount Google Drive
drive.mount('/content/drive')

# Define paths
DATA_DIR = '/content/drive/MyDrive/processed_data'
IMAGES_DIR = os.path.join(DATA_DIR, 'aligned_faces')
DATASET_JSON = os.path.join(DATA_DIR, 'dataset_vision_llm_balanced.json')
OUTPUT_DIR = '/content/drive/MyDrive/blip2-emotion-rafce-final'

print(f"‚úÖ Google Drive mounted")
print(f"üìÅ Data directory: {DATA_DIR}")
print(f"üìÅ Images directory: {IMAGES_DIR}")
print(f"üìÅ Output directory: {OUTPUT_DIR}")

# Verify data exists
if os.path.isdir(IMAGES_DIR) and os.path.isfile(DATASET_JSON):
    print(f"\n‚úÖ Processed data found!")
    print(f"   - Images: {len(os.listdir(IMAGES_DIR))} files")
    with open(DATASET_JSON, 'r') as f:
        dataset = json.load(f)
    print(f"   - Dataset: {len(dataset)} entries")
else:
    print(f"\n‚ùå Processed data NOT found!")
    print(f"   Please run Notebook 1 (Data Preparation) first.")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
‚úÖ Google Drive mounted
üìÅ Data directory: /content/drive/MyDrive/processed_data
üìÅ Images directory: /content/drive/MyDrive/processed_data/aligned_faces
üìÅ Output directory: /content/drive/MyDrive/blip2-emotion-rafce-final

‚ùå Processed data NOT found!
   Please run Notebook 1 (Data Preparation) first.


In [None]:
'''import sys

print("üì¶ Installing training dependencies...\n")
!{sys.executable} -m pip install -U bitsandbytes transformers accelerate peft datasets scipy sentencepiece protobuf -q

print("\n‚úÖ Installation complete!")'''

'import sys\n\nprint("üì¶ Installing training dependencies...\n")\n!{sys.executable} -m pip install -U bitsandbytes transformers accelerate peft datasets scipy sentencepiece protobuf -q\n\nprint("\n‚úÖ Installation complete!")'

In [None]:
import torch
import json
from transformers import Blip2Processor, Blip2ForConditionalGeneration
from peft import LoraConfig, get_peft_model

print("="*70)
print("üîß SETTING UP BLIP-2 MODEL WITH LORA")
print("="*70)

model_id = "Salesforce/blip2-opt-2.7b"
device = "cuda" if torch.cuda.is_available() else "cpu"

print(f"\nüì¶ Loading BLIP-2 model: {model_id}")
print(f"üì± Device: {device}")

# Load processor
processor = Blip2Processor.from_pretrained(model_id)
print(f"‚úÖ Processor loaded")

# Load model with FP16
model = Blip2ForConditionalGeneration.from_pretrained(
    model_id,
    device_map="auto",
    torch_dtype=torch.float16
)

print(f"‚úÖ Base model loaded")
print(f"   Model parameters: {sum(p.numel() for p in model.parameters()) / 1e9:.2f}B")

# Apply LoRA
model.gradient_checkpointing_enable()

lora_config = LoraConfig(
    r=16,
    lora_alpha=32,
    target_modules=["q_proj", "v_proj"],
    lora_dropout=0.05,
    bias="none",
    task_type="CAUSAL_LM"
)

model = get_peft_model(model, lora_config)
model.print_trainable_parameters()

print(f"\n‚úÖ LoRA configuration applied!")

üîß SETTING UP BLIP-2 MODEL WITH LORA

üì¶ Loading BLIP-2 model: Salesforce/blip2-opt-2.7b
üì± Device: cpu
‚úÖ Processor loaded


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import os
import json

class BLIP2EmotionDataset(Dataset):
    """Dataset for BLIP-2 emotion recognition with Action Units"""

    def __init__(self, json_file, img_dir, processor, max_length=256):
        with open(json_file, 'r') as f:
            self.data = json.load(f)

        self.img_dir = img_dir
        self.processor = processor
        self.max_length = max_length
        self.EMOTION_NAMES = ['Surprise', 'Fear', 'Disgust', 'Happiness', 'Sadness', 'Anger']

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]

        # Load image from Drive
        image_path = os.path.join(self.img_dir, item['id'])
        image = Image.open(image_path).convert("RGB")

        # Training prompt
        prompt = (
            "Analyze this facial image and identify:\n"
            "1. Which emotions are present (Surprise, Fear, Disgust, Happiness, Sadness, Anger)\n"
            "2. The facial Action Units (AUs) involved\n"
            "Please explain the connection between the AUs and the emotions."
        )

        # Target text
        emotions_str = ', '.join(item['emotions_present']) if item['emotions_present'] else 'Neutral'
        au_info = item['conversations'][1]['value'].split('Action Units: ')[-1] if 'Action Units:' in item['conversations'][1]['value'] else 'N/A'

        target_text = (
            f"This face exhibits: {emotions_str}. "
            f"Emotion vector: {item['emotion_vector']}. "
            f"Observed Action Units: {au_info}"
        )

        # Process inputs
        inputs = self.processor(
            images=image,
            text=prompt,
            return_tensors="pt",
            padding="max_length",
            max_length=self.max_length,
            truncation=True
        )

        targets = self.processor(
            text=target_text,
            return_tensors="pt",
            padding="max_length",
            max_length=self.max_length,
            truncation=True
        )

        # Remove batch dimension
        for key in inputs:
            if inputs[key] is not None:
                inputs[key] = inputs[key].squeeze(0)

        return {
            "pixel_values": inputs.pixel_values,
            "input_ids": inputs.input_ids,
            "attention_mask": inputs.attention_mask,
            "labels": targets.input_ids[0],
        }

# Create dataset
print("="*70)
print("üìä CREATING PYTORCH DATASET")
print("="*70)

dataset = BLIP2EmotionDataset(
    json_file=DATASET_JSON,
    img_dir=IMAGES_DIR,
    processor=processor,
    max_length=256
)

print(f"\n‚úÖ Dataset created with {len(dataset)} samples")

In [None]:
from torch.utils.data import DataLoader

def blip2_collate_fn(batch):
    """Custom collator for BLIP-2"""
    pixel_values = torch.stack([item['pixel_values'] for item in batch])
    input_ids = torch.stack([item['input_ids'] for item in batch])
    attention_mask = torch.stack([item['attention_mask'] for item in batch])
    labels = torch.stack([item['labels'] for item in batch])

    return {
        "pixel_values": pixel_values,
        "input_ids": input_ids,
        "attention_mask": attention_mask,
        "labels": labels
    }

train_loader = DataLoader(
    dataset,
    batch_size=4,
    shuffle=True,
    collate_fn=blip2_collate_fn,
    num_workers=2
)

print("‚úÖ DataLoader created")
print(f"   Batch size: 4")
print(f"   Total batches: {len(train_loader)}")

In [None]:
from torch.optim import AdamW
from torch.optim.lr_scheduler import LinearLR

print("="*70)
print("‚öôÔ∏è CONFIGURING TRAINING PARAMETERS")
print("="*70)

training_config = {
    "batch_size": 4,
    "gradient_accumulation_steps": 2,
    "num_train_epochs": 3,
    "learning_rate": 2e-4,
    "warmup_steps": 100,
    "weight_decay": 0.01,
    "fp16": True,
}

print(f"\nüìã Training Configuration:")
for key, value in training_config.items():
    print(f"   {key:<30}: {value}")

# Setup optimizer
optimizer = AdamW(
    filter(lambda p: p.requires_grad, model.parameters()),
    lr=training_config['learning_rate'],
    weight_decay=training_config['weight_decay']
)

# Setup scheduler
num_training_steps = len(train_loader) * training_config['num_train_epochs']
scheduler = LinearLR(
    optimizer,
    start_factor=1.0,
    end_factor=0.0,
    total_iters=num_training_steps
)

print(f"\n‚úÖ Optimizer configured")
print(f"   Total training steps: {num_training_steps}")

In [None]:
from tqdm import tqdm

def train_epoch(model, train_loader, optimizer, scheduler, epoch, device="cuda"):
    """Train for one epoch"""
    model.train()
    total_loss = 0

    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}")

    for batch_idx, batch in enumerate(progress_bar):
        pixel_values = batch['pixel_values'].to(device)
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        outputs = model(
            pixel_values=pixel_values,
            input_ids=input_ids,
            attention_mask=attention_mask,
            labels=labels
        )

        loss = outputs.loss

        optimizer.zero_grad()
        loss.backward()

        torch.nn.utils.clip_grad_norm_(
            filter(lambda p: p.requires_grad, model.parameters()),
            max_norm=1.0
        )

        optimizer.step()
        scheduler.step()

        total_loss += loss.item()
        progress_bar.set_postfix({'loss': loss.item()})

    avg_loss = total_loss / len(train_loader)
    return avg_loss

print("="*70)
print("üöÄ STARTING TRAINING")
print("="*70 + "\n")

history = []
for epoch in range(training_config['num_train_epochs']):
    avg_loss = train_epoch(
        model,
        train_loader,
        optimizer,
        scheduler,
        epoch,
        device=device
    )

    history.append(avg_loss)
    print(f"\nüìä Epoch {epoch+1} - Avg Loss: {avg_loss:.4f}")

print("\n" + "="*70)
print("‚úÖ TRAINING COMPLETE!")
print("="*70)
print(f"Final training loss: {history[-1]:.4f}")
print(f"Best loss: {min(history):.4f}")

In [None]:
import os
import json

print("="*70)
print("üíæ SAVING MODEL TO GOOGLE DRIVE")
print("="*70)

os.makedirs(OUTPUT_DIR, exist_ok=True)

print(f"\nüìÅ Save directory: {OUTPUT_DIR}")
print(f"üìä Training summary:")
print(f"   - Epochs: {training_config['num_train_epochs']}")
print(f"   - Final loss: {history[-1]:.4f}")
print(f"   - Best loss: {min(history):.4f}")
print(f"   - Dataset size: {len(dataset)} samples\n")

# Save LoRA adapter weights
print("‚û°Ô∏è Saving LoRA adapter weights...")
try:
    model.save_pretrained(OUTPUT_DIR)
    print("   ‚úÖ Adapter weights saved")
except Exception as e:
    print(f"   ‚ùå Failed: {e}")

# Save processor
print("‚û°Ô∏è Saving processor and tokenizer...")
try:
    processor.save_pretrained(OUTPUT_DIR)
    print("   ‚úÖ Processor saved")
except Exception as e:
    print(f"   ‚ùå Failed: {e}")

# Save metadata
print("‚û°Ô∏è Saving training metadata...")
metadata = {
    "model_id": "Salesforce/blip2-opt-2.7b",
    "training_config": training_config,
    "training_history": {
        "losses": history,
        "final_loss": float(history[-1]),
        "best_loss": float(min(history)),
        "epochs_completed": len(history)
    },
    "dataset_info": {
        "total_samples": len(dataset),
        "json_file": "dataset_vision_llm_balanced.json",
        "image_dir": IMAGES_DIR
    },
    "lora_config": {
        "r": 16,
        "lora_alpha": 32,
        "target_modules": ["q_proj", "v_proj"],
        "lora_dropout": 0.05,
    }
}

try:
    with open(os.path.join(OUTPUT_DIR, "training_metadata.json"), 'w') as f:
        json.dump(metadata, f, indent=2)
    print("   ‚úÖ Training metadata saved")
except Exception as e:
    print(f"   ‚ö†Ô∏è Failed: {e}")

# Verify files
print("\n‚û°Ô∏è Verifying saved files...")
required_files = [
    "adapter_model.safetensors",
    "adapter_config.json",
    "tokenizer.json",
    "processor_config.json"
]

for fname in required_files:
    fpath = os.path.join(OUTPUT_DIR, fname)
    if os.path.exists(fpath):
        size_mb = os.path.getsize(fpath) / (1024 * 1024)
        print(f"   ‚úÖ {fname:<40} {size_mb:>8.2f} MB")
    else:
        print(f"   ‚ùå {fname:<40} MISSING")

print(f"\n‚úÖ Model saved to: {OUTPUT_DIR}")

In [None]:
import random
from PIL import Image
import matplotlib.pyplot as plt

print("="*70)
print("üß™ RUNNING INFERENCE TESTS")
print("="*70 + "\n")

model.eval()

# Get random test images
all_images = [f for f in os.listdir(IMAGES_DIR) if f.endswith('.jpg')]
test_samples = random.sample(all_images, min(3, len(all_images)))

print(f"üì∏ Testing on {len(test_samples)} random images\n")

def generate_prediction(model, processor, image_path, device="cuda"):
    """Generate emotion prediction"""
    image = Image.open(image_path).convert("RGB")
    prompt = (
        "Analyze this facial image and identify:\n"
        "1. Which emotions are present (Surprise, Fear, Disgust, Happiness, Sadness, Anger)\n"
        "2. The facial Action Units (AUs) involved\n"
        "Please explain the connection between the AUs and the emotions."
    )

    inputs = processor(images=image, text=prompt, return_tensors="pt", truncation=True)
    if device == "cuda" and torch.cuda.is_available():
        inputs = {k: v.to(device) for k, v in inputs.items()}

    with torch.no_grad():
        generated_ids = model.generate(
            **inputs,
            max_new_tokens=96,
            min_new_tokens=16,
            do_sample=False,
            num_beams=3,
            repetition_penalty=1.1,
        )

    prediction = processor.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
    return prediction

# Test and visualize
fig, axes = plt.subplots(1, len(test_samples), figsize=(15, 5))
if len(test_samples) == 1:
    axes = [axes]

predictions = []

print("="*70)
for idx, img_name in enumerate(test_samples):
    img_path = os.path.join(IMAGES_DIR, img_name)

    print(f"\nüñºÔ∏è  Image {idx+1}: {img_name}")

    try:
        pred = generate_prediction(model, processor, img_path, device=device)
        predictions.append(pred)

        print(f"üìù Prediction: {pred[:150]}...")

        # Display image
        img = Image.open(img_path).convert("RGB")
        axes[idx].imshow(img)
        axes[idx].set_title(f"{img_name}\n{pred[:30]}...", fontsize=8)
        axes[idx].axis('off')

    except Exception as e:
        print(f"   ‚ùå Error: {e}")
        predictions.append(f"[ERROR: {e}]")

plt.tight_layout()
plt.show()

print("\n" + "="*70)
print(f"\n‚úÖ Inference test complete!")

In [None]:
print("="*70)
print("üìä EVALUATION METRICS")
print("="*70)

print(f"\nüìà Training Results:")
print(f"   Final loss: {history[-1]:.4f}")
print(f"   Best loss: {min(history):.4f}")
print(f"   Total samples: {len(dataset)}")
print(f"   Trainable parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad) / 1e6:.2f}M")

print(f"\n‚úÖ Inference Quality:")
valid_predictions = [p for p in predictions if not p.startswith('[ERROR')]
if valid_predictions:
    has_keywords = sum(1 for p in valid_predictions if any(word in p.lower() for word in ["emotion", "face", "au", "action"]))
    print(f"   Valid predictions: {len(valid_predictions)}/{len(test_samples)}")
    print(f"   Keyword presence: {has_keywords}/{len(valid_predictions)}")
    print(f"   Avg output length: {sum(len(p) for p in valid_predictions) / len(valid_predictions):.0f} chars")
else:
    print(f"   No valid predictions")

print(f"\nüéâ Training and evaluation complete!")
print(f"\nüìÅ Model saved at: {OUTPUT_DIR}")
print(f"üìÅ Processed data at: {DATA_DIR}")

In [None]:
# ============================================================
# PART 11: Reload Trained Model from Google Drive
# ============================================================
import os, gc, torch
from transformers import AutoProcessor, Blip2ForConditionalGeneration
from peft import PeftModel

BASE_ID = "Salesforce/blip2-opt-2.7b"

# Prefer explicit override if provided; otherwise use the training OUTPUT_DIR
ADAPTER_DIR = os.environ.get("ADAPTER_DIR_OVERRIDE", OUTPUT_DIR)
ADAPTER_DIR = os.path.abspath(ADAPTER_DIR)

# Force offline/local loading to avoid HF repo id validation
os.environ["HF_HUB_OFFLINE"] = "1"
os.environ["HF_HUB_DISABLE_TELEMETRY"] = "1"

print("="*70)
print("üì• RELOADING TRAINED MODEL FROM DRIVE")
print("="*70)
print(f"Adapter dir: {ADAPTER_DIR}")

if not os.path.isdir(ADAPTER_DIR):
    raise FileNotFoundError(
        f"Adapter directory not found: {ADAPTER_DIR}\n"
        "Mount Google Drive and ensure the training save cell has run."
    )

required_adapter_files = [
    "adapter_model.safetensors",
    "adapter_config.json",
]
missing = [f for f in required_adapter_files if not os.path.exists(os.path.join(ADAPTER_DIR, f))]
if missing:
    raise FileNotFoundError(
        "Missing adapter files in the adapter directory: " + ", ".join(missing)
    )


def load_blip2_with_lora(adapter_dir=ADAPTER_DIR, base_id=BASE_ID, device=None, merge_adapters=True):
    device = device or ("cuda" if torch.cuda.is_available() else "cpu")
    if device == "cuda":
        torch.cuda.empty_cache()
    gc.collect()

    print(f"\n‚û°Ô∏è Loading base model: {base_id}")
    base_model = Blip2ForConditionalGeneration.from_pretrained(
        base_id,
        dtype=torch.float16 if device == "cuda" else torch.float32,
        device_map="auto" if device == "cuda" else None,
    )

    print(f"‚û°Ô∏è Attaching LoRA adapters from: {adapter_dir}")
    model = PeftModel.from_pretrained(
        base_model,
        adapter_dir,
        is_trainable=False,
        local_files_only=True,
        token=None,
        trust_remote_code=False,
    )

    if merge_adapters:
        print("‚û°Ô∏è Merging adapters into base model for inference...")
        try:
            model = model.merge_and_unload()
            print("   ‚úÖ Adapters merged")
        except Exception as e:
            print(f"   ‚ö†Ô∏è Merge failed: {e}. Continuing without merge.")

    model.eval()
    if device == "cuda":
        try:
            model.to("cuda")
        except Exception:
            pass

    print("‚û°Ô∏è Loading processor/tokenizer...")
    try:
        processor = AutoProcessor.from_pretrained(adapter_dir, local_files_only=True, token=None)
        print("   ‚úÖ Processor loaded from adapter directory")
    except Exception:
        processor = AutoProcessor.from_pretrained(base_id)
        print("   ‚ö†Ô∏è Fallback: Processor loaded from base model")

    return model, processor, device

reloaded_model, reloaded_processor, reload_device = load_blip2_with_lora()
print("\n‚úÖ Reload complete. Model ready for evaluation.")


In [None]:
# ============================================================
# PART 12: Evaluation ‚Äî Exact-Match Accuracy and F1 Scores
# ============================================================
import os, re, time, json, random
import numpy as np
from PIL import Image

# Ensure sklearn is available for metrics
try:
    from sklearn.metrics import f1_score, precision_score, recall_score
except Exception:
    import sys
    !{sys.executable} -m pip install scikit-learn -q
    from sklearn.metrics import f1_score, precision_score, recall_score

EMOTION_CANONICAL = ["Surprise", "Fear", "Disgust", "Happiness", "Sadness", "Anger"]
VARIANTS = {
    "Happiness": ["happiness", "happy"],
    "Sadness": ["sadness", "sad"],
    "Anger": ["anger", "angry"],
    "Fear": ["fear", "afraid", "fearful"],
    "Disgust": ["disgust", "disgusted"],
    "Surprise": ["surprise", "surprised"],
}


def extract_predicted_emotions(text: str):
    text_l = text.lower()
    preds = set()
    for canon, variations in VARIANTS.items():
        for v in variations:
            if re.search(rf"\b{re.escape(v)}\b", text_l):
                preds.add(canon)
                break
    return preds


def evaluate_accuracy(dataset_json=DATASET_JSON, img_dir=IMAGES_DIR, num_samples=100, device=reload_device):
    with open(dataset_json, "r") as f:
        data = json.load(f)

    subset = random.sample(data, min(num_samples, len(data)))
    y_true, y_pred = [], []
    exact_match = 0
    latencies = []

    prompt = (
        "Analyze this facial image and identify:\n"
        "1. Which emotions are present (Surprise, Fear, Disgust, Happiness, Sadness, Anger)\n"
        "2. The facial Action Units (AUs) involved\n"
        "Please explain the connection between the AUs and the emotions."
    )

    for i, item in enumerate(subset):
        image_path = os.path.join(img_dir, item["id"])
        image = Image.open(image_path).convert("RGB")

        inputs = reloaded_processor(images=image, text=prompt, return_tensors="pt", truncation=True)
        if device == "cuda" and torch.cuda.is_available():
            inputs = {k: v.to(device) for k, v in inputs.items()}

        start = time.time()
        with torch.no_grad():
            gen_ids = reloaded_model.generate(
                **inputs,
                max_new_tokens=96,
                min_new_tokens=16,
                do_sample=False,
                num_beams=3,
                repetition_penalty=1.1,
            )
        latencies.append(time.time() - start)

        text = reloaded_processor.tokenizer.batch_decode(gen_ids, skip_special_tokens=True)[0].strip()

        preds = extract_predicted_emotions(text)
        trues = set(item["emotions_present"]) if item.get("emotions_present") else set()

        y_true.append([1 if e in trues else 0 for e in EMOTION_CANONICAL])
        y_pred.append([1 if e in preds else 0 for e in EMOTION_CANONICAL])

        if preds == trues:
            exact_match += 1

        if (i + 1) % 20 == 0:
            print(f"Processed {i+1}/{len(subset)} samples...")

    y_true = np.array(y_true)
    y_pred = np.array(y_pred)

    micro_f1 = f1_score(y_true, y_pred, average="micro", zero_division=0)
    macro_f1 = f1_score(y_true, y_pred, average="macro", zero_division=0)
    micro_prec = precision_score(y_true, y_pred, average="micro", zero_division=0)
    micro_rec = recall_score(y_true, y_pred, average="micro", zero_division=0)
    exact_acc = exact_match / len(subset) if subset else 0.0
    avg_latency = float(sum(latencies) / len(latencies)) if latencies else 0.0

    metrics = {
        "samples": len(subset),
        "exact_match_accuracy": float(exact_acc),
        "micro_f1": float(micro_f1),
        "macro_f1": float(macro_f1),
        "micro_precision": float(micro_prec),
        "micro_recall": float(micro_rec),
        "avg_latency_sec": avg_latency,
    }
    return metrics

print("\n" + "="*70)
print("üìä RUNNING ACCURACY EVALUATION (subset)")
print("="*70 + "\n")

metrics = evaluate_accuracy(num_samples=100)
print("Evaluation metrics:")
for k, v in metrics.items():
    if isinstance(v, float):
        print(f"- {k}: {v:.4f}")
    else:
        print(f"- {k}: {v}")

print("\n‚úÖ Evaluation complete.")