In [None]:
!pip install easyfsl



In [None]:
!pip install pymongo



In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms, datasets
from torch.utils.data import DataLoader, Subset
from easyfsl.samplers import TaskSampler
from easyfsl.utils import sliding_average

from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
import random
import time

import itertools
import pandas as pd
import os

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
class FilteredCIFAR100(datasets.CIFAR100):
    def __getitem__(self, index):
        image, label = self.data[index], self.targets[index]
        image = Image.fromarray(image)
        if self.transform:
            image = self.transform(image)
        return image, int(label)

In [None]:
class PrototypicalNetworks(nn.Module):
        def __init__(self, backbone: nn.Module):
            super().__init__()
            self.backbone = backbone

        def forward(self, support_images, support_labels, query_images):
            z_support = self.backbone(support_images)
            z_query = self.backbone(query_images)
            n_way = len(torch.unique(support_labels))
            z_proto = torch.stack([
                z_support[support_labels == label].mean(0)
                for label in torch.unique(support_labels)
            ])
            dists = torch.cdist(z_query, z_proto)
            return -dists

In [None]:
def run_single_experiment(experiment_id, shot, meta_batch_size, do_training, results_dir):
    exp_dir = os.path.join(results_dir, experiment_id)
    os.makedirs(exp_dir, exist_ok=True)

    # 하이퍼파리터 설정
    N_WAY = 4
    N_SHOT = shot
    N_QUERY = 10
    N_TRAINING_EPISODES = 40000
    N_VALIDATION_TASKS = 100
    N_EVALUATION_TASKS = 1000
    LOG_FREQ = 50
    VAL_FREQ = 500
    PATIENCE = 10

    # 데이터세트
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
    ])
    cifar100 = FilteredCIFAR100(root=".", train=True, download=True, transform=transform)

    # 클래스 설정
    all_classes = list(range(100))
    random.seed(42)
    random.shuffle(all_classes)
    train_classes = all_classes[:80]
    val_classes = all_classes[80:]

    train_idx = [i for i, label in enumerate(cifar100.targets) if label in train_classes]
    val_idx = [i for i, label in enumerate(cifar100.targets) if label in val_classes]

    train_dataset = Subset(cifar100, train_idx)
    val_dataset = Subset(cifar100, val_idx)
    train_dataset.get_labels = lambda: [int(cifar100.targets[i]) for i in train_idx]
    val_dataset.get_labels = lambda: [int(cifar100.targets[i]) for i in val_idx]

    train_sampler = TaskSampler(train_dataset, n_way=N_WAY, n_shot=N_SHOT, n_query=N_QUERY, n_tasks=N_TRAINING_EPISODES)
    val_sampler = TaskSampler(val_dataset, n_way=N_WAY, n_shot=N_SHOT, n_query=N_QUERY, n_tasks=N_VALIDATION_TASKS)

    train_loader = DataLoader(train_dataset, batch_sampler=train_sampler, num_workers=2, collate_fn=train_sampler.episodic_collate_fn)
    val_loader = DataLoader(val_dataset, batch_sampler=val_sampler, num_workers=2, collate_fn=val_sampler.episodic_collate_fn)

    # 모델 정의
    backbone = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
    backbone.fc = nn.Flatten()
    model = PrototypicalNetworks(backbone).cuda()

    # 학습 설정
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-3)

    best_val_acc = 0.0
    patience_counter = 0
    all_loss = []
    val_acc_list = []
    episode_list = []

    # 학습 없이 검사만 진행하는 경우
    if not do_training:
        print(f"[{experiment_id}] No training: evaluating pretrained model")
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for s_img, s_lbl, q_img, q_lbl, _ in val_loader:
                s_img, s_lbl = s_img.cuda(), s_lbl.cuda()
                q_img, q_lbl = q_img.cuda(), q_lbl.cuda()
                scores = model(s_img, s_lbl, q_img)
                preds = scores.argmax(dim=1)
                correct += (preds == q_lbl).sum().item()
                total += len(q_lbl)
        val_acc = correct / total * 100
        print(f"[{experiment_id}] Validation Accuracy: {val_acc:.2f}%")
        return {
            "experiment_id": experiment_id,
            "shot": shot,
            "meta_batch_size": meta_batch_size,
            "best_val_acc": val_acc,
            "status": "Evaluated"
        }

    # 학습 루프
    model.train()
    for episode_index, (support_images, support_labels, query_images, query_labels, _) in enumerate(train_loader):
        support_images, support_labels = support_images.cuda(), support_labels.cuda()
        query_images, query_labels = query_images.cuda(), query_labels.cuda()

        optimizer.zero_grad()
        scores = model(support_images, support_labels, query_images)
        loss = criterion(scores, query_labels)
        loss.backward()
        optimizer.step()

        all_loss.append(loss.item())

        # 로그 출력
        if episode_index % LOG_FREQ == 0:
            print(f"[{experiment_id}] Episode {episode_index} | Loss: {sliding_average(all_loss, LOG_FREQ):.4f}")

        # Validation 평가
        if episode_index % VAL_FREQ == 0 and episode_index > 0:
            model.eval()
            correct = 0
            total = 0
            with torch.no_grad():
                for s_img, s_lbl, q_img, q_lbl, _ in val_loader:
                    s_img, s_lbl = s_img.cuda(), s_lbl.cuda()
                    q_img, q_lbl = q_img.cuda(), q_lbl.cuda()
                    scores = model(s_img, s_lbl, q_img)
                    preds = scores.argmax(dim=1)
                    correct += (preds == q_lbl).sum().item()
                    total += len(q_lbl)
            val_acc = correct / total * 100
            val_acc_list.append(val_acc)
            episode_list.append(episode_index)
            print(f"[{experiment_id}] Validation Accuracy: {val_acc:.2f}%")

            # Early Stopping 체크
            if val_acc > best_val_acc:
                best_val_acc = val_acc
                patience_counter = 0
                torch.save(model.state_dict(), os.path.join(exp_dir, "best_model.pt"))
            else:
                patience_counter += 1
                if patience_counter >= PATIENCE:
                    print(f"[{experiment_id}] Early stopping triggered at episode {episode_index}")
                    break
            model.train()

    # 시각화 저장
    smoothed_loss = [np.mean(all_loss[max(0, i - 49):i + 1]) for i in range(len(all_loss))]
    plt.figure(figsize=(14, 5))
    plt.subplot(1, 2, 1)
    plt.plot(smoothed_loss)
    plt.title("Train Loss")
    plt.subplot(1, 2, 2)
    plt.plot(episode_list, val_acc_list, marker='o')
    plt.title("Validation Accuracy")
    plt.savefig(os.path.join(exp_dir, "training_plot.png"))
    plt.close()

    return {
        "experiment_id": experiment_id,
        "shot": shot,
        "meta_batch_size": meta_batch_size,
        "best_val_acc": best_val_acc,
        "status": "Finished"
    }


