In [16]:
!pip install -q torch torchvision kagglehub
!pip install -q pytorch-lightning torchmetrics "numpy<2.0"
!pip install -q pandas matplotlib seaborn
!pip install -q grad-cam

In [17]:
import os
import json
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import pytorch_lightning as pl
import torchmetrics
import kagglehub
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
from tqdm import tqdm
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image
from pytorch_grad_cam import GradCAM
from pytorch_grad_cam.utils.image import show_cam_on_image, preprocess_image

In [None]:
BATCH_SIZE = 32
EPOCHS = 10
LEARNING_RATE = 1e-4
NUM_WORKERS = 2
SEED = 1234

CHECKPOINT_BEST = "best_convnext.ckpt"
CLASSES_FILE = "classes.json"
OUTPUT_FILE = "wyniki_convnext.csv"

pl.seed_everything(SEED, workers=True)

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
PIN_MEMORY = DEVICE.type == "cuda"

def find_train_test_dirs(base_dir: str):
    for root, dirs, _files in os.walk(base_dir):
        if "train" in dirs and "test" in dirs:
            candidate_train = os.path.join(root, "train")
            candidate_test = os.path.join(root, "test")
            has_class_dirs = any(
                os.path.isdir(os.path.join(candidate_train, d))
                for d in os.listdir(candidate_train)
            )
            if has_class_dirs:
                return candidate_train, candidate_test
    return None, None

def dataloader_kwargs():
    kwargs = {"num_workers": NUM_WORKERS, "pin_memory": PIN_MEMORY}
    if NUM_WORKERS and NUM_WORKERS > 0:
        kwargs["persistent_workers"] = True
    return kwargs

def make_loader(dataset, *, shuffle: bool):
    return DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=shuffle, **dataloader_kwargs())

def make_transforms(mode: str):
    mode = mode.lower()
    if mode == "train":
        return transforms.Compose(
            [
                transforms.Resize((256, 256)),
                transforms.RandomCrop(224),
                transforms.RandomHorizontalFlip(),
                transforms.ColorJitter(0.2, 0.2),
                transforms.ToTensor(),
                transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
            ]
        )
    if mode in {"val", "test", "eval"}:
        return transforms.Compose(
            [
                transforms.Resize((224, 224)),
                transforms.ToTensor(),
                transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
            ]
        )
    raise ValueError(f"Nieznany tryb transformacji: {mode}")

def load_eval_model(checkpoint_path: str, *, num_classes: int):
    if not os.path.exists(checkpoint_path):
        raise FileNotFoundError(f"Brak checkpointu: {checkpoint_path}")
    model = CarClassifierConvNext.load_from_checkpoint(
        checkpoint_path,
        num_classes=num_classes,
        learning_rate=LEARNING_RATE,
    )
    model.to(DEVICE)
    model.eval()
    model.freeze()
    return model

try:
    BASE_DIR = kagglehub.dataset_download("jutrera/stanford-car-dataset-by-classes-folder")
except Exception:
    BASE_DIR = "."

train_dir, test_dir = find_train_test_dirs(BASE_DIR)
if not train_dir:
    raise FileNotFoundError("Nie znaleziono danych. Sprawdź, czy dataset jest podpięty.")

print(f"Trening: {train_dir}")
print(f"Test:    {test_dir}")

