In [None]:
#Код нейронной сети, модификация для данных RGB. 

#Установка нужных библиотек. Использовалась версия PyTorch 2.3.1
import torch
import torch.nn as nn
from torch.nn import functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau, CosineAnnealingLR

import torchvision
from torchvision.transforms import v2 
from torchvision.datasets import ImageFolder

from torch.nn import Linear, Conv2d, ReLU, Sigmoid, MaxPool2d, BatchNorm2d, Dropout, Flatten, Sequential

from sklearn.metrics import confusion_matrix, classification_report

import rasterio
from PIL import Image

import numpy as np
import os
from datetime import datetime

import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
#Аугментация данных
train_transforms = v2.Compose([
    v2.ToImage(),
    v2.ToDtype(torch.float32, scale=True),
    v2.RandomResizedCrop(size=(40, 40), antialias=True, scale=(0.7, 1.0)),
    v2.RandomHorizontalFlip(p=0.5),
    v2.RandomVerticalFlip(p=0.5),
    v2.RandomRotation(degrees=(-180, 180)), 
    v2.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.8, 1.2), shear=(-15, 15)),
    v2.RandomAdjustSharpness(sharpness_factor=2, p=0.5),
    v2.RandomAutocontrast(p=0.5),
    v2.RandomEqualize(p=0.1),
    v2.GaussianBlur(kernel_size=3, sigma=(0.1, 1.0)),
    v2.Normalize(mean=[0.5], std=[0.5]),
])

val_test_transforms = v2.Compose([
    v2.ToImage(),
    v2.ToDtype(torch.float32, scale=True),
    v2.Resize(size=(40, 40), antialias=True),
    v2.Normalize(mean=[0.5], std=[0.5]),
])

