In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import json
import os
from torch.utils.data import random_split
import csv
from PIL import Image

<div style="font-size: 40px">
data preparation
</div>

In [4]:
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.1, contrast=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

full_dataset = ImageFolder("train/", transform=train_transform)

dataset_size = len(full_dataset)
train_size = int(dataset_size * 0.92)
val_size = dataset_size - train_size
trainset, valset = random_split(full_dataset, [train_size, val_size])

trainloader = DataLoader(trainset, batch_size=32, shuffle=True, num_workers=4)
valloader = DataLoader(valset, batch_size=32, shuffle=False, num_workers=4)

print(f"Number of classes: {len(full_dataset.classes)}")
print(f"Class names: {full_dataset.classes}")
print(f"training images count: {len(trainset)}")
print(f"testing images count: {len(valset)}")

Liczba klas: 50
Nazwy klas: ['acoustic', 'antenna', 'bacteria', 'battery', 'bean', 'beetle', 'bicycle', 'birch', 'bird', 'bomb', 'bread', 'bridge', 'camera', 'carbon', 'cat', 'corn', 'crab', 'crocodilian', 'echinoderm', 'egg', 'elephant', 'fish', 'flower', 'frog', 'fungus', 'gauge', 'hammer', 'icecream', 'kangaroo', 'memorial', 'monkey', 'motor', 'nest', 'palm', 'pizza', 'pot', 'printer', 'saw', 'snake', 'spice', 'spider', 'spoon', 'squash', 'swine', 'tea', 'tomato', 'towel', 'truck', 'turtle', 'worm']
Liczba obrazów treningowych: 88011


<div style="font-size: 40px">
model architecture
</div>

In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()

        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        residual = x

        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))

        out += self.shortcut(residual)
        out = F.relu(out)

        return out

In [5]:
class CNNModel(nn.Module):
    def __init__(self, num_classes):
        super(CNNModel, self).__init__()

        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.res1 = ResidualBlock(32, 64, stride=1)
        self.res2 = ResidualBlock(64, 128, stride=1)
        self.res3 = ResidualBlock(128, 256, stride=1)
        self.res4 = ResidualBlock(256, 512, stride=1)

        self.fc1 = nn.Linear(512 * 2 * 2, 1024)
        self.dropout = nn.Dropout(0.4)
        self.fc2 = nn.Linear(1024, num_classes)

    def forward(self, x):
        x = self.pool(F.relu(self.bn1(self.conv1(x))))

        x = self.pool(self.res1(x))
        x = self.pool(self.res2(x))
        x = self.pool(self.res3(x))
        x = self.pool(self.res4(x))

        x = x.view(-1, 512 * 2 * 2)

        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)

        return x

In [7]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Used device: {device}")


Używane urządzenie: cpu


In [None]:
num_classes = len(full_dataset.classes)
model = CNNModel(num_classes).to(device)

model_path = 'model.pth'
if os.path.exists(model_path):
    print(f"Loading existing model from {model_path}")
    model.load_state_dict(torch.load(model_path))
else:
    print("No existing model found. Initializing new model.")

<div style="font-size: 40px">
main training loop
</div>

In [None]:
num_classes = len(trainset.classes)
model = CNNModel(num_classes).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5)

