<a href="https://colab.research.google.com/github/Ponczeks/image-classification-comparison/blob/main/Mnist_cross_validation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install cup

Collecting cup
  Downloading cup-3.2.33-py3-none-any.whl.metadata (8.5 kB)
Collecting paramiko<4.0.0,>=3.0.0 (from cup)
  Downloading paramiko-3.5.1-py3-none-any.whl.metadata (4.6 kB)
Collecting pytz<2023.0.0,>=2022.7.1 (from cup)
  Downloading pytz-2022.7.1-py2.py3-none-any.whl.metadata (21 kB)
Collecting bcrypt>=3.2 (from paramiko<4.0.0,>=3.0.0->cup)
  Downloading bcrypt-4.3.0-cp39-abi3-manylinux_2_34_x86_64.whl.metadata (10 kB)
Collecting pynacl>=1.5 (from paramiko<4.0.0,>=3.0.0->cup)
  Downloading PyNaCl-1.5.0-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl.metadata (8.6 kB)
Downloading cup-3.2.33-py3-none-any.whl (132 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m132.7/132.7 kB[0m [31m8.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading paramiko-3.5.1-py3-none-any.whl (227 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m227.3/227.3 kB[0m [31m16.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pytz-2022.7.1-py

#Ładowanie i przygotowanie danych


In [None]:
import torch
import numpy as np
from torchvision import datasets, transforms
from sklearn.decomposition import PCA
from skimage.feature import hog
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import random
import time
import psutil

from cuml.svm import LinearSVC
from sklearn.kernel_approximation import RBFSampler
import cupy as cp
from cuml.ensemble import RandomForestClassifier

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed(SEED)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

print("Urządzenie:", device)

# Wczytanie danych MNIST
N = 60000
transform_flat = transforms.Compose([
    transforms.ToTensor(),
    transforms.Lambda(lambda x: x.view(-1))
])
dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform_flat)
X_raw = torch.stack([dataset[i][0] for i in range(N)])
y_raw = torch.tensor([dataset[i][1] for i in range(N)])
print(f"Rozmiar danych (klasyczne): {X_raw.shape}")

X_raw_clean = X_raw.clone()

# Funkcje szumienia
def add_gaussian_noise(X, std=0.2):
    return torch.clamp(X + torch.randn_like(X) * std, 0.0, 1.0)

def add_salt_and_pepper_noise(X, amount=0.1):
    X_noisy = X.clone()
    for i in range(X.shape[0]):
        idx = np.random.choice(X.shape[1], int(amount * X.shape[1]), replace=False)
        X_noisy[i][idx] = torch.tensor(np.random.choice([0., 1.], size=len(idx))).float()
    return X_noisy

def add_label_noise(y, noise_ratio=0.1, num_classes=10):
    y_noisy = y.clone()
    n_noisy = int(len(y) * noise_ratio)
    idx = np.random.choice(len(y), n_noisy, replace=False)
    for i in idx:
        y_noisy[i] = random.choice([c for c in range(num_classes) if c != y[i]])
    return y_noisy

# Ekstrakcja cech HOG
def extract_hog(X):
    hog_feats = []
    for i in range(X.shape[0]):
        img = X[i].reshape(28, 28).numpy()
        feat = hog(img, pixels_per_cell=(8,8), cells_per_block=(2,2), orientations=9)
        hog_feats.append(feat)
    return torch.tensor(np.array(hog_feats), dtype=torch.float32)

# Prekomputacja szumów
print("\n=== PREKOMPUTACJA SZUMÓW ===")
precomputed_noisy_features = {}
precomputed_noisy_labels = {}

for std in [0.1, 0.2, 0.3]:
    key = f"RAW_Gaussian_{std}"
    precomputed_noisy_features[key] = add_gaussian_noise(X_raw_clean, std)
    print(f"{key}: obliczony.")
for amt in [0.05, 0.1, 0.2]:
    key = f"RAW_SP_{amt}"
    precomputed_noisy_features[key] = add_salt_and_pepper_noise(X_raw_clean, amt)
    print(f"{key}: obliczony.")
for noise in [0.1, 0.2, 0.3]:
    key = f"RAW_Label_{int(noise*100)}"
    precomputed_noisy_labels[key] = add_label_noise(y_raw, noise)
    print(f"{key}: obliczony.")

# Prekomputacja cech: PCA i HOG
print("\n=== PREKOMPUTACJA CECH ===")
start_pca = time.perf_counter()
pca = PCA(n_components=50)
pca.fit(X_raw_clean.numpy())
X_pca_clean = torch.tensor(pca.transform(X_raw_clean.numpy()), dtype=torch.float32)
pca_time = time.perf_counter() - start_pca

start_hog = time.perf_counter()
X_hog_clean = extract_hog(X_raw_clean)
hog_time = time.perf_counter() - start_hog
print(f"PCA: czas = {pca_time:.3f}s")
print(f"HOG: czas = {hog_time:.3f}s")

# Prekomputacja cech PCA i HOG dla zaszumionych danych testowych
precomputed_noisy_pca = {}
precomputed_noisy_hog = {}

print("\n=== PREKOMPUTACJA CECH (dla zaszumionych danych testowych) ===")
for key, X_noisy in precomputed_noisy_features.items():
    print(f"{key} - PCA i HOG...")
    precomputed_noisy_pca[key] = torch.tensor(pca.transform(X_noisy.numpy()), dtype=torch.float32)
    precomputed_noisy_hog[key] = extract_hog(X_noisy)
print("Zakończono prekomputację cech zaszumionych.")


Urządzenie: cuda
Rozmiar danych (klasyczne): torch.Size([60000, 784])

=== PREKOMPUTACJA SZUMÓW ===
RAW_Gaussian_0.1: obliczony.
RAW_Gaussian_0.2: obliczony.
RAW_Gaussian_0.3: obliczony.
RAW_SP_0.05: obliczony.
RAW_SP_0.1: obliczony.
RAW_SP_0.2: obliczony.
RAW_Label_10: obliczony.
RAW_Label_20: obliczony.
RAW_Label_30: obliczony.

=== PREKOMPUTACJA CECH ===
PCA: czas = 0.300s
HOG: czas = 12.682s

=== PREKOMPUTACJA CECH (dla zaszumionych danych testowych) ===
RAW_Gaussian_0.1 - PCA i HOG...
RAW_Gaussian_0.2 - PCA i HOG...
RAW_Gaussian_0.3 - PCA i HOG...
RAW_SP_0.05 - PCA i HOG...
RAW_SP_0.1 - PCA i HOG...
RAW_SP_0.2 - PCA i HOG...
Zakończono prekomputację cech zaszumionych.


#KNN

In [None]:
def knn_classify(x_train, y_train, x_test, k):
    dists = torch.cdist(x_test, x_train, p=2)
    _, idx = torch.topk(dists, k, largest=False)
    neighbors = y_train[idx]
    return torch.mode(neighbors, dim=1).values

def crossval_metrics_knn(modality, noise_key=None, label_noise_key=None, k=3):
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    metrics = {m: [] for m in ["accuracy", "precision_macro", "recall_macro", "f1_macro"]}
    total_train_time = 0.0
    total_pred_time = 0.0
    X_np = X_raw_clean.cpu().numpy()
    y_np = y_raw.cpu().numpy()

    for train_idx, test_idx in skf.split(X_np, y_np):
        if modality == "RAW":
            x_train = X_raw_clean[train_idx]
        elif modality == "PCA":
            x_train = X_pca_clean[train_idx]
        elif modality == "HOG":
            x_train = X_hog_clean[train_idx]
        else:
            raise ValueError("Nieznany typ ekstrakcji cech")

        y_train = y_raw[train_idx]

        if modality == "RAW":
            x_test = precomputed_noisy_features[noise_key][test_idx] if noise_key else X_raw_clean[test_idx]
        elif modality == "PCA":
            x_test = precomputed_noisy_pca[noise_key][test_idx] if noise_key else X_pca_clean[test_idx]
        elif modality == "HOG":
            x_test = precomputed_noisy_hog[noise_key][test_idx] if noise_key else X_hog_clean[test_idx]

        y_test = precomputed_noisy_labels[label_noise_key][test_idx] if label_noise_key else y_raw[test_idx]

        t1 = time.perf_counter()
        t2 = time.perf_counter()
        total_train_time += (t2 - t1)

        t3 = time.perf_counter()
        y_pred = knn_classify(x_train.to(device), y_train.to(device), x_test.to(device), k)
        t4 = time.perf_counter()
        total_pred_time += (t4 - t3)

        yt = y_test.cpu().numpy()
        yp = y_pred.cpu().numpy()
        metrics["accuracy"].append(accuracy_score(yt, yp))
        metrics["precision_macro"].append(precision_score(yt, yp, average="macro", zero_division=0))
        metrics["recall_macro"].append(recall_score(yt, yp, average="macro", zero_division=0))
        metrics["f1_macro"].append(f1_score(yt, yp, average="macro", zero_division=0))

    timing = {
        "train_time": total_train_time,
        "pred_time": total_pred_time,
        "total_time": total_train_time + total_pred_time
    }
    return metrics, timing

def summarize_metrics(metrics):
    return {m: f"{np.mean(v):.4f} ± {np.std(v):.4f}" for m, v in metrics.items()}

def run_gridsearch_knn(modality, noise_key, label_noise_key, name):
    print(f"\n{name}")
    best_f1 = -1
    best_k = None
    best_summary = None
    best_timing = None

    for k in [1, 3, 5, 7]:
        print(f"  k={k}")
        metrics, timing = crossval_metrics_knn(modality, noise_key, label_noise_key, k)
        summary = summarize_metrics(metrics)
        f1_val = float(summary["f1_macro"].split(" ±")[0])
        print(f"    F1_macro: {summary['f1_macro']} | Recall_macro: {summary['recall_macro']} | Precision_macro: {summary['precision_macro']} | Accuracy: {summary['accuracy']} | Train: {timing['train_time']:.3f}s | Pred: {timing['pred_time']:.3f}s")

        if f1_val > best_f1:
            best_f1 = f1_val
            best_k = k
            best_summary = summary
            best_timing = timing

    if modality == "RAW":
        precomp_time = 0.0
    elif modality == "PCA":
        precomp_time = pca_time
    elif modality == "HOG":
        precomp_time = hog_time
    else:
        precomp_time = 0.0

    print(f"Najlepsze k={best_k}")
    return {
        "name": name,
        "params": f"k={best_k}",
        "metrics": best_summary,
        "timing": best_timing,
        "precomp_time": precomp_time
    }

knn_experiments = []
knn_experiments.append(("RAW", "RAW", None, None))
for std in [0.1, 0.2, 0.3]:
    knn_experiments.append((f"RAW + Gaussian (std={std})", "RAW", f"RAW_Gaussian_{std}", None))
for amt in [0.05, 0.1, 0.2]:
    knn_experiments.append((f"RAW + S&P (amt={amt})", "RAW", f"RAW_SP_{amt}", None))
for noise in [0.1, 0.2, 0.3]:
    knn_experiments.append((f"RAW + LabelNoise ({int(noise*100)}%)", "RAW", None, f"RAW_Label_{int(noise*100)}"))

knn_experiments.append(("PCA", "PCA", None, None))
for std in [0.1, 0.2, 0.3]:
    knn_experiments.append((f"PCA + Gaussian (std={std})", "PCA", f"RAW_Gaussian_{std}", None))
for amt in [0.05, 0.1, 0.2]:
    knn_experiments.append((f"PCA + S&P (amt={amt})", "PCA", f"RAW_SP_{amt}", None))
for noise in [0.1, 0.2, 0.3]:
    knn_experiments.append((f"PCA + LabelNoise ({int(noise*100)}%)", "PCA", None, f"RAW_Label_{int(noise*100)}"))

knn_experiments.append(("HOG", "HOG", None, None))
for std in [0.1, 0.2, 0.3]:
    knn_experiments.append((f"HOG + Gaussian (std={std})", "HOG", f"RAW_Gaussian_{std}", None))
for amt in [0.05, 0.1, 0.2]:
    knn_experiments.append((f"HOG + S&P (amt={amt})", "HOG", f"RAW_SP_{amt}", None))
for noise in [0.1, 0.2, 0.3]:
    knn_experiments.append((f"HOG + LabelNoise ({int(noise*100)}%)", "HOG", None, f"RAW_Label_{int(noise*100)}"))

knn_results = [run_gridsearch_knn(mod, noise_key, label_noise_key, name)
               for name, mod, noise_key, label_noise_key in knn_experiments]

print("\nTabela wyników kNN:")
print(f"{'Metoda':<35} | {'Parametry':<10} | {'Accuracy':<20} | {'Precision_macro':<20} | {'Recall_macro':<20} | {'F1_macro':<20} | {'Train [s]':<10} | {'Pred [s]':<10}")
print("-" * 160)
for r in knn_results:
    print(f"{r['name']:<35} | {r['params']:<10} | {r['metrics']['accuracy']:<20} | {r['metrics']['precision_macro']:<20} | "
          f"{r['metrics']['recall_macro']:<20} | {r['metrics']['f1_macro']:<20} | {r['timing']['train_time']:<10.3f} | {r['timing']['pred_time']:<10.3f}")



RAW
  k=1
    F1_macro: 0.9717 ± 0.0003 | Recall_macro: 0.9715 ± 0.0003 | Precision_macro: 0.9721 ± 0.0003 | Accuracy: 0.9719 ± 0.0002 | Train: 0.000s | Pred: 0.207s
  k=3
    F1_macro: 0.9717 ± 0.0014 | Recall_macro: 0.9714 ± 0.0014 | Precision_macro: 0.9723 ± 0.0014 | Accuracy: 0.9718 ± 0.0014 | Train: 0.000s | Pred: 0.206s
  k=5
    F1_macro: 0.9710 ± 0.0014 | Recall_macro: 0.9707 ± 0.0014 | Precision_macro: 0.9716 ± 0.0014 | Accuracy: 0.9711 ± 0.0014 | Train: 0.000s | Pred: 0.207s
  k=7
    F1_macro: 0.9692 ± 0.0008 | Recall_macro: 0.9689 ± 0.0008 | Precision_macro: 0.9699 ± 0.0008 | Accuracy: 0.9694 ± 0.0008 | Train: 0.000s | Pred: 0.203s
Najlepsze k=1

RAW + Gaussian (std=0.1)
  k=1
    F1_macro: 0.9719 ± 0.0008 | Recall_macro: 0.9717 ± 0.0008 | Precision_macro: 0.9722 ± 0.0009 | Accuracy: 0.9721 ± 0.0008 | Train: 0.000s | Pred: 0.208s
  k=3
    F1_macro: 0.9718 ± 0.0013 | Recall_macro: 0.9715 ± 0.0013 | Precision_macro: 0.9723 ± 0.0013 | Accuracy: 0.9719 ± 0.0013 | Train: 0.000

#SVM

In [None]:

def crossval_metrics_svm(modality, noise_key=None, label_noise_key=None, svm_params={"C":1, "gamma":0.01}):
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    acc_list, prec_list, rec_list, f1_list = [], [], [], []
    train_times, pred_times = [], []
    X_np = X_raw_clean.cpu().numpy()
    y_np = y_raw.cpu().numpy()

    for train_idx, test_idx in skf.split(X_np, y_np):
        if modality == "RAW":
            X_train = X_raw_clean[train_idx]
        elif modality == "PCA":
            X_train = X_pca_clean[train_idx]
        elif modality == "HOG":
            X_train = X_hog_clean[train_idx]
        else:
            raise ValueError("Nieznany typ ekstrakcji cech")

        y_train = y_raw[train_idx]
        raw_test = X_raw_clean[test_idx]

        raw_test_noisy = precomputed_noisy_features[noise_key][test_idx] if noise_key else raw_test
        if modality == "RAW":
            X_test = raw_test_noisy
        elif modality == "PCA":
            X_test = precomputed_noisy_pca[noise_key][test_idx] if noise_key else X_pca_clean[test_idx]
        elif modality == "HOG":
            X_test = precomputed_noisy_hog[noise_key][test_idx] if noise_key else X_hog_clean[test_idx]

        y_test = precomputed_noisy_labels[label_noise_key][test_idx] if label_noise_key else y_raw[test_idx]

        X_train_np = X_train.cpu().numpy()
        X_test_np = X_test.cpu().numpy()
        sampler = RBFSampler(gamma=svm_params["gamma"], n_components=2000, random_state=42)
        X_train_trans = sampler.fit_transform(X_train_np)
        X_test_trans = sampler.transform(X_test_np)

        X_train_cp = cp.asarray(X_train_trans)
        X_test_cp = cp.asarray(X_test_trans)
        y_train_cp = cp.asarray(y_train.cpu().numpy())

        t0 = time.perf_counter()
        model_svm = LinearSVC(C=svm_params["C"])
        model_svm.fit(X_train_cp, y_train_cp)
        t1 = time.perf_counter()
        train_times.append(t1 - t0)

        t2 = time.perf_counter()
        y_pred_cp = model_svm.predict(X_test_cp)
        t3 = time.perf_counter()
        pred_times.append(t3 - t2)

        y_pred = cp.asnumpy(y_pred_cp)
        acc_list.append(accuracy_score(y_test.cpu().numpy(), y_pred))
        prec_list.append(precision_score(y_test.cpu().numpy(), y_pred, average="macro", zero_division=0))
        rec_list.append(recall_score(y_test.cpu().numpy(), y_pred, average="macro", zero_division=0))
        f1_list.append(f1_score(y_test.cpu().numpy(), y_pred, average="macro", zero_division=0))

    timing = {
        "train_time": np.mean(train_times),
        "pred_time": np.mean(pred_times),
        "total_time": np.mean(train_times) + np.mean(pred_times)
    }
    metrics = {
        "accuracy": (np.mean(acc_list), np.std(acc_list)),
        "precision_macro": (np.mean(prec_list), np.std(prec_list)),
        "recall_macro": (np.mean(rec_list), np.std(rec_list)),
        "f1_macro": (np.mean(f1_list), np.std(f1_list))
    }
    return metrics, timing


def run_gridsearch_svm(modality, noise_key, label_noise_key, name, param_grid):
    print(f"\n{name}")
    best_f1 = -1
    best_params = None
    best_metrics = None
    best_timing = None

    for params in param_grid:
        print(f"  Params: {params}")
        metrics, timing = crossval_metrics_svm(modality, noise_key, label_noise_key, svm_params=params)
        summary = {k: f"{v[0]:.4f} ± {v[1]:.4f}" for k, v in metrics.items()}
        f1_val = float(summary["f1_macro"].split(" ±")[0])
        print(f"    Accuracy: {summary['accuracy']} | Precision: {summary['precision_macro']} | Recall: {summary['recall_macro']} | F1_macro: {summary['f1_macro']} | Train: {timing['train_time']:.3f}s | Pred: {timing['pred_time']:.3f}s")

        if f1_val > best_f1:
            best_f1 = f1_val
            best_params = params
            best_metrics = summary
            best_timing = timing

    precomp_time = {
        "RAW": 0.0,
        "PCA": pca_time,
        "HOG": hog_time
    }.get(modality, 0.0)

    print(f"Najlepsze parametry: {best_params}")
    return {
        "name": name,
        "params": best_params,
        "metrics": best_metrics,
        "timing": best_timing,
        "precomp_time": precomp_time
    }

# Definicja eksperymentów
svm_experiments = []
svm_experiments.append(("RAW", "RAW", None, None))
for std in [0.1, 0.2, 0.3]:
    svm_experiments.append((f"RAW + Gaussian (std={std})", "RAW", f"RAW_Gaussian_{std}", None))
for amt in [0.05, 0.1, 0.2]:
    svm_experiments.append((f"RAW + S&P (amt={amt})", "RAW", f"RAW_SP_{amt}", None))
for noise in [0.1, 0.2, 0.3]:
    svm_experiments.append((f"RAW + LabelNoise ({int(noise*100)}%)", "RAW", None, f"RAW_Label_{int(noise*100)}"))

svm_experiments.append(("PCA", "PCA", None, None))
for std in [0.1, 0.2, 0.3]:
    svm_experiments.append((f"PCA + Gaussian (std={std})", "PCA", f"RAW_Gaussian_{std}", None))
for amt in [0.05, 0.1, 0.2]:
    svm_experiments.append((f"PCA + S&P (amt={amt})", "PCA", f"RAW_SP_{amt}", None))
for noise in [0.1, 0.2, 0.3]:
    svm_experiments.append((f"PCA + LabelNoise ({int(noise*100)}%)", "PCA", None, f"RAW_Label_{int(noise*100)}"))

svm_experiments.append(("HOG", "HOG", None, None))
for std in [0.1, 0.2, 0.3]:
    svm_experiments.append((f"HOG + Gaussian (std={std})", "HOG", f"RAW_Gaussian_{std}", None))
for amt in [0.05, 0.1, 0.2]:
    svm_experiments.append((f"HOG + S&P (amt={amt})", "HOG", f"RAW_SP_{amt}", None))
for noise in [0.1, 0.2, 0.3]:
    svm_experiments.append((f"HOG + LabelNoise ({int(noise*100)}%)", "HOG", None, f"RAW_Label_{int(noise*100)}"))

# grid
svm_param_grid = [
    {"C": 1, "gamma": 0.01},
    {"C": 1, "gamma": 0.001},
    {"C": 1, "gamma": 0.1}
]

svm_results = [run_gridsearch_svm(mod, noise_key, label_noise_key, name, svm_param_grid)
               for name, mod, noise_key, label_noise_key in svm_experiments]


print("\nTabela wyników SVM:")
print(f"{'Metoda':<35} | {'Parametry':<25} | {'Accuracy':<20} | {'Precision_macro':<20} | {'Recall_macro':<20} | {'F1_macro':<20} | {'Train [s]':<10} | {'Pred [s]':<10}")
print("-" * 160)
for r in svm_results:
    print(f"{r['name']:<35} | {str(r['params']):<25} | {r['metrics']['accuracy']:<20} | {r['metrics']['precision_macro']:<20} | "
          f"{r['metrics']['recall_macro']:<20} | {r['metrics']['f1_macro']:<20} | {r['timing']['train_time']:<10.3f} | {r['timing']['pred_time']:<10.3f}")



RAW
  Params: {'C': 1, 'gamma': 0.01}
    Accuracy: 0.9335 ± 0.0022 | Precision: 0.9329 ± 0.0024 | Recall: 0.9329 ± 0.0022 | F1_macro: 0.9328 ± 0.0023 | Train: 1.131s | Pred: 0.005s
  Params: {'C': 1, 'gamma': 0.001}
    Accuracy: 0.8606 ± 0.0048 | Precision: 0.8591 ± 0.0050 | Recall: 0.8584 ± 0.0048 | F1_macro: 0.8581 ± 0.0049 | Train: 0.792s | Pred: 0.009s
  Params: {'C': 1, 'gamma': 0.1}
    Accuracy: 0.5595 ± 0.0046 | Precision: 0.5242 ± 0.0038 | Recall: 0.5525 ± 0.0046 | F1_macro: 0.5246 ± 0.0041 | Train: 0.337s | Pred: 0.005s
Najlepsze parametry: {'C': 1, 'gamma': 0.01}

RAW + Gaussian (std=0.1)
  Params: {'C': 1, 'gamma': 0.01}
    Accuracy: 0.9293 ± 0.0026 | Precision: 0.9288 ± 0.0026 | Recall: 0.9289 ± 0.0026 | F1_macro: 0.9287 ± 0.0026 | Train: 0.742s | Pred: 0.005s
  Params: {'C': 1, 'gamma': 0.001}
    Accuracy: 0.8591 ± 0.0060 | Precision: 0.8569 ± 0.0062 | Recall: 0.8571 ± 0.0060 | F1_macro: 0.8567 ± 0.0061 | Train: 0.796s | Pred: 0.005s
  Params: {'C': 1, 'gamma': 0.1}


#Random Forest

In [None]:
def crossval_metrics_rf(modality, noise_key=None, label_noise_key=None, rf_params={"n_estimators": 100, "max_depth": 20}):
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    acc_list, prec_list, rec_list, f1_list = [], [], [], []
    train_times, pred_times = [], []
    X_np = X_raw_clean.cpu().numpy()
    y_np = y_raw.cpu().numpy()

    for train_idx, test_idx in skf.split(X_np, y_np):
        if modality == "RAW":
            x_train = X_raw_clean[train_idx]
        elif modality == "PCA":
            x_train = X_pca_clean[train_idx]
        elif modality == "HOG":
            x_train = X_hog_clean[train_idx]
        else:
            raise ValueError("Nieznany typ ekstrakcji cech")

        y_train = y_raw[train_idx]
        raw_test = X_raw_clean[test_idx]
        raw_test_noisy = precomputed_noisy_features[noise_key][test_idx] if noise_key else raw_test

        if modality == "RAW":
            x_test = raw_test_noisy
        elif modality == "PCA":
            x_test_np = pca.transform(raw_test_noisy.cpu().numpy())
            x_test = torch.tensor(x_test_np, dtype=torch.float32)
        elif modality == "HOG":
            x_test = extract_hog(raw_test_noisy)

        y_test = precomputed_noisy_labels[label_noise_key][test_idx] if label_noise_key else y_raw[test_idx]

        X_train_cp = cp.asarray(x_train.cpu().numpy())
        X_test_cp = cp.asarray(x_test.cpu().numpy())
        y_train_cp = cp.asarray(y_train.cpu().numpy())

        t0 = time.perf_counter()
        model_rf = RandomForestClassifier(**rf_params)
        model_rf.fit(X_train_cp, y_train_cp)
        t1 = time.perf_counter()
        train_times.append(t1 - t0)

        t2 = time.perf_counter()
        y_pred_cp = model_rf.predict(X_test_cp)
        t3 = time.perf_counter()
        pred_times.append(t3 - t2)

        y_pred = cp.asnumpy(y_pred_cp)
        y_true = y_test.cpu().numpy()

        acc_list.append(accuracy_score(y_true, y_pred))
        prec_list.append(precision_score(y_true, y_pred, average="macro", zero_division=0))
        rec_list.append(recall_score(y_true, y_pred, average="macro", zero_division=0))
        f1_list.append(f1_score(y_true, y_pred, average="macro", zero_division=0))

    timing = {
        "train_time": np.mean(train_times),
        "pred_time": np.mean(pred_times),
        "total_time": np.mean(train_times) + np.mean(pred_times)
    }
    metrics = {
        "accuracy": (np.mean(acc_list), np.std(acc_list)),
        "precision_macro": (np.mean(prec_list), np.std(prec_list)),
        "recall_macro": (np.mean(rec_list), np.std(rec_list)),
        "f1_macro": (np.mean(f1_list), np.std(f1_list))
    }
    return metrics, timing


def run_gridsearch_rf(modality, noise_key, label_noise_key, name, param_grid):
    print(f"\n{name}")
    best_f1 = -1
    best_params = None
    best_metrics = None
    best_timing = None

    for params in param_grid:
        print(f"  Params: {params}")
        metrics, timing = crossval_metrics_rf(modality, noise_key, label_noise_key, rf_params=params)
        summary = {k: f"{v[0]:.4f} ± {v[1]:.4f}" for k, v in metrics.items()}
        f1_val = float(summary["f1_macro"].split(" ±")[0])
        print(f"    Accuracy: {summary['accuracy']} | Precision: {summary['precision_macro']} | Recall: {summary['recall_macro']} | F1_macro: {summary['f1_macro']} | Train: {timing['train_time']:.3f}s | Pred: {timing['pred_time']:.3f}s")

        if f1_val > best_f1:
            best_f1 = f1_val
            best_params = params
            best_metrics = summary
            best_timing = timing

    precomp_time = {
        "RAW": 0.0,
        "PCA": pca_time,
        "HOG": hog_time
    }.get(modality, 0.0)

    print(f"Najlepsze parametry: {best_params}")
    return {
        "name": name,
        "params": best_params,
        "metrics": best_metrics,
        "timing": best_timing,
        "precomp_time": precomp_time
    }


# === Definicja eksperymentów ===
rf_experiments = []
rf_experiments.append(("RAW", "RAW", None, None))
for std in [0.1, 0.2, 0.3]:
    rf_experiments.append((f"RAW + Gaussian (std={std})", "RAW", f"RAW_Gaussian_{std}", None))
for amt in [0.05, 0.1, 0.2]:
    rf_experiments.append((f"RAW + S&P (amt={amt})", "RAW", f"RAW_SP_{amt}", None))
for noise in [0.1, 0.2, 0.3]:
    rf_experiments.append((f"RAW + LabelNoise ({int(noise*100)}%)", "RAW", None, f"RAW_Label_{int(noise*100)}"))

rf_experiments.append(("PCA", "PCA", None, None))
for std in [0.1, 0.2, 0.3]:
    rf_experiments.append((f"PCA + Gaussian (std={std})", "PCA", f"RAW_Gaussian_{std}", None))
for amt in [0.05, 0.1, 0.2]:
    rf_experiments.append((f"PCA + S&P (amt={amt})", "PCA", f"RAW_SP_{amt}", None))
for noise in [0.1, 0.2, 0.3]:
    rf_experiments.append((f"PCA + LabelNoise ({int(noise*100)}%)", "PCA", None, f"RAW_Label_{int(noise*100)}"))

rf_experiments.append(("HOG", "HOG", None, None))
for std in [0.1, 0.2, 0.3]:
    rf_experiments.append((f"HOG + Gaussian (std={std})", "HOG", f"RAW_Gaussian_{std}", None))
for amt in [0.05, 0.1, 0.2]:
    rf_experiments.append((f"HOG + S&P (amt={amt})", "HOG", f"RAW_SP_{amt}", None))
for noise in [0.1, 0.2, 0.3]:
    rf_experiments.append((f"HOG + LabelNoise ({int(noise*100)}%)", "HOG", None, f"RAW_Label_{int(noise*100)}"))

# siatka hiperparametrów
rf_param_grid = [
    {"n_estimators": 100, "max_depth": d, "max_features": f}
    for d in [20, 30]
    for f in ["sqrt", "log2"]
]

# Uruchom eksperymenty
rf_results = [run_gridsearch_rf(mod, noise_key, label_noise_key, name, rf_param_grid)
              for name, mod, noise_key, label_noise_key in rf_experiments]

# Tabela wyników
print("\nTabela wyników Random Forest:")
print(f"{'Metoda':<35} | {'Parametry':<40} | {'Accuracy':<20} | {'Precision_macro':<20} | {'Recall_macro':<20} | {'F1_macro':<20} | {'Train [s]':<10} | {'Pred [s]':<10}")
print("-" * 160)
for r in rf_results:
    print(f"{r['name']:<35} | {str(r['params']):<40} | {r['metrics']['accuracy']:<20} | {r['metrics']['precision_macro']:<20} | "
          f"{r['metrics']['recall_macro']:<20} | {r['metrics']['f1_macro']:<20} | {r['timing']['train_time']:<10.3f} | {r['timing']['pred_time']:<10.3f}")



RAW
  Params: {'n_estimators': 100, 'max_depth': 20, 'max_features': 'sqrt'}
    Accuracy: 0.9652 ± 0.0021 | Precision: 0.9649 ± 0.0022 | Recall: 0.9649 ± 0.0021 | F1_macro: 0.9649 ± 0.0022 | Train: 1.141s | Pred: 0.101s
  Params: {'n_estimators': 100, 'max_depth': 20, 'max_features': 'log2'}
    Accuracy: 0.9633 ± 0.0014 | Precision: 0.9630 ± 0.0015 | Recall: 0.9629 ± 0.0014 | F1_macro: 0.9629 ± 0.0015 | Train: 0.626s | Pred: 0.080s
  Params: {'n_estimators': 100, 'max_depth': 30, 'max_features': 'sqrt'}
    Accuracy: 0.9658 ± 0.0019 | Precision: 0.9655 ± 0.0020 | Recall: 0.9655 ± 0.0019 | F1_macro: 0.9655 ± 0.0019 | Train: 0.922s | Pred: 0.061s
  Params: {'n_estimators': 100, 'max_depth': 30, 'max_features': 'log2'}
    Accuracy: 0.9641 ± 0.0015 | Precision: 0.9639 ± 0.0015 | Recall: 0.9638 ± 0.0015 | F1_macro: 0.9638 ± 0.0015 | Train: 0.680s | Pred: 0.085s
Najlepsze parametry: {'n_estimators': 100, 'max_depth': 30, 'max_features': 'sqrt'}

RAW + Gaussian (std=0.1)
  Params: {'n_est

#CNN

In [None]:
# CNN z 5-krotną walidacją krzyżową na całym MNIST
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from torchvision import datasets, transforms
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import numpy as np
import random
import time

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed(SEED)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

print("Urządzenie:", device)

N = 60000
dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transforms.ToTensor())
X_raw = torch.stack([dataset[i][0] for i in range(N)])
y_raw = torch.tensor([dataset[i][1] for i in range(N)])

# Funkcje szumienia
def add_gaussian_noise_image(x, std=0.2):
    noise = torch.randn_like(x) * std
    return torch.clamp(x + noise, 0.0, 1.0)

def add_salt_and_pepper_noise_image(x, amount=0.1):
    x_noisy = x.clone()
    num_elements = x.numel()
    num_noisy = int(amount * num_elements)
    noisy_indices = random.sample(range(num_elements), num_noisy)
    flat = x_noisy.view(-1)
    for idx in noisy_indices:
        flat[idx] = random.choice([0.0, 1.0])
    return x_noisy

def add_label_noise(y, noise_ratio=0.1, num_classes=10):
    y_noisy = y.clone()
    n_noisy = int(len(y) * noise_ratio)
    idx = np.random.choice(len(y), n_noisy, replace=False)
    for i in idx:
        y_noisy[i] = random.choice([c for c in range(num_classes) if c != y[i]])
    return y_noisy

def normalize_tensor(x):
    mean = 0.1307
    std = 0.3081
    return (x - mean) / std

print("\n=== PREKOMPUTACJA SZUMÓW ===")
precomputed_noisy_features = {}
precomputed_noisy_labels = {}

for std in [0.1, 0.2, 0.3]:
    key = f"RAW_Gaussian_{std}"
    precomputed_noisy_features[key] = add_gaussian_noise_image(X_raw, std)
    print(f"{key}: obliczony.")
for amt in [0.05, 0.1, 0.2]:
    key = f"RAW_SP_{amt}"
    precomputed_noisy_features[key] = add_salt_and_pepper_noise_image(X_raw, amt)
    print(f"{key}: obliczony.")
for noise in [0.1, 0.2, 0.3]:
    key = f"RAW_Label_{int(noise*100)}"
    precomputed_noisy_labels[key] = add_label_noise(y_raw, noise)
    print(f"{key}: obliczony.")

class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(64 * 7 * 7, 128)
        self.fc2 = nn.Linear(128, 10)
        self.dropout = nn.Dropout(0.25)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 64 * 7 * 7)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x


def compute_metrics(y_true, y_pred):
    acc = accuracy_score(y_true, y_pred)
    prec = precision_score(y_true, y_pred, average="macro", zero_division=0)
    rec = recall_score(y_true, y_pred, average="macro", zero_division=0)
    f1 = f1_score(y_true, y_pred, average="macro", zero_division=0)
    return acc, prec, rec, f1

def evaluate_model(model, loader):
    model.eval()
    preds, targets = [], []
    pred_start = time.perf_counter()
    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)
            images = normalize_tensor(images)
            outputs = model(images)
            _, pred = torch.max(outputs, 1)
            preds.extend(pred.cpu().numpy())
            targets.extend(labels.cpu().numpy())
    pred_end = time.perf_counter()
    return compute_metrics(targets, preds), pred_end - pred_start

# Walidacja krzyzowa

def crossval_metrics_cnn(noise_key=None, label_noise_key=None, batch_size=64, epochs=15):
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    acc_list, prec_list, rec_list, f1_list, train_times, pred_times = [], [], [], [], [], []

    for fold, (train_idx, test_idx) in enumerate(skf.split(X_raw, y_raw)):
        print(f"\nFold {fold + 1}/5")
        X_train = X_raw[train_idx]
        y_train = y_raw[train_idx]

        if noise_key:
            X_test = precomputed_noisy_features[noise_key][test_idx]
        else:
            X_test = X_raw[test_idx]
        if label_noise_key:
            y_test = precomputed_noisy_labels[label_noise_key][test_idx]
        else:
            y_test = y_raw[test_idx]

        train_dataset = TensorDataset(X_train, y_train)
        test_dataset = TensorDataset(X_test, y_test)

        train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=2, pin_memory=True)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

        model = SimpleCNN().to(device)
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        criterion = nn.CrossEntropyLoss()

        best_f1 = 0
        best_model = None
        wait = 0
        patience = 3

        t_start = time.perf_counter()
        for epoch in range(epochs):
            model.train()
            for images, labels in train_loader:
                images, labels = images.to(device), labels.to(device)
                images = normalize_tensor(images)
                optimizer.zero_grad()
                outputs = model(images)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()


            model.eval()
            preds, targs = [], []
            with torch.no_grad():
                for images, labels in test_loader:
                    images, labels = images.to(device), labels.to(device)
                    images = normalize_tensor(images)
                    outputs = model(images)
                    _, pred = torch.max(outputs, 1)
                    preds.extend(pred.cpu().numpy())
                    targs.extend(labels.cpu().numpy())
            _, _, _, f1 = compute_metrics(targs, preds)

            if f1 > best_f1:
                best_f1 = f1
                best_model = model.state_dict()
                wait = 0
            else:
                wait += 1
                if wait >= patience:
                    print("Early stop")
                    break

        t_end = time.perf_counter()
        train_times.append(t_end - t_start)
        model.load_state_dict(best_model)
        (acc, prec, rec, f1), pred_time = evaluate_model(model, test_loader)

        acc_list.append(acc)
        prec_list.append(prec)
        rec_list.append(rec)
        f1_list.append(f1)
        pred_times.append(pred_time)

        print(f"Fold {fold + 1} - Accuracy: {acc:.4f}, F1_macro: {f1:.4f}")

    metrics = {
        "accuracy": (np.mean(acc_list), np.std(acc_list)),
        "precision_macro": (np.mean(prec_list), np.std(prec_list)),
        "recall_macro": (np.mean(rec_list), np.std(rec_list)),
        "f1_macro": (np.mean(f1_list), np.std(f1_list))
    }
    timing = {
        "train_time": np.mean(train_times),
        "pred_time": np.mean(pred_times),
        "total_time": np.mean(train_times) + np.mean(pred_times)
    }
    return metrics, timing

# Uruchomienie eksperymentów
cnn_experiments = [
    ("RAW", None, None),
    ("RAW + Gaussian (std=0.1)", "RAW_Gaussian_0.1", None),
    ("RAW + Gaussian (std=0.2)", "RAW_Gaussian_0.2", None),
    ("RAW + Gaussian (std=0.3)", "RAW_Gaussian_0.3", None),
    ("RAW + S&P (amt=0.05)", "RAW_SP_0.05", None),
    ("RAW + S&P (amt=0.1)", "RAW_SP_0.1", None),
    ("RAW + S&P (amt=0.2)", "RAW_SP_0.2", None),
    ("RAW + LabelNoise (10%)", None, "RAW_Label_10"),
    ("RAW + LabelNoise (20%)", None, "RAW_Label_20"),
    ("RAW + LabelNoise (30%)", None, "RAW_Label_30"),
]

cnn_results = []
for name, noise_key, label_noise_key in cnn_experiments:
    print(f"\n{name}")
    metrics, timing = crossval_metrics_cnn(noise_key, label_noise_key)
    summary = {k: f"{v[0]:.4f} ± {v[1]:.4f}" for k, v in metrics.items()}
    print(f"Accuracy: {summary['accuracy']} | Precision: {summary['precision_macro']} | Recall: {summary['recall_macro']} | F1_macro: {summary['f1_macro']} | Train: {timing['train_time']:.2f}s | Pred: {timing['pred_time']:.2f}s")
    cnn_results.append({
        "name": name,
        "params": "-",
        "metrics": summary,
        "timing": timing,
        "precomp_time": 0.0
    })


print("\nTabela wyników CNN:")
print(f"{'Metoda':<35} | {'Parametry':<10} | {'Accuracy':<20} | {'Precision_macro':<20} | {'Recall_macro':<20} | {'F1_macro':<20} | {'Train [s]':<10} | {'Pred [s]':<10}")
print("-" * 160)
for r in cnn_results:
    print(f"{r['name']:<35} | {r['params']:<10} | {r['metrics']['accuracy']:<20} | {r['metrics']['precision_macro']:<20} | {r['metrics']['recall_macro']:<20} | {r['metrics']['f1_macro']:<20} | {r['timing']['train_time']:<10.2f} | {r['timing']['pred_time']:<10.2f}")


Urządzenie: cuda

=== PREKOMPUTACJA SZUMÓW ===
RAW_Gaussian_0.1: obliczony.
RAW_Gaussian_0.2: obliczony.
RAW_Gaussian_0.3: obliczony.
RAW_SP_0.05: obliczony.
RAW_SP_0.1: obliczony.
RAW_SP_0.2: obliczony.
RAW_Label_10: obliczony.
RAW_Label_20: obliczony.
RAW_Label_30: obliczony.

RAW

Fold 1/5
Early stop
Fold 1 - Accuracy: 0.9905, F1_macro: 0.9905

Fold 2/5
Early stop
Fold 2 - Accuracy: 0.9911, F1_macro: 0.9910

Fold 3/5
Early stop
Fold 3 - Accuracy: 0.9885, F1_macro: 0.9884

Fold 4/5
Early stop
Fold 4 - Accuracy: 0.9902, F1_macro: 0.9902

Fold 5/5
Early stop
Fold 5 - Accuracy: 0.9901, F1_macro: 0.9901
Accuracy: 0.9901 ± 0.0009 | Precision: 0.9901 ± 0.0008 | Recall: 0.9900 ± 0.0009 | F1_macro: 0.9900 ± 0.0009 | Train: 32.32s | Pred: 0.25s

RAW + Gaussian (std=0.1)

Fold 1/5
Early stop
Fold 1 - Accuracy: 0.9866, F1_macro: 0.9865

Fold 2/5
Early stop
Fold 2 - Accuracy: 0.9895, F1_macro: 0.9894

Fold 3/5
Early stop
Fold 3 - Accuracy: 0.9883, F1_macro: 0.9883

Fold 4/5
Early stop
Fold 4 - A

#Zapis Wyników

In [None]:
import pandas as pd

def save_results_to_excel(knn_results, rf_results, svm_results, cnn_results, filename="results.xlsx"):
    def flatten_result(r, model_name):
        return {
            "Model": model_name,
            "Metoda": r["name"],
            "Parametry": r.get("params", "-"),
            "Accuracy": r["metrics"]["accuracy"],
            "Precision_macro": r["metrics"]["precision_macro"],
            "Recall_macro": r["metrics"]["recall_macro"],
            "F1_macro": r["metrics"]["f1_macro"],
            "Train_time": r["timing"]["train_time"],
            "Pred_time": r["timing"]["pred_time"],
            "Precomp_time": r.get("precomp_time", 0.0)
        }

    all_rows = []
    for model_name, results in [
        ("kNN", knn_results),
        ("Random Forest", rf_results),
        ("SVM", svm_results),
        ("CNN", cnn_results)
    ]:
        for r in results:
            all_rows.append(flatten_result(r, model_name))

    df = pd.DataFrame(all_rows)
    df.to_excel(filename, index=False)
    print(f"\n Wyniki zostały zapisane do pliku: {filename}")


In [None]:

save_results_to_excel(knn_results, rf_results, svm_results, cnn_results)



 Wyniki zostały zapisane do pliku: results.xlsx
