In [1]:
# ==========================================
# PART 1: INSTALL LIBRARIES (FIXED)
# ==========================================
print("Installing libraries... (This takes about 2 minutes)")

# We remove '==0.41.3' so it installs the latest version compatible with CUDA 12
!pip install -q -U bitsandbytes
!pip install -q -U transformers accelerate peft

print("Done installing!")

Installing libraries... (This takes about 2 minutes)


Done installing!


In [2]:
import os
import zipfile

# Auto-Unzip Logic
if not os.path.exists('aligned'):
    print("üìÇ Unzipping dataset...")
    with zipfile.ZipFile('aligned.zip', 'r') as zip_ref:
        zip_ref.extractall('.')
    print("‚úÖ Dataset Ready.")
else:
    print("‚úÖ Dataset already unzipped.")

‚úÖ Dataset already unzipped.


In [3]:
import json
import os
import random
from collections import defaultdict

# --- ACTION UNIT MAP (The Anatomy Knowledge) ---
AU_MAP = {
    1: "Inner Brow Raiser", 2: "Outer Brow Raiser", 4: "Brow Lowerer",
    5: "Upper Lid Raiser", 6: "Cheek Raiser", 7: "Lid Tightener",
    9: "Nose Wrinkler", 10: "Upper Lip Raiser", 12: "Lip Corner Puller",
    15: "Lip Corner Depressor", 16: "Lower Lip Depressor", 17: "Chin Raiser",
    18: "Lip Puckerer", 20: "Lip Stretcher", 23: "Lip Tightener",
    24: "Lip Pressor", 25: "Lips Part", 26: "Jaw Drop", 27: "Mouth Stretch"
}

EMOTION_MAP = {
    0: "Happily Surprised", 1: "Happily Disgusted", 2: "Sadly Fearful",
    3: "Sadly Angry", 4: "Sadly Surprised", 5: "Sadly Disgusted",
    6: "Fearfully Angry", 7: "Fearfully Surprised", 8: "Fearfully Disgusted",
    9: "Angrily Surprised", 10: "Angrily Disgusted", 11: "Disgustedly Surprised",
    12: "Happily Fearful"
}

class RAFCE_Data_Engine:
    def __init__(self, image_dir="aligned"):
        self.image_dir = image_dir
        self.raw_emotions = self._read_txt('RAFCE_emolabel.txt')
        self.raw_aus = self._read_txt('RAFCE_AUlabel.txt', is_au=True)
        self.partitions = self._read_txt('RAFCE_partition.txt')
        self.available_files = set(os.listdir(image_dir))

    def _read_txt(self, filepath, is_au=False):
        data = {}
        if not os.path.exists(filepath):
            # Fallback if user uploaded files directly
            filepath = filepath.split('/')[-1]

        with open(filepath, 'r') as f:
            for line in f:
                parts = line.strip().split()
                if len(parts) < 2: continue

                filename = parts[0]
                if is_au:
                    # Clean AU string "1+2+5" -> [1, 2, 5]
                    au_str = parts[1]
                    try:
                        au_list = [int(x.replace('L','').replace('R','')) for x in au_str.split('+') if x.replace('L','').replace('R','').isdigit()]
                        data[filename] = au_list
                    except:
                        data[filename] = []
                else:
                    data[filename] = int(parts[1])
        return data

    def get_real_filename(self, filename):
        # Fix the "001.jpg" vs "001_aligned.jpg" mismatch
        if filename in self.available_files: return filename
        aligned_name = filename.replace(".jpg", "_aligned.jpg")
        if aligned_name in self.available_files: return aligned_name
        return None

    def generate_cot_prompt(self, au_list, emotion_id):
        # This creates the "Reasoning" text
        au_desc = [f"{au} ({AU_MAP.get(au, 'Muscle Movement')})" for au in au_list if au in AU_MAP]
        emotion_name = EMOTION_MAP.get(emotion_id, "Unknown")

        if not au_desc:
            reasoning = "facial cues are subtle."
        else:
            reasoning = f"I observe the following Action Units: {', '.join(au_desc)}."

        return f"{reasoning} Therefore, the compound emotion is {emotion_name}."

    def build_dataset(self, oversample=True):
        train_data, test_data = [], []

        # Track counts for balancing
        train_counts = defaultdict(int)

        print("üîß Engineering Data...")

        for filename, emotion_id in self.raw_emotions.items():
            if filename not in self.partitions: continue

            real_file = self.get_real_filename(filename)
            if not real_file: continue

            split = self.partitions[filename]
            au_list = self.raw_aus.get(filename, [])

            # Create the Intelligent Response
            answer = self.generate_cot_prompt(au_list, emotion_id)

            entry = {
                "id": real_file,
                "image": f"{self.image_dir}/{real_file}",
                "conversations": [
                    {"from": "human", "value": "<image>\nAnalyze the facial muscles and determine the compound emotion."},
                    {"from": "gpt", "value": answer}
                ],
                "label": emotion_id # Keep for metrics later
            }

            if split == 0: # Train
                train_data.append(entry)
                train_counts[emotion_id] += 1
            else: # Test
                test_data.append(entry)

        # OVERSAMPLING (Fixing Imbalance)
        if oversample:
            print("‚öñÔ∏è Balancing dataset (Oversampling rare classes)...")
            avg_count = int(sum(train_counts.values()) / len(train_counts))
            final_train = []
            for entry in train_data:
                final_train.append(entry)
                emo_id = entry['label']
                # If this class is rare, duplicate the sample
                if train_counts[emo_id] < avg_count:
                    # Add it 1 or 2 more times depending on rarity
                    repeats = int(avg_count / train_counts[emo_id])
                    for _ in range(repeats):
                        final_train.append(entry)
            train_data = final_train

        print(f"‚úÖ Data Ready: {len(train_data)} Training samples (Balanced), {len(test_data)} Test samples.")
        return train_data, test_data

