In [None]:
!nvidia-smi

In [None]:
import kagglehub


# Download latest version of the GAMEEMO dataset
path = kagglehub.dataset_download("sigfest/database-for-emotion-recognition-system-gameemo")

print("Path to dataset files:", path)


In [None]:
import os
base_path = "/root/.cache/kagglehub/datasets/sigfest/database-for-emotion-recognition-system-gameemo/versions/1"
dataset_path = os.path.join(base_path, "GAMEEMO")
print(dataset_path)


In [None]:
import os

for item in os.listdir(dataset_path):
    print(item)


In [None]:
import os
import pandas as pd

base_path = "/root/.cache/kagglehub/datasets/sigfest/database-for-emotion-recognition-system-gameemo/versions/1/GAMEEMO"
subjects = sorted([f for f in os.listdir(base_path) if f.startswith("(S")])
subjects


In [None]:
def load_raw_eeg(subject_folder):
    raw_path = os.path.join(subject_folder, "Raw EEG Data")
    for f in os.listdir(raw_path):
        if f.lower().endswith(".csv"):
            return pd.read_csv(os.path.join(raw_path, f))
    return None

def load_sam(subject_folder):
    sam_path = os.path.join(subject_folder, "SAM Ratings")
    for f in os.listdir(sam_path):
        if f.lower().endswith(".csv"):
            return pd.read_csv(os.path.join(sam_path, f))
    return None


In [None]:
import os

dataset_path = "/root/.cache/kagglehub/datasets/sigfest/database-for-emotion-recognition-system-gameemo/versions/1/GAMEEMO"

for root, dirs, files in os.walk(dataset_path):
    print(root)
    for f in files:
        print("   FILE:", f)
    print("-" * 50)


In [None]:
import os
import pandas as pd

base_path = "/root/.cache/kagglehub/datasets/sigfest/database-for-emotion-recognition-system-gameemo/versions/1/GAMEEMO"
subjects = sorted([f for f in os.listdir(base_path) if f.startswith("(S")])

emotion_map = {
    "G1": "Calm",
    "G2": "Funny",
    "G3": "Horror",
    "G4": "Boring"
}

all_data = []

for subject in subjects:
    subject_path = os.path.join(base_path, subject)
    raw_csv_path = os.path.join(subject_path, "Raw EEG Data/.csv format")

    if not os.path.exists(raw_csv_path):
        print("No RAW CSV for", subject)
        continue

    for file in os.listdir(raw_csv_path):
        if file.endswith(".csv"):
            file_path = os.path.join(raw_csv_path, file)

            # Find G1/G2/G3/G4 from filename
            # Example: S11G3AllRawChannels.csv → G3
            gcode = file.split("G")[1][0]   # extracts "3" from "G3"
            gkey = f"G{gcode}"
            emotion = emotion_map[gkey]

            df = pd.read_csv(file_path)
            df["subject"] = subject
            df["gameplay"] = gkey
            df["emotion"] = emotion

            all_data.append(df)

    print("Processed:", subject)

# Combine all subjects
gameemo_raw_df = pd.concat(all_data, ignore_index=True)
gameemo_raw_df.shape


In [None]:
gameemo_raw_df.head()


In [None]:
# Clean dataset from unwanted columns
cols_to_drop = [col for col in gameemo_raw_df.columns if "Unnamed" in col or col == "eeg"]

gameemo_raw_df = gameemo_raw_df.drop(columns=cols_to_drop, errors='ignore')

print("Cleaned columns:")
print(gameemo_raw_df.columns)
gameemo_raw_df.head()


In [None]:
# Convert categorical labels to numeric
gameemo_raw_df['emotion_id'] = gameemo_raw_df['emotion'].astype('category').cat.codes
gameemo_raw_df['subject_id'] = gameemo_raw_df['subject'].astype('category').cat.codes

print(gameemo_raw_df[['emotion','emotion_id']].drop_duplicates())


In [None]:
eeg_channels = ['FC5','FC6','O1','O2','P7','P8','T7','T8']

X = gameemo_raw_df[eeg_channels].astype('float32').values
y = gameemo_raw_df['emotion_id'].values

print("EEG shape:", X.shape)
print("Labels shape:", y.shape)


In [None]:
import numpy as np

# Z-score normalization
X_mean = X.mean(axis=0)
X_std = X.std(axis=0) + 1e-8
X_norm = (X - X_mean) / X_std

print("Normalized EEG sample:", X_norm[0])


In [None]:

WINDOW = 256  # number of time samples per window
STEP = 256     # no overlap

X_windows = []
y_windows = []

num_samples = len(X_norm)

for start in range(0, num_samples - WINDOW, STEP):
    end = start + WINDOW
    X_windows.append(X_norm[start:end])
    y_windows.append(y[start])

X_windows = np.array(X_windows)
y_windows = np.array(y_windows)

print("Windowed EEG shape:", X_windows.shape)
print("Windowed labels shape:", y_windows.shape)


In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(
    X_windows, y_windows, test_size=0.2, random_state=42
)

X_train.shape, X_test.shape


In [None]:
import torch
from torch.utils.data import Dataset, DataLoader

class EEGDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx].permute(1,0), self.y[idx]
        # permute → (channels, time)

train_ds = EEGDataset(X_train, y_train)
test_ds = EEGDataset(X_test, y_test)

train_loader = DataLoader(train_ds, batch_size=64, shuffle=True)
test_loader = DataLoader(test_ds, batch_size=64)


In [None]:
import torch.nn as nn

class EEGTransformer(nn.Module):
    def __init__(self, num_channels=8, num_classes=4, embed_dim=64, num_heads=4, num_layers=2):
        super().__init__()

        self.input_proj = nn.Linear(num_channels, embed_dim)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=embed_dim,
            nhead=num_heads,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        self.classifier = nn.Sequential(
            nn.Linear(embed_dim, 128),
            nn.ReLU(),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        # x: (batch, channels, time)
        x = x.permute(0,2,1)  # → (batch, time, channels)
        x = self.input_proj(x)
        x = self.transformer(x)
        x = x.mean(dim=1)  # pooled
        return self.classifier(x)

model = EEGTransformer(num_channels=8, num_classes=len(set(y_windows)))


In [None]:
import torch.optim as optim

device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

EPOCHS = 5

for epoch in range(EPOCHS):
    model.train()
    total_loss = 0
    for Xb, yb in train_loader:
        Xb, yb = Xb.to(device), yb.to(device)

        optimizer.zero_grad()
        preds = model(Xb)
        loss = criterion(preds, yb)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch {epoch+1}/{EPOCHS}, Loss: {total_loss:.4f}")


In [None]:
import torch
import torch.nn as nn
import numpy as np
from sklearn.metrics import accuracy_score, classification_report

# =======================
# STRONG MODEL COMPONENTS
# =======================

class SEBlock(nn.Module):
    def __init__(self, channels, reduction=16):
        super().__init__()
        self.pool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Sequential(
            nn.Linear(channels, channels // reduction),
            nn.ReLU(),
            nn.Linear(channels // reduction, channels),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, t = x.size()
        y = self.pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1)
        return x * y


class EEG_Strong_Model(nn.Module):
    def __init__(self, num_channels=8, num_classes=4):
        super().__init__()

        self.cnn1 = nn.Conv1d(num_channels, 64, kernel_size=5, padding=2)
        self.bn1 = nn.BatchNorm1d(64)

        self.cnn2 = nn.Conv1d(64, 128, kernel_size=5, padding=2)
        self.bn2 = nn.BatchNorm1d(128)

        self.cnn3 = nn.Conv1d(128, 256, kernel_size=5, padding=2)
        self.bn3 = nn.BatchNorm1d(256)

        self.attn = SEBlock(256)
        self.pool = nn.MaxPool1d(2)
        self.relu = nn.ReLU()

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=256, nhead=8, batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=4)

        self.classifier = nn.Sequential(
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, num_classes)
        )

    def forward(self, x):
        x = self.relu(self.bn1(self.cnn1(x)))
        x = self.pool(x)

        x = self.relu(self.bn2(self.cnn2(x)))
        x = self.pool(x)

        x = self.relu(self.bn3(self.cnn3(x)))
        x = self.pool(x)

        x = self.attn(x)
        x = x.permute(0, 2, 1)

        x = self.transformer(x)
        x = x.mean(dim=1)

        return self.classifier(x)


# =======================
# MODEL INITIALIZATION
# =======================

# Calculate class weights
class_counts = np.bincount(y_train)
class_weights = torch.tensor(1. / class_counts, dtype=torch.float32)
class_weights /= class_weights.sum()
class_weights = class_weights.to(device)

model = EEG_Strong_Model(
    num_channels=8,
    num_classes=len(np.unique(y_train))
).to(device)

criterion = nn.CrossEntropyLoss(
    weight=class_weights,
    label_smoothing=0.1
)

optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)

scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
    optimizer,
    T_max=40,
    eta_min=1e-6
)

# =======================
# EARLY STOP TRAINING
# =======================

EPOCHS = 40
PATIENCE = 7
best_val_loss = np.inf
patience_counter = 0
best_model_path = "best_gameemo_strong_model.pth"

for epoch in range(EPOCHS):

    # Training
    model.train()
    train_loss = 0

    for Xb, yb in train_loader:
        Xb, yb = Xb.to(device), yb.to(device)

        # EEG noise augmentation
        Xb = Xb + 0.01 * torch.randn_like(Xb)

        optimizer.zero_grad()
        preds = model(Xb)
        loss = criterion(preds, yb)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    train_loss /= len(train_loader)

    # Validation
    model.eval()
    val_loss = 0
    all_preds = []
    all_true = []

    with torch.no_grad():
        for Xb, yb in test_loader:
            Xb, yb = Xb.to(device), yb.to(device)

            preds = model(Xb)
            loss = criterion(preds, yb)
            val_loss += loss.item()

            pred_labels = torch.argmax(preds, dim=1)
            all_preds.extend(pred_labels.cpu().numpy())
            all_true.extend(yb.cpu().numpy())

    val_loss /= len(test_loader)
    val_acc = accuracy_score(all_true, all_preds)

    scheduler.step()

    print(
        f"Epoch {epoch+1}/{EPOCHS} | "
        f"Train Loss: {train_loss:.4f} | "
        f"Val Loss: {val_loss:.4f} | "
        f"Val Acc: {val_acc:.4f}"
    )

    # Early stopping logic
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        torch.save(model.state_dict(), best_model_path)
        print("Best model updated")
    else:
        patience_counter += 1
        print(f"No improvement. Patience {patience_counter}/{PATIENCE}")

        if patience_counter >= PATIENCE:
            print("Early stopping triggered")
            break

# =======================
# LOAD BEST MODEL
# =======================

model.load_state_dict(torch.load(best_model_path))
model.eval()
print("Best model loaded")

# =======================
# FINAL EVALUATION
# =======================

all_preds = []
all_true = []

with torch.no_grad():
    for Xb, yb in test_loader:
        Xb, yb = Xb.to(device), yb.to(device)
        preds = model(Xb)
        pred_labels = torch.argmax(preds, dim=1)

        all_preds.extend(pred_labels.cpu().numpy())
        all_true.extend(yb.cpu().numpy())

final_acc = accuracy_score(all_true, all_preds)

print("Final best-stop accuracy:", final_acc)
print("Classification report:")
print(classification_report(all_true, all_preds))

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from collections import Counter

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)

# ============================================================
# 1. CLEAN DATASET
# ============================================================

cols_to_drop = [col for col in gameemo_raw_df.columns if "Unnamed" in col or col == "eeg"]
gameemo_raw_df = gameemo_raw_df.drop(columns=cols_to_drop, errors='ignore')

gameemo_raw_df['emotion_id'] = gameemo_raw_df['emotion'].astype('category').cat.codes

eeg_channels = ['FC5','FC6','O1','O2','P7','P8','T7','T8']

X = gameemo_raw_df[eeg_channels].astype('float32').values
y = gameemo_raw_df['emotion_id'].values

print("Raw EEG shape:", X.shape)
print("Labels shape:", y.shape)

# ============================================================
# 2. NORMALIZATION
# ============================================================

X_mean = X.mean(axis=0)
X_std = X.std(axis=0) + 1e-8
X_norm = (X - X_mean) / X_std

# ============================================================
# 3. OVERLAPPED WINDOWING (LARGER WINDOW)
# ============================================================

WINDOW = 512      # more context than before (256)
STEP = 256        # 50% overlap

X_windows, y_windows = [], []

for start in range(0, len(X_norm) - WINDOW, STEP):
    end = start + WINDOW
    X_windows.append(X_norm[start:end])
    y_windows.append(y[start])

X_windows = np.array(X_windows)
y_windows = np.array(y_windows)

print("Windowed EEG shape:", X_windows.shape)
print("Windowed labels shape:", y_windows.shape)

# ============================================================
# 4. TRAIN / TEST SPLIT
# ============================================================

X_train, X_test, y_train, y_test = train_test_split(
    X_windows, y_windows,
    test_size=0.2,
    random_state=42,
    stratify=y_windows
)

# ============================================================
# 5. CLASS WEIGHTS
# ============================================================

class_counts = Counter(y_train)
total = sum(class_counts.values())
class_weights = [total / class_counts[i] for i in sorted(class_counts)]
class_weights = torch.tensor(class_weights, dtype=torch.float32).to(device)

print("Class weights:", class_weights)

# ============================================================
# 6. PYTORCH DATASET
# ============================================================

class EEGDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        # Return (channels, time)
        return self.X[idx].permute(1,0), self.y[idx]

train_ds = EEGDataset(X_train, y_train)
test_ds = EEGDataset(X_test, y_test)

train_loader = DataLoader(train_ds, batch_size=128, shuffle=True)
test_loader = DataLoader(test_ds, batch_size=128)

# ============================================================
# 7. STRONG MODEL WITH ATTENTION (TUNED)
# ============================================================

