In [None]:
# @title Dataset, Dataloader
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import numpy as np
import os
from PIL import Image
import random


DATASET_PATH = '/content/drive/MyDrive/dataset'
IMG_EXTENSIONS = ['.jpg', '.JPG', '.jpeg', '.JPEG']
TRAIN_SIZE = 0.8
MAX_DATASET_SIZE = 2000
CLASS_TO_IDX = {
    'normal': 0,
    'moved': 1,
    'covered': 2,
    'defocussed': 3
}


def default_loader(path):
    return Image.open(path).convert('RGB')


def is_image_file(filename):
    return any(filename.endswith(ext) for ext in IMG_EXTENSIONS)


class SimpleDataset(Dataset):
    def __init__(self, imgs, transform=None, loader=default_loader):
        self.imgs = imgs
        self.transform = transform
        self.loader = loader

    def __getitem__(self, index):
        path, target = self.imgs[index]
        img = self.loader(path)
        if self.transform:
            img = self.transform(img)
        return img, target

    def __len__(self):
        return len(self.imgs)


def get_transformed_datasets(dir, transforms, max_class_size):
    images = []

    if not os.path.exists(dir):
        raise FileNotFoundError(f"Directory not found: {dir}")

    for target in os.listdir(dir):
        if target not in CLASS_TO_IDX:
            continue

        d = os.path.join(dir, target)
        if not os.path.isdir(d):
            continue

        for root, _, fnames in os.walk(d):
            class_images = []
            for fname in fnames:
                if is_image_file(fname):
                    path = os.path.join(root, fname)
                    class_images.append((path, CLASS_TO_IDX[target]))
            if len(class_images) < max_class_size:
                print(f"Requested {max_class_size} images per class. In class {target} only {len(class_images)}.")
                images.extend(class_images)
            else:
                images.extend(random.sample(class_images, max_class_size))

    if not images:
        raise RuntimeError(
            f"Found 0 images in subfolders of: {dir}\n"
            f"Supported extensions: {', '.join(IMG_EXTENSIONS)}"
        )

    images_np = np.array(images, dtype=object)
    np.random.shuffle(images_np)
    split_idx = int(TRAIN_SIZE * len(images_np))

    return (
        SimpleDataset(images_np[:split_idx], transform=transforms['train']),
        SimpleDataset(images_np[split_idx:], transform=transforms['val'])
    )


class ImageDataset(Dataset):
    def __init__(self, dataset_path, max_class_size=500, batch_size=64, val_batch_size=16, img_size=224):
        self.batch_size = batch_size
        self.val_batch_size = val_batch_size
        self.img_size = img_size
        self.dataset_path = dataset_path
        self.max_class_size = max_class_size
        self.transforms = {
            'train': transforms.Compose([
                transforms.RandomResizedCrop(img_size),
                transforms.RandomHorizontalFlip(),
                transforms.ToTensor(),
                transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
            ]),
            'val': transforms.Compose([
                transforms.Resize(img_size),
                transforms.CenterCrop(img_size),
                transforms.ToTensor(),
                transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
            ]),
        }

        self._load_datasets()

    def _load_datasets(self):
        self.train_ds, self.val_ds = get_transformed_datasets(self.dataset_path, self.transforms, self.max_class_size)
        self.size_dataset_train = len(self.train_ds)
        self.size_dataset_val = len(self.val_ds)

    def get_dataloader(self, train=True):
        return DataLoader(
            self.train_ds if train else self.val_ds,
            batch_size=self.batch_size if train else self.val_batch_size,
            shuffle=train,
            num_workers=2
        )

    def __len__(self):
        return self.size_dataset_train + self.size_dataset_val

    def train_len(self):
        return self.size_dataset_train

    def val_len(self):
        return self.size_dataset_val

In [None]:
# @title Обучение
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models
from tqdm import tqdm