# Execute
engine = RAFCE_Data_Engine()
train_data, test_data = engine.build_dataset()

üîß Engineering Data...
‚öñÔ∏è Balancing dataset (Oversampling rare classes)...
‚úÖ Data Ready: 4190 Training samples (Balanced), 1840 Test samples.


In [5]:
import torch
from transformers import BitsAndBytesConfig, AutoProcessor, LlavaForConditionalGeneration

model_id = "llava-hf/llava-1.5-7b-hf"

print(f"üß† Loading {model_id}...")

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16
)

model = LlavaForConditionalGeneration.from_pretrained(
    model_id,
    quantization_config=bnb_config,
    device_map="auto"
)
processor = AutoProcessor.from_pretrained(model_id)
print("‚úÖ Model Loaded.")

üß† Loading llava-hf/llava-1.5-7b-hf...


ValueError: Some modules are dispatched on the CPU or the disk. Make sure you have enough GPU RAM to fit the quantized model. If you want to dispatch the model on the CPU or the disk while keeping these modules in 32-bit, you need to set `llm_int8_enable_fp32_cpu_offload=True` and pass a custom `device_map` to `from_pretrained`. Check https://huggingface.co/docs/transformers/main/en/main_classes/quantization#offload-between-cpu-and-gpu for more details. 

In [6]:
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training

# 1. Prepare
model.train()
model = prepare_model_for_kbit_training(model)

# 2. Aggressive LoRA Config
config = LoraConfig(
    r=32,            # Increased from 16 for better learning capacity
    lora_alpha=64,   # Increased from 32 (Stronger adaptation)
    target_modules=["q_proj", "v_proj", "k_proj", "o_proj"], # Target ALL attention layers
    lora_dropout=0.05,
    bias="none",
    task_type="CAUSAL_LM"
)

model = get_peft_model(model, config)
print(f"üí™ Trainable Parameters: {model.print_trainable_parameters()}")

NameError: name 'model' is not defined

In [6]:
import torch.optim as optim
from PIL import Image
from tqdm import tqdm
import matplotlib.pyplot as plt

# HYPERPARAMETERS
EPOCHS = 3
LR = 2e-4
BATCH_SIZE = 1 # We use gradient accumulation logic effectively by looping

optimizer = optim.AdamW(model.parameters(), lr=LR)
loss_history = []

print(f"üöÄ Starting A-CoT Training on {len(train_data)} samples...")