class SEBlock(nn.Module):
    def __init__(self, channels, reduction=16):
        super().__init__()
        self.pool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Sequential(
            nn.Linear(channels, channels // reduction),
            nn.ReLU(),
            nn.Linear(channels // reduction, channels),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, t = x.size()
        y = self.pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1)
        return x * y


class EEG_Strong_Model(nn.Module):
    def __init__(self, num_channels=8, num_classes=4):
        super().__init__()

        self.cnn1 = nn.Conv1d(num_channels, 64, kernel_size=5, padding=2)
        self.bn1 = nn.BatchNorm1d(64)

        self.cnn2 = nn.Conv1d(64, 128, kernel_size=5, padding=2)
        self.bn2 = nn.BatchNorm1d(128)

        self.cnn3 = nn.Conv1d(128, 256, kernel_size=5, padding=2)
        self.bn3 = nn.BatchNorm1d(256)

        self.attn = SEBlock(256)
        self.pool = nn.MaxPool1d(2)
        self.relu = nn.ReLU()

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=256, nhead=8, batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=4)

        self.classifier = nn.Sequential(
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Dropout(0.3),   # slightly less dropout than before
            nn.Linear(256, num_classes)
        )

    def forward_features(self, x):
        x = self.relu(self.bn1(self.cnn1(x)))
        x = self.pool(x)

        x = self.relu(self.bn2(self.cnn2(x)))
        x = self.pool(x)

        x = self.relu(self.bn3(self.cnn3(x)))
        x = self.pool(x)

        x = self.attn(x)
        x = x.permute(0,2,1)
        x = self.transformer(x)
        x = x.mean(dim=1)
        return x

    def forward(self, x):
        x = self.forward_features(x)
        return self.classifier(x)

# ============================================================
# 8. MODEL, LOSS, OPTIMIZER, SCHEDULER
# ============================================================

model = EEG_Strong_Model(
    num_channels=8,
    num_classes=len(np.unique(y_train))
).to(device)

criterion = nn.CrossEntropyLoss(
    weight=class_weights
)  # no label smoothing this time

optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4, weight_decay=1e-2)

scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
    optimizer,
    T_max=50,
    eta_min=1e-6
)

# ============================================================
# 9. EARLY STOP TRAINING (TUNED)
# ============================================================

EPOCHS = 50
PATIENCE = 10
best_val_loss = np.inf
patience_counter = 0
best_model_path = "best_gameemo_model_tuned.pth"

history = {
    'train_loss': [],
    'train_acc': [],
    'val_loss': [],
    'val_acc': []
}

for epoch in range(EPOCHS):

    # Training
    model.train()
    train_loss = 0
    train_preds = []
    train_true = []

    for Xb, yb in train_loader:
        Xb, yb = Xb.to(device), yb.to(device)

        # Slight noise augmentation
        Xb = Xb + 0.005 * torch.randn_like(Xb)

        optimizer.zero_grad()
        preds = model(Xb)
        loss = criterion(preds, yb)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        pred_labels = torch.argmax(preds, dim=1)
        train_preds.extend(pred_labels.cpu().numpy())
        train_true.extend(yb.cpu().numpy())

    train_loss /= len(train_loader)
    train_acc = accuracy_score(train_true, train_preds)

    # Validation
    model.eval()
    val_loss = 0
    all_preds = []
    all_true = []

    with torch.no_grad():
        for Xb, yb in test_loader:
            Xb, yb = Xb.to(device), yb.to(device)

            preds = model(Xb)
            loss = criterion(preds, yb)
            val_loss += loss.item()

            pred_labels = torch.argmax(preds, dim=1)
            all_preds.extend(pred_labels.cpu().numpy())
            all_true.extend(yb.cpu().numpy())

    val_loss /= len(test_loader)
    val_acc = accuracy_score(all_true, all_preds)

    scheduler.step()

    print(
        f"Epoch {epoch+1}/{EPOCHS} | "
        f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f} | "
        f"Val Loss: {val_loss:.4f} | "
        f"Val Acc: {val_acc:.4f}"
    )

    history['train_loss'].append(train_loss)
    history['train_acc'].append(train_acc)
    history['val_loss'].append(val_loss)
    history['val_acc'].append(val_acc)

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        torch.save(model.state_dict(), best_model_path)
        print("Best model saved")
    else:
        patience_counter += 1
        print(f"No improvement. Patience {patience_counter}/{PATIENCE}")

        if patience_counter >= PATIENCE:
            print("Early stopping triggered")
            break

# ============================================================
# 10. LOAD BEST MODEL AND FINAL EVALUATION
# ============================================================

model.load_state_dict(torch.load(best_model_path))
model.eval()

all_preds = []
all_true = []

with torch.no_grad():
    for Xb, yb in test_loader:
        Xb = Xb.to(device)
        yb = yb.to(device)
        preds = model(Xb)
        pred_labels = torch.argmax(preds, dim=1)

        all_preds.extend(pred_labels.cpu().numpy())
        all_true.extend(yb.cpu().numpy())

final_acc = accuracy_score(all_true, all_preds)

print("Final tuned best-stop accuracy:", final_acc)
print("Classification report:")
print(classification_report(all_true, all_preds))


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# ============================================================
# POST-TRAINING VISUALIZATION FOR GAMEEMO EEG
# ============================================================

import torch
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.decomposition import PCA

device = "cuda" if torch.cuda.is_available() else "cpu"

# ============================================
# 1. LOAD YOUR BEST MODEL
# ============================================

model_path = "best_gameemo_model_tuned.pth"   # change if needed

model.load_state_dict(torch.load(model_path, map_location=device))
model.to(device)
model.eval()

print("Loaded model:", model_path)

# ============================================
# 2. GET PREDICTIONS ON TEST SET
# ============================================

all_preds = []
all_true = []
all_embeddings = []

with torch.no_grad():
    for Xb, yb in test_loader:
        Xb = Xb.to(device)

        # Predictions
        preds = model(Xb)
        pred_labels = torch.argmax(preds, dim=1)

        all_preds.extend(pred_labels.cpu().numpy())
        all_true.extend(yb.numpy())

        # Embeddings (for PCA)
        emb = model.forward_features(Xb)
        all_embeddings.append(emb.cpu().numpy())

all_embeddings = np.concatenate(all_embeddings, axis=0)

# ============================================
# 3. CONFUSION MATRIX VISUALIZATION
# ============================================

cm = confusion_matrix(all_true, all_preds)

plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
plt.title("Confusion Matrix - EEG Emotion Model")
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.show()

# ============================================
# 4. CLASSIFICATION REPORT
# ============================================

print("Classification Report:\n")
print(classification_report(all_true, all_preds))

# ============================================
# 5. PREDICTION DISTRIBUTION BAR PLOT
# ============================================

plt.figure(figsize=(5, 4))
plt.hist(all_preds, bins=4)
plt.title("Prediction Distribution")
plt.xlabel("Emotion Class")
plt.ylabel("Count")
plt.show()

# ============================================
# 6. EEG EMBEDDING VISUALIZATION (PCA)
# ============================================

pca = PCA(n_components=2)
emb_2d = pca.fit_transform(all_embeddings)

plt.figure(figsize=(6, 5))
plt.scatter(emb_2d[:, 0], emb_2d[:, 1], c=all_true, s=5)
plt.title("2D Visualization of EEG Emotion Embeddings (PCA)")
plt.xlabel("PCA Component 1")
plt.ylabel("PCA Component 2")
plt.colorbar(label="Emotion Class")
plt.show()

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from matplotlib.colors import ListedColormap

# Assuming you have:
# all_embeddings - your EEG embeddings
# all_true - your emotion labels (0, 1, 2, 3)

# Define emotion mapping
emotion_mapping = {
    0: 'Boring',
    1: 'Calm',
    2: 'Funny',
    3: 'Horror'
}

# Apply PCA
pca = PCA(n_components=2)
emb_2d = pca.fit_transform(all_embeddings)

# Create the visualization with proper labels
plt.figure(figsize=(10, 8))

# Create scatter plot
scatter = plt.scatter(emb_2d[:, 0], emb_2d[:, 1],
                     c=all_true,
                     s=20,  # Increased size for better visibility
                     cmap='viridis',
                     alpha=0.6)

# Set labels and title
plt.title("2D Visualization of EEG Emotion Embeddings (PCA)",
          fontsize=14, fontweight='bold')
plt.xlabel("PCA Component 1", fontsize=12)
plt.ylabel("PCA Component 2", fontsize=12)

# Add colorbar with emotion labels
cbar = plt.colorbar(scatter, label="Emotion Class")
cbar.set_label('Emotion Class', rotation=270, labelpad=20, fontsize=12)

# Set colorbar ticks to match emotion classes
cbar.set_ticks([0, 1, 2, 3])
cbar.set_ticklabels(['Boring (0)', 'Calm (1)', 'Funny (2)', 'Horror (3)'])

# Optional: Add legend with emotion names
from matplotlib.patches import Patch
legend_elements = [
    Patch(facecolor=plt.cm.viridis(0.0), label='Boring (0)', alpha=0.6),
    Patch(facecolor=plt.cm.viridis(0.33), label='Calm (1)', alpha=0.6),
    Patch(facecolor=plt.cm.viridis(0.66), label='Funny (2)', alpha=0.6),
    Patch(facecolor=plt.cm.viridis(1.0), label='Horror (3)', alpha=0.6)
]
plt.legend(handles=legend_elements, loc='best', fontsize=10, framealpha=0.9)

# Add grid for better readability
plt.grid(True, alpha=0.3, linestyle='--')

plt.tight_layout()
plt.show()

# Optional: Print statistics
print("\nEmotion Distribution:")
unique, counts = np.unique(all_true, return_counts=True)
for emotion_id, count in zip(unique, counts):
    emotion_name = emotion_mapping.get(emotion_id, f'Unknown')
    print(f"{emotion_name} ({emotion_id}): {count} samples ({count/len(all_true)*100:.1f}%)")

print(f"\nExplained variance ratio: {pca.explained_variance_ratio_}")
print(f"Total variance explained: {sum(pca.explained_variance_ratio_)*100:.2f}%")

In [None]:
!pip install -q diffusers transformers accelerate safetensors


In [None]:
import torch
import numpy as np

device = "cuda" if torch.cuda.is_available() else "cpu"

model_path = "best_gameemo_strong_model.pth"
model.load_state_dict(torch.load(model_path, map_location=device))
model.to(device)
model.eval()

print("Loaded model:", model_path)


In [None]:
all_embeddings = []
all_labels = []

with torch.no_grad():
    for Xb, yb in test_loader:
        Xb = Xb.to(device)

        features = model.forward_features(Xb)   # (batch, 256)

        all_embeddings.append(features.cpu().numpy())
        all_labels.append(yb.numpy())

all_embeddings = np.concatenate(all_embeddings, axis=0)
all_labels = np.concatenate(all_labels, axis=0)

np.save("eeg_emotion_embeddings.npy", all_embeddings)
np.save("eeg_emotion_labels.npy", all_labels)

print("Saved EEG embeddings:", all_embeddings.shape)


In [None]:
!pip install -q diffusers transformers accelerate safetensors


In [None]:
import torch.nn as nn
from diffusers import StableDiffusionPipeline

class EEGtoCLIPAdapter(nn.Module):
    def __init__(self, eeg_dim=256, clip_dim=768):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(eeg_dim, 512),
            nn.ReLU(),
            nn.Linear(512, clip_dim)
        )

    def forward(self, x):
        return self.net(x)

adapter = EEGtoCLIPAdapter().to(device)

# Load the pipeline with torch.float32 to ensure consistent dtypes for training
pipe = StableDiffusionPipeline.from_pretrained(
    "runwayml/stable-diffusion-v1-5",
    torch_dtype=torch.float32, # Changed to float32
    safety_checker=None
).to(device)

tokenizer = pipe.tokenizer
text_encoder = pipe.text_encoder

In [None]:
emotion_prompts = {
    0: "boring dull low energy environment",
    1: "happy joyful colorful energetic environment",
    2: "horror dark frightening tense environment",
    3: "calm peaceful relaxed soft lighting environment"
}

def get_text_embedding(prompt):
    tokens = tokenizer(
        prompt,
        padding="max_length",
        truncation=True,
        max_length=77,
        return_tensors="pt"
    ).to(device)

    with torch.no_grad():
        emb = text_encoder(**tokens).last_hidden_state.mean(dim=1)

    return emb

text_targets = {k: get_text_embedding(v) for k, v in emotion_prompts.items()}


In [None]:
eeg_embeddings = np.load("eeg_emotion_embeddings.npy")
eeg_labels = np.load("eeg_emotion_labels.npy")

# Create a Dataset for EEG embeddings and labels
class CLIPDataset(Dataset):
    def __init__(self, embeddings, labels):
        self.embeddings = torch.tensor(embeddings, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.long)

    def __len__(self):
        return len(self.embeddings)

    def __getitem__(self, idx):
        return self.embeddings[idx], self.labels[idx]

clip_ds = CLIPDataset(eeg_embeddings, eeg_labels)
clip_loader = DataLoader(clip_ds, batch_size=64, shuffle=True) # Use a reasonable batch size

optimizer = torch.optim.Adam(adapter.parameters(), lr=3e-4)
criterion = nn.MSELoss()

for epoch in range(30):
    total_loss = 0
    for eeg_batch, label_batch in clip_loader:
        eeg_batch = eeg_batch.to(device)
        label_batch = label_batch.to(device)

        optimizer.zero_grad()

        pred_clip = adapter(eeg_batch)

        target_clip = torch.stack([
            text_targets[int(lbl.item())] for lbl in label_batch
        ]).squeeze(1) # Squeeze to remove the extra dimension

        loss = criterion(pred_clip, target_clip)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    print(f"Epoch {epoch+1}, Loss: {total_loss / len(clip_loader):.6f}")

torch.save(adapter.state_dict(), "eeg_to_clip_adapter.pth")
print("Adapter saved.")

In [None]:
def generate_image_from_eeg(eeg_vector):
    adapter.eval()

    eeg_vector = torch.tensor(eeg_vector, dtype=torch.float32).unsqueeze(0).to(device)

    with torch.no_grad():
        clip_cond = adapter(eeg_vector)
        # Stable Diffusion's text encoder typically outputs embeddings of shape (batch, 77, embed_dim).
        # We need to expand our single embedding (batch, embed_dim) to match this structure.
        clip_cond = clip_cond.unsqueeze(1).repeat(1, 77, 1) # Reshape to (batch, 77, embed_dim)

    image = pipe(
        prompt_embeds=clip_cond,
        num_inference_steps=30,
        guidance_scale=7.5
    ).images[0]

    return image

In [None]:
from PIL import Image

for i in range(5):
    img = generate_image_from_eeg(eeg_embeddings[i])
    img.save(f"eeg_generated_{i}.png")
    print(f"Saved eeg_generated_{i}.png")


In [None]:
import matplotlib.pyplot as plt
import os
from PIL import Image

image_files = [f for f in os.listdir("/content") if f.startswith("eeg_generated")]