def train_mobilenet(dataset_path, epochs=10):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    dataset = ImageDataset(dataset_path)
    train_loader = DataLoader(
        dataset.get_dataloader(train=True).dataset,
        batch_size=32,
        shuffle=True,
        num_workers=2
    )
    val_loader = DataLoader(
        dataset.get_dataloader(train=False).dataset,
        batch_size=32,
        num_workers=2
    )

    model = models.mobilenet_v2(weights='DEFAULT')
    model.classifier[1] = nn.Linear(model.classifier[1].in_features, len(CLASS_TO_IDX))
    model = model.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    best_acc = 0.0
    for epoch in range(epochs):
        model.train()
        train_loss, train_correct = 0, 0

        pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}")
        for inputs, labels in pbar:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            _, preds = torch.max(outputs, 1)
            train_correct += (preds == labels).sum().item()
            train_loss += loss.item()

            pbar.set_postfix({
                'loss': loss.item(),
                'acc': f"{(preds == labels).sum().item()/len(labels):.2f}"
            })

        model.eval()
        val_loss, val_correct = 0, 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                _, preds = torch.max(outputs, 1)
                val_correct += (preds == labels).sum().item()
                val_loss += loss.item()

        train_acc = train_correct / len(train_loader.dataset)
        val_acc = val_correct / len(val_loader.dataset)
        print(f"\nEpoch {epoch+1} Results:")
        print(f"Train Loss: {train_loss/len(train_loader):.4f} | Acc: {train_acc:.4f}")
        print(f"Val Loss: {val_loss/len(val_loader):.4f} | Acc: {val_acc:.4f}")

        # Save best model
        if val_acc > best_acc:
            best_acc = val_acc
            torch.save(model.state_dict(), 'best_model.pth')
            print("Saved new best model!")

    print(f"\nTraining complete! Best validation accuracy: {best_acc:.4f}")

# Run training
if __name__ == "__main__":
    train_mobilenet(DATASET_PATH, epochs=3)

Using device: cuda


Epoch 1/3: 100%|██████████| 200/200 [23:33<00:00,  7.07s/it, loss=0.145, acc=0.94]



Epoch 1 Results:
Train Loss: 0.3909 | Acc: 0.8411
Val Loss: 0.1386 | Acc: 0.9400
Saved new best model!


Epoch 2/3: 100%|██████████| 200/200 [00:44<00:00,  4.52it/s, loss=0.152, acc=0.94]



Epoch 2 Results:
Train Loss: 0.2676 | Acc: 0.8989
Val Loss: 0.1088 | Acc: 0.9544
Saved new best model!


Epoch 3/3: 100%|██████████| 200/200 [00:43<00:00,  4.60it/s, loss=0.343, acc=0.81]



Epoch 3 Results:
Train Loss: 0.2262 | Acc: 0.9163
Val Loss: 0.0715 | Acc: 0.9794
Saved new best model!

Training complete! Best validation accuracy: 0.9794


In [None]:
# @title Тесты
import torch
from torch.utils.data import DataLoader
from torchvision import transforms
from tqdm import tqdm
import numpy as np


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


def load_saved_model(model_path, num_classes=4):
    model = models.mobilenet_v2(weights=None)
    model.classifier[1] = nn.Linear(model.classifier[1].in_features, num_classes)

    model.load_state_dict(torch.load(model_path, map_location=device))

    model.eval()
    return model


def validate_model(model, val_loader, device):
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0

    class_correct = [0] * len(CLASS_TO_IDX)
    class_total = [0] * len(CLASS_TO_IDX)

    with torch.no_grad():
        for images, labels in tqdm(val_loader, desc='Validating'):
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)
            loss = criterion(outputs, labels)

            val_loss += loss.item() * images.size(0)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            for i in range(len(labels)):
                label = labels[i]
                class_correct[label] += (predicted[i] == label).item()
                class_total[label] += 1

    val_loss /= len(val_loader.dataset)
    val_acc = correct / total

    print(f'\nValidation Results:')
    print(f'Average Loss: {val_loss:.4f}')
    print(f'Overall Accuracy: {val_acc:.4f}')

    print('\nPer-class Accuracy:')
    for class_name, idx in CLASS_TO_IDX.items():
        if class_total[idx] > 0:
            accuracy = 100 * class_correct[idx] / class_total[idx]
            print(f'{class_name}: {accuracy:.2f}% ({class_correct[idx]}/{class_total[idx]})')

    return val_loss, val_acc


if __name__ == "__main__":
    model = load_saved_model('best_model.pth', num_classes=len(CLASS_TO_IDX))
    model = model.to(device)

    criterion = torch.nn.CrossEntropyLoss()


    dataset = ImageDataset(DATASET_PATH, max_class_size=10000)
    val_loader = DataLoader(
        dataset.get_dataloader(train=False).dataset,
        batch_size=32,
        shuffle=False,
        num_workers=2
    )

    val_loss, val_acc = validate_model(model, val_loader, device)

Validating: 100%|██████████| 250/250 [26:59<00:00,  6.48s/it]


Validation Results:
Average Loss: 0.0610
Overall Accuracy: 0.9856

Per-class Accuracy:
normal: 98.18% (1995/2032)
moved: 96.19% (1967/2045)
covered: 100.00% (1973/1973)
defocussed: 100.00% (1950/1950)



