In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import torchvision as tv

import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
from tqdm.autonotebook import tqdm

  from tqdm.autonotebook import tqdm


In [2]:
from typing import Literal
from pathlib import Path
from sklearn.preprocessing import LabelEncoder
import pickle
from torchvision.transforms import v2
from PIL import Image


RESCALE_SIZE = 224


class SimpsonsDataset(torch.utils.data.Dataset):
    """
    Датасет с картинками, который паралельно подгружает их из папок
    производит скалирование и превращение в торчевые тензоры
    """

    def __init__(self, files: list[Path], mode: Literal['train', 'val', 'test']):
        super().__init__()
        self.files = sorted(files)
        self.mode = mode

        self.len_ = len(self.files)
        self.label_encoder = LabelEncoder()

        if self.mode != 'test':
            self.labels = [path.parent.name for path in self.files]
            self.label_encoder.fit(self.labels)

            with open('label_encoder.pkl', 'wb') as le_dump_file:
                  pickle.dump(self.label_encoder, le_dump_file)
        
        self.transform = v2.Compose([
            v2.PILToTensor(),
            v2.RandomRotation((-10, 10)),
            v2.Resize((RESCALE_SIZE, RESCALE_SIZE)),          
            v2.RandomHorizontalFlip(0.5),
            v2.ToDtype(torch.float32, scale=True),
            v2.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])

    def __len__(self):
        return self.len_

    def load_sample(self, file):
        image = Image.open(file)
        image.load()
        return image

    def __getitem__(self, index):
        x = self.load_sample(self.files[index])
        x = self.transform(x)

        if self.mode == 'test':
            return x
        else:
            label = self.labels[index]
            label_id = self.label_encoder.transform([label])
            y = label_id.item()
            return x, y

In [3]:
TRAIN_DIR = Path('./simsons/train/')
TEST_DIR = Path('./simsons/testset/')

train_val_files = sorted(list(TRAIN_DIR.rglob('*.jpg')))
test_files = sorted(list(TEST_DIR.rglob('*.jpg')))

from sklearn.model_selection import train_test_split

train_val_labels = [path.parent.name for path in train_val_files]
train_files, val_files = train_test_split(
    train_val_files,
    test_size=0.25,
    stratify=train_val_labels
)

train_ds_catsdogs = SimpsonsDataset(train_files, 'train')
test_ds_catsdogs = SimpsonsDataset(val_files, 'val')

In [4]:
print(f'Train size: {len(train_ds_catsdogs)}')
print(f'Test size: {len(test_ds_catsdogs)}')

Train size: 15699
Test size: 5234


In [5]:
batch_size = 4

train_loader = torch.utils.data.DataLoader(
    train_ds_catsdogs, shuffle=True, 
    batch_size=batch_size, num_workers=1, drop_last=True
)
test_loader = torch.utils.data.DataLoader(
    test_ds_catsdogs, shuffle=False,
    batch_size=batch_size, num_workers=1, drop_last=False
)

In [6]:
class VGG13(nn.Module):
    def __init__(self, out_nc):
        super().__init__()
        
        self.act = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(2,2)
        
        self.conv1_1 = nn.Conv2d(3, 64, kernel_size=3, padding=1)
        self.conv1_2 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        
        self.conv2_1 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.conv2_2 = nn.Conv2d(128, 128, kernel_size=3, padding=1)
        
        self.conv3_1 = nn.Conv2d(128, 128, kernel_size=3, padding=1)
        self.conv3_2 = nn.Conv2d(128, 128, kernel_size=3, padding=1)
        self.conv3_3 = nn.Conv2d(128, 128, kernel_size=3, padding=1)
        
        self.conv4_1 = nn.Conv2d(128, 128, kernel_size=3, padding=1)
        self.conv4_2 = nn.Conv2d(128, 128, kernel_size=3, padding=1)
        self.conv4_3 = nn.Conv2d(128, 128, kernel_size=3, padding=1)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        
#         self.conv5_1 = nn.Conv2d(512, 512, kernel_size=3, padding=1)
#         self.conv5_2 = nn.Conv2d(512, 512, kernel_size=3, padding=1)
#         self.conv5_3 = nn.Conv2d(512, 512, kernel_size=3, padding=1)
        
        self.flat = nn.Flatten()
        
        self.fc1 = nn.Linear(128, 128)
        #self.fc2 = nn.Linear(4096, 4096)
        self.fc3 = nn.Linear(128, out_nc)
        
    def forward(self, x):
        out = self.conv1_1(x)
        out = self.act(out)
        out = self.conv1_2(out)
        out = self.act(out)
        
        out = self.maxpool(out)
        
        out = self.conv2_1(out)
        out = self.act(out)
        out = self.conv2_2(out)
        out = self.act(out)
        
        out = self.maxpool(out)
        
        out = self.conv3_1(out)
        out = self.act(out)
        out = self.conv3_2(out)
        out = self.act(out)
        out = self.conv3_3(out)
        out = self.act(out)
        
        out = self.maxpool(out)
        
        out = self.conv4_1(out)
        out = self.act(out)
        out = self.conv4_2(out)
        out = self.act(out)
        out = self.conv4_3(out)
        out = self.act(out)
        
        out = self.maxpool(out)
        
#         out = self.conv5_1(out)
#         out = self.act(out)
#         out = self.conv5_2(out)
#         out = self.act(out)
#         out = self.conv5_3(out)
#         out = self.act(out)
        
#         out = self.maxpool(out)
        out = self.avgpool(out)
        out = self.flat(out)
        
        out = self.fc1(out)
        out = self.act(out)
#         out = self.fc2(out)
#         out = self.act(out)
        out = self.fc3(out)
        
        return out

In [7]:
# device = 'cuda' if torch.cuda.is_available() else 'cpu'
device = 'cpu'

In [8]:
model = VGG13(42) # ConvNet()
model = model.to(device)

epochs = 7

loss_fn = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

In [9]:
for epoch in range(epochs):
    for inputs, labels in (pbar := tqdm(train_loader)):
        img = img.to(device)
        label = label.to(device)
        optimizer.zero_grad()

        pred = model(img)
        loss = loss_fn(pred, label)

        loss.backward()
        optimizer.step()

  0%|          | 0/3924 [00:00<?, ?it/s]