Przepuszczamy wszystkie obrazki z binary_dataset_pelican przez pipeline z pliku initial_pipeline_sae.ipynb a wstawiamy reprezentacje wyrzucone przez sae do jednego datasetu wraz z labelami (y oznacza 1 jeśli na zdjęciu jest pelican i 0 jeśli nie ma).

In [19]:
import torch
import clip
from PIL import Image
import torchvision.transforms as transforms
from pathlib import Path
from sparse_autoencoder import SparseAutoencoder
import pandas as pd
import numpy as np


def process_image_pipeline(image_path, sae_model_path):
    """
    Przetwarza obraz przez model CLIP i SAE, a następnie zapisuje wynik.
    :param image_path: Ścieżka do obrazu wejściowego.
    :param sae_model_path: Ścieżka do wytrenowanego modelu SAE.
    :param output_path: Ścieżka do zapisu przetworzonych cech.
    """

    # Wybór urządzenia
    device = "cuda" if torch.cuda.is_available() else ('mps' if torch.backends.mps.is_available() else "cpu")
    print(f"Używane urządzenie: {device}")

    # CLIP
    model, preprocess = clip.load("ViT-L/14", device=device)
    # Załaduj i przetwórz obraz
    image = preprocess(Image.open(image_path)).unsqueeze(0).to(device)
    # Przetwarzanie obrazu przez CLIP
    with torch.no_grad():
        image_features = model.encode_image(image)

    # SAE
    def load_sae_model(sae_checkpoint_path):
        state_dict = torch.load(sae_checkpoint_path, map_location=device)
        autoencoder_input_dim = 768  # CLIP ViT-L/14
        expansion_factor = 8
        n_learned_features = int(autoencoder_input_dim * expansion_factor)
        len_hook_points = 1  

        sae = SparseAutoencoder(
            n_input_features=autoencoder_input_dim,
            n_learned_features=n_learned_features,
            n_components=len_hook_points
        ).to(device)

        sae.load_state_dict(state_dict)
        sae.eval()
        return sae  

    # Przepuszczanie CLIP features przez SAE
    @torch.no_grad()
    def get_sae_representation(clip_features, sae_model):
        concepts, _ = sae_model(clip_features)
        return concepts


    sae = load_sae_model(sae_model_path)
    sae_repr = get_sae_representation(image_features, sae)

    return sae_repr.cpu().squeeze(0)

In [20]:
from tqdm import tqdm

Zróbmy (przynajmniej na razie) modele tylko na 100 zdjęciach z konceptem i 100 bez bo za długo to się wykonuje.

In [None]:
def create_concept_dataset(dataset_path, sae_model_path, concept_name, max_per_class=100):
    X = []
    y = []

    for label_dir in ["0_other", f"1_{concept_name}"]:
        label = 0 if label_dir == "0_other" else 1
        dir_path = Path(dataset_path) / label_dir
        image_paths = list(dir_path.glob("*.jpg")) + list(dir_path.glob("*.png")) + list(dir_path.glob("*.jpeg"))
        
        # Bierzemy tylko do max_per_class przykładów
        image_paths = image_paths[:max_per_class]

        for img_path in tqdm(image_paths, desc=f"Processing {label_dir}"):
            try:
                vec = process_image_pipeline(img_path, sae_model_path) 
                X.append(vec.squeeze().numpy())
                y.append(label)
            except Exception as e:
                print(f"Error processing {img_path}: {e}")

    # Konwersja do tensora
    X_tensor = torch.tensor(X, dtype=torch.float32)
    y_tensor = torch.tensor(y, dtype=torch.int64)

    # Zapis
    torch.save(X_tensor, "X_concepts.pt")
    torch.save(y_tensor, "y_labels.pt")

    return X_tensor, y_tensor


In [22]:
X, y = create_concept_dataset("../WB-SAE-CBM/concept_datasets/binary_dataset_pelican", "clip_ViT-L_14sparse_autoencoder_final.pt", "pelican")

Używane urządzenie: cpu


Processing 0_other:   0%|          | 0/100 [00:00<?, ?it/s]

Używane urządzenie: cpu


Processing 0_other:   1%|          | 1/100 [00:06<10:07,  6.13s/it]

Error processing ..\WB-SAE-CBM\concept_datasets\binary_dataset_pelican\0_other\n01440764_11444.JPEG: bad allocation
Używane urządzenie: cpu


Processing 0_other:   2%|▏         | 2/100 [00:11<09:18,  5.70s/it]

Error processing ..\WB-SAE-CBM\concept_datasets\binary_dataset_pelican\0_other\n01440764_11974.JPEG: bad allocation
Używane urządzenie: cpu