In [None]:
from pymongo import MongoClient

# MongoDB Atlas 연결
conn_str = "mongodb+srv://kaeul991020:IlopdXaLdPPAQyWX@cluster0.c74kr.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"

client = MongoClient(conn_str)
db = client["mydatabase"]
collection = db["experiment_results"]


In [None]:
import itertools
import pandas as pd

# 학습 없이 평가만 하는 실험 2개 (pretrained만 사용)
no_training_experiments = [
    {"shot": 1, "meta_batch_size": None, "do_training": False},
    {"shot": 5, "meta_batch_size": None, "do_training": False},
]

# meta-training을 수행하는 실험 6개 (학습 + 평가)
training_experiments = [
    {"shot": s, "meta_batch_size": mb, "do_training": True}
    for s in [1, 5]  # shot
    for mb in [1, 4, 8]  # meta_batch_size
]

# 전체 실험 조합 결합 (총 8개)
experiments = no_training_experiments + training_experiments

# 각 실험에 experiment_id 부여
experiment_configs = []
for i, exp in enumerate(experiments):
    exp_id = f"exp_{i+1}"
    exp["experiment_id"] = exp_id
    experiment_configs.append(exp)

# DataFrame으로 보기 쉽게 정리
experiment_df = pd.DataFrame(experiment_configs)
print("실험 구성표:")
print(experiment_df)