In [None]:
class cnn_model(nn.Module):
    def __init__(self, shape):
        super(cnn_model, self).__init__()


        self.batch_size = shape[0]
        self.channels = shape[1]
        self.height = shape[2]
        self.width = shape[3]
        
        conv_layers = [Conv2d(
            in_channels=3,
            out_channels=32,
            kernel_size=3,
            padding='same',
            padding_mode='reflect',
            ),
        BatchNorm2d(32),
        ReLU (inplace = True),
        Conv2d(
            in_channels=32, 
            out_channels=32, 
            kernel_size=3, 
            padding='same', 
            padding_mode='reflect',
            ),
        BatchNorm2d(32),
        ReLU(inplace=True),               
        MaxPool2d(kernel_size=2),
        Dropout(0.2),
               
        Conv2d(
            in_channels=32,
            out_channels=64,
            kernel_size=3,
            padding='same',
            padding_mode='reflect',
        ),
        BatchNorm2d(64),
        ReLU(inplace = True),
        Conv2d(
            in_channels=64, 
            out_channels=64, 
            kernel_size=3, 
            padding='same', 
            padding_mode='reflect'),
        BatchNorm2d(64),
        ReLU(inplace=True),
        MaxPool2d(kernel_size=2),
        Dropout(0.3),
               
        Conv2d(
            in_channels=64,
            out_channels=128,
            kernel_size=3,
            padding='same',
            padding_mode='reflect',
            ),
        BatchNorm2d(128),
        ReLU(inplace = True),
        Conv2d(
            in_channels=128, 
            out_channels=128, 
            kernel_size=3, 
            padding='same', 
            padding_mode='reflect'),
        BatchNorm2d(128),
        ReLU(inplace=True),
        MaxPool2d(kernel_size=2),
        Dropout(0.4),
        ]

        linear_layers = [Linear(3200 , 256), ReLU(inplace=True), Dropout(0.5), Linear(256, 64), ReLU(inplace=True), Linear(64, 1), Sigmoid()]

        layers = conv_layers + [Flatten()] + linear_layers
        self.model = Sequential(*layers)
        
    def forward(self, x):
        return self.model(x)


    def train_model(self, train_loader, val_loader=None, epochs=100, patience=10):
        metrics = {
                'train': {'loss': [], 'accuracy': []},
                'val': {'loss': [], 'accuracy': []}
        }
        device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')
        self.to(device)

        criterion = nn.BCELoss()
        optimizer = optim.Adam(self.model.parameters(), lr=0.001, weight_decay=1e-4)
        scheduler = ReduceLROnPlateau(optimizer, 'min', factor=0.5, patience=5, verbose=True)

        best_val_loss = float('inf')
        no_improvement = 0

        for epoch in range(epochs):
            ep_metrics = {
                'train': {'loss': 0, 'accuracy': 0, 'count': 0},
                'val': {'loss': 0, 'accuracy': 0, 'count': 0},
            }
            print(f'Epoch {epoch+1}/{epochs}')
            
            self.model.train()
            for images, labels in train_loader:
                optimizer.zero_grad()

                images = images.to(device)
                labels = labels.float().to(device)
                        
                output = self(images).view(-1)
                loss = criterion(output, labels)
                        
                correct_preds = (output > 0.65).float() == labels
                accuracy = correct_preds.sum()/len(labels)
                        
                loss.backward()
                optimizer.step()
                
                ep_metrics['train']['loss'] += loss.item()
                ep_metrics['train']['accuracy'] += accuracy.item()
                ep_metrics['train']['count'] += 1
                
            if val_loader:
                self.model.eval()
                with torch.no_grad():
                    for images, labels in val_loader:
                        images = images.to(device)
                        labels = labels.float().to(device)
                        
                        output = self(images).view(-1)
                        loss = criterion(output, labels)
                        
                        correct_preds = (output > 0.65).float() == labels
                        accuracy = correct_preds.sum() / len(labels)
                        
                        ep_metrics['val']['loss'] += loss.item()
                        ep_metrics['val']['accuracy'] += accuracy.item()
                        ep_metrics['val']['count'] += 1
                
                train_loss = ep_metrics['train']['loss']/ep_metrics['train']['count']
                train_acc = ep_metrics['train']['accuracy']/ep_metrics['train']['count']

                print(f'Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.4f}')
            
                metrics['train']['loss'].append(train_loss)
                metrics['train']['accuracy'].append(train_acc)
            
                if val_loader:
                    val_loss = ep_metrics['val']['loss'] / ep_metrics['val']['count']
                    val_acc = ep_metrics['val']['accuracy'] / ep_metrics['val']['count']
                
                    print(f'Val Loss: {val_loss:.4f}, Val Accuracy: {val_acc:.4f}')
                
                    metrics['val']['loss'].append(val_loss)
                    metrics['val']['accuracy'].append(val_acc)
            
                    scheduler.step(val_loss)
                
                    if val_loss < best_val_loss:
                        best_val_loss = val_loss
                        no_improvement = 0
                        torch.save(self.state_dict(), 'best_road_model.pth')
                    else:
                        no_improvement += 1
                
                    if no_improvement >= patience:
                        print(f'Early stopping at epoch {epoch+1}')
                        break
                print('End')
                
            if val_loader:
                self.load_state_dict(torch.load('best_road_model.pth'))
        return metrics


    def evaluate(self, test_loader):
        device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')
        self = self.to(device=device)
        self.model.eval()
        
        criterion = nn.BCELoss()
        tot_loss = 0
        tot_acc = 0
        count = 0
        
        preds = []
        actual = []
        
        with torch.no_grad():
            for images, labels in test_loader:
                images, labels = images.to(device), labels.to(device)
                
                output = self(images).view(-1)
                loss = criterion(output, labels.float())
                
                pred_labels = (output > 0.5).float()
                accuracy = (labels.float() == pred_labels).sum() / len(labels)
                
                tot_loss += loss.item()
                tot_acc += accuracy.item()
                count += 1
                
                preds.extend(pred_labels.cpu().numpy())
                actual.extend(labels.cpu().numpy())
        
        print(f"Test Loss: {tot_loss / count:.4f}, Test Accuracy: {tot_acc / count:.4f}")
        
        return {
            'loss': tot_loss / count,
            'accuracy': tot_acc / count,
            'predictions': preds,
            'actual': actual}

