<a href="https://colab.research.google.com/github/LeraUseinova/SmokeDetection/blob/main/SmokeDetection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!unzip /content/drive/MyDrive/Dataset/Datasets.zip -d /content/dataset

In [None]:
!pip install torchmetrics

In [4]:
import torch
import numpy as np
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
from torch.optim.lr_scheduler import StepLR
import matplotlib.pyplot as plt
from PIL import Image
import random
import torch.nn as nn
import torchvision.models as models
from collections import OrderedDict
from torchvision.models.resnet import BasicBlock
from torchvision.models.resnet import Bottleneck
import torchmetrics
from sklearn.metrics import f1_score 
from sklearn.metrics import confusion_matrix

In [5]:
# Гиперпараметры
num_epochs = 100
num_classes = 6
batch_size = 32
learning_rate = 0.001
weight_decay = 0.0001

# Конфигурация устройства
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Пути к папкам с данными
train_dir = "/content/dataset/Датасет/train"
val_dir = "/content/dataset/Датасет/validation"
test_dir = "/content/dataset/Датасет/test"

In [None]:
"""
Вычисление среднего и стандартного отклонения по тренировочному датасету 
""" 


dataset = datasets.ImageFolder(train_dir, transform=transforms.ToTensor())
loader = DataLoader(dataset, batch_size=batch_size,num_workers=2)

def mean_std(loader):
    
    # cnt — общее количество пикселей во всем наборе данных, fst_moment и snd_moment — первый и второй моменты набора данных соответственно
    cnt = 0
    fst_moment = torch.empty(3, dtype=torch.float32)
    snd_moment = torch.empty(3, dtype=torch.float32)

    # no_grad отключает вычисление градиента и сокращает использование памяти
    with torch.no_grad():
      # Перебираем изображения, игнорируя метки
      for images, _ in loader:
        # Вычисляем количество пикселей в тензоре images
        b, c, h, w = images.shape
        nb_pixels = b * h * w 
        # Сумма значений каждого пикселя по ширине и высоте изображения
        sum_ = torch.sum(images, dim=[0, 2, 3], dtype=torch.float32) 
        # Сумма квадратов значений каждого пиксела по ширине и высоте изображения
        sum_of_square = torch.sum(images ** 2, dim=[0, 2, 3], dtype=torch.float32)
        # Обновляем значения первого и второго момента, и количество пикселей для текущего набора
        fst_moment = (cnt * fst_moment + sum_) / (cnt + nb_pixels)
        snd_moment = (cnt * snd_moment + sum_of_square) / (cnt + nb_pixels)
        cnt += nb_pixels
    # Вычисляем среднее значение и стандартное отклонение
    mean, std = fst_moment, torch.sqrt(snd_moment - fst_moment ** 2)       
    return mean,std

mean, std = mean_std(loader)
print("mean and std: \n",mean, std)

In [None]:
"""
 Преобразование изображений
"""
# Набор преобразований данных, который включает случайные преобразования для увеличения разнообразия данных
data_transform = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std)
])
# Набор преобразований данных, который используется для тестирования и валидации и не включает случайных преобразований, 
# чтобы сохранить исходное изображение
transform = transforms.Compose([
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std)
])

# Загрузка датасетов
train_dataset = datasets.ImageFolder(train_dir, transform=data_transform)
val_dataset = datasets.ImageFolder(val_dir, transform=transform)
test_dataset = datasets.ImageFolder(test_dir, transform=transform)

# Определяем загрузчики данных
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size)
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size)

classes = train_dataset.classes

print('Classes: ',  classes)
print('The train dataset have: ',  len(train_dataset) ," images.")
print('Number of train downloaded images per batch: ',  len(train_dataloader))
print('The valid dataset have: ',  len(val_dataset)," images.")
print('Number of valid downloaded images per batch: ',  len(val_dataloader))
print('The test dataset have: ', len(test_dataset) ," images.")
print('Number of test downloaded images per batch: ',  len(test_dataloader))


In [None]:
"""
Вычисление среднего и стандартного отклонения после преобразования
"""

mean_normal, std_normal = mean_std(train_dataloader)
print("mean and std after normalize:\n",
      mean_normal, std_normal)

In [None]:
"""
Вывод изображений до и после преобразования
"""

original_dataset = ImageFolder(train_dir)

# Вывод по одной случайной картинке из каждого класса, после преобразований
fig, axes = plt.subplots(nrows=len(original_dataset.class_to_idx), ncols=2, figsize=(10, 4 * len(original_dataset.class_to_idx)))