In [19]:
class CarClassifierConvNext(pl.LightningModule):
    def __init__(self, num_classes: int, learning_rate: float = 1e-4):
        super().__init__()
        self.save_hyperparameters()
        self.lr = learning_rate

        self.model = models.convnext_tiny(weights="DEFAULT")

        for param in self.model.features.parameters():
            param.requires_grad = False
        for param in self.model.features[-2:].parameters():
            param.requires_grad = True

        in_features = self.model.classifier[2].in_features
        self.model.classifier[2] = nn.Linear(in_features, num_classes)

        self.criterion = nn.CrossEntropyLoss()
        self.train_acc = torchmetrics.Accuracy(task="multiclass", num_classes=num_classes)
        self.val_acc = torchmetrics.Accuracy(task="multiclass", num_classes=num_classes)

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, _batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.criterion(logits, y)
        self.train_acc(torch.argmax(logits, dim=1), y)

        self.log("train_loss", loss, on_step=False, on_epoch=True, prog_bar=True)
        self.log("train_acc", self.train_acc, on_step=False, on_epoch=True, prog_bar=True)
        return loss

    def validation_step(self, batch, _batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.criterion(logits, y)
        self.val_acc(torch.argmax(logits, dim=1), y)
        self.log("val_loss", loss, prog_bar=True)
        self.log("val_acc", self.val_acc, prog_bar=True)
        return loss

    def configure_optimizers(self):
        optimizer = optim.AdamW(
            [
                {"params": self.model.features[-2:].parameters(), "lr": self.lr * 0.1},
                {"params": self.model.classifier.parameters(), "lr": self.lr},
            ],
            weight_decay=1e-4,
)
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, mode="min", factor=0.1, patience=2
        )
        return {"optimizer": optimizer, "lr_scheduler": scheduler, "monitor": "val_loss"}

In [None]:
train_tf = make_transforms("train")
eval_tf = make_transforms("eval")

train_ds = datasets.ImageFolder(train_dir, transform=train_tf)
val_ds = datasets.ImageFolder(test_dir, transform=eval_tf)

train_loader = make_loader(train_ds, shuffle=True)
val_loader = make_loader(val_ds, shuffle=False)

class_names = train_ds.classes
with open("classes.json", "w", encoding="utf-8") as f:
    json.dump(class_names, f, ensure_ascii=False)

print(f"Liczba klas: {len(class_names)}")
print(f"Train: {len(train_ds)} zdjęć")
print(f"Val:   {len(val_ds)} zdjęć")

In [None]:
print("Start treningu ConvNeXt...")

csv_logger = pl.loggers.CSVLogger(save_dir="logs", name="convnext_experiment")
checkpoint_callback = pl.callbacks.ModelCheckpoint(
    dirpath=".",
    filename="best_convnext",
    save_top_k=1,
    monitor="val_acc",
    mode="max",
)

trainer = pl.Trainer(
    max_epochs=EPOCHS,
    accelerator="auto",
    devices=1,
    callbacks=[checkpoint_callback],
    logger=csv_logger,
    enable_progress_bar=True,
)

model = CarClassifierConvNext(num_classes=len(class_names), learning_rate=LEARNING_RATE)
trainer.fit(model, train_loader, val_loader)

print(f"Trening zakończony. Najlepszy checkpoint: {checkpoint_callback.best_model_path}")

In [None]:
metrics_path = f"{csv_logger.log_dir}/metrics.csv"
if not os.path.exists(metrics_path):
    print("Nie znaleziono metrics.csv (brak logów).")
else:
    metrics = pd.read_csv(metrics_path)
    epoch_metrics = metrics.groupby("epoch").mean(numeric_only=True)

    sns.set(style="whitegrid")
    fig, axes = plt.subplots(1, 2, figsize=(15, 6))

    axes[0].plot(epoch_metrics.index, epoch_metrics["train_loss"], marker="o", label="Train")
    axes[0].plot(epoch_metrics.index, epoch_metrics["val_loss"], marker="o", label="Val")
    axes[0].set_title("Loss")
    axes[0].set_xlabel("Epoch")
    axes[0].set_ylabel("Loss")
    axes[0].legend()

    axes[1].plot(epoch_metrics.index, epoch_metrics["train_acc"], marker="o", label="Train")
    axes[1].plot(epoch_metrics.index, epoch_metrics["val_acc"], marker="o", label="Val")
    axes[1].set_title("Accuracy")
    axes[1].set_xlabel("Epoch")
    axes[1].set_ylabel("Accuracy")
    axes[1].legend()

    plt.tight_layout()
    plt.savefig("convnext_training_curves.png", dpi=150)
    plt.show()
    print("Zapisano: convnext_training_curves.png")