In [None]:
#Функция загрузки обучающей выборки
set_path = 'set_path' #Заменить на путь к набору данных
def load_and_prepare_data(data_dir, batch_size=32):
    train_dataset = torchvision.datasets.ImageFolder(root=set_path, transform=train_transforms)
    
    train_size = int(0.7 * len(train_dataset))
    val_size = int(0.15 * len(train_dataset))
    test_size = len(train_dataset) - train_size - val_size
    
    train_set, val_set, test_set = torch.utils.data.random_split(train_dataset, [train_size, val_size, test_size] )
    
    val_dataset = torchvision.datasets.ImageFolder(root=set_path, transform=val_test_transforms)
    
    val_indices = val_set.indices
    val_set = torch.utils.data.Subset(val_dataset, val_indices)
    
    test_dataset = torchvision.datasets.ImageFolder(root=set_path, transform=val_test_transforms)
    
    test_indices = test_set.indices
    test_set = torch.utils.data.Subset(test_dataset, test_indices)
    
    train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)
    
    val_loader = torch.utils.data.DataLoader(val_set, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True)
    
    test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True)
    
    return {
        "train": train_loader,
        "val": val_loader,
        "test": test_loader
    }

In [None]:
#Создание графиков для обучения
def plot_training_history(metrics, save_path):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

    ax1.plot(metrics['train']['loss'], label='Обучающая выборка', color = 'purple')
    ax1.plot(metrics['val']['loss'], label='Валидационная выборка', color = 'pink')
    ax1.set_title('Значения функции потерь в зависимости от эпохи обучения')
    ax1.set_xlabel('Эпохи')
    ax1.set_ylabel('Функция потерь')
    ax1.legend()
    
    ax2.plot(metrics['train']['accuracy'], label='Обучающая выборка', color = 'purple')
    ax2.plot(metrics['val']['accuracy'], label='Валидационная выборка', color = 'brown')
    ax2.set_title('Значения точности (accuracy) в зависимости от эпохи обучения')
    ax2.set_xlabel('Эпохи')
    ax2.set_ylabel('Точность')
    ax2.legend()
    
    plt.tight_layout()
    plt.savefig(save_path)
    plt.close()

def plot_confusion_matrix(true_labels, predictions, save_path):
    cm = confusion_matrix(true_labels, predictions)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='RdPu')
    plt.title('Матрица ошибок')
    plt.ylabel('Фактический класс')
    plt.xlabel('Предсказанный класс')
    plt.savefig(save_path)
    plt.close()

In [None]:
BATCH_SIZE = 32
EPOCHS = 200
PATIENCE = 15

results_dir = 'results_path' #Заменить на директорию для результатов обучения модели