for i, (class_name, class_idx) in enumerate(original_dataset.class_to_idx.items()):
    # Извлечение всех изображений данного класса
    images = original_dataset.imgs
    class_images = [img[0] for img in images if img[1] == class_idx]

    # Выбор случайного изображения из данного класса
    torch.manual_seed(123)
    random_image_path = random.sample(class_images, 1)[0]
    image = Image.open(random_image_path)

    # Применение преобразований к изображению
    transformed_image = data_transform(image)

    # Вывод изображений до и после преобразований
    axes[i][0].imshow(image)
    axes[i][0].set_title(f'{class_name} (original)')
    axes[i][0].axis("off")
    axes[i][1].imshow(transformed_image.permute(1, 2, 0))
    axes[i][1].set_title(f'{class_name} (transformed)')
    axes[i][1].axis("off")

plt.tight_layout()
plt.show()

In [None]:
"""
Добавляем к предобученной модели дропаут и механизмы внимания
"""
# Определение класса, реализующего пространственное внимание
class SpatialAttention(nn.Module):
    def __init__(self, in_channels):
        super(SpatialAttention, self).__init__()
        # Conv2d слой с ядром 1x1, который сокращает количество каналов на входе в 8 раз
        self.conv1 = nn.Conv2d(in_channels, in_channels // 8, kernel_size=1) 
        # Conv2d слой с ядром 1x1, который преобразует выходной тензор conv1 в маску пространственного внимания
        self.conv2 = nn.Conv2d(in_channels // 8, 1, kernel_size=1)
        # Адаптивный слой пулинга среднего значения, который вычисляет среднее по каждому каналу 
        self.avg_pool = nn.AdaptiveAvgPool2d(1) 
        # Адаптивный слой максимального пулинга, который вычисляет максимальное значение по каждому каналу
        self.max_pool = nn.AdaptiveMaxPool2d(1) 
        # Сигмоидный слой, который вычисляет маску пространственного внимания
        self.sigmoid = nn.Sigmoid() 

    def forward(self, x):
        # Применение адаптивного слоя среднего значения для вычисления среднего по каждому каналу
        avg_out = self.avg_pool(x) 
        # Проход данных через первый слой свертки с ядром 1x1
        avg_out = self.conv1(avg_out)
        # Проход данных через второй слой свертки с ядром 1x1, чтобы получить маску внимания, отображающую важность каждой области изображения
        avg_out = self.conv2(avg_out) 
        # Применение адаптивного слоя максимального значения для вычисления максимального значения по каждому каналу
        max_out = self.max_pool(x) 
        # Проход данных через первый слой свертки, аналогичный avg_out, но использующий max_out в качестве входа
        max_out = self.conv1(max_out) 
        # Проход данных через второй слой свертки для получения маски внимания, используя max_out
        max_out = self.conv2(max_out) 
        # Объединение двух масок внимания вместе с использованием операции поэлементной конкатенации по оси каналов
        out = torch.cat([avg_out, max_out], dim=1) 
        # Вычисление маски внимания, используя сигмоидную функцию для преобразования значений маски в промежутке от 0 до 1
        mask = self.sigmoid(out)
        return x * mask

# Определение класса, реализующего канальное внимание
class ChannelAttention(nn.Module):
    def __init__(self, in_planes, ratio=16):
        super(ChannelAttention, self).__init__()
        # Адаптивный слой пуллинга среднего значения, который вычисляет среднее по каждому каналу
        self.avg_pool = nn.AdaptiveAvgPool2d(1) 
        # Адаптивный слой максимального пулинга, который вычисляет максимальное значение по каждому каналу
        self.max_pool = nn.AdaptiveMaxPool2d(1) 
        # Conv2d слой с ядром 1x1, который сокращает количество каналов изображения в ratio раз
        self.fc1 = nn.Conv2d(in_planes, in_planes // ratio, 1, bias=False)  
        self.relu = nn.ReLU(inplace=True)
        # Conv2d слой с ядром 1x1, который восстанавливает количество каналов до исходного значения
        self.fc2 = nn.Conv2d(in_planes // ratio, in_planes, 1, bias=False)  
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # Применение адаптивного слоя среднего значения для вычисления среднего по каждому каналу
        avg_pool = self.avg_pool(x) 
        # Применение адаптивного слоя максимального значения для вычисления максимального значения по каждому каналу
        max_pool = self.max_pool(x) 
        # Выполнение операций свертки для уменьшения количества каналов тензора и получение маски внимания, 
        # основанной на данных, полученных с использованием адаптивного пулинга среднего значения
        avg_out = self.fc2(self.relu(self.fc1(avg_pool))) 
        # Выполнение операций свертки для получения маски внимания на основе данных, 
        # полученных с использованием адаптивного пулинга максимального значения
        max_out = self.fc2(self.relu(self.fc1(max_pool))) 
        # Объединение двух масок внимания вместе с использованием операции поэлементной конкатенации по оси каналов
        out = torch.cat([avg_out, max_out], dim=1) 
        x = self.sigmoid(x)
        return x

# Определение класса ResNetWithAttention
class ResNetWithAttention(nn.Module):
    def __init__(self, num_classes=6):
        super(ResNetWithAttention, self).__init__()
        self.resnet = models.resnet18(pretrained=True) # Загружаем предобученную нейронную сеть ResNet18 из библиотеки PyTorch
        
        # Добавляем модули канального и пространсвенного внимания к каждому остаточному блоку.
        for module in self.resnet.modules():
            if isinstance(module, nn.Sequential):
                for i in range(len(module)):
                    if isinstance(module[i], BasicBlock):
                        module[i].ca = ChannelAttention(module[i].conv2.out_channels)
                        module[i].sa = SpatialAttention(module[i].conv2.in_channels)
                    elif isinstance(module[i], Bottleneck):
                        module[i].ca = ChannelAttention(module[i].conv3.out_channels)
                        module[i].sa = SpatialAttention(module[i].conv3.in_channels)

        # Заменяем последний полносвязный слой, чтобы на выходе получалось 6 классов
        num_features = self.resnet.fc.in_features
        self.resnet.fc = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(num_features, num_classes)
        )

    def forward(self, x):
        # Тензор x проходит через операции сверточного слоя, батч-нормализации и функции активации. 
        x = self.resnet.conv1(x)
        x = self.resnet.bn1(x)
        x = self.resnet.relu(x)
        x = self.resnet.maxpool(x) # Полученный тензор проходит через операцию пулинга maxpool

        # Тензор проходит через последовательности слоев ResNet 
        x = self.resnet.layer1(x)
        x = self.resnet.layer2(x)
        x = self.resnet.layer3(x)
        x = self.resnet.layer4(x)

        # Тензор проходит через операцию пулинга avgpool
        x = self.resnet.avgpool(x)
        x = torch.flatten(x, 1) # тензор "распрямляется" в вектор, используя функцию flatten

        # Полученный вектор проходит через полносвязный слой fc
        x = self.resnet.fc(x)

        return x

model = ResNetWithAttention()
            
# Перемещение модели на GPU
model.to(device)

In [None]:
"""
Обучаем модель
"""
# Определяем класс EarlyStopping
class EarlyStopping:
    def __init__(self, patience=10, delta=0, path='checkpoint.pt'):
        # Задаем параметры EarlyStopping: число эпох, в течение которых мы ждем улучшения модели; значение delta, 
        # являющееся изменением в loss-функции, с которым мы считаем ее улучшение; путь для сохранения весов модели; счетчик неулучшений; 
        # наилучшее значение метрики качества; флажок ранней остановки; наилучшее значение метрики качества во время валидации.
        self.patience = patience
        self.delta = delta
        self.path = path
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf

    def __call__(self, val_loss, model):
        score = -val_loss # Подсчет значения оценки качества модели

        # Если это первый запуск, устанавливаем score как лучшее значение
        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        # Если модель не продемонстрировала достаточного улучшения, увеличиваем счетчик и проверяем, не истек ли лимит
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        # Если новое значение оценки качества модели лучше, сохраняем веса и сбрасываем счетчик
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        # Сохранение текущих весов модели в файл по указанному пути
        torch.save(model.state_dict(), self.path)
        # Обновление эталонного значения функции потерь на валидации
        self.val_loss_min = val_loss

early_stopping = EarlyStopping(patience=10)

loss_fn = nn.CrossEntropyLoss() # Определение функции потерь кросс-энтропия
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay) # Определение оптимизатора Adam 
# Определение планировщика ReduceLROnPlateau скорости обучения, 
# который мы будем использовать для изменения скорости обучения
# в зависимости от того, как меняется значение функции потерь
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5) 

# Код для обучения модели
train_loss = []
train_acc = []
val_loss = []
val_acc = []

for epoch in range(num_epochs):
    # Обучение модели на тренировочном датасете
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    for images, labels in train_dataloader:
        # Перемещаем тензоры на устройство
        images = images.to(device)
        labels = labels.to(device)

        # Прямое прохождение 
        outputs = model(images)
        loss = loss_fn(outputs, labels)

        # Добавление регуляризации
        for name, param in model.named_parameters():
            # Добавление L2-регуляризации только для весов
            if 'weight' in name:
                l2_regularization = torch.norm(param) # Вычисление L2-нормы весов
                loss += weight_decay * l2_regularization # Добавление к функции потерь вычисленное значение регуляризации с весом `weight_decay`

        # Обратное прхождение и оптимизация              
        optimizer.zero_grad() # Очистка градиентов, сохраненных в оптимизаторе
        loss.backward() # Обратное распространение градиента
        optimizer.step() # Обновление параметров модели на основе вычисленных градиентов

        # Сохраненяем статистику
        running_loss += loss.item() * images.size(0)
        _, preds = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (preds == labels).sum().item()

        # Очищаем память на устройстве
        del images, labels, outputs
        torch.cuda.empty_cache()

    # Вычисление функции потерь и точности на тренировочном датасете для текущей эпохи
    epoch_train_loss = running_loss / len(train_dataloader.dataset)
    epoch_train_acc = 100 * correct / total
    # Добавляем значения функции потерь и точности в списки
    train_loss.append(epoch_train_loss)
    train_acc.append(epoch_train_acc)

    # Проверка модели на валидационном датасете
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in val_dataloader:
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)
            loss = loss_fn(outputs, labels)

            # Сохраненяем статистику
            running_loss += loss.item() * images.size(0)
            _, preds = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (preds == labels).sum().item()

            # Очищаем память на устройстве
            del images, labels, outputs
            torch.cuda.empty_cache()

    # Вычисляем функции потерь и точности на валидационном датасете для текущей эпохи
    epoch_val_loss = running_loss / len(val_dataloader.dataset)
    epoch_val_acc = 100 * correct / total
    # Добавляем значения функции потерь и точности в списки
    val_loss.append(epoch_val_loss)
    val_acc.append(epoch_val_acc)

    print('Epoch [{}/{}], Train Loss: {:.4f}, Train Acc: {:.2f}%, Val Loss: {:.4f}, Val Acc: {:.2f}%'.format(epoch+1, num_epochs, epoch_train_loss, 
                                                                                                             epoch_train_acc, epoch_val_loss, 
                                                                                                             epoch_val_acc))
    
    # Сохраняем модель с лучшей точностью
    best_acc = 0
    if epoch_val_acc > best_acc:
        best_acc = epoch_val_acc
        torch.save({
            'model': model,
            'state_dict': model.state_dict()
            }, '/content/resnet50_model_with_dropout_2.pth')

    # Уменьшаем learning rate, если в течение 5 эпох точность не улучшается
    scheduler.step(epoch_val_loss)
    
    # Останавливаем обучение, если в течение 10 эпох точность не улучшается
    if early_stopping.early_stop:
        print("Early stopping")
        break

