In [1]:
# !pip install --upgrade torch

import os
import pickle
import random
# Ignore warnings
import warnings
from glob import glob
from pathlib import Path

import numpy as np
import torch
import torch.nn as nn
import torchvision
from PIL import Image
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from tqdm import tqdm

warnings.filterwarnings("ignore")

print("PyTorch Version: ", torch.__version__)
print("Torchvision Version: ", torchvision.__version__)

train_on_gpu = torch.cuda.is_available()

if not train_on_gpu:
    print('CUDA is not available.  Training on CPU ...')
else:
    print('CUDA is available!  Training on GPU ...')

PyTorch Version:  2.2.1
Torchvision Version:  0.17.1
CUDA is not available.  Training on CPU ...


In [2]:
SEED = 42

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

In [3]:
DATA_MODES = ['train', 'val', 'test']
RESCALE_SIZE = 256
DEVICE = torch.device("mps")

In [4]:
class SimpsonsDataset(Dataset):
    def __init__(self, files, mode, augmentations=None):
        super().__init__()
        self.files = files
        self.mode = mode
        self.augmentations = augmentations

        if self.mode not in DATA_MODES:
            print(f'wrong mode: {self.mode}')
            raise NameError

        self.len_ = len(self.files)
        self.label_encoder = LabelEncoder()

        if self.mode != 'test':
            self.labels = [path.parent.name for path in self.files]
            self.label_encoder.fit(self.labels)

            with open('label_encoder.pkl', 'wb') as le_dump:
                pickle.dump(self.label_encoder, le_dump)

    def __len__(self):
        return self.len_

    def load_sample(self, file):
        image = Image.open(file)
        image.load()
        return image

    def __getitem__(self, index):
        transform = transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])

        x = self.load_sample(self.files[index])
        #     x = self._prepare_sample(x)
        #     x = np.array(x / 255, dtype='float32')

        x = transform(x)

        if self.mode == 'test':
            return x
        else:

            label = self.labels[index]
            label_id = self.label_encoder.transform([label])
            y = label_id.item()
            return x, y

#   def _prepare_sample(self, image):
#     image = image.resize((RESCALE_SIZE, RESCALE_SIZE))
#     return np.array(image)

In [11]:
train_dir = Path('./train_dir/simpsons_dataset/')
test_dir = Path('./test_dir/kaggle_simpson_testset/')
files_training = glob(os.path.join(train_dir, '*/*.jpg'))
num_images = len(files_training)
print('Number of images in Training file:', num_images)

Number of images in Training file: 0


In [6]:
min_images = 1000
im_cnt = []
class_names = []
print('{:18s}'.format('class'), end='')
print('Count:')
print('-' * 24)
for folder in os.listdir(train_dir):
    if os.path.isdir(os.path.join(train_dir, folder)):
        folder_num = len(os.listdir(os.path.join(train_dir, folder)))
        im_cnt.append(folder_num)
        class_names.append(folder)
        print('{:20s}'.format(folder), end=' ')
        print(folder_num)
        if (folder_num < min_images):
            min_images = folder_num
            folder_name = folder

num_classes = len(class_names)
print("\nMinumum imgages per category:", min_images, 'Category:', folder)
print('Average number of Images per Category: {:.0f}'.format(np.array(im_cnt).mean()))
print('Total number of classes: {}'.format(num_classes))

class             Count:
------------------------
maggie_simpson       128
charles_montgomery_burns 1193
patty_bouvier        72
ralph_wiggum         89
chief_wiggum         986
milhouse_van_houten  1079
rainier_wolfcastle   45
cletus_spuckler      47
martin_prince        71
lenny_leonard        310
sideshow_bob         877
fat_tony             27
selma_bouvier        103
barney_gumble        106
lionel_hutz          3
gil                  27
moe_szyslak          1452
carl_carlson         98
edna_krabappel       457
snake_jailbird       55
groundskeeper_willie 121
sideshow_mel         40
ned_flanders         1454
abraham_grampa_simpson 913
krusty_the_clown     1206
waylon_smithers      182
apu_nahasapeemapetilon 623
marge_simpson        1291
comic_book_guy       469
nelson_muntz         358
mayor_quimby         246
kent_brockman        498
professor_john_frink 65
principal_skinner    1194
bart_simpson         1342
lisa_simpson         1354
otto_mann            32
troy_mcclure         8

In [7]:
train_val_files = sorted(list(train_dir.rglob('*.jpg')))
test_files = sorted(list(test_dir.rglob('*.jpg')))

In [8]:


from sklearn.model_selection import train_test_split

train_val_labels = [path.parent.name for path in train_val_files]
train_files, val_files = train_test_split(train_val_files, test_size=0.3, stratify=train_val_labels)
val_labels = [path.parent.name for path in val_files]
val_files, oos_files = train_test_split(val_files, test_size=0.4, stratify=val_labels)



ValueError: The least populated class in y has only 1 member, which is too few. The minimum number of groups for any class cannot be less than 2.

In [None]:


val_dataset = SimpsonsDataset(val_files, mode='val')
train_dataset = SimpsonsDataset(train_files, mode='train')



In [None]:
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_dataset = SimpsonsDataset(test_files, mode="test")
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=64)