In [None]:
checkpoint_path = checkpoint_callback.best_model_path or CHECKPOINT_BEST
model = load_eval_model(checkpoint_path, num_classes=len(class_names))

test_ds = datasets.ImageFolder(test_dir, transform=eval_tf)
loader = make_loader(test_ds, shuffle=False)

file_paths = [s[0] for s in test_ds.samples]
rows = []

print(f"Analiza {len(test_ds)} zdjęć (ConvNeXt)...")
for batch_idx, (inputs, labels) in enumerate(tqdm(loader, desc="Test")):
    inputs = inputs.to(DEVICE)
    preds = model(inputs)
    probs = torch.softmax(preds, dim=1)
    scores, pred_ids = torch.max(probs, dim=1)

    start_i = batch_idx * loader.batch_size
    for i in range(len(inputs)):
        idx_global = start_i + i
        if idx_global >= len(file_paths):
            break

        path = file_paths[idx_global]
        filename = os.path.basename(path)
        true_name = class_names[labels[i].item()]
        pred_name = class_names[pred_ids[i].item()]
        score_pct = float(scores[i].item()) * 100

        rows.append(
            {
                "Plik": filename,
                "Prawdziwe Auto": true_name,
                "Wykryte Auto": pred_name,
                "Pewnosc Modelu": f"{score_pct:.2f}%",
                "Czy Trafil": "TAK" if true_name == pred_name else "NIE",
            }
)

df = pd.DataFrame(rows)
acc = (df["Czy Trafil"] == "TAK").mean() * 100

print(f"Skuteczność: {acc:.2f}%")
df.to_csv(OUTPUT_FILE, index=False, encoding="utf-8")
print(f"Zapisano: {OUTPUT_FILE}")
df.head(20)

In [None]:
pd.set_option('display.max_rows', None)
pd.set_option('display.width', 1000)
pd.set_option('display.precision', 2)

COMPARISON_OUTPUT_FILE = 'porownanie_modeli_klasy.csv'

def generate_report(csv, model):
    print(f"\n{'='*20} ANALIZA: {model.upper()} {'='*20}")
    
    if not os.path.exists(csv):
        print(f"Brak pliku csv")
        return None

    df = pd.read_csv(csv)
    
    report = df.groupby("Prawdziwe Auto").agg(
        photos_count=('Plik', 'count'),
        guesses=('Czy Trafil', lambda x: (x == 'TAK').sum())
    )
    
    report['accuracy'] = (report['guesses'] / report['photos_count']) * 100    
    report = report.sort_values(by='accuracy', ascending=True)    
    report_display = report.copy()
    report_display['accuracy'] = report_display['accuracy'].apply(lambda x: f"{x:.1f}%")
    
    print(report_display[['photos_count', 'accuracy']])
    
    output = f"raport_klas_{model.lower()}.csv"
    report.to_csv(output)
    print(f"\nZapisano szczegóły w: {output}")
    
    return report[['accuracy']] 
    
eff_stats = generate_report("wyniki_efficientnet.csv", "EfficientNet")

conv_stats = generate_report("wyniki_convnext.csv", "ConvNeXt")

if eff_stats is not None and conv_stats is not None:    
    comparison = pd.concat([eff_stats, conv_stats], axis=1)
    comparison.columns = ['EffNet Acc', 'ConvNeXt Acc']
    
    comparison['Różnica'] = comparison['ConvNeXt Acc'] - comparison['EffNet Acc']
    
    comparison = comparison.sort_values(by='Różnica', ascending=False)
    
    print(comparison)
    
    comparison.to_csv(COMPARISON_OUTPUT_FILE)
    print(f"\nZapisano porównanie w: {COMPARISON_OUTPUT_FILE}")