실험 구성표:
   shot  meta_batch_size  do_training experiment_id
0     1              NaN        False         exp_1
1     5              NaN        False         exp_2
2     1              1.0         True         exp_3
3     1              4.0         True         exp_4
4     1              8.0         True         exp_5
5     5              1.0         True         exp_6
6     5              4.0         True         exp_7
7     5              8.0         True         exp_8


In [None]:
# 결과 저장 폴더
results_dir = "/content/drive/MyDrive/experiment_results"
os.makedirs(results_dir, exist_ok=True)

# 전체 실험 실행
results_list = []

# 전체 실험 실행
for exp in experiment_configs:
    print("\n==============================")
    print(f"실험 시작: {exp['experiment_id']} (shot: {exp['shot']}, "
          f"meta_batch_size: {exp['meta_batch_size']}, do_training: {exp['do_training']})")

    result = run_single_experiment(
        experiment_id=exp["experiment_id"],
        shot=exp["shot"],
        meta_batch_size=exp["meta_batch_size"],
        do_training=exp["do_training"],
        results_dir=results_dir
    )

    # MongoDB 저장
    collection.insert_one(result)

    results_list.append(result)

    # 중간 결과 저장
    result_df = pd.DataFrame(results_list)
    result_csv_path = os.path.join(results_dir, "experiment_summary.csv")
    result_df.to_csv(result_csv_path, index=False)
    print(f"결과가 저장되었습니다: {result_csv_path}")

# 최종 결과 출력
final_results = pd.DataFrame(results_list)
print("\n최종 실험 결과:")
print(final_results)


실험 시작: exp_1 (shot: 1, meta_batch_size: None, do_training: False)
[exp_1] No training: evaluating pretrained model
[exp_1] Validation Accuracy: 52.48%
결과가 저장되었습니다: /content/drive/MyDrive/experiment_results/experiment_summary.csv

실험 시작: exp_2 (shot: 5, meta_batch_size: None, do_training: False)
[exp_2] No training: evaluating pretrained model
[exp_2] Validation Accuracy: 71.38%
결과가 저장되었습니다: /content/drive/MyDrive/experiment_results/experiment_summary.csv

실험 시작: exp_3 (shot: 1, meta_batch_size: 1, do_training: True)
[exp_3] Episode 0 | Loss: 0.9434
[exp_3] Episode 50 | Loss: 2.0671
[exp_3] Episode 100 | Loss: 1.4189
[exp_3] Episode 150 | Loss: 1.2949
[exp_3] Episode 200 | Loss: 1.2000
[exp_3] Episode 250 | Loss: 1.2143
[exp_3] Episode 300 | Loss: 1.1662
[exp_3] Episode 350 | Loss: 1.1912
[exp_3] Episode 400 | Loss: 1.1753
[exp_3] Episode 450 | Loss: 1.2500
[exp_3] Episode 500 | Loss: 1.1800
[exp_3] Validation Accuracy: 41.02%
[exp_3] Episode 550 | Loss: 1.1476
[exp_3] Episode 600 | Lo

시각화

In [None]:
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import seaborn as sns