# Построение графиков
fig, ax = plt.subplots(1, 2, figsize=(12, 5))

ax[0].plot(train_loss, label='train')
ax[0].plot(val_loss, label='validation')
ax[0].set_title('Loss')
ax[0].legend()

ax[1].plot(train_acc, label='train')
ax[1].plot(val_acc, label='validation')
ax[1].set_title('Accuracy')
ax[1].legend()

In [None]:
"""
Тестируем модель
"""
# Загружаем модель
checkpoint = torch.load('/content/resnet50.pth')
model = checkpoint['model']
model.load_state_dict(checkpoint['state_dict'])

# Перемещение модели на устройство
model = model.to(device)

model.eval()
y_true = []
y_pred = []
with torch.no_grad():
    for images, labels in test_dataloader:
        # Перемещаем тензоры на устройство
        images = images.to(device)
        labels = labels.to(device)

        # Получаем прогнозы модели 
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)

        # Вычисляем колличество правильных предсказаний
        y_true.extend(labels.cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())

# Вычисление метрик
accuracy = torchmetrics.functional.accuracy(torch.tensor(y_pred), torch.tensor(y_true), num_classes=num_classes, task='multiclass')
precision = torchmetrics.functional.precision(torch.tensor(y_pred), torch.tensor(y_true), num_classes=num_classes, average='macro', task='multiclass')
recall = torchmetrics.functional.recall(torch.tensor(y_pred), torch.tensor(y_true), num_classes=num_classes, average='macro', task='multiclass')
f1 = f1_score(y_true, y_pred, average='macro')

print('Accuracy: {:.4f}%, Precision: {:.4f}, Recall: {:.4f}, F1 score: {:.4f}'.format(accuracy * 100, precision, recall, f1))

# Строим матрицу ошибок
cm = confusion_matrix(y_true, y_pred)
print("confusion matrix: \n", cm)