In [None]:
EFF_CHECKPOINT = "best_model.ckpt"
CONV_CHECKPOINT = checkpoint_callback.best_model_path or "best_convnext.ckpt"

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class CarClassifierEffNet(pl.LightningModule):
    def __init__(self, num_classes: int):
        super().__init__()
        self.model = models.efficientnet_v2_s(weights=None)
        in_features = self.model.classifier[1].in_features
        self.model.classifier[1] = nn.Linear(in_features, num_classes)

    def forward(self, x):
        return self.model(x)

class CarClassifierConvNextEval(pl.LightningModule):
    def __init__(self, num_classes: int):
        super().__init__()
        self.model = models.convnext_tiny(weights=None)
        in_features = self.model.classifier[2].in_features
        self.model.classifier[2] = nn.Linear(in_features, num_classes)

    def forward(self, x):
        return self.model(x)

model_eff = None
if os.path.exists(EFF_CHECKPOINT):
    model_eff = CarClassifierEffNet.load_from_checkpoint(EFF_CHECKPOINT, num_classes=len(class_names))
    model_eff.eval().to(device)
else:
    print(f"Brak checkpointu EfficientNet: {EFF_CHECKPOINT}")

model_conv = None
if os.path.exists(CONV_CHECKPOINT):
    model_conv = CarClassifierConvNextEval.load_from_checkpoint(CONV_CHECKPOINT, num_classes=len(class_names))
    model_conv.eval().to(device)
else:
    print(f"Brak checkpointu ConvNeXt: {CONV_CHECKPOINT}")

def visualize_comparison(image_path: str):
    img_pil = Image.open(image_path).convert("RGB").resize((224, 224))
    rgb_img = np.float32(img_pil) / 255.0
    input_tensor = preprocess_image(
        rgb_img,
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225],
    ).to(device)

    fig, axs = plt.subplots(1, 3, figsize=(18, 6))
    axs[0].imshow(img_pil)
    axs[0].set_title(os.path.basename(image_path))
    axs[0].axis("off")

    if model_eff is not None:
        cam_eff = GradCAM(model=model_eff, target_layers=[model_eff.model.features[-1]])
        grayscale_cam_eff = cam_eff(input_tensor=input_tensor, targets=None)
        vis_eff = show_cam_on_image(rgb_img, grayscale_cam_eff[0, :], use_rgb=True)

        preds_eff = model_eff(input_tensor)
        idx_eff = int(torch.argmax(preds_eff, dim=1).item())
        conf_eff = float(torch.softmax(preds_eff, dim=1)[0, idx_eff].item())

        axs[1].imshow(vis_eff)
        axs[1].set_title(f"EfficientNet: {class_names[idx_eff]} ({conf_eff:.1%})")
        axs[1].axis("off")
    else:
        axs[1].text(0.5, 0.5, "Brak EfficientNet", ha="center", va="center")
        axs[1].axis("off")

    if model_conv is not None:
        cam_conv = GradCAM(model=model_conv, target_layers=[model_conv.model.features[-1]])
        grayscale_cam_conv = cam_conv(input_tensor=input_tensor, targets=None)
        vis_conv = show_cam_on_image(rgb_img, grayscale_cam_conv[0, :], use_rgb=True)

        preds_conv = model_conv(input_tensor)
        idx_conv = int(torch.argmax(preds_conv, dim=1).item())
        conf_conv = float(torch.softmax(preds_conv, dim=1)[0, idx_conv].item())

        axs[2].imshow(vis_conv)
        axs[2].set_title(f"ConvNeXt: {class_names[idx_conv]} ({conf_conv:.1%})")
        axs[2].axis("off")
    else:
        axs[2].text(0.5, 0.5, "Brak ConvNeXt", ha="center", va="center")
        axs[2].axis("off")

    plt.tight_layout()
    plt.show()

test_images = [p for p, _ in datasets.ImageFolder(test_dir, transform=val_tf).samples]
sample = random.sample(test_images, k=min(5, len(test_images)))
print(f"Wybrano {len(sample)} obrazów do porównania.")
for img_path in sample:
    visualize_comparison(img_path)