def visualize_tsne(model, test_loader, save_path, class_names=None):
    model.eval()
    with torch.no_grad():
        support_images, support_labels, query_images, query_labels, _ = next(iter(test_loader))

        support_images = support_images.cuda()
        query_images = query_images.cuda()
        support_labels = support_labels.cuda()
        query_labels = query_labels.cuda()

        z_support = model.backbone(support_images).cpu()
        z_query = model.backbone(query_images).cpu()

        unique_classes = torch.unique(support_labels)
        z_proto = torch.stack([
            z_support[support_labels.cpu() == cls.cpu()].mean(0)
            for cls in unique_classes
        ])

        all_embeddings = torch.cat([z_support, z_query, z_proto], dim=0).numpy()
        tsne = TSNE(n_components=2, perplexity=5, random_state=42)
        embeddings_2d = tsne.fit_transform(all_embeddings)

        n_support = len(z_support)
        n_query = len(z_query)
        z_support_2d = embeddings_2d[:n_support]
        z_query_2d = embeddings_2d[n_support:n_support + n_query]
        z_proto_2d = embeddings_2d[n_support + n_query:]

        # 색상
        n_way = len(unique_classes)
        palette = sns.color_palette("Set2", n_colors=n_way)

        plt.figure(figsize=(10, 6))

        for i, cls in enumerate(unique_classes.cpu().numpy()):
            idx = (support_labels.cpu().numpy() == cls)
            sns.kdeplot(
                x=z_support_2d[idx, 0], y=z_support_2d[idx, 1],
                fill=True,
                cmap=sns.light_palette(palette[i], as_cmap=True),
                alpha=0.3, levels=10,
                label=f"Support KDE - {class_names[cls] if class_names else cls}"
            )

        for i, cls in enumerate(unique_classes.cpu().numpy()):
            idx = (query_labels.cpu().numpy() == cls)
            plt.scatter(z_query_2d[idx, 0], z_query_2d[idx, 1],
                        color=palette[i], label=f"Query - {class_names[cls] if class_names else cls}",
                        alpha=0.7)

        for i, cls in enumerate(unique_classes.cpu().numpy()):
            plt.scatter(z_proto_2d[i, 0], z_proto_2d[i, 1],
                        marker='*', s=200, edgecolors='k', linewidths=1.5,
                        color=palette[i], label=f"Prototype - {class_names[cls] if class_names else cls}")

        plt.title("t-SNE Visualization")
        plt.legend(bbox_to_anchor=(1.05, 1), loc="upper left")
        plt.tight_layout()
        plt.savefig(f"{save_path}/tsne_plot.png")
        plt.close()


In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

def visualize_confusion_matrix(model, test_loader, save_path, class_names=None):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for support_images, support_labels, query_images, query_labels, _ in test_loader:
            support_images, support_labels = support_images.cuda(), support_labels.cuda()
            query_images = query_images.cuda()

            scores = model(support_images, support_labels, query_images)
            preds = scores.argmax(dim=1).cpu().numpy()

            all_preds.extend(preds)
            all_labels.extend(query_labels.numpy())

    cm = confusion_matrix(all_labels, all_preds)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
    fig, ax = plt.subplots(figsize=(8, 6))
    disp.plot(cmap="Blues", ax=ax)
    plt.title("Confusion Matrix")
    plt.savefig(f"{save_path}/confusion_matrix.png")
    plt.close()


In [None]:
import os
from torchvision import models
from tqdm import tqdm

def run_visualizations_for_all_experiments(results_root_dir, test_loader, class_names=None):
    experiment_dirs = sorted([
        d for d in os.listdir(results_root_dir)
        if os.path.isdir(os.path.join(results_root_dir, d))
    ])

    for exp_name in tqdm(experiment_dirs, desc="Experiments"):
        exp_dir = os.path.join(results_root_dir, exp_name)
        model_path = os.path.join(exp_dir, "best_model.pt")

        if not os.path.exists(model_path):
            print(f"[{exp_name}] 모델 파일이 없습니다. 스킵합니다.")
            continue

        print(f"[{exp_name}] 모델 불러오는 중...")

        # 모델 불러오기
        backbone = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
        backbone.fc = nn.Flatten()
        model = PrototypicalNetworks(backbone).cuda()
        model.load_state_dict(torch.load(model_path))
        model.eval()

        # t-SNE 시각화
        try:
            visualize_tsne(model, test_loader, save_path=exp_dir, class_names=class_names)
            print(f"[{exp_name}] t-SNE 저장 완료")
        except Exception as e:
            print(f"[{exp_name}] t-SNE 실패: {e}")

        # Confusion Matrix 시각화
        try:
            visualize_confusion_matrix(model, test_loader, save_path=exp_dir, class_names=class_names)
            print(f"[{exp_name}] Confusion Matrix 저장 완료")
        except Exception as e:
            print(f"[{exp_name}] Confusion Matrix 실패: {e}")


In [None]:
results_dir = "/mnt/data/experiment_results"
class_names = test_dataset.classes  # ex: ['AK', 'KAPADOKYA', 'NURLU', 'SIRA']

run_visualizations_for_all_experiments(results_dir, test_loader=test_loader, class_names=class_names)

NameError: name 'test_dataset' is not defined