Processing 0_other:   2%|▏         | 2/100 [00:16<13:13,  8.10s/it]


KeyboardInterrupt: 

Model liniowy na neuronie odpowiadającym za koncept pelikana:

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, roc_auc_score

In [None]:
# Podział danych
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)

# neuron dla "pelican" 
pelican_neuron = 1085

# Model
X_train_single = X_train[:, pelican_neuron].reshape(-1, 1)
X_test_single = X_test[:, pelican_neuron].reshape(-1, 1)
clf_single = LogisticRegression().fit(X_train_single, y_train)
preds_single = clf_single.predict(X_test_single)
acc_single = accuracy_score(y_test, preds_single)
roc_single = roc_auc_score(y_test, clf_single.predict_proba(X_test_single)[:, 1])

ValueError: With n_samples=0, test_size=0.2 and train_size=None, the resulting train set will be empty. Adjust any of the aforementioned parameters.

Regresja logistyczna dla wszystkich neuronów:

In [None]:
# model
clf_full = LogisticRegression(max_iter=1000).fit(X_train, y_train)
preds_full = clf_full.predict(X_test)
acc_full = accuracy_score(y_test, preds_full)
roc_full = roc_auc_score(y_test, clf_full.predict_proba(X_test)[:, 1])

Wyniki: 

In [None]:
pelican_results = {
    "naming_neuron": {"accuracy": acc_single, "roc_auc": roc_single},
    "logistic_regression_full": {"accuracy": acc_full, "roc_auc": roc_full}
}

In [None]:
print(f"Pelican results: {pelican_results}")

To samo dla 2 pozostałych konceptów: flamingi i gęsi.

Flamingi:

In [None]:
X_flamingo, y_flamingo = create_concept_dataset("../WB-SAE-CBM/concept_datasets/binary_dataset_flamingos", "clip_ViT-L_14sparse_autoencoder_final.pt", "flamingo")

In [None]:
# Podział danych
X_train, X_test, y_train, y_test = train_test_split(X_flamingo, y_flamingo, test_size=0.2, stratify=y, random_state=42)

# neuron dla "flamingo" 
flamingo_neuron = 2347

# Model na pojedynczym neuronie
X_train_single = X_train[:, flamingo_neuron].reshape(-1, 1)
X_test_single = X_test[:, flamingo_neuron].reshape(-1, 1)
clf_single = LogisticRegression().fit(X_train_single, y_train)
preds_single = clf_single.predict(X_test_single)
acc_single = accuracy_score(y_test, preds_single)
roc_single = roc_auc_score(y_test, clf_single.predict_proba(X_test_single)[:, 1])

# Model na pełnym wektorze
clf_full = LogisticRegression(max_iter=1000).fit(X_train, y_train)
preds_full = clf_full.predict(X_test)
acc_full = accuracy_score(y_test, preds_full)
roc_full = roc_auc_score(y_test, clf_full.predict_proba(X_test)[:, 1])



In [None]:
results_flamingo = {
    "naming_neuron": {"accuracy": acc_single, "roc_auc": roc_single},
    "logistic_regression_full": {"accuracy": acc_full, "roc_auc": roc_full}
}

Gęsi:

In [None]:
X_geese, y_geese = create_concept_dataset("../WB-SAE-CBM/concept_datasets/binary_dataset_geese", "clip_ViT-L_14sparse_autoencoder_final.pt", "goose")

In [None]:
# Podział danych
X_train, X_test, y_train, y_test = train_test_split(X_geese, y_geese, test_size=0.2, stratify=y, random_state=42)

# neuron dla "goose" 
goose_neuron = 3426

# Model na pojedynczym neuronie
X_train_single = X_train[:, goose_neuron].reshape(-1, 1)
X_test_single = X_test[:, goose_neuron].reshape(-1, 1)
clf_single = LogisticRegression().fit(X_train_single, y_train)
preds_single = clf_single.predict(X_test_single)
acc_single = accuracy_score(y_test, preds_single)
roc_single = roc_auc_score(y_test, clf_single.predict_proba(X_test_single)[:, 1])

# Model na pełnym wektorze
clf_full = LogisticRegression(max_iter=1000).fit(X_train, y_train)
preds_full = clf_full.predict(X_test)
acc_full = accuracy_score(y_test, preds_full)
roc_full = roc_auc_score(y_test, clf_full.predict_proba(X_test)[:, 1])



In [None]:
results_goose = {
    "naming_neuron": {"accuracy": acc_single, "roc_auc": roc_single},
    "logistic_regression_full": {"accuracy": acc_full, "roc_auc": roc_full}
}