for epoch in range(EPOCHS):
    print(f"\n=== EPOCH {epoch+1}/{EPOCHS} ===")
    total_loss = 0
    steps = 0

    # Shuffle data
    random.shuffle(train_data)

    pbar = tqdm(train_data)
    for item in pbar:
        try:
            # 1. Prepare Image
            image = Image.open(item["image"]).convert("RGB")

            # 2. Prepare Text (The CoT Prompt)
            # Format: USER: <image>\nPrompt\nASSISTANT: <Reasoning + Answer>
            prompt = item["conversations"][0]["value"]
            answer = item["conversations"][1]["value"]
            full_text = f"USER: <image>\n{prompt}\nASSISTANT: {answer}"

            # 3. Tokenize
            inputs = processor(text=full_text, images=image, return_tensors="pt", padding=True)
            inputs = {k: v.to(0) for k, v in inputs.items()}

            # 4. Train Step
            outputs = model(**inputs, labels=inputs["input_ids"])
            loss = outputs.loss

            # Backprop
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

            # Logs
            loss_val = loss.item()
            loss_history.append(loss_val)
            total_loss += loss_val
            steps += 1

            if steps % 10 == 0:
                pbar.set_description(f"Loss: {loss_val:.4f}")

        except Exception as e:
            continue

    print(f"üìâ Epoch {epoch+1} Avg Loss: {total_loss/steps:.4f}")

# Save
model.save_pretrained("best_raf_ce_model")
processor.save_pretrained("best_raf_ce_model")
print("‚úÖ Training Complete & Saved.")

# Plot
plt.plot(loss_history)
plt.title("Training Loss (Anatomical Chain-of-Thought)")
plt.show()

üöÄ Starting A-CoT Training on 4190 samples...

=== EPOCH 1/3 ===


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 4190/4190 [01:26<00:00, 48.60it/s]


ZeroDivisionError: division by zero

In [None]:
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix
import seaborn as sns
import numpy as np

# 1. Setup Semantic Matcher
def extract_prediction(text):
    text = text.lower().split("therefore")[-1] # Look after the reasoning

    roots = {
        "Happily Surprised": ["happ", "surp"],
        "Happily Disgusted": ["happ", "disg"],
        "Sadly Fearful": ["sad", "fear"],
        "Sadly Angry": ["sad", "angr"],
        "Sadly Surprised": ["sad", "surp"],
        "Sadly Disgusted": ["sad", "disg"],
        "Fearfully Angry": ["fear", "angr"],
        "Fearfully Surprised": ["fear", "surp"],
        "Fearfully Disgusted": ["fear", "disg"],
        "Angrily Surprised": ["angr", "surp"],
        "Angrily Disgusted": ["angr", "disg"],
        "Disgustedly Surprised": ["disg", "surp"],
        "Happily Fearful": ["happ", "fear"]
    }

    best_cls = "Unknown"
    max_score = 0

    for cls_name, keywords in roots.items():
        score = sum(1 for k in keywords if k in text)
        if score > max_score:
            max_score = score
            best_cls = cls_name

    return best_cls

# 2. Run Inference
model.eval()
y_true, y_pred = [], []
labels_list = list(EMOTION_MAP.values())

print("üìä Running Final Benchmark (Test Set)...")
# We test on a large subset (e.g., 200 images) for speed, or remove [:200] for full
subset_test = test_data[:200]

for item in tqdm(subset_test):
    try:
        # Load
        image = Image.open(item["image"]).convert("RGB")
        # Note: We use the SAME prompt asking for analysis
        prompt = "USER: <image>\nAnalyze the facial muscles and determine the compound emotion.\nASSISTANT:"

        inputs = processor(text=prompt, images=image, return_tensors="pt").to(0)

        with torch.no_grad():
            output = model.generate(**inputs, max_new_tokens=100)

        generated = processor.decode(output[0], skip_special_tokens=True)

        # Parse
        pred = extract_prediction(generated)
        true_label = EMOTION_MAP[item['label']]

        y_true.append(true_label)
        y_pred.append(pred)

    except:
        pass

# 3. Report
print("\n" + "="*60)
print("üèÜ FINAL RESULTS REPORT")
print("="*60)
print(f"Accuracy: {accuracy_score(y_true, y_pred)*100:.2f}%")
print("-" * 60)
print(classification_report(y_true, y_pred, zero_division=0))

# 4. Matrix
plt.figure(figsize=(12, 10))
cm = confusion_matrix(y_true, y_pred, labels=labels_list)
sns.heatmap(cm, annot=True, fmt='d', xticklabels=labels_list, yticklabels=labels_list, cmap='viridis')
plt.title("Confusion Matrix (with Anatomical Reasoning)")
plt.xticks(rotation=45, ha='right')
plt.show()