In [None]:
import torch
import torchvision as tv
from torch import nn
import os
import matplotlib.pyplot as plt

In [None]:
data_transforms = tv.transforms.Compose(
    [   
        tv.transforms.RandomResizedCrop(128),
        tv.transforms.ToTensor(),
        tv.transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]),
    ]
)

In [None]:
if torch.backends.mps.is_available():
    DEVICE = torch.device("mps")
elif torch.cuda.is_available():
    DEVICE = torch.device("cuda")
else:
    DEVICE = torch.device("cpu")

print(DEVICE)

In [None]:
data_dir = f"{os.path.abspath(os.curdir)}/data"

dataset = tv.datasets.ImageFolder(f"{data_dir}", data_transforms)

train_size = int(0.9 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(
    dataset, [train_size, test_size]
)
classes = dataset.classes
print("Classes: ", classes)
print("The datasest have: ", len(dataset), " images")

In [None]:
BATCH_SIZE = 64

train_data_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=BATCH_SIZE, shuffle=True
)
test_data_loader = torch.utils.data.DataLoader(test_dataset, batch_size=BATCH_SIZE)

In [None]:
from collections import Counter

def pie_plot(_df):
    labels = list(_df.keys())
    values = list(_df.values())
    fig, ax = plt.subplots(figsize=(15, 8))
    ax.grid(False)

    plt.pie(values, labels=labels, autopct="%1.1f%%")
    my_circle = plt.Circle((0, 0), 0.7, color="#ffffff")
    plt.title("Cоотношение количества объектов разных классов")
    plt.gcf().gca().add_artist(my_circle)

train_classes = [label for _, label in train_dataset]
pie_plot(dict(Counter(train_classes)))

In [None]:
import matplotlib.pyplot as plt
import numpy as np


def imshow(img):
    img_np = img.numpy()
    plt.imshow(np.transpose(img_np, (1, 2, 0)))


for images, labels in train_data_loader:
    imshow(tv.utils.make_grid(images))
    print("Image batch dimensions:", images.shape)
    print("Image label dimensions:", labels.shape)
    break

### Гиперпараметры для Conv2d
1) **input/output** - кол-во входных и выходных параметров
2) **kernel_size** - Размер ядра (окна). Большие ядра могут извлекать более общие признаки, а мелкие - детализированные признаки
3) **stride - шаг**. Определяет то, настолько ядро далеко перемещается при каждой операции свертки. Больший шаг уменьшает размер выходной карты признаков. 
4) **padding** - заполнение. Добавляет рамку вокруг входных данных перед операцие свертки.

> Если вход имел размерность w * h, а в слое n сверток размерности kx * ky, то выход будет иметь размерность, n*(w−kx+1)*(h−ky+1)


In [None]:
class SimpleCarClassifier(nn.Module):
    def __init__(self):
        super(SimpleCarClassifier, self).__init__()
        # 128x128x3 => 126x126x32
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3)
        self.flatten = nn.Flatten()
        self.activation = nn.ReLU()
        self.activation_soft_max = nn.Softmax(dim=1)
        self.d1 = nn.Linear(126 * 126 * 32, 256)
        self.d2 = nn.Linear(256, 2)

    def forward(self, x):
        # 32x1x28x28 => 32x32x26x26
        x = self.activation(self.conv1(x))
        # flatten => 32 x (32*26*26)
        x = self.flatten(x)
        # 32 x (32*26*26) => 32x128
        x = self.activation(self.d1(x))
        # logits => 32x10
        x = self.activation_soft_max(self.d2(x))
        return x

In [None]:
class ModifiedCarClassifier(nn.Module):
    def __init__(self):
        super(ModifiedCarClassifier, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1)
        self.activation = nn.ReLU()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(126 * 126 * 64, 512)
        self.fc2 = nn.Linear(512, 2)  

    def forward(self, x):
        x = self.pool(self.activation(self.conv1(x)))
        x = self.pool(self.activation(self.conv2(x)))
        x = self.pool(self.activation(self.conv3(x)))
        x = self.flatten(x)
        x = self.activation(self.fc1(x))
        x = self.fc2(x)
        return x

### Гиперпараметры для Pool2D
1) **Тип операции объединения** - не является параметром. Можно изменить, используя импортирование различных функций (максимальное, среднее). 

    Максимальное объединение часто используется для выделения ключевых признаков, тогда как среднее объединение может быть полезным для создания более устойчивых признаков.
2) **kernel_size** - размер ядра (окна объединение). 
    
    Большие размеры окон объединения уменьшат размер карт признаков более агрессивно и могут привести к уменьшению пространственной разрешающей способности сети. Меньшие размеры окон сохраняют более детализированную информацию.

Flatten - сворачивает n-мерный тензор в одномерный

> Когда применяется сверточный слой к изображению, выходом является трехмерный массив (высота, ширина, количество каналов). Перед передачей данных в полносвязный слой, который ожидает одномерный вектор, требуется операция "flatten" для преобразования трехмерного массива в одномерный.

In [None]:
class AnotherCarClassifier(nn.Module):
    def __init__(self, num_classes=2):
        super(AnotherCarClassifier, self).__init__()

        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1)
        self.activation = nn.ReLU()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)
        self.drop = nn.Dropout(0.2)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(256 * 16 * 16, 512)
        self.fc2 = nn.Linear(512, num_classes)

    def forward(self, x):
        x = self.pool(self.activation(self.conv1(x)))
        x = self.pool(self.activation(self.conv2(x)))
        x = self.pool(self.activation(self.conv3(x)))
        x = self.drop(x)
        x = self.flatten(x)
        x = self.activation(self.fc1(x))
        x = self.drop(x)
        x = self.fc2(x)
        return x