plt.figure(figsize=(15, 5))
for i, fname in enumerate(sorted(image_files)):
    img = Image.open(fname)
    plt.subplot(1, len(image_files), i + 1)
    plt.imshow(img)
    plt.title(fname)
    plt.axis("off")

plt.show()


In [None]:
import torch
from diffusers import StableDiffusionPipeline

device = "cuda" if torch.cuda.is_available() else "cpu"

pipe = StableDiffusionPipeline.from_pretrained(
    "runwayml/stable-diffusion-v1-5",
    torch_dtype=torch.float16,
    safety_checker=None
).to(device)

print("Stable Diffusion loaded on:", device)


In [None]:
emotion_prompts = {
    0: "a dull, empty, low-light room with faded colors, very boring and lifeless scene",
    1: "a bright, colorful, joyful scene with smiling people and sunshine, very happy mood",
    2: "a dark, scary, horror environment at night with fog, shadows and fear",
    3: "a calm, peaceful lakeside or nature scene at sunset, relaxing and quiet"
}


In [None]:
import numpy as np

eeg_labels = np.load("eeg_emotion_labels.npy")   # shape (N,)
print(eeg_labels[:10])


In [None]:
from PIL import Image

num_samples = 5   # how many EEG samples you want to visualize

for i in range(num_samples):
    emo_id = int(eeg_labels[i])
    prompt = emotion_prompts[emo_id]
    print(f"EEG sample {i} → emotion {emo_id} → prompt: {prompt}")

    image = pipe(
        prompt,
        num_inference_steps=30,
        guidance_scale=7.5
    ).images[0]

    image.save(f"eeg_scene_{i}_class_{emo_id}.png")
    print(f"Saved: eeg_scene_{i}_class_{emo_id}.png")


In [None]:
# ============================================
# GRAD-CAM IMPLEMENTATION FOR EEG (1D CNN)
# ============================================

class EEGGradCAM:
    def __init__(self, model, target_layer):
        self.model = model
        self.target_layer = target_layer

        self.activations = None
        self.gradients = None

        self._register_hooks()

    def _register_hooks(self):
        def forward_hook(module, input, output):
            self.activations = output.detach()

        def backward_hook(module, grad_in, grad_out):
            self.gradients = grad_out[0].detach()

        self.target_layer.register_forward_hook(forward_hook)
        self.target_layer.register_backward_hook(backward_hook)

    def generate_cam(self, x, class_idx):
        self.model.zero_grad()

        output = self.model(x)
        target = output[:, class_idx]
        target.backward()

        weights = self.gradients.mean(dim=2, keepdim=True)
        cam = (weights * self.activations).sum(dim=1)
        cam = torch.relu(cam)

        cam = cam - cam.min()
        cam = cam / (cam.max() + 1e-8)

        return cam.squeeze().cpu().numpy()


In [None]:
gradcam = EEGGradCAM(
    model=model,
    target_layer=model.cnn3
)


In [None]:
model.eval()

# Take one EEG test sample
X_sample, y_sample = next(iter(test_loader))
X_sample = X_sample[0:1].to(device)

with torch.no_grad():
    pred = torch.argmax(model(X_sample)).item()

cam = gradcam.generate_cam(X_sample, pred)

print("Predicted Emotion Class:", pred)
print("CAM Shape:", cam.shape)


In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 4))
# Reshape cam to a 2D array (1 row, 128 columns) for imshow
plt.imshow(cam.reshape(1, -1), aspect='auto', cmap='hot', origin='lower')
plt.colorbar(label="Importance")
plt.title("EEG Grad-CAM Explainability Map")
plt.xlabel("Time")
plt.ylabel("Channel Importance (Aggregated)") # Changed label to reflect 1D visualization
plt.yticks([]) # Hide y-axis ticks as it's a single row
plt.show()

In [None]:
# Calculate saliency map for input channel importance
model.eval()
X_sample.requires_grad_(True) # Enable gradient tracking for the input

pred_output = model(X_sample)
pred_label = torch.argmax(pred_output, dim=1)

# Backpropagate a scalar for the predicted class to get gradients w.r.t. input
one_hot = torch.zeros_like(pred_output).scatter_(1, pred_label.unsqueeze(-1), 1)
pred_output.backward(gradient=one_hot, retain_graph=False)

# Get gradients w.r.t. input and take absolute values
gradients = X_sample.grad.abs()

# Average gradients over the time dimension for each channel
channel_importance = gradients.mean(dim=2).squeeze().cpu().numpy() # shape (8,)

eeg_channels = ['FC5','FC6','O1','O2','P7','P8','T7','T8']

plt.figure(figsize=(6,4))
plt.bar(eeg_channels, channel_importance)
plt.title("EEG Channel Importance for Emotion Prediction (Saliency Map)")
plt.ylabel("Importance Score")
plt.xlabel("EEG Channel")
plt.show()

# Reset gradients and requires_grad_ for subsequent runs
X_sample.grad.zero_()
X_sample.requires_grad_(False)

In [None]:
# ============================================================
# FULL EEG → IMAGE → XAI → CLEAR BOTTOM PANEL CODE (FINAL)
# ============================================================

!pip install -q diffusers transformers accelerate safetensors

import torch
import numpy as np
import matplotlib.pyplot as plt
from diffusers import StableDiffusionPipeline
from PIL import ImageDraw, ImageFont

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)

# ------------------------------------------------------------
# EEG CHANNEL NAMES
# ------------------------------------------------------------
eeg_channels = ['FC5','FC6','O1','O2','P7','P8','T7','T8']

# ------------------------------------------------------------
# EMOTION → PSYCHOLOGICAL FACTORS
# ------------------------------------------------------------
emotion_factors = {
    0: {"name": "BORING", "valence": "Low",  "arousal": "Low",  "lighting": "Dull",   "motion": "Static"},
    1: {"name": "HAPPY",  "valence": "High", "arousal": "High", "lighting": "Bright", "motion": "Dynamic"},
    2: {"name": "HORROR", "valence": "Low",  "arousal": "High", "lighting": "Dark",   "motion": "Chaotic"},
    3: {"name": "CALM",   "valence": "High", "arousal": "Low",  "lighting": "Soft",   "motion": "Slow"}
}

# ------------------------------------------------------------
# GENERIC NO-HUMAN PROMPTS
# ------------------------------------------------------------
emotion_prompts = {
    0: "empty grey room, dull lighting, silent atmosphere, no humans",
    1: "bright sunlight, vivid colors, energetic light, no humans",
    2: "dark abandoned corridor, fog, sharp shadows, no humans",
    3: "peaceful lake at sunset, soft light, calm water, no humans"
}

negative_prompt = "people, humans, face, abstract, noise, glitch, distorted, blurry"

# ------------------------------------------------------------
# LOAD STABLE DIFFUSION
# ------------------------------------------------------------
pipe = StableDiffusionPipeline.from_pretrained(
    "runwayml/stable-diffusion-v1-5",
    torch_dtype=torch.float16,
    safety_checker=None
).to(device)

# ------------------------------------------------------------
# MAIN GENERATION LOOP
# ------------------------------------------------------------
model.eval()
global_window_index = 0
num_samples = 4
shown = 0