In [None]:
#Обучение модели
def main():
    data_loaders = load_and_prepare_data(set_path, batch_size=BATCH_SIZE)
    
    shape = (BATCH_SIZE, 3, 40, 40)
    model = cnn_model(shape)
    
    
    metrics = model.train_model(
        train_loader=data_loaders["train"],
        val_loader=data_loaders["val"],
        epochs=EPOCHS,
        patience=PATIENCE
    )
    
    plot_training_history(metrics, os.path.join(results_dir, "training_history.png"))
    
    test_results = model.evaluate(data_loaders["test"])
    
    y_true = test_results['actual']
    y_pred = test_results['predictions']
    class_report = classification_report(y_true, y_pred)
    
    with open(os.path.join(results_dir, "classification_report.txt"), 'w') as f:
        f.write(f"Test Loss: {test_results['loss']:.4f}\n")
        f.write(f"Test Accuracy: {test_results['accuracy']:.4f}\n\n")
        f.write("Classification Report:\n")
        f.write(class_report)
    
    plot_confusion_matrix(y_true, y_pred, os.path.join(results_dir, "confusion_matrix.png"))
    
    device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')
    model = model.to(device)
    model.eval()
    
    all_probs = []
    all_labels = []
    
    with torch.no_grad():
        for images, labels in data_loaders["test"]:
            images, labels = images.to(device), labels.to(device)
            
            output = model(images).view(-1)
            
            all_probs.extend(output.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    

    torch.save(model.state_dict(), os.path.join(results_dir, "final_model.pth"))
    
    print(f"\nОбучение завершено. Результаты сохранены в директории: {results_dir}")

if __name__ == "__main__":
    main()

In [None]:
#Код, применяющий обученную модель, расположенную по пути final_model, к изображению

class PatchBasedInference:
    def __init__(self, cnn_model, patch_size=40, device='cuda' if torch.cuda.is_available() else 'cpu'):

        self.model = cnn_model
        self.patch_size = patch_size
        self.device = device
        self.model = self.model.to(device)
        self.model.model.eval()
        
        self.transform = v2.Compose([
            v2.ToImage(),
            v2.ToDtype(torch.float32, scale=True),
            v2.Resize(size=(patch_size, patch_size), antialias=True),
            v2.Normalize(mean=[0.5], std=[0.5]),
        ])
        
    def load_image(self, image_path):
        image = Image.open(image_path).convert('RGB') 
        image_np = np.array(image)
        return image, image_np
    
    def extract_patches(self, image_np):
        height, width, channels = image_np.shape
        stride = self.patch_size - 20 #Параметр перекрытия патчей (20 = 50%). Настраивается в зависимости от результатов (но может привести к большим времязатратам
        
        patches = []
        positions = []
        
        for y in range(0, height, stride):
            for x in range(0, width, stride):
                y1 = y
                x1 = x
                y2 = min(y1 + self.patch_size, height)
                x2 = min(x1 + self.patch_size, width)

                
                patch = patch = image_np[y1:y2, x1:x2]

                if patch.shape[0] != self.patch_size or patch.shape[1] != self.patch_size:
                    temp_patch = np.zeros((self.patch_size, self.patch_size, 3), dtype=np.uint8)
                    temp_patch[:patch.shape[0], :patch.shape[1], :] = patch
                    patch = temp_patch
                
                patches.append(patch)
                positions.append((y1, x1, y2, x2))
        
        return patches, positions
    
    def process_patches(self, patches):
        predictions = []
        confidences = []
        
        with torch.no_grad():
            for patch in patches:
                patch_pil = Image.fromarray(patch)
                patch_tensor = self.transform(patch_pil).unsqueeze(0).to(self.device)
                
                output = self.model.model(patch_tensor).view(-1)
                confidence = output.item()
                pred = (confidence > 0.5) #Значение 0.5 может меняться в большую сторону, если результат зашумлен
                
                predictions.append(pred)
                confidences.append(confidence)
        
        return predictions, confidences

    
    def create_visualization(self, predictions, confidences, positions, original_image):
        visualization = np.zeros(original_image.shape, dtype=np.uint8)

        for pred, conf, (y1, x1, y2, x2) in zip(predictions, confidences, positions):
            if pred:
                color = [255, 255, 255]
            else:
                color = [0, 0, 0]

            for c in range(3):
                visualization[y1:y2, x1:x2, c] = color[c]
    
        return np.clip(visualization, 0, 255).astype(np.uint8)

    def process_image(self, image_path, output_path=True, show_result=True):
        image, image_np = self.load_image(image_path)
        
        patches, positions = self.extract_patches(image_np)
        
        predictions, confidences = self.process_patches(patches)

        result_image = None
        
        result_image = self.create_visualization(predictions, confidences, positions, image_np)
        
        if result_image.ndim == 4:
            result_image = np.squeeze(result_image)
        result_image = result_image.astype(np.uint8)

        if output_path:
            Image.fromarray(result_image).save(output_path)

        if show_result:
            plt.figure(figsize=(12, 6))
            plt.imshow(result_image)
            plt.axis('off')
            plt.show()
        
        return result_image


input_shape = (1, 3, 40, 40)
model = cnn_model(input_shape)
model.load_state_dict(torch.load(final_model))
model.model.eval()

processor = PatchBasedInference(model, patch_size=40)


image_path = 'image_path' #П  
output_path = r'C:\kursach\applied\other_plots\results\rlipetsk_2.jpg'