In [None]:
# class Classifier(torch.nn.Module):
#     def __init__(self, num_classes):
#         super(Classifier, self).__init__()
#         self.encoder = torchvision.models.resnet18(pretrained=True)
#         self.linear_classifier = torch.nn.Linear(self.encoder.fc.out_features, num_classes)

#     def forward(self, sample):
#         final_feature_map = self.encoder(sample)
#         logits = self.linear_classifier(final_feature_map) # argmax(logits) = pred_class
#         return logits

In [None]:
import torch.nn.functional as F


class Classifier(torch.nn.Module):

    def __init__(self, num_classes: int):
        super().__init__()
        self.encoder = torchvision.models.mobilenet_v2(pretrained=False).features
        self.classifier = torch.nn.Linear(
            in_features=1280,  # For mobilenet_v2.
            out_features=num_classes,
        )

    def forward(self, x):
        feature_map = self.encoder(x)
        pooled_features = F.adaptive_avg_pool2d(feature_map, (1, 1))
        flattened_features = torch.flatten(pooled_features, 1)
        logits = self.classifier(flattened_features)
        return logits


classifier = Classifier(num_classes=10)
print(classifier(torch.ones((1, 3, 224, 224), dtype=torch.float32)))  # Test output of your neural network.

In [None]:
torchvision.models.resnet18().fc.out_features

In [None]:
torchvision.models.mobilenet_v2().features

In [None]:
def fit_epoch(model, train_loader, criterion, optimizer):
    running_loss = 0.0
    running_corrects = 0
    processed_data = 0

    for inputs, labels in tqdm(train_loader):
        inputs = inputs.to(DEVICE)
        labels = labels.to(DEVICE)
        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        preds = torch.argmax(outputs, 1)
        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
        processed_data += inputs.size(0)

    train_loss = running_loss / processed_data
    train_acc = running_corrects.cpu().numpy() / processed_data
    return train_loss, train_acc


def eval_epoch(model, val_loader, criterion):
    model.eval()
    running_loss = 0.0
    running_corrects = 0
    processed_size = 0

    for inputs, labels in tqdm(val_loader):
        inputs = inputs.to(DEVICE)
        labels = labels.to(DEVICE)

        with torch.set_grad_enabled(False):
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            preds = torch.argmax(outputs, 1)

        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
        processed_size += inputs.size(0)
    val_loss = running_loss / processed_size
    val_acc = running_corrects.double() / processed_size
    return val_loss, val_acc


def train(train_files, val_files, model, epochs, batch_size):
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    print('Train data loader \n')
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    print('val data loader \n')

    history = []
    log_template = "\nEpoch {ep:03d} train_loss: {t_loss:0.4f} \
    val_loss {v_loss:0.4f} train_acc {t_acc:0.4f} val_acc {v_acc:0.4f}"

    #with tqdm(desc="epoch", total=epochs) as pbar_outer:
    #Здесь можно добавить схему изменения learning rate

    opt = torch.optim.Adam(model.parameters(), lr=1e-4)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(epochs):
        train_loss, train_acc = fit_epoch(model, train_loader, criterion, opt)
        #             print("loss", train_loss)

        val_loss, val_acc = eval_epoch(model, val_loader, criterion)
        history.append((train_loss, train_acc, val_loss, val_acc))

        # pbar_outer.update(1)
        # tqdm.write(log_template.format(ep=epoch+1, t_loss=train_loss,\
        #                               v_loss=val_loss, t_acc=train_acc, v_acc=val_acc))
        print("train loss: " + str(train_loss) + " train acc: " + str(train_acc) + " val loss: " + str(
            val_loss) + " val acc: " + str(val_acc.item()))

    return history

In [None]:
def predict(model, test_loader):
    with torch.no_grad():
        logits = []

        for inputs in test_loader:
            inputs = inputs.to(DEVICE)
            model.eval()
            outputs = model(inputs).cpu()
            logits.append(outputs)

    probs = nn.functional.softmax(torch.cat(logits), dim=-1).numpy()
    return probs

In [None]:
n_classes = len(np.unique(train_val_labels))
model = Classifier(n_classes).to(DEVICE)

In [None]:
np.unique(train_val_labels)

In [None]:

history = train(train_dataset, val_dataset, model=model, epochs=15, batch_size=32)

In [None]:
torch.save(model.state_dict(), "saved_model.pth")

In [None]:
if ("saved_model.pth" in os.listdir(".")):
    model.load_state_dict(torch.load("saved_model.pth"))

In [None]:
history = train(train_dataset, val_dataset, model=model, epochs=5, batch_size=32)

In [None]:
class Classifier2(torch.nn.Module):

    def __init__(self, num_classes: int):
        super().__init__()
        self.encoder = torchvision.models.mobilenet_v2(pretrained=True).features
        self.classifier = torch.nn.Linear(
            in_features=1280,  # For mobilenet_v2.
            out_features=num_classes,
        )

    def forward(self, x):
        feature_map = self.encoder(x)
        pooled_features = F.adaptive_avg_pool2d(feature_map, (1, 1))
        flattened_features = torch.flatten(pooled_features, 1)
        logits = self.classifier(flattened_features)
        return logits

In [None]:
model2 = Classifier2(n_classes).to(DEVICE)

In [None]:
history2 = train(train_dataset, val_dataset, model=model2, epochs=5, batch_size=32)

In [None]:
torch.save(model.state_dict(), "saved_model_pretrained.pth")