In [None]:
def losses_plot(losses: list[float]):
    plt.plot(losses)
    plt.xlabel("Эпоха")
    plt.ylabel("Потери")
    plt.title("Потери в обучении на протяжении эпох")
    plt.show()

def accuracy_plot(accuracy: list[float]):
    plt.plot(accuracy)
    plt.xlabel("Эпоха")
    plt.ylabel("Точность")
    plt.title("Точность обучения на протяжении эпох")
    plt.show()

In [None]:
from sklearn.metrics import confusion_matrix

def plot_confusion_matrix(y_true, y_pred, classes, normalize=False, title=None, cmap=plt.cm.Blues):
    cm = confusion_matrix(y_true, y_pred)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        fmt = '.2f'
        print("Нормализованная матрица ошибок")
    else:
        fmt = 'd'
        print('Матрица ошибок без нормализации')

    plt.figure(figsize=(8, 6))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()

    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    plt.ylabel('Истинные метки')
    plt.xlabel('Предсказанные метки')
    plt.tight_layout()

    for i in range(len(classes)):
        for j in range(len(classes)):
            plt.text(j, i, format(cm[i, j], fmt), horizontalalignment="center", color="white" if cm[i, j] > 0.3 else "black")

    plt.show()

In [None]:
def get_accuracy(logit, target, batch_size):
    corrects = (torch.max(logit, 1)[1].view(target.size()).data == target.data).sum()
    accuracy = 100.0 * corrects / batch_size
    return accuracy.item()

def get_model_accuracy(_net):
    correct_pred = {classname: 0 for classname in classes}
    total_pred = {classname: 0 for classname in classes}
    correct = 0
    total = 0
    y_true_list = []
    y_pred_list = []

    with torch.no_grad():
        for images, labels in test_data_loader:
            images, labels = images.to(DEVICE), labels.to(DEVICE)

            outputs = _net(images)
            _, predictions = torch.max(outputs, 1)
            y_true_list.extend(labels.numpy())
            y_pred_list.extend(predictions.numpy())
            total += labels.size(0)
            correct += (predictions == labels).sum().item()
            # собираем правильные прогнозы для каждого класса
            for label, prediction in zip(labels, predictions):
                if label == prediction:
                    correct_pred[classes[label]] += 1
                total_pred[classes[label]] += 1
    
    print(f"Точность сети: {100 * correct // total}%")

    # Выводим точность на каждом классе
    for classname, correct_count in correct_pred.items():
        accuracy = 100 * float(correct_count) / total_pred[classname]
        print(f'Точность для класса: {classname} is {accuracy:.1f}%')

    y_true = np.array(y_true_list)
    y_pred = np.array(y_pred_list)

    plot_confusion_matrix(y_true, y_pred, classes, normalize=True)

In [None]:
def train_model(_criterion, _optimizer, _net, _epoch_num):
    epoch_loss: list[float] = []
    epoch_accuracy: list[float] = []

    for epoch in range(_epoch_num):
        running_loss = 0.0
        accuracy = 0.0
        for images, labels in train_data_loader:

            inputs, labels = images.to(DEVICE), labels.to(DEVICE)

            _optimizer.zero_grad()

            outputs = _net(inputs)
            loss = _criterion(outputs, labels)
            loss.backward()
            _optimizer.step()

            running_loss += loss.item()
            # accuracy += get_accuracy(outputs, labels, BATCH_SIZE)
        epoch_loss_temp = running_loss / len(train_data_loader)
        epoch_accuracy_temp = accuracy / len(train_data_loader)

        epoch_loss.append(epoch_loss_temp)
        epoch_accuracy.append(epoch_accuracy_temp)
        print(
            "Epoch: %d | Loss: %.4f | Train Accuracy: %.2f "
            % (epoch, epoch_loss_temp, epoch_accuracy_temp)
        )

    return epoch_loss, epoch_accuracy

In [None]:
import torch.optim as optim

simple_model = SimpleCarClassifier().to(DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(simple_model.parameters(), lr=0.001)
simple_losses, simple_accuracy = train_model(criterion, optimizer, simple_model, 5)

In [None]:
losses_plot(simple_losses)
accuracy_plot(simple_accuracy)
get_model_accuracy(simple_model)

In [None]:
pretrained_resnet = tv.models.resnet18(pretrained=True)

# Заморозка параметров нижних слоев
for param in pretrained_resnet.parameters():
    param.requires_grad = False

# Изменение архитектуры верхних слоев
pretrained_resnet.fc = nn.Sequential(
    nn.Linear(pretrained_resnet.fc.in_features, 128),
    nn.ReLU(),
    nn.Linear(128, 2),  
)

In [None]:
preptrained_model = pretrained_resnet

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(preptrained_model.parameters(), lr=0.001)
pretrained_losses, pretrained_accuracy = train_model(criterion, optimizer, preptrained_model, 25)

In [None]:
losses_plot(pretrained_losses)
accuracy_plot(pretrained_accuracy)
get_model_accuracy(preptrained_model)

In [None]:
modified_model = ModifiedCarClassifier()

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(modified_model.parameters(), lr=0.001)
modified_losses, modified_accuracy = train_model(criterion, optimizer, modified_model, 25)

In [None]:
losses_plot(modified_losses)
accuracy_plot(modified_accuracy)
get_model_accuracy(modified_model)

In [None]:
another_model = AnotherCarClassifier()

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(another_model.parameters(), lr=0.001)
another_losses, another_accuracy = train_model(criterion, optimizer, another_model, 25)

In [None]:
losses_plot(another_losses)
accuracy_plot(another_accuracy)
get_model_accuracy(another_model)