def train_model(model, trainloader, valloader, criterion, optimizer, scheduler, start_epoch=0, num_epochs=10):
    os.makedirs("models", exist_ok=True)

    model.train()

    training_losses = []
    training_accuracies = []
    val_losses = []
    val_accuracies = []

    for epoch in range(start_epoch, start_epoch + num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for i, data in enumerate(trainloader, 0):
            inputs, labels = data[0].to(device), data[1].to(device)

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_loss = running_loss / len(trainloader)
        train_acc = 100 * correct / total
        training_losses.append(train_loss)
        training_accuracies.append(train_acc)

        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for data in valloader:
                inputs, labels = data[0].to(device), data[1].to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)

                val_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()

        val_loss = val_loss / len(valloader)
        val_acc = 100 * val_correct / val_total
        val_losses.append(val_loss)
        val_accuracies.append(val_acc)

        scheduler.step(val_loss)

        lr = optimizer.param_groups[0]['lr']
        print(f"\nEpoch {epoch+1} - Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%, LR: {lr:.6f}")

        prev_backup = f"backup_model_epoch_{epoch}.pth"
        if os.path.exists(prev_backup):
            os.remove(prev_backup)

        if epoch != start_epoch + num_epochs - 1:
            backup_path = f"backup_model_epoch_{epoch+1}.pth"
            torch.save(model.state_dict(), backup_path)

        if (epoch + 1) % 5 == 0 or epoch == start_epoch + num_epochs - 1:
            model_dir = f"models/model_{epoch+1}"
            os.makedirs(model_dir, exist_ok=True)

            model_path = f"{model_dir}/model.pth"
            torch.save(model.state_dict(), model_path)

            plt.figure(figsize=(12, 5))

            plt.subplot(1, 2, 1)
            plt.plot(training_losses, 'b-', label='Training')
            plt.plot(val_losses, 'r-', label='Validation')
            plt.title('Loss During Training')
            plt.xlabel('Epochs')
            plt.ylabel('Loss')
            plt.legend()

            plt.subplot(1, 2, 2)
            plt.plot(training_accuracies, 'b-', label='Training')
            plt.plot(val_accuracies, 'r-', label='Validation')
            plt.title('Accuracy During Training')
            plt.xlabel('Epochs')
            plt.ylabel('Accuracy (%)')
            plt.legend()

            plt.tight_layout()
            plt.savefig(f"{model_dir}/training_history.png")
            plt.close()

            training_data = {
                'epoch': epoch + 1,
                'train_losses': training_losses,
                'train_accs': training_accuracies,
                'val_losses': val_losses,
                'val_accs': val_accuracies,
                'final_lr': lr
            }

            with open(f"{model_dir}/training_data.json", 'w') as f:
                json.dump(training_data, f)

    torch.save(model.state_dict(), 'main.pth')
    print("Training complete. Final model saved as main.pth")

    return {
        'train_losses': training_losses,
        'train_accs': training_accuracies,
        'val_losses': val_losses,
        'val_accs': val_accuracies
    }

history = train_model(model, trainloader, valloader, criterion, optimizer, scheduler, start_epoch=50, num_epochs=20)

<div style="font-size: 40px">
plot training history
</div>

In [None]:
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(history['train_losses'], 'b-', label='Training')
plt.plot(history['val_losses'], 'r-', label='Validation')
plt.title('Loss During Training')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history['train_accs'], 'b-', label='Training')
plt.plot(history['val_accs'], 'r-', label='Validation')
plt.title('Accuracy During Training')
plt.xlabel('Epochs')
plt.ylabel('Accuracy (%)')
plt.legend()

plt.tight_layout()
plt.show()

<div style="font-size: 40px">
generate predictions
</div>

In [None]:
model.load_state_dict(torch.load('main.pth'))
model.eval()

test_dir = "dane/test_all/"
test_files = [f for f in os.listdir(test_dir)
              if f.lower().endswith(('.jpeg', '.jpg', '.png', '.JPEG'))]

print(f"Found {len(test_files)} test images")
print(f"Classes in training set: {full_dataset.classes}")

In [None]:

predictions = []
filenames = []

with torch.no_grad():
    for file in test_files:
        img_path = os.path.join(test_dir, file)
        image = Image.open(img_path).convert('RGB')
        image_tensor = test_transform(image).unsqueeze(0).to(device)

        output = model(image_tensor)
        _, pred = torch.max(output, 1)

        base_name = os.path.splitext(file)[0]
        filename_jpeg = f"{base_name}.JPEG"

        predictions.append(pred.item())
        filenames.append(filename_jpeg)

        if len(predictions) % 100 == 0:
            print(f"Processed {len(predictions)}/{len(test_files)} images")

In [None]:
with open('pred.csv', 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    for filename, pred in zip(filenames, predictions):
        writer.writerow([filename, pred])