for Xb, yb in test_loader:

    Xb = Xb.to(device)

    for i in range(len(Xb)):

        if shown >= num_samples:
            break

        # ----------------------------
        # 1. GET EEG SAMPLE
        # ----------------------------
        x = Xb[i:i+1]                      # (1, 8, T)
        raw_eeg = x.squeeze(0).cpu().numpy()  # (8, T)

        # ----------------------------
        # 2. EEG → EMOTION PREDICTION
        # ----------------------------
        with torch.no_grad():
            logits = model(x)
            pred_class = torch.argmax(logits, dim=1).item()

        factors = emotion_factors[pred_class]
        prompt  = emotion_prompts[pred_class]

        # ----------------------------
        # 3. GENERATE IMAGE
        # ----------------------------
        image = pipe(
            prompt=prompt,
            negative_prompt=negative_prompt,
            num_inference_steps=45,
            guidance_scale=9.0,
            height=512,
            width=512
        ).images[0]

        # ----------------------------
        # 4. GRAD-CAM
        # ----------------------------
        cam = gradcam.generate_cam(x, pred_class)   # (time,)

        # ----------------------------
        # 5. TIME-SEGMENT VALUES
        # ----------------------------
        time_energy = cam # cam is already the 1D time energy, no need for .mean(axis=0)
        T = len(time_energy)

        early_val  = time_energy[:T//3].mean() if T//3 > 0 else 0.0
        middle_val = time_energy[T//3:2*T//3].mean() if (2*T//3 - T//3) > 0 else 0.0
        late_val   = time_energy[2*T//3:].mean() if (T - 2*T//3) > 0 else 0.0

        # ----------------------------
        # 6. DOMINANT TIME + CHANNEL VALUES
        # ----------------------------
        t_idx = np.argmax(time_energy)

        channel_values = raw_eeg[:, t_idx]
        channel_values = channel_values / (np.max(np.abs(channel_values)) + 1e-8)

        channel_text = (
            f"FC5: {channel_values[0]:.2f} | FC6: {channel_values[1]:.2f} | "
            f"O1: {channel_values[2]:.2f} | O2: {channel_values[3]:.2f} |\n"
            f"P7: {channel_values[4]:.2f} | P8: {channel_values[5]:.2f} | "
            f"T7: {channel_values[6]:.2f} | T8: {channel_values[7]:.2f}"
        )

        # ----------------------------
        # 7. FINAL BOTTOM PANEL TEXT (YOUR FORMAT)
        # ----------------------------
        bottom_text = (
            f"Emotion: {factors['name']}\n\n"
            f"EEG Window Index: {global_window_index}\n\n"
            f"Time-Segment Influence:\n"
            f"Early EEG  = {early_val:.3f}\n"
            f"Middle EEG = {middle_val:.3f}\n"
            f"Late EEG   = {late_val:.3f}\n\n"
            f"{channel_text}\n\n"
            f"Valence: {factors['valence']} | "
            f"Arousal: {factors['arousal']} | "
            f"Lighting: {factors['lighting']} | "
            f"Motion: {factors['motion']}"
        )

        # ----------------------------
        # 8. DRAW CLEAR BOTTOM PANEL
        # ----------------------------
        draw = ImageDraw.Draw(image)

        try:
            font_text = ImageFont.truetype(
                "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 16
            )
        except:
            font_text = ImageFont.load_default()

        draw.rectangle((0, 300, 512, 512), fill=(0, 0, 0))
        draw.text((15, 310), bottom_text, fill=(255, 255, 255), font=font_text)

        # ----------------------------
        # 9. SAVE & DISPLAY
        # ----------------------------
        fname = f"eeg_final_output_{shown}.png"
        image.save(fname)

        plt.figure(figsize=(5, 5))
        plt.imshow(image)
        plt.axis("off")
        plt.show()

        shown += 1
        global_window_index += 1

    if shown >= num_samples:
        break


In [None]:
import torch
import numpy as np
import matplotlib.pyplot as plt
from diffusers import StableDiffusionPipeline
from PIL import Image

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)

# ------------------------------------------------------------
# 1. LOAD EEG LABELS
# ------------------------------------------------------------
eeg_labels = np.load("eeg_emotion_labels.npy")

# ------------------------------------------------------------
# 2. LOAD STABLE DIFFUSION
# ------------------------------------------------------------
pipe = StableDiffusionPipeline.from_pretrained(
    "runwayml/stable-diffusion-v1-5",
    torch_dtype=torch.float16,
    safety_checker=None
).to(device)

# ------------------------------------------------------------
# 3. EMOTION NAMES
# ------------------------------------------------------------
emotion_names = {
    0: "boring",
    1: "happy",
    2: "horror",
    3: "calm"
}

# ------------------------------------------------------------
# 4. GENERIC PSYCHOLOGICAL SCENE PROMPTS (NO HUMANS)
# ------------------------------------------------------------
emotion_prompts = {
    0: "low arousal, low valence environment, empty grey room, dull light, no movement, flat textures, silent atmosphere, no humans",

    1: "high arousal, high valence environment, bright sunlight, vivid colors, moving clouds, energetic lighting, vivid nature scene, no humans",

    2: "high arousal, low valence environment, dark abandoned corridor, thick fog, sharp shadows, cold colors, threatening atmosphere, no humans",

    3: "low arousal, high valence environment, peaceful lake at sunset, smooth water, soft pastel colors, slow clouds, quiet atmosphere, no humans"
}

negative_prompt = "people, humans, face, body, crowd, portrait, abstract, texture, noise, mosaic, glitch, distorted, blurry, low quality"

# ------------------------------------------------------------
# 5. EEG TIME-SEGMENT EXPLANATION FUNCTION (from b5edc580)
# ------------------------------------------------------------
def explain_by_time_segment(cam_map):
    """
    cam_map shape: (time_steps,)
    """
    time_energy = cam_map # cam_map is already a 1D array of temporal importance

    T = len(time_energy)
    early = time_energy[:T//3].mean() if T//3 > 0 else 0.0
    mid   = time_energy[T//3:2*T//3].mean() if (2*T//3 - T//3) > 0 else 0.0
    late  = time_energy[2*T//3:].mean() if (T - 2*T//3) > 0 else 0.0

    segments = {
        "early": early,
        "middle": mid,
        "late": late
    }

    dominant = max(segments, key=segments.get)

    return dominant, segments

# ------------------------------------------------------------
# 6. ONE-BY-ONE EEG → IMAGE + TIME-BASED EXPLANATION
# ------------------------------------------------------------
model.eval()

sample_count = 4
shown = 0

for Xb, yb in test_loader:
    Xb = Xb.to(device)

    for i in range(len(Xb)):

        if shown >= sample_count:
            break

        x = Xb[i:i+1]
        raw_eeg_window = x.squeeze(0).cpu().numpy()   # (channels x time)

        # -------- EEG → Emotion Prediction --------
        with torch.no_grad():
            logits = model(x)
            pred_class = torch.argmax(logits, dim=1).item()

        emotion_name = emotion_names[pred_class]
        prompt = emotion_prompts[pred_class]

        # -------- EEG → GENERIC EMOTION SCENE --------
        image = pipe(
            prompt=prompt,
            negative_prompt=negative_prompt,
            num_inference_steps=45,
            guidance_scale=9.0,
            height=512,
            width=512
        ).images[0]

        # -------- TIME-BASED EXPLAINABILITY (Grad-CAM) --------
        # This part requires a fresh backward pass on 'x' to generate CAM, so enable grad tracking for 'x'
        x.requires_grad_(True)
        cam = gradcam.generate_cam(x, pred_class)   # shape (time,)
        dominant_segment, segment_scores = explain_by_time_segment(cam)
        x.requires_grad_(False) # Disable grad tracking for 'x' after CAM calculation

        # -------- CHANNEL-BASED EXPLAINABILITY (Saliency Map) --------
        # Re-enable grad tracking for 'x' for saliency map calculation if needed, or pass it directly if already computed
        x_for_saliency = Xb[i:i+1].clone().detach().requires_grad_(True)
        logits_saliency = model(x_for_saliency)
        one_hot_saliency = torch.zeros_like(logits_saliency).scatter_(1, torch.tensor([pred_class]).to(device).unsqueeze(-1), 1)
        logits_saliency.backward(gradient=one_hot_saliency, retain_graph=False)
        gradients = x_for_saliency.grad.abs()
        channel_importance = gradients.mean(dim=2).squeeze().cpu().numpy() # shape (8,)
        channel_importance = channel_importance / (channel_importance.max() + 1e-8)

        # -------- DOMINANT TIME INDEX for RAW EEG Plot --------
        t_idx, t_val = dominant_time_index(cam)

        # -------- DISPLAY IMAGE + EXPLANATION --------
        plt.figure(figsize=(6, 7))
        plt.imshow(image)
        plt.axis("off")
        plt.title(f"Predicted Emotion: {emotion_name.upper()}", fontsize=14)

        # Add a text box for time-segment and channel info (optional, or use plot_xai_graphs for separate plots)
        # explain_text = (
        #     f"Time-Segment Influence:\n"
        #     f"Early EEG = {segment_scores['early']:.3f}\n"
        #     f"Middle EEG = {segment_scores['middle']:.3f}\n"
        #     f"Late EEG = {segment_scores['late']:.3f}\n\n"
        #     f"Dominant Segment: {dominant_segment.upper()}"
        # )
        # plt.figtext(
        #     0.5, 0.01,
        #     explain_text,
        #     wrap=True,
        #     horizontalalignment="center",
        #     fontsize=11
        # )

        plt.show()

        # Call the XAI graphs function to display additional plots
        plot_xai_graphs(
            cam=cam,
            raw_eeg=raw_eeg_window,
            dominant_t_idx=t_idx,
            eeg_channels=eeg_channels,
            channel_importance_vals=channel_importance,
            sample_id=shown
        )

        shown += 1

    if shown >= sample_count:
        break

In [None]:
# ============================================================
# FULL EEG → IMAGE → XAI → CLEAR BOTTOM PANEL CODE (FINAL)
# ============================================================

!pip install -q diffusers transformers accelerate safetensors

import torch
import numpy as np
import matplotlib.pyplot as plt
from diffusers import StableDiffusionPipeline
from PIL import ImageDraw, ImageFont

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)

# ------------------------------------------------------------
# EEG CHANNEL NAMES
# ------------------------------------------------------------
eeg_channels = ['FC5','FC6','O1','O2','P7','P8','T7','T8']

# ------------------------------------------------------------
# EMOTION → PSYCHOLOGICAL FACTORS
# ------------------------------------------------------------
emotion_factors = {
    0: {"name": "BORING", "valence": "Low",  "arousal": "Low",  "lighting": "Dull",   "motion": "Static"},
    1: {"name": "HAPPY",  "valence": "High", "arousal": "High", "lighting": "Bright", "motion": "Dynamic"},
    2: {"name": "HORROR", "valence": "Low",  "arousal": "High", "lighting": "Dark",   "motion": "Chaotic"},
    3: {"name": "CALM",   "valence": "High", "arousal": "Low",  "lighting": "Soft",   "motion": "Slow"}
}

# ------------------------------------------------------------
# GENERIC NO-HUMAN PROMPTS
# ------------------------------------------------------------
emotion_prompts = {
    0: "empty grey room, dull lighting, silent atmosphere, no humans",
    1: "bright sunlight, vivid colors, energetic light, no humans",
    2: "dark abandoned corridor, fog, sharp shadows, no humans",
    3: "peaceful lake at sunset, soft light, calm water, no humans"
}

negative_prompt = "people, humans, face, abstract, noise, glitch, distorted, blurry"

# ------------------------------------------------------------
# LOAD STABLE DIFFUSION
# ------------------------------------------------------------
pipe = StableDiffusionPipeline.from_pretrained(
    "runwayml/stable-diffusion-v1-5",
    torch_dtype=torch.float16,
    safety_checker=None
).to(device)

# ------------------------------------------------------------
# MAIN GENERATION LOOP
# ------------------------------------------------------------
model.eval()
global_window_index = 0
num_samples = 4
shown = 0

for Xb, yb in test_loader:

    Xb = Xb.to(device)

    for i in range(len(Xb)):

        if shown >= num_samples:
            break

        # ----------------------------
        # 1. GET EEG SAMPLE
        # ----------------------------
        x = Xb[i:i+1]                      # (1, 8, T)
        raw_eeg = x.squeeze(0).cpu().numpy()  # (8, T)

        # ----------------------------
        # 2. EEG → EMOTION PREDICTION
        # ----------------------------
        with torch.no_grad():
            logits = model(x)
            pred_class = torch.argmax(logits, dim=1).item()

        factors = emotion_factors[pred_class]
        prompt  = emotion_prompts[pred_class]

        # ----------------------------
        # 3. GENERATE IMAGE
        # ----------------------------
        image = pipe(
            prompt=prompt,
            negative_prompt=negative_prompt,
            num_inference_steps=45,
            guidance_scale=9.0,
            height=512,
            width=512
        ).images[0]

        # ----------------------------
        # 4. GRAD-CAM
        # ----------------------------
        cam = gradcam.generate_cam(x, pred_class)   # (time,)

        # ----------------------------
        # 5. TIME-SEGMENT VALUES
        # ----------------------------
        time_energy = cam # cam is already the 1D time energy, no need for .mean(axis=0)
        T = len(time_energy)

        early_val  = time_energy[:T//3].mean() if T//3 > 0 else 0.0
        middle_val = time_energy[T//3:2*T//3].mean() if (2*T//3 - T//3) > 0 else 0.0
        late_val   = time_energy[2*T//3:].mean() if (T - 2*T//3) > 0 else 0.0

        # ----------------------------
        # 6. DOMINANT TIME + CHANNEL VALUES
        # ----------------------------
        t_idx = np.argmax(time_energy)
        channel_values = raw_eeg[:, t_idx]
        channel_values = channel_values / (np.max(np.abs(channel_values)) + 1e-8)

        channel_text = (
            f"FC5: {channel_values[0]:.2f} | FC6: {channel_values[1]:.2f} | "
            f"O1: {channel_values[2]:.2f} | O2: {channel_values[3]:.2f} |\n"
            f"P7: {channel_values[4]:.2f} | P8: {channel_values[5]:.2f} | "
            f"T7: {channel_values[6]:.2f} | T8: {channel_values[7]:.2f}"
        )

        # ----------------------------
        # 7. FINAL BOTTOM PANEL TEXT (YOUR FORMAT)
        # ----------------------------
        bottom_text = (
            f"Emotion: {factors['name']}\n\n"
            f"EEG Window Index: {global_window_index}\n\n"
            f"Time-Segment Influence:\n"
            f"Early EEG  = {early_val:.3f}\n"
            f"Middle EEG = {middle_val:.3f}\n"
            f"Late EEG   = {late_val:.3f}\n\n"
            f"{channel_text}\n\n"
            f"Valence: {factors['valence']} | "
            f"Arousal: {factors['arousal']} | "
            f"Lighting: {factors['lighting']} | "
            f"Motion: {factors['motion']}"
        )

        # ----------------------------
        # 8. DRAW CLEAR BOTTOM PANEL
        # ----------------------------
        draw = ImageDraw.Draw(image)

        try:
            font_text = ImageFont.truetype(
                "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 16
            )
        except:
            font_text = ImageFont.load_default()

        draw.rectangle((0, 300, 512, 512), fill=(0, 0, 0))
        draw.text((15, 310), bottom_text, fill=(255, 255, 255), font=font_text)

        # ----------------------------
        # 9. SAVE & DISPLAY
        # ----------------------------
        fname = f"eeg_final_output_{shown}.png"
        image.save(fname)

        plt.figure(figsize=(5, 5))
        plt.imshow(image)
        plt.axis("off")
        plt.show()

        shown += 1
        global_window_index += 1

    if shown >= num_samples:
        break

In [None]:
# ============================================================
# FULL EEG → IMAGE → XAI → OUTLINED BOTTOM PANEL (FINAL)
# ============================================================

!pip install -q diffusers transformers accelerate safetensors

import torch
import numpy as np
import matplotlib.pyplot as plt
from diffusers import StableDiffusionPipeline
from PIL import ImageDraw, ImageFont

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)

# ------------------------------------------------------------
# EEG CHANNEL NAMES
# ------------------------------------------------------------
eeg_channels = ['FC5','FC6','O1','O2','P7','P8','T7','T8']

# ------------------------------------------------------------
# EMOTION → PSYCHOLOGICAL FACTORS
# ------------------------------------------------------------
emotion_factors = {
    0: {"name": "BORING", "valence": "Low",  "arousal": "Low",  "lighting": "Dull",   "motion": "Static"},
    1: {"name": "HAPPY",  "valence": "High", "arousal": "High", "lighting": "Bright", "motion": "Dynamic"},
    2: {"name": "HORROR", "valence": "Low",  "arousal": "High", "lighting": "Dark",   "motion": "Chaotic"},
    3: {"name": "CALM",   "valence": "High", "arousal": "Low",  "lighting": "Soft",   "motion": "Slow"}
}

# ------------------------------------------------------------
# GENERIC NO-HUMAN PROMPTS
# ------------------------------------------------------------
emotion_prompts = {
    0: "empty grey room, dull lighting, silent atmosphere, no humans",
    1: "bright sunlight, vivid colors, energetic environment, no humans",
    2: "dark abandoned corridor, fog, sharp shadows, no humans",
    3: "peaceful lake at sunset, soft light, calm water, no humans"
}

negative_prompt = "people, humans, face, abstract, noise, glitch, distorted, blurry"

# ------------------------------------------------------------
# LOAD STABLE DIFFUSION
# ------------------------------------------------------------
pipe = StableDiffusionPipeline.from_pretrained(
    "runwayml/stable-diffusion-v1-5",
    torch_dtype=torch.float16,
    safety_checker=None
).to(device)

# ------------------------------------------------------------
# MAIN LOOP
# ------------------------------------------------------------
model.eval()

global_window_index = 0
num_samples = 4
snown = 0

for Xb, yb in test_loader:

    Xb = Xb.to(device)

    for i in range(len(Xb)):

        if snown >= num_samples:
            break

        # ----------------------------
        # 1. GET EEG SAMPLE
        # ----------------------------
        x = Xb[i:i+1]                          # (1, 8, T)
        raw_eeg = x.squeeze(0).cpu().numpy()  # (8, T)

        # ----------------------------
        # 2. EEG → EMOTION
        # ----------------------------
        with torch.no_grad():
            logits = model(x)
            pred_class = torch.argmax(logits, dim=1).item()

        factors = emotion_factors[pred_class]
        prompt  = emotion_prompts[pred_class]

        # ----------------------------
        # 3. GENERATE IMAGE
        # ----------------------------
        image = pipe(
            prompt=prompt,
            negative_prompt=negative_prompt,
            num_inference_steps=45,
            guidance_scale=9.0,
            height=512,
            width=512
        ).images[0]

        # ----------------------------
        # 4. GRAD-CAM
        # ----------------------------
        cam = gradcam.generate_cam(x, pred_class)   # (time,)

        # ----------------------------
        # 5. TIME-SEGMENT VALUES
        # ----------------------------
        time_energy = cam # cam is already the 1D time energy
        T = len(time_energy)

        early_val  = time_energy[:T//3].mean() if T//3 > 0 else 0.0
        middle_val = time_energy[T//3:2*T//3].mean() if (2*T//3 - T//3) > 0 else 0.0
        late_val   = time_energy[2*T//3:].mean() if (T - 2*T//3) > 0 else 0.0

        # ----------------------------
        # 6. DOMINANT TIME + CHANNEL VALUES
        # ----------------------------
        t_idx = np.argmax(time_energy)

        channel_values = raw_eeg[:, t_idx]
        channel_values = channel_values / (np.max(np.abs(channel_values)) + 1e-8)

        channel_text = (
            f"FC5: {channel_values[0]:.2f} | FC6: {channel_values[1]:.2f} | "
            f"O1: {channel_values[2]:.2f} | O2: {channel_values[3]:.2f} |\n"
            f"P7: {channel_values[4]:.2f} | P8: {channel_values[5]:.2f} | "
            f"T7: {channel_values[6]:.2f} | T8: {channel_values[7]:.2f}"
        )

        # ----------------------------
        # 7. FINAL BOTTOM PANEL TEXT
        # ----------------------------
        bottom_text = (
            f"Emotion: {factors['name']}\n\n"
            f"EEG Window Index: {global_window_index}\n\n"
            f"Time-Segment Influence:\n"
            f"Early EEG  = {early_val:.3f}\n"
            f"Middle EEG = {middle_val:.3f}\n"
            f"Late EEG   = {late_val:.3f}\n\n"
            f"{channel_text}\n\n"
            f"Valence: {factors['valence']} | "
            f"Arousal: {factors['arousal']} | "
            f"Lighting: {factors['lighting']} | "
            f"Motion: {factors['motion']}"
        )

        # ----------------------------
        # 8. OUTLINED BOTTOM PANEL
        # ----------------------------
        draw = ImageDraw.Draw(image)

        try:
            font_text = ImageFont.truetype(
                "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 22
            )
        except:
            font_text = ImageFont.load_default()

        # Tall bottom panel
        draw.rectangle((0, 260, 512, 512), fill=(0, 0, 0))

        x0, y0 = 20, 275

        # ---- BLACK OUTLINE ----
        for dx in [-1, 0, 1]:
            for dy in [-1, 0, 1]:
                draw.multiline_text(
                    (x0 + dx, y0 + dy),
                    bottom_text,
                    fill=(0, 0, 0),
                    font=font_text,
                    spacing=8,
                    align="left"
                )

        # ---- WHITE MAIN TEXT ----
        draw.multiline_text(
            (x0, y0),
            bottom_text,
            fill=(255, 255, 255),
            font=font_text,
            spacing=8,
            align="left"
        )

        # ----------------------------
        # 9. SAVE & DISPLAY
        # ----------------------------
        fname = f"eeg_final_outlined_{snown}.png"
        image.save(fname)

        plt.figure(figsize=(5, 5))
        plt.imshow(image)
        plt.axis("off")
        plt.show()

        snown += 1
        global_window_index += 1

    if snown >= num_samples:
        break

In [None]:
# history = dict containing training logs
plt.figure()
plt.plot(history["train_acc"], label="Train Acc")
plt.plot(history["val_acc"], label="Val Acc")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.title("Training vs Validation Accuracy")
plt.legend()
plt.show()

plt.figure()
plt.plot(history["train_loss"], label="Train Loss")
plt.plot(history["val_loss"], label="Val Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training vs Validation Loss")
plt.legend()
plt.show()

In [None]:
import matplotlib.pyplot as plt
from scipy import signal
import numpy as np

# --- Configuration for Spectrogram ---
# Assuming a sampling rate of 256 Hz for the EEG data
# (This is a common sampling rate for EEG research and aligns with typical windowing)
FS = 256  # Sampling frequency (Hz)

# Frequency bands
FREQ_BANDS = {
    'Delta': (1, 4),
    'Theta': (4, 8),
    'Alpha': (8, 12),
    'Beta': (12, 30)
}

# --- Get an EEG sample for analysis ---
# Taking the first sample from the test_loader (already normalized)
# Ensure X_sample has been converted back to (time, channels) for this plot
# The current X_sample from `l-uvTyNWPJyC` is (1, channels, time), let's reshape for plotting.

# Fetch one batch from the test loader
for Xb_test, yb_test in test_loader:
    # Take the first sample from the batch
    eeg_sample = Xb_test[0].cpu().numpy() # Shape will be (channels, time)
    true_label = yb_test[0].item()
    break

# Reshape eeg_sample to (time, channels) for easier processing if needed for some functions,
# but for channel-wise spectrogram, (channels, time) is fine.
# Let's verify the shape and use it as is if compatible.
# eeg_sample.shape: (8, 512)

print(f"Analyzing EEG sample for true emotion label: {true_label}")
print(f"EEG sample shape: {eeg_sample.shape} (channels, time)")

# --- Plotting the Spectrogram for each channel ---
fig, axes = plt.subplots(eeg_sample.shape[0], 1, figsize=(12, 2 * eeg_sample.shape[0]), sharex=True)
fig.suptitle('EEG Time-Frequency Spectrogram by Channel', y=0.99, fontsize=16)

for i, channel_data in enumerate(eeg_sample):
    # Compute the spectrogram
    f, t, Sxx = signal.spectrogram(channel_data, FS, nperseg=FS*2, noverlap=FS*1.5)

    ax = axes[i]
    im = ax.pcolormesh(t, f, 10 * np.log10(Sxx), shading='gouraud', cmap='viridis')
    ax.set_ylabel('Frequency [Hz]')
    ax.set_title(f'Channel: {eeg_channels[i]}')

    # Highlight frequency bands
    for band_name, (f_low, f_high) in FREQ_BANDS.items():
        ax.axhspan(f_low, f_high, color='r', alpha=0.1, label=band_name)
        # Add text label for band
        ax.text(t[-1]*1.01, (f_low + f_high) / 2, band_name, color='red', va='center', ha='left', fontsize=8)

    ax.set_ylim(0, 40) # Limit frequency display to relevant range

fig.colorbar(im, ax=axes.ravel().tolist(), label='Power/Frequency [dB/Hz]')

axes[-1].set_xlabel('Time [sec]')
plt.tight_layout(rect=[0, 0.03, 1, 0.96]) # Adjust layout to prevent suptitle overlap
plt.show()


# Task
Calculate the consistency between generated images and their corresponding emotion text prompts by:
1. Loading a pre-trained CLIP Vision Model and its processor.
2. For a subset of the test dataset, predicting the emotion for each EEG sample, generating an image using Stable Diffusion with the corresponding text prompt, and then encoding both the generated image and the text prompt into CLIP embeddings.
3. Computing the cosine similarity between these image and text embeddings.
4. Visualizing the distribution of these consistency scores using a histogram.
5. Summarizing the findings regarding how well generated images align with predicted emotions.

## Load CLIP Model for Image Embeddings

### Subtask:
Load a pre-trained CLIP model's image encoder and processor to convert generated images into embeddings suitable for comparison with text embeddings.


**Reasoning**:
To convert images into embeddings, I need to load the CLIP processor and the CLIP vision model from the `transformers` library. The `CLIPProcessor` will handle image preprocessing, and the `CLIPVisionModel` will extract the image features.



In [None]:
from transformers import CLIPProcessor, CLIPVisionModel

# Load pre-trained CLIP processor
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

# Load pre-trained CLIP vision model and move to device
clip_vision_model = CLIPVisionModel.from_pretrained("openai/clip-vit-base-patch32").to(device)

print("CLIP Processor and Vision Model loaded successfully.")

**Reasoning**:
The previous output showed a warning about `use_fast` but no error, and confirmed the CLIP processor and vision model were loaded. The next step in the task is to prepare to calculate the consistency between generated images and text prompts. This involves loading the CLIP tokenizer and text encoder to convert prompts into embeddings, then iterating through the test dataset to generate images from predicted EEG emotions and encode both images and their corresponding prompts into CLIP embeddings.



In [None]:
from transformers import CLIPTokenizer, CLIPTextModel

# Load pre-trained CLIP tokenizer and text model
clip_tokenizer = CLIPTokenizer.from_pretrained("openai/clip-vit-base-patch32")
clip_text_model = CLIPTextModel.from_pretrained("openai/clip-vit-base-patch32").to(device)

print("CLIP Tokenizer and Text Model loaded successfully.")

# --- Helper function to get CLIP text embeddings --- #
def get_clip_text_embedding(prompt):
    inputs = clip_tokenizer(prompt, return_tensors="pt", padding=True, truncation=True).to(device)
    with torch.no_grad():
        text_features = clip_text_model(**inputs).last_hidden_state
    # CLIP text embeddings often use the pooled output or mean of last_hidden_state
    # For comparison with image embeddings, the pooled output [CLS] token embedding is commonly used.
    # However, for stable diffusion text encoder, it's often the last_hidden_state.mean(dim=1)
    # Let's use the pooled_output if available, otherwise mean of last_hidden_state for consistency with common CLIP usage
    if hasattr(clip_text_model, 'pooler_output'):
        return clip_text_model(**inputs).pooler_output # For models that explicitly output pooled_output
    else:
        return text_features.mean(dim=1) # Fallback to mean if no explicit pooler_output

# --- Collect embeddings for consistency calculation --- #
image_embeddings_list = []
text_embeddings_list = []

num_samples_to_evaluate = 50 # Evaluate a subset for efficiency

model.eval()
clip_vision_model.eval()
clip_text_model.eval()
pipe.safety_checker = lambda images, **kwargs: (images, False) # Disable safety checker for faster generation

count = 0
for Xb, yb in test_loader:
    if count >= num_samples_to_evaluate:
        break

    Xb = Xb.to(device)

    for i in range(len(Xb)):
        if count >= num_samples_to_evaluate:
            break

        x_eeg = Xb[i:i+1] # Single EEG sample

        # 1. Predict emotion from EEG
        with torch.no_grad():
            logits = model(x_eeg)
            pred_class = torch.argmax(logits, dim=1).item()

        # 2. Get corresponding text prompt
        prompt_text = emotion_prompts[pred_class]

        # 3. Generate image using Stable Diffusion with the text prompt
        generated_image = pipe(
            prompt=prompt_text,
            negative_prompt=negative_prompt,
            num_inference_steps=25, # Fewer steps for speed
            guidance_scale=7.5,
            height=512,
            width=512
        ).images[0]

        # 4. Encode generated image into CLIP embedding
        image_inputs = clip_processor(images=generated_image, return_tensors="pt").to(device)
        with torch.no_grad():
            image_embedding = clip_vision_model(**image_inputs).pooler_output

        # 5. Encode the text prompt into CLIP embedding
        text_embedding = get_clip_text_embedding(prompt_text)

        image_embeddings_list.append(image_embedding.squeeze(0).cpu().numpy())
        text_embeddings_list.append(text_embedding.squeeze(0).cpu().numpy())

        count += 1

# Convert lists to numpy arrays
image_embeddings = np.array(image_embeddings_list)
text_embeddings = np.array(text_embeddings_list)

print(f"Collected {len(image_embeddings)} image-text embedding pairs.")
print("Image embeddings shape:", image_embeddings.shape)
print("Text embeddings shape:", text_embeddings.shape)

# Task
The `TypeError: 'bool' object is not iterable` occurred because the `pipe.safety_checker` was incorrectly assigned a lambda function that returned a boolean, which the Stable Diffusion pipeline tried to iterate over. To fix this, the `safety_checker` will be explicitly set to `None` to fully disable it, allowing the image generation process to continue without errors.

After this correction, I will proceed with the plan to:
1. Load a pre-trained CLIP Vision Model and its processor (already done).
2. Iterate through a subset of the test dataset, predict the emotion for each EEG sample, generate an image using Stable Diffusion with the corresponding text prompt, and then encode both the generated image and the text prompt into CLIP embeddings.
3. Compute the cosine similarity between these image and text embeddings.
4. Visualize the distribution of these consistency scores using a histogram.
5. Summarize the findings regarding how well generated images align with predicted emotions.

```python
from transformers import CLIPTokenizer, CLIPTextModel
from sklearn.metrics.pairwise import cosine_similarity
import matplotlib.pyplot as plt
import seaborn as sns

# Load pre-trained CLIP tokenizer and text model
# (These were already loaded, but including for completeness if cell were run standalone)
# clip_tokenizer = CLIPTokenizer.from_pretrained("openai/clip-vit-base-patch32")
# clip_text_model = CLIPTextModel.from_pretrained("openai/clip-vit-base-patch32").to(device)

print("CLIP Tokenizer and Text Model loaded successfully.")

# --- Helper function to get CLIP text embeddings --- #
def get_clip_text_embedding(prompt):
    inputs = clip_tokenizer(
        prompt,
        return_tensors="pt",
        padding=True,
        truncation=True,
        max_length=77 # Standard CLIP max length
    ).to(device)

    with torch.no_grad():
        # CLIP models generally output `pooler_output` for sentence embeddings.
        # `last_hidden_state` is the sequence of token embeddings.
        outputs = clip_text_model(**inputs)
        if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
            return outputs.pooler_output
        else:
            # Fallback for models without explicit pooler_output, typically using mean of last_hidden_state
            return outputs.last_hidden_state.mean(dim=1)

# --- Collect embeddings for consistency calculation --- #
image_embeddings_list = []
text_embeddings_list = []
consistency_scores = [] # To store cosine similarity scores

num_samples_to_evaluate = 50 # Evaluate a subset for efficiency

model.eval()
clip_vision_model.eval()
clip_text_model.eval()

# FIX: Set safety_checker to None to truly disable it.
# It was already passed during from_pretrained, but if it were set to a callable,
# this is how to disable it afterward.
pipe.safety_checker = None

count = 0
for Xb, yb in test_loader:
    if count >= num_samples_to_evaluate:
        break

    Xb = Xb.to(device)

    for i in range(len(Xb)):
        if count >= num_samples_to_evaluate:
            break

        x_eeg = Xb[i:i+1] # Single EEG sample

        # 1. Predict emotion from EEG
        with torch.no_grad():
            logits = model(x_eeg)
            pred_class = torch.argmax(logits, dim=1).item()

        # 2. Get corresponding text prompt
        prompt_text = emotion_prompts[pred_class]

        # 3. Generate image using Stable Diffusion with the text prompt
        generated_image = pipe(
            prompt=prompt_text,
            negative_prompt=negative_prompt,
            num_inference_steps=25, # Fewer steps for speed
            guidance_scale=7.5,
            height=512,
            width=512
        ).images[0]

        # 4. Encode generated image into CLIP embedding
        image_inputs = clip_processor(images=generated_image, return_tensors="pt").to(device)
        with torch.no_grad():
            image_embedding = clip_vision_model(**image_inputs).pooler_output
            # Ensure the image embedding is normalized for cosine similarity if not already
            image_embedding = image_embedding / image_embedding.norm(p=2, dim=-1, keepdim=True)


        # 5. Encode the text prompt into CLIP embedding
        text_embedding = get_clip_text_embedding(prompt_text)
        # Ensure the text embedding is normalized for cosine similarity if not already
        text_embedding = text_embedding / text_embedding.norm(p=2, dim=-1, keepdim=True)


        # 6. Calculate cosine similarity
        similarity = torch.cosine_similarity(image_embedding, text_embedding, dim=-1)
        consistency_scores.append(similarity.item())

        image_embeddings_list.append(image_embedding.squeeze(0).cpu().numpy())
        text_embeddings_list.append(text_embedding.squeeze(0).cpu().numpy())

        count += 1

# Convert lists to numpy arrays
image_embeddings = np.array(image_embeddings_list)
text_embeddings = np.array(text_embeddings_list)
consistency_scores = np.array(consistency_scores)

print(f"Collected {len(image_embeddings)} image-text embedding pairs.")
print("Image embeddings shape:", image_embeddings.shape)
print("Text embeddings shape:", text_embeddings.shape)
print("Consistency scores shape:", consistency_scores.shape)

# Plotting the distribution of consistency scores
plt.figure(figsize=(8, 6))
sns.histplot(consistency_scores, bins=20, kde=True)
plt.title('Distribution of Image-Emotion Consistency Scores (Cosine Similarity)')
plt.xlabel('Cosine Similarity Score (Image Embedding vs. Text Embedding)')
plt.ylabel('Frequency')
plt.grid(axis='y', alpha=0.75)
plt.show()

# Summarize findings
mean_score = np.mean(consistency_scores)
median_score = np.median(consistency_scores)
std_dev_score = np.std(consistency_scores)

print(f"\nSummary of Image-Emotion Consistency Scores:")
print(f"Mean Cosine Similarity: {mean_score:.4f}")
print(f"Median Cosine Similarity: {median_score:.4f}")
print(f"Standard Deviation: {std_dev_score:.4f}")
print(f"Min Score: {np.min(consistency_scores):.4f}")
print(f"Max Score: {np.max(consistency_scores):.4f}")

# Discussion of findings based on the distribution
if mean_score > 0.7:
    print("\nThe high mean cosine similarity suggests a strong alignment between the generated images and their corresponding emotion text prompts. The Stable Diffusion model, guided by the EEG-predicted emotions, is likely producing images that effectively capture the semantic meaning of the emotion prompts.")
elif mean_score > 0.5:
    print("\nThe moderate mean cosine similarity indicates a reasonable alignment between the generated images and their corresponding emotion text prompts. There's a good general correlation, but there might be room for improvement in fine-tuning prompts or the adapter for stronger consistency.")
else:
    print("\nThe low mean cosine similarity suggests a weak alignment between the generated images and their corresponding emotion text prompts. This could indicate that either the EEG-to-CLIP adapter needs further training, the emotion prompts are not sufficiently descriptive, or the Stable Diffusion model struggles to interpret the nuances of these emotion prompts effectively.")

print(f"\nDetailed analysis of the histogram shows the distribution of scores. A distribution clustered towards higher values (closer to 1) indicates better consistency, while a spread towards lower values (closer to 0 or negative) suggests less alignment. The shape of the KDE (Kernel Density Estimate) also provides insight into the overall quality and reliability of the image generation process based on emotion predictions.")
```

## Calculate Image-Emotion Consistency Scores

### Subtask:
Iterate through a subset of the test dataset, predict the emotion for each EEG sample, generate an image using Stable Diffusion with the corresponding text prompt, and then encode both the generated image and the text prompt into CLIP embeddings. Compute the cosine similarity between these image and text embeddings.


**Reasoning**:
The previous code block encountered a `TypeError` due to an incorrect assignment for `pipe.safety_checker`. I will regenerate the code block, ensuring that `pipe.safety_checker` is correctly handled by explicitly setting it to `None` only if it's not already `None`. I will also add the cosine similarity calculation between the normalized image and text embeddings and store them in a new list, as per the subtask instructions.



In [None]:
from transformers import CLIPTokenizer, CLIPTextModel
import torch.nn.functional as F

# Load pre-trained CLIP tokenizer and text model
clip_tokenizer = CLIPTokenizer.from_pretrained("openai/clip-vit-base-patch32")
clip_text_model = CLIPTextModel.from_pretrained("openai/clip-vit-base-patch32").to(device)

print("CLIP Tokenizer and Text Model loaded successfully.")

# --- Helper function to get CLIP text embeddings ---
def get_clip_text_embedding(prompt):
    inputs = clip_tokenizer(prompt, return_tensors="pt", padding=True, truncation=True).to(device)
    with torch.no_grad():
        # CLIP text models usually return `pooler_output` for sentence-level embeddings
        # If not available, we can use the mean of `last_hidden_state`
        outputs = clip_text_model(**inputs)
        if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
            return outputs.pooler_output
        else:
            # Fallback for models without explicit pooler_output (e.g., some T5 or older transformers)
            return outputs.last_hidden_state.mean(dim=1)

# --- Collect embeddings and consistency scores for calculation ---
image_embeddings_list = []
text_embeddings_list = []
consistency_scores = []

num_samples_to_evaluate = 50 # Evaluate a subset for efficiency

model.eval()
clip_vision_model.eval()
clip_text_model.eval()

# Ensure pipe's safety checker is disabled for faster generation
# It was already loaded with safety_checker=None in earlier cells,
# so we don't need to reassign it in a way that causes errors.
# If it were enabled, it would return (images, [bools]), but here we assume it's None.

count = 0
for Xb, yb in test_loader:
    if count >= num_samples_to_evaluate:
        break

    Xb = Xb.to(device)

    for i in range(len(Xb)): # Iterate through batch
        if count >= num_samples_to_evaluate:
            break

        x_eeg = Xb[i:i+1] # Single EEG sample

        # 1. Predict emotion from EEG
        with torch.no_grad():
            logits = model(x_eeg)
            pred_class = torch.argmax(logits, dim=1).item()

        # 2. Get corresponding text prompt
        prompt_text = emotion_prompts[pred_class]

        # 3. Generate image using Stable Diffusion with the text prompt
        generated_image = pipe(
            prompt=prompt_text,
            negative_prompt=negative_prompt,
            num_inference_steps=25, # Fewer steps for speed
            guidance_scale=7.5,
            height=512,
            width=512
        ).images[0]

        # 4. Encode generated image into CLIP embedding and normalize
        image_inputs = clip_processor(images=generated_image, return_tensors="pt").to(device)
        with torch.no_grad():
            image_embedding = clip_vision_model(**image_inputs).pooler_output
            image_embedding_norm = F.normalize(image_embedding, p=2, dim=-1)

        # 5. Encode the text prompt into CLIP embedding and normalize
        text_embedding = get_clip_text_embedding(prompt_text)
        text_embedding_norm = F.normalize(text_embedding, p=2, dim=-1)

        # 6. Calculate cosine similarity
        # Cosine similarity is the dot product of L2-normalized vectors
        similarity = torch.sum(image_embedding_norm * text_embedding_norm).item()
        consistency_scores.append(similarity)

        image_embeddings_list.append(image_embedding.squeeze(0).cpu().numpy())
        text_embeddings_list.append(text_embedding.squeeze(0).cpu().numpy())

        count += 1

# Convert lists to numpy arrays
image_embeddings = np.array(image_embeddings_list)
text_embeddings = np.array(text_embeddings_list)
consistency_scores = np.array(consistency_scores)

print(f"Collected {len(image_embeddings)} image-text embedding pairs and {len(consistency_scores)} consistency scores.")
print("Image embeddings shape:", image_embeddings.shape)
print("Text embeddings shape:", text_embeddings.shape)
print("Consistency scores shape:", consistency_scores.shape)

**Reasoning**:
The previous code block failed because the `pipe.safety_checker` was set to a lambda function that returned a single boolean instead of an iterable of booleans, causing a `TypeError`. I will correct the lambda function to return a list of booleans, which aligns with the expected input format for the pipeline's internal processing of safety checker results. This will enable the stable diffusion pipeline to generate images and allow the computation of image-text consistency scores.



In [None]:
from transformers import CLIPTokenizer, CLIPTextModel
import torch.nn.functional as F

# Load pre-trained CLIP tokenizer and text model
clip_tokenizer = CLIPTokenizer.from_pretrained("openai/clip-vit-base-patch32")
clip_text_model = CLIPTextModel.from_pretrained("openai/clip-vit-base-patch32").to(device)

print("CLIP Tokenizer and Text Model loaded successfully.")

# --- Helper function to get CLIP text embeddings ---
def get_clip_text_embedding(prompt):
    inputs = clip_tokenizer(prompt, return_tensors="pt", padding=True, truncation=True).to(device)
    with torch.no_grad():
        # CLIP text models usually return `pooler_output` for sentence-level embeddings
        # If not available, we can use the mean of `last_hidden_state`
        outputs = clip_text_model(**inputs)
        if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
            return outputs.pooler_output
        else:
            # Fallback for models without explicit pooler_output (e.g., some T5 or older transformers)
            return outputs.last_hidden_state.mean(dim=1)

# --- Collect embeddings and consistency scores for calculation ---
image_embeddings_list = []
text_embeddings_list = []
consistency_scores = []

num_samples_to_evaluate = 50 # Evaluate a subset for efficiency

model.eval()
clip_vision_model.eval()
clip_text_model.eval()

# FIX: Correctly set pipe.safety_checker to return an iterable (list of bools)
# This prevents the TypeError when the pipeline attempts to iterate over the safety checker's output.
pipe.safety_checker = lambda images, **kwargs: (images, [False] * len(images))

count = 0
for Xb, yb in test_loader:
    if count >= num_samples_to_evaluate:
        break

    Xb = Xb.to(device)

    for i in range(len(Xb)): # Iterate through batch
        if count >= num_samples_to_evaluate:
            break

        x_eeg = Xb[i:i+1] # Single EEG sample

        # 1. Predict emotion from EEG
        with torch.no_grad():
            logits = model(x_eeg)
            pred_class = torch.argmax(logits, dim=1).item()

        # 2. Get corresponding text prompt
        prompt_text = emotion_prompts[pred_class]

        # 3. Generate image using Stable Diffusion with the text prompt
        generated_image = pipe(
            prompt=prompt_text,
            negative_prompt=negative_prompt,
            num_inference_steps=25, # Fewer steps for speed
            guidance_scale=7.5,
            height=512,
            width=512
        ).images[0]

        # 4. Encode generated image into CLIP embedding and normalize
        image_inputs = clip_processor(images=generated_image, return_tensors="pt").to(device)
        with torch.no_grad():
            image_embedding = clip_vision_model(**image_inputs).pooler_output
            image_embedding_norm = F.normalize(image_embedding, p=2, dim=-1)

        # 5. Encode the text prompt into CLIP embedding and normalize
        text_embedding = get_clip_text_embedding(prompt_text)
        text_embedding_norm = F.normalize(text_embedding, p=2, dim=-1)

        # 6. Calculate cosine similarity
        # Cosine similarity is the dot product of L2-normalized vectors
        similarity = torch.sum(image_embedding_norm * text_embedding_norm).item()
        consistency_scores.append(similarity)

        image_embeddings_list.append(image_embedding.squeeze(0).cpu().numpy())
        text_embeddings_list.append(text_embedding.squeeze(0).cpu().numpy())

        count += 1

# Convert lists to numpy arrays
image_embeddings = np.array(image_embeddings_list)
text_embeddings = np.array(text_embeddings_list)
consistency_scores = np.array(consistency_scores)

print(f"Collected {len(image_embeddings)} image-text embedding pairs and {len(consistency_scores)} consistency scores.")
print("Image embeddings shape:", image_embeddings.shape)
print("Text embeddings shape:", text_embeddings.shape)
print("Consistency scores shape:", consistency_scores.shape)


**Reasoning**:
The `RuntimeError` regarding tensor size mismatch (768 vs 512) indicates that the CLIP image and text embeddings being compared have different dimensions. This likely stems from using `clip-vit-base-patch32` (which typically outputs 512-dim embeddings) for the image and text encoders, while Stable Diffusion v1.5's internal text encoder (which generates the initial text embeddings for prompt processing) is usually based on `clip-vit-large-patch14` and outputs 768-dim embeddings. To resolve this inconsistency and ensure that both image and text embeddings used for similarity calculation are of the same dimension, I will explicitly load all CLIP components (`CLIPProcessor`, `CLIPVisionModel`, `CLIPTokenizer`, `CLIPTextModel`) from the `openai/clip-vit-large-patch14` model. This model outputs 768-dimensional embeddings for both modalities, making them compatible for cosine similarity calculation.



In [None]:
from transformers import CLIPTokenizer, CLIPTextModel, CLIPProcessor, CLIPVisionModel
import torch.nn.functional as F

# Model name for CLIP components to ensure consistent embedding dimensions
CLIP_MODEL_NAME = "openai/clip-vit-large-patch14"

# Load pre-trained CLIP tokenizer and text model
clip_tokenizer = CLIPTokenizer.from_pretrained(CLIP_MODEL_NAME)
clip_text_model = CLIPTextModel.from_pretrained(CLIP_MODEL_NAME).to(device)

# Load pre-trained CLIP processor and vision model
clip_processor = CLIPProcessor.from_pretrained(CLIP_MODEL_NAME)
clip_vision_model = CLIPVisionModel.from_pretrained(CLIP_MODEL_NAME).to(device)

print(f"CLIP components ({CLIP_MODEL_NAME}) loaded successfully with consistent embedding dimensions.")

# --- Helper function to get CLIP text embeddings ---
def get_clip_text_embedding(prompt):
    inputs = clip_tokenizer(prompt, return_tensors="pt", padding=True, truncation=True).to(device)
    with torch.no_grad():
        # CLIP text models usually return `pooler_output` for sentence-level embeddings
        # If not available, we can use the mean of `last_hidden_state`
        outputs = clip_text_model(**inputs)
        if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
            return outputs.pooler_output
        else:
            # Fallback for models without explicit pooler_output
            return outputs.last_hidden_state.mean(dim=1)

# --- Collect embeddings and consistency scores for calculation ---
image_embeddings_list = []
text_embeddings_list = []
consistency_scores = []

num_samples_to_evaluate = 50 # Evaluate a subset for efficiency

model.eval()
clip_vision_model.eval()
clip_text_model.eval()

# FIX: Correctly set pipe.safety_checker to return an iterable (list of bools)
# This prevents the TypeError when the pipeline attempts to iterate over the safety checker's output.
pipe.safety_checker = lambda images, **kwargs: (images, [False] * len(images))

count = 0
for Xb, yb in test_loader:
    if count >= num_samples_to_evaluate:
        break

    Xb = Xb.to(device)

    for i in range(len(Xb)): # Iterate through batch
        if count >= num_samples_to_evaluate:
            break

        x_eeg = Xb[i:i+1] # Single EEG sample

        # 1. Predict emotion from EEG
        with torch.no_grad():
            logits = model(x_eeg)
            pred_class = torch.argmax(logits, dim=1).item()

        # 2. Get corresponding text prompt
        prompt_text = emotion_prompts[pred_class]

        # 3. Generate image using Stable Diffusion with the text prompt
        generated_image = pipe(
            prompt=prompt_text,
            negative_prompt=negative_prompt,
            num_inference_steps=25, # Fewer steps for speed
            guidance_scale=7.5,
            height=512,
            width=512
        ).images[0]

        # 4. Encode generated image into CLIP embedding and normalize
        image_inputs = clip_processor(images=generated_image, return_tensors="pt").to(device)
        with torch.no_grad():
            image_embedding = clip_vision_model(**image_inputs).pooler_output
            image_embedding_norm = F.normalize(image_embedding, p=2, dim=-1)

        # 5. Encode the text prompt into CLIP embedding and normalize
        text_embedding = get_clip_text_embedding(prompt_text)
        text_embedding_norm = F.normalize(text_embedding, p=2, dim=-1)

        # 6. Calculate cosine similarity
        # Cosine similarity is the dot product of L2-normalized vectors
        similarity = torch.sum(image_embedding_norm * text_embedding_norm).item()
        consistency_scores.append(similarity)

        image_embeddings_list.append(image_embedding.squeeze(0).cpu().numpy())
        text_embeddings_list.append(text_embedding.squeeze(0).cpu().numpy())

        count += 1

# Convert lists to numpy arrays
image_embeddings = np.array(image_embeddings_list)
text_embeddings = np.array(text_embeddings_list)
consistency_scores = np.array(consistency_scores)

print(f"Collected {len(image_embeddings)} image-text embedding pairs and {len(consistency_scores)} consistency scores.")
print("Image embeddings shape:", image_embeddings.shape)
print("Text embeddings shape:", text_embeddings.shape)
print("Consistency scores shape:", consistency_scores.shape)


# Task
The previous attempt to calculate image-emotion consistency scores failed with a `RuntimeError` indicating a tensor size mismatch (1024 vs 768) when using `openai/clip-vit-large-patch14`. This suggests that `CLIPVisionModel` and `CLIPTextModel` from `large-patch14` do not produce `pooler_output` embeddings of the same dimension (vision is 1024, text is 768).

To resolve this dimension inconsistency, I will modify the code to use `openai/clip-vit-base-patch32` for all CLIP components (`CLIPProcessor`, `CLIPVisionModel`, `CLIPTokenizer`, `CLIPTextModel`) for the consistency calculation. This model is known to produce consistent 512-dimensional `pooler_output` embeddings for both image and text, which is crucial for direct cosine similarity comparisons.

I will also include print statements to explicitly show the dimensions of the generated image embeddings and text embeddings just before the similarity calculation, to verify that they are indeed consistent (512-dimensional).

```python
from transformers import CLIPTokenizer, CLIPTextModel, CLIPProcessor, CLIPVisionModel
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Model name for CLIP components to ensure consistent embedding dimensions (512-dim for base-patch32)
CLIP_MODEL_NAME = "openai/clip-vit-base-patch32"

# Load pre-trained CLIP tokenizer and text model
clip_tokenizer = CLIPTokenizer.from_pretrained(CLIP_MODEL_NAME)
clip_text_model = CLIPTextModel.from_pretrained(CLIP_MODEL_NAME).to(device)

# Load pre-trained CLIP processor and vision model
clip_processor = CLIPProcessor.from_pretrained(CLIP_MODEL_NAME)
clip_vision_model = CLIPVisionModel.from_pretrained(CLIP_MODEL_NAME).to(device)

print(f"CLIP components ({CLIP_MODEL_NAME}) loaded successfully.")

# --- Helper function to get CLIP text embeddings ---
def get_clip_text_embedding(prompt):
    inputs = clip_tokenizer(prompt, return_tensors="pt", padding=True, truncation=True).to(device)
    with torch.no_grad():
        # CLIP text models usually return `pooler_output` for sentence-level embeddings
        # If not available, we can use the mean of `last_hidden_state`
        outputs = clip_text_model(**inputs)
        if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
            return outputs.pooler_output
        else:
            # Fallback for models without explicit pooler_output
            return outputs.last_hidden_state.mean(dim=1)

# --- Collect embeddings and consistency scores for calculation ---
image_embeddings_list = []
text_embeddings_list = []
consistency_scores = []

num_samples_to_evaluate = 50 # Evaluate a subset for efficiency

model.eval()
clip_vision_model.eval()
clip_text_model.eval()

# Ensure pipe's safety checker is correctly set to return an iterable (list of bools)
# This prevents the TypeError encountered in previous runs.
# Note: The 'pipe' variable is from Stable Diffusion, not the CLIP model loaded here.
if pipe.safety_checker is not None: # Check if it exists before trying to modify
    pipe.safety_checker = lambda images, **kwargs: (images, [False] * len(images))


count = 0
for batch_idx, (Xb, yb) in enumerate(test_loader):
    if count >= num_samples_to_evaluate:
        break

    Xb = Xb.to(device)

    for i in range(len(Xb)): # Iterate through batch
        if count >= num_samples_to_evaluate:
            break

        x_eeg = Xb[i:i+1] # Single EEG sample

        # 1. Predict emotion from EEG
        with torch.no_grad():
            logits = model(x_eeg)
            pred_class = torch.argmax(logits, dim=1).item()

        # 2. Get corresponding text prompt
        prompt_text = emotion_prompts[pred_class]

        # 3. Generate image using Stable Diffusion with the text prompt
        generated_image = pipe(
            prompt=prompt_text,
            negative_prompt=negative_prompt,
            num_inference_steps=25, # Fewer steps for speed
            guidance_scale=7.5,
            height=512,
            width=512
        ).images[0]

        # 4. Encode generated image into CLIP embedding and normalize
        image_inputs = clip_processor(images=generated_image, return_tensors="pt").to(device)
        with torch.no_grad():
            image_embedding = clip_vision_model(**image_inputs).pooler_output
            image_embedding_norm = F.normalize(image_embedding, p=2, dim=-1)

        # 5. Encode the text prompt into CLIP embedding and normalize
        text_embedding = get_clip_text_embedding(prompt_text)
        text_embedding_norm = F.normalize(text_embedding, p=2, dim=-1)

        # Verify dimensions before calculation
        if batch_idx == 0 and i == 0: # Print shapes only for the first sample
            print(f"DEBUG: Image embedding shape: {image_embedding_norm.shape}")
            print(f"DEBUG: Text embedding shape: {text_embedding_norm.shape}")

        # 6. Calculate cosine similarity
        # Cosine similarity is the dot product of L2-normalized vectors
        similarity = torch.sum(image_embedding_norm * text_embedding_norm).item()
        consistency_scores.append(similarity)

        image_embeddings_list.append(image_embedding.squeeze(0).cpu().numpy())
        text_embeddings_list.append(text_embedding.squeeze(0).cpu().numpy())

        count += 1

# Convert lists to numpy arrays
image_embeddings = np.array(image_embeddings_list)
text_embeddings = np.array(text_embeddings_list)
consistency_scores = np.array(consistency_scores)

print(f"Collected {len(image_embeddings)} image-text embedding pairs and {len(consistency_scores)} consistency scores.")
print("Image embeddings shape:", image_embeddings.shape)
print("Text embeddings shape:", text_embeddings.shape)
print("Consistency scores shape:", consistency_scores.shape)

# Plotting the distribution of consistency scores
plt.figure(figsize=(8, 6))
sns.histplot(consistency_scores, bins=20, kde=True)
plt.title('Distribution of Image-Emotion Consistency Scores (Cosine Similarity)')
plt.xlabel('Cosine Similarity Score (Image Embedding vs. Text Embedding)')
plt.ylabel('Frequency')
plt.grid(axis='y', alpha=0.75)
plt.show()

# Summarize findings
mean_score = np.mean(consistency_scores)
median_score = np.median(consistency_scores)
std_dev_score = np.std(consistency_scores)

print(f"\nSummary of Image-Emotion Consistency Scores:")
print(f"Mean Cosine Similarity: {mean_score:.4f}")
print(f"Median Cosine Similarity: {median_score:.4f}")
print(f"Standard Deviation: {std_dev_score:.4f}")
print(f"Min Score: {np.min(consistency_scores):.4f}")
print(f"Max Score: {np.max(consistency_scores):.4f}")

# Discussion of findings based on the distribution
if mean_score > 0.7:
    print("\nThe high mean cosine similarity suggests a strong alignment between the generated images and their corresponding emotion text prompts. The Stable Diffusion model, guided by the EEG-predicted emotions, is likely producing images that effectively capture the semantic meaning of the emotion prompts.")
elif mean_score > 0.5:
    print("\nThe moderate mean cosine similarity indicates a reasonable alignment between the generated images and their corresponding emotion text prompts. There's a good general correlation, but there might be room for improvement in fine-tuning prompts or the adapter for stronger consistency.")
else:
    print("\nThe low mean cosine similarity suggests a weak alignment between the generated images and their corresponding emotion text prompts. This could indicate that either the EEG-to-CLIP adapter needs further training, the emotion prompts are not sufficiently descriptive, or the Stable Diffusion model struggles to interpret the nuances of these emotion prompts effectively.")

print(f"\nDetailed analysis of the histogram shows the distribution of scores. A distribution clustered towards higher values (closer to 1) indicates better consistency, while a spread towards lower values (closer to 0 or negative) suggests less alignment. The shape of the KDE (Kernel Density Estimate) also provides insight into the overall quality and reliability of the image generation process based on emotion predictions.")
```

## Load CLIP Model for Image Embeddings

### Subtask:
Reload all CLIP components (`CLIPProcessor`, `CLIPVisionModel`, `CLIPTokenizer`, `CLIPTextModel`) using `openai/clip-vit-large-patch14` to ensure consistent 768-dimensional embeddings for both modalities.


**Reasoning**:
The previous attempt to load CLIP models resulted in a size mismatch error because the image and text embeddings had different dimensions. To fix this, I need to reload all CLIP components using a consistent model, `openai/clip-vit-large-patch14`, which provides 768-dimensional embeddings for both modalities, ensuring compatibility for cosine similarity calculations.



In [None]:
from transformers import CLIPTokenizer, CLIPTextModel, CLIPProcessor, CLIPVisionModel
import torch.nn.functional as F

# Model name for CLIP components to ensure consistent embedding dimensions
CLIP_MODEL_NAME = "openai/clip-vit-large-patch14"

# Load pre-trained CLIP tokenizer and text model
clip_tokenizer = CLIPTokenizer.from_pretrained(CLIP_MODEL_NAME)
clip_text_model = CLIPTextModel.from_pretrained(CLIP_MODEL_NAME).to(device)

# Load pre-trained CLIP processor and vision model
clip_processor = CLIPProcessor.from_pretrained(CLIP_MODEL_NAME)
clip_vision_model = CLIPVisionModel.from_pretrained(CLIP_MODEL_NAME).to(device)

print(f"CLIP components ({CLIP_MODEL_NAME}) loaded successfully with consistent embedding dimensions.")

# --- Helper function to get CLIP text embeddings ---
def get_clip_text_embedding(prompt):
    inputs = clip_tokenizer(prompt, return_tensors="pt", padding=True, truncation=True).to(device)
    with torch.no_grad():
        # CLIP text models usually return `pooler_output` for sentence-level embeddings
        # If not available, we can use the mean of `last_hidden_state`
        outputs = clip_text_model(**inputs)
        if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
            return outputs.pooler_output
        else:
            # Fallback for models without explicit pooler_output
            return outputs.last_hidden_state.mean(dim=1)

# --- Collect embeddings and consistency scores for calculation ---
image_embeddings_list = []
text_embeddings_list = []
consistency_scores = []

num_samples_to_evaluate = 50 # Evaluate a subset for efficiency

model.eval()
clip_vision_model.eval()
clip_text_model.eval()

# FIX: Correctly set pipe.safety_checker to return an iterable (list of bools)
# This prevents the TypeError when the pipeline attempts to iterate over the safety checker's output.
pipe.safety_checker = lambda images, **kwargs: (images, [False] * len(images))

count = 0
for Xb, yb in test_loader:
    if count >= num_samples_to_evaluate:
        break

    Xb = Xb.to(device)

    for i in range(len(Xb)): # Iterate through batch
        if count >= num_samples_to_evaluate:
            break

        x_eeg = Xb[i:i+1] # Single EEG sample

        # 1. Predict emotion from EEG
        with torch.no_grad():
            logits = model(x_eeg)
            pred_class = torch.argmax(logits, dim=1).item()

        # 2. Get corresponding text prompt
        prompt_text = emotion_prompts[pred_class]

        # 3. Generate image using Stable Diffusion with the text prompt
        generated_image = pipe(
            prompt=prompt_text,
            negative_prompt=negative_prompt,
            num_inference_steps=25, # Fewer steps for speed
            guidance_scale=7.5,
            height=512,
            width=512
        ).images[0]

        # 4. Encode generated image into CLIP embedding and normalize
        image_inputs = clip_processor(images=generated_image, return_tensors="pt").to(device)
        with torch.no_grad():
            image_embedding = clip_vision_model(**image_inputs).pooler_output
            image_embedding_norm = F.normalize(image_embedding, p=2, dim=-1)

        # 5. Encode the text prompt into CLIP embedding and normalize
        text_embedding = get_clip_text_embedding(prompt_text)
        text_embedding_norm = F.normalize(text_embedding, p=2, dim=-1)

        # 6. Calculate cosine similarity
        # Cosine similarity is the dot product of L2-normalized vectors
        similarity = torch.sum(image_embedding_norm * text_embedding_norm).item()
        consistency_scores.append(similarity)

        image_embeddings_list.append(image_embedding.squeeze(0).cpu().numpy())
        text_embeddings_list.append(text_embedding.squeeze(0).cpu().numpy())

        count += 1

# Convert lists to numpy arrays
image_embeddings = np.array(image_embeddings_list)
text_embeddings = np.array(text_embeddings_list)
consistency_scores = np.array(consistency_scores)

print(f"Collected {len(image_embeddings)} image-text embedding pairs and {len(consistency_scores)} consistency scores.")
print("Image embeddings shape:", image_embeddings.shape)
print("Text embeddings shape:", text_embeddings.shape)
print("Consistency scores shape:", consistency_scores.shape)

**Reasoning**:
The previous attempt to calculate image-emotion consistency scores failed due to a tensor size mismatch between the CLIP image and text embeddings (1024 vs 768) when using `openai/clip-vit-large-patch14`. To resolve this, I will reload all CLIP components using `openai/clip-vit-base-patch32`, which provides consistent 512-dimensional `pooler_output` embeddings for both modalities, ensuring compatibility for cosine similarity calculations.



In [None]:
from transformers import CLIPTokenizer, CLIPTextModel, CLIPProcessor, CLIPVisionModel
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Model name for CLIP components to ensure consistent embedding dimensions (512-dim for base-patch32)
CLIP_MODEL_NAME = "openai/clip-vit-base-patch32"

# Load pre-trained CLIP tokenizer and text model
clip_tokenizer = CLIPTokenizer.from_pretrained(CLIP_MODEL_NAME)
clip_text_model = CLIPTextModel.from_pretrained(CLIP_MODEL_NAME).to(device)

# Load pre-trained CLIP processor and vision model
clip_processor = CLIPProcessor.from_pretrained(CLIP_MODEL_NAME)
clip_vision_model = CLIPVisionModel.from_pretrained(CLIP_MODEL_NAME).to(device)

print(f"CLIP components ({CLIP_MODEL_NAME}) loaded successfully.")

# --- Helper function to get CLIP text embeddings ---
def get_clip_text_embedding(prompt):
    inputs = clip_tokenizer(prompt, return_tensors="pt", padding=True, truncation=True).to(device)
    with torch.no_grad():
        # CLIP text models usually return `pooler_output` for sentence-level embeddings
        # If not available, we can use the mean of `last_hidden_state`
        outputs = clip_text_model(**inputs)
        if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
            return outputs.pooler_output
        else:
            # Fallback for models without explicit pooler_output
            return outputs.last_hidden_state.mean(dim=1)

# --- Collect embeddings and consistency scores for calculation ---
image_embeddings_list = []
text_embeddings_list = []
consistency_scores = []

num_samples_to_evaluate = 50 # Evaluate a subset for efficiency

model.eval()
clip_vision_model.eval()
clip_text_model.eval()

# Ensure pipe's safety checker is correctly set to return an iterable (list of bools)
# This prevents the TypeError encountered in previous runs.
# Note: The 'pipe' variable is from Stable Diffusion, not the CLIP model loaded here.
if pipe.safety_checker is not None: # Check if it exists before trying to modify
    pipe.safety_checker = lambda images, **kwargs: (images, [False] * len(images))


count = 0
for batch_idx, (Xb, yb) in enumerate(test_loader):
    if count >= num_samples_to_evaluate:
        break

    Xb = Xb.to(device)

    for i in range(len(Xb)): # Iterate through batch
        if count >= num_samples_to_evaluate:
            break

        x_eeg = Xb[i:i+1] # Single EEG sample

        # 1. Predict emotion from EEG
        with torch.no_grad():
            logits = model(x_eeg)
            pred_class = torch.argmax(logits, dim=1).item()

        # 2. Get corresponding text prompt
        prompt_text = emotion_prompts[pred_class]

        # 3. Generate image using Stable Diffusion with the text prompt
        generated_image = pipe(
            prompt=prompt_text,
            negative_prompt=negative_prompt,
            num_inference_steps=25, # Fewer steps for speed
            guidance_scale=7.5,
            height=512,
            width=512
        ).images[0]

        # 4. Encode generated image into CLIP embedding and normalize
        image_inputs = clip_processor(images=generated_image, return_tensors="pt").to(device)
        with torch.no_grad():
            image_embedding = clip_vision_model(**image_inputs).pooler_output
            image_embedding_norm = F.normalize(image_embedding, p=2, dim=-1)

        # 5. Encode the text prompt into CLIP embedding and normalize
        text_embedding = get_clip_text_embedding(prompt_text)
        text_embedding_norm = F.normalize(text_embedding, p=2, dim=-1)

        # Verify dimensions before calculation
        if batch_idx == 0 and i == 0: # Print shapes only for the first sample
            print(f"DEBUG: Image embedding shape: {image_embedding_norm.shape}")
            print(f"DEBUG: Text embedding shape: {text_embedding_norm.shape}")

        # 6. Calculate cosine similarity
        # Cosine similarity is the dot product of L2-normalized vectors
        similarity = torch.sum(image_embedding_norm * text_embedding_norm).item()
        consistency_scores.append(similarity)

        image_embeddings_list.append(image_embedding.squeeze(0).cpu().numpy())
        text_embeddings_list.append(text_embedding.squeeze(0).cpu().numpy())

        count += 1

# Convert lists to numpy arrays
image_embeddings = np.array(image_embeddings_list)
text_embeddings = np.array(text_embeddings_list)
consistency_scores = np.array(consistency_scores)

print(f"Collected {len(image_embeddings)} image-text embedding pairs and {len(consistency_scores)} consistency scores.")
print("Image embeddings shape:", image_embeddings.shape)
print("Text embeddings shape:", text_embeddings.shape)
print("Consistency scores shape:", consistency_scores.shape)

# Plotting the distribution of consistency scores
plt.figure(figsize=(8, 6))
sns.histplot(consistency_scores, bins=20, kde=True)
plt.title('Distribution of Image-Emotion Consistency Scores (Cosine Similarity)')
plt.xlabel('Cosine Similarity Score (Image Embedding vs. Text Embedding)')
plt.ylabel('Frequency')
plt.grid(axis='y', alpha=0.75)
plt.show()

# Summarize findings
mean_score = np.mean(consistency_scores)
median_score = np.median(consistency_scores)
std_dev_score = np.std(consistency_scores)

print(f"\nSummary of Image-Emotion Consistency Scores:")
print(f"Mean Cosine Similarity: {mean_score:.4f}")
print(f"Median Cosine Similarity: {median_score:.4f}")
print(f"Standard Deviation: {std_dev_score:.4f}")
print(f"Min Score: {np.min(consistency_scores):.4f}")
print(f"Max Score: {np.max(consistency_scores):.4f}")

# Discussion of findings based on the distribution
if mean_score > 0.7:
    print("\nThe high mean cosine similarity suggests a strong alignment between the generated images and their corresponding emotion text prompts. The Stable Diffusion model, guided by the EEG-predicted emotions, is likely producing images that effectively capture the semantic meaning of the emotion prompts.")
elif mean_score > 0.5:
    print("\nThe moderate mean cosine similarity indicates a reasonable alignment between the generated images and their corresponding emotion text prompts. There's a good general correlation, but there might be room for improvement in fine-tuning prompts or the adapter for stronger consistency.")
else:
    print("\nThe low mean cosine similarity suggests a weak alignment between the generated images and their corresponding emotion text prompts. This could indicate that either the EEG-to-CLIP adapter needs further training, the emotion prompts are not sufficiently descriptive, or the Stable Diffusion model struggles to interpret the nuances of these emotion prompts effectively.")

print(f"\nDetailed analysis of the histogram shows the distribution of scores. A distribution clustered towards higher values (closer to 1) indicates better consistency, while a spread towards lower values (closer to 0 or negative) suggests less alignment. The shape of the KDE (Kernel Density Estimate) also provides insight into the overall quality and reliability of the image generation process based on emotion predictions.")

**Reasoning**:
The previous `RuntimeError` for tensor size mismatch (1024 vs 768) indicates that `CLIPVisionModel` and `CLIPTextModel` from `openai/clip-vit-large-patch14` provide `pooler_output` of different dimensions. To correctly compare image and text embeddings, I need to use the dedicated feature extraction methods `clip_model.get_image_features()` and `clip_model.get_text_features()`, which project both modalities into a consistent shared latent space (768 dimensions for `large-patch14`). I will load the overarching `CLIPModel` to ensure access to these methods and guarantee consistent feature dimensions for cosine similarity calculation.



In [None]:
from transformers import CLIPTokenizer, CLIPTextModel, CLIPProcessor, CLIPVisionModel, CLIPModel
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Model name for CLIP components to ensure consistent embedding dimensions
CLIP_MODEL_NAME = "openai/clip-vit-large-patch14"

# Load the full CLIP model, processor, and tokenizer
clip_model = CLIPModel.from_pretrained(CLIP_MODEL_NAME).to(device)
clip_processor = CLIPProcessor.from_pretrained(CLIP_MODEL_NAME)
clip_tokenizer = CLIPTokenizer.from_pretrained(CLIP_MODEL_NAME)

print(f"CLIP components ({CLIP_MODEL_NAME}) loaded successfully with consistent embedding dimensions for features.")

# --- Helper function to get CLIP text features --- #
def get_clip_text_embedding(prompt):
    inputs = clip_tokenizer(prompt, return_tensors="pt", padding=True, truncation=True).to(device)
    with torch.no_grad():
        # Use the dedicated method to get projected text features
        text_features = clip_model.get_text_features(**inputs)
    return text_features

# --- Collect embeddings and consistency scores for calculation --- #
image_embeddings_list = []
text_embeddings_list = []
consistency_scores = []

num_samples_to_evaluate = 50 # Evaluate a subset for efficiency

model.eval()
clip_model.eval() # Ensure the full CLIP model is in evaluation mode

# Ensure pipe's safety checker is correctly set to return an iterable (list of bools)
# This prevents the TypeError encountered in previous runs.
# Note: The 'pipe' variable is from Stable Diffusion, not the CLIP model loaded here.
if pipe.safety_checker is not None: # Check if it exists before trying to modify
    pipe.safety_checker = lambda images, **kwargs: (images, [False] * len(images))


count = 0
for batch_idx, (Xb, yb) in enumerate(test_loader):
    if count >= num_samples_to_evaluate:
        break

    Xb = Xb.to(device)

    for i in range(len(Xb)): # Iterate through batch
        if count >= num_samples_to_evaluate:
            break

        x_eeg = Xb[i:i+1] # Single EEG sample

        # 1. Predict emotion from EEG
        with torch.no_grad():
            logits = model(x_eeg)
            pred_class = torch.argmax(logits, dim=1).item()

        # 2. Get corresponding text prompt
        prompt_text = emotion_prompts[pred_class]

        # 3. Generate image using Stable Diffusion with the text prompt
        generated_image = pipe(
            prompt=prompt_text,
            negative_prompt=negative_prompt,
            num_inference_steps=25, # Fewer steps for speed
            guidance_scale=7.5,
            height=512,
            width=512
        ).images[0]

        # 4. Encode generated image into CLIP embedding and normalize
        image_inputs = clip_processor(images=generated_image, return_tensors="pt").to(device)
        with torch.no_grad():
            # Use the dedicated method to get projected image features
            image_embedding = clip_model.get_image_features(**image_inputs)
            image_embedding_norm = F.normalize(image_embedding, p=2, dim=-1)

        # 5. Encode the text prompt into CLIP embedding and normalize
        text_embedding = get_clip_text_embedding(prompt_text)
        text_embedding_norm = F.normalize(text_embedding, p=2, dim=-1)

        # Verify dimensions before calculation
        if batch_idx == 0 and i == 0: # Print shapes only for the first sample
            print(f"DEBUG: Image embedding shape: {image_embedding_norm.shape}")
            print(f"DEBUG: Text embedding shape: {text_embedding_norm.shape}")

        # 6. Calculate cosine similarity
        # Cosine similarity is the dot product of L2-normalized vectors
        similarity = torch.sum(image_embedding_norm * text_embedding_norm).item()
        consistency_scores.append(similarity)

        image_embeddings_list.append(image_embedding.squeeze(0).cpu().numpy())
        text_embeddings_list.append(text_embedding.squeeze(0).cpu().numpy())

        count += 1

# Convert lists to numpy arrays
image_embeddings = np.array(image_embeddings_list)
text_embeddings = np.array(text_embeddings_list)
consistency_scores = np.array(consistency_scores)

print(f"Collected {len(image_embeddings)} image-text embedding pairs and {len(consistency_scores)} consistency scores.")
print("Image embeddings shape:", image_embeddings.shape)
print("Text embeddings shape:", text_embeddings.shape)
print("Consistency scores shape:", consistency_scores.shape)

# Plotting the distribution of consistency scores
plt.figure(figsize=(8, 6))
sns.histplot(consistency_scores, bins=20, kde=True)
plt.title('Distribution of Image-Emotion Consistency Scores (Cosine Similarity)')
plt.xlabel('Cosine Similarity Score (Image Embedding vs. Text Embedding)')
plt.ylabel('Frequency')
plt.grid(axis='y', alpha=0.75)
plt.show()

# Summarize findings
mean_score = np.mean(consistency_scores)
median_score = np.median(consistency_scores)
std_dev_score = np.std(consistency_scores)

print(f"\nSummary of Image-Emotion Consistency Scores:")
print(f"Mean Cosine Similarity: {mean_score:.4f}")
print(f"Median Cosine Similarity: {median_score:.4f}")
print(f"Standard Deviation: {std_dev_score:.4f}")
print(f"Min Score: {np.min(consistency_scores):.4f}")
print(f"Max Score: {np.max(consistency_scores):.4f}")

# Discussion of findings based on the distribution
if mean_score > 0.7:
    print("\nThe high mean cosine similarity suggests a strong alignment between the generated images and their corresponding emotion text prompts. The Stable Diffusion model, guided by the EEG-predicted emotions, is likely producing images that effectively capture the semantic meaning of the emotion prompts.")
elif mean_score > 0.5:
    print("\nThe moderate mean cosine similarity indicates a reasonable alignment between the generated images and their corresponding emotion text prompts. There's a good general correlation, but there might be room for improvement in fine-tuning prompts or the adapter for stronger consistency.")
else:
    print("\nThe low mean cosine similarity suggests a weak alignment between the generated images and their corresponding emotion text prompts. This could indicate that either the EEG-to-CLIP adapter needs further training, the emotion prompts are not sufficiently descriptive, or the Stable Diffusion model struggles to interpret the nuances of these emotion prompts effectively.")

print(f"\nDetailed analysis of the histogram shows the distribution of scores. A distribution clustered towards higher values (closer to 1) indicates better consistency, while a spread towards lower values (closer to 0 or negative) suggests less alignment. The shape of the KDE (Kernel Density Estimate) also provides insight into the overall quality and reliability of the image generation process based on emotion predictions.")

## Final Task

### Subtask:
Summarize the findings from the consistency score distribution, discussing the extent to which generated images align with predicted emotions.


## Summary:

### Q&A
The generated images show a weak alignment with the predicted emotions, as indicated by a mean cosine similarity score of 0.2352. The distribution of consistency scores is low, ranging from 0.1918 to 0.2814.

### Data Analysis Key Findings
*   **Initial Dimension Mismatch:** The initial attempt to calculate consistency scores using `CLIPVisionModel` and `CLIPTextModel` from `openai/clip-vit-large-patch14` failed due to a `RuntimeError` caused by a tensor size mismatch (1024 vs. 768 dimensions).
*   **Persistent Mismatch:** An subsequent attempt with `openai/clip-vit-base-patch32` also resulted in dimension inconsistency (768 for image embeddings and 512 for text embeddings), demonstrating that using separate `CLIPVisionModel` and `CLIPTextModel` components does not guarantee consistent embedding dimensions across modalities, even with the same base model.
*   **Resolution of Dimension Inconsistency:** The issue was successfully resolved by loading the comprehensive `CLIPModel` (`openai/clip-vit-large-patch14`) and utilizing its dedicated `get_image_features()` and `get_text_features()` methods. This ensured both image and text embeddings were consistently 768-dimensional.
*   **Consistency Score Summary:** For 50 evaluated samples, the image-emotion consistency scores (cosine similarity) were summarized as follows:
    *   Mean Cosine Similarity: 0.2352
    *   Median Cosine Similarity: 0.2318
    *   Standard Deviation: 0.0191
    *   Minimum Score: 0.1918
    *   Maximum Score: 0.2814
*   **Weak Alignment:** The mean cosine similarity of 0.2352 indicates a weak alignment between the generated images and their corresponding emotion text prompts.

### Insights or Next Steps
*   Further investigation is needed to determine whether the weak alignment stems from the EEG-to-CLIP adapter's effectiveness, the descriptiveness of the emotion prompts, or the Stable Diffusion model's interpretation of these emotion prompts.
*   Consider strategies to improve the consistency, such as fine-tuning the EEG-to-CLIP adapter, refining the emotion prompts used for image generation, or exploring alternative image generation models/techniques.
