Открытие гугл диска

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
import os
import os.path
from PIL import Image
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import random
from torchvision import transforms
import math
import torch.nn as nn
import torch.nn.functional as F
import time

Дата лоадер

In [None]:
def get_pathes(base_folder ):
    paths = []
    for i in range(11):
        folder_name = f'{i:02d}'
        image_folder = os.path.join(base_folder, folder_name, 'image_0')
        movements_file = os.path.join(base_folder, folder_name, f'{folder_name}.txt')
        paths.append((image_folder,movements_file))
    return paths

def get_data(batch_size, base_folder = '/content/drive/MyDrive'):
    paths = get_pathes(base_folder)

    # Выбор папок для тренировочных, валидационных и тестовых данных
    validation_folder = paths[-2]  # Предпоследняя папка
    test_folder = paths[-1]        # Последняя папка
    train_folders = paths[:-2]     # Все остальные папки

    # Чтение движений из файла
    def create_movements(movements_file):
        movements = []
        with open(movements_file, 'r') as f:
            for line in f:
                data = list(map(float, line.strip().split()))
                movements.append(data)
        return movements

    # movements = create_movements(movements_file)

    # Преобразование движений
    # def transform_mov(movements):
    #     new_movements = []
    #     for matrix in movements:
    #         new_matrix = [
    #             matrix[0:4],
    #             matrix[4:8],
    #             matrix[8:12],
    #             [0, 0, 0, 1]
    #         ]
    #         new_matrix = torch.tensor(new_matrix, dtype=torch.float32)
    #         new_movements.append(new_matrix)

    #     res_movements = []
    #     for i in range(len(new_movements) - 1):
    #         T1_inv = torch.linalg.inv(new_movements[i])
    #         T_rel = torch.matmul(new_movements[i + 1], T1_inv)
    #         res_matrix = T_rel[:3]
    #         res_matrix = torch.cat([res_matrix[0], res_matrix[1], res_matrix[2]])
    #         res_movements.append(res_matrix)


    #     return res_movements

    # def transform_mov(movements):
    #     res_movements = []
    #     for i in range(len(movements)-1):
    #         m1 = torch.tensor(movements[i])
    #         m2 = torch.tensor(movements[i+1])
    #         res = m2-m1
    #         res_movements.append(res)
    #     return res_movements
    step = 1

    def transform_mov(movements):
        res_movements = []
        # for i in range(len(movements)-1):
        for i in range(len(movements)-step):
            m1 = torch.tensor(movements[i])
            m2 = torch.tensor(movements[i+step])
            res = m2-m1
            distance = math.sqrt(pow(res[3],2) + pow(res[7],2) + pow(res[11],2))
            res_movements.append([distance ])
        return res_movements



    # movements = transform_mov(movements)

    # Оптимизированный класс Dataset
    class ImageDataset(Dataset):
        def __init__(self, image_folder, movements, transform=None ):
            # Получаем список всех файлов .png и сортируем их
            all_image_files = sorted([os.path.join(image_folder, file) for file in os.listdir(image_folder) if file.endswith('.png')])
            self.image_files = all_image_files
            self.movements = movements
            self.transform = transform

        def __len__(self):
            # Возвращаем минимальную длину между движениями и изображениями - 1
            return min(len(self.movements), len(self.image_files) - 1)


        def __getitem__(self, idx):
            image1_path = self.image_files[idx]
            # image2_path = self.image_files[idx + 1]
            image2_path = self.image_files[idx + step]

            image1 = Image.open(image1_path).convert('L')
            image2 = Image.open(image2_path).convert('L')

            if self.transform:
                image1 = self.transform(image1)
                image2 = self.transform(image2)

            movement = self.movements[idx].clone().detach().float()if isinstance(self.movements[idx], torch.Tensor) else torch.tensor(self.movements[idx], dtype=torch.float32)
            return (image1, image2), movement


    # Трансформации для уменьшения использования памяти
    transform = transforms.Compose([
        transforms.Grayscale(),
        # transforms.Resize((376,1240)),
        transforms.CenterCrop((370,1226)),
        # transforms.Resize((180,320)),
        transforms.ToTensor()
    ])

    def load_dataset(folders):
        images, movements = [], []
        for image_folder, movements_file in folders:
            mov = create_movements(movements_file)
            mov = transform_mov(mov)
            movements.append(mov)
            images.append(image_folder)
        return images, movements

    train_images, train_movements = load_dataset(train_folders)
    train_dataset = []
    for index,image_folder in enumerate(train_images):
        dataset = ImageDataset(image_folder, train_movements[index], transform=transform)
        train_dataset.append(dataset)
    train_dataset = torch.utils.data.ConcatDataset(train_dataset)

    # Загрузка валидационных данных
    val_images, val_movements = load_dataset([validation_folder])
    val_dataset = ImageDataset(val_images[0], val_movements[0], transform=transform)


   # Загрузка тестовых данных
    test_images, test_movements = load_dataset([test_folder])
    test_dataset = ImageDataset(test_images[0], test_movements[0], transform=transform)

    # DataLoaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle = True, num_workers=4, pin_memory=True)
    validation_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle = False, num_workers=4, pin_memory=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle = False, num_workers=4, pin_memory=True)

    return train_loader, validation_loader, test_loader


EVALUATION

In [None]:
def set_all_seeds(seed):
    os.environ["PL_GLOBAL_SEED"] = str(seed)
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

def compute_accuracy(model, data_loader, device):
    model.eval()
    total_loss = 0.0
    num_batches = 0
    correct_predictions = 0   # Счетчик корректных предсказаний
    total_samples = 0         # Общее количество элементов

    criterion = torch.nn.MSELoss()

    with torch.no_grad():
        for batch_idx, ((feature1, feature2),targets) in enumerate(data_loader):
            feature1 = feature1.to(device)
            feature2 = feature2.to(device)
            targets = targets.to(device)

            predicted_y = model(feature1, feature2)

            # Вычисление ошибки
            loss = criterion(predicted_y, targets)
            total_loss += loss.item()
            num_batches += 1

            # Проверка критерия качества
            relative_difference = torch.abs((predicted_y - targets))  # Расчет абсолютного отклонения
            valid_predictions = torch.all(relative_difference <= 0.1, dim=1)    # Условие: отклонение <= 10 см
            correct_predictions += torch.sum(valid_predictions).item()
            total_samples = total_samples + targets.size(0)


    avg_loss = total_loss / num_batches             # Расчет средней ошибки
    accuracy = correct_predictions / total_samples  # Расчет доли верных предсказаний
    return avg_loss, accuracy

Model

In [None]:
class Network(nn.Module):
    def __init__(self):
        super().__init__()

        self.features = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size = (7,7), stride = (2,2), padding=(3, 3)),# 185, 613
            nn.BatchNorm2d(32),
            nn.ReLU(inplace = True),

            nn.Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),# 185, 613
            nn.BatchNorm2d(32),
            nn.ReLU(inplace = True),

            nn.Conv2d(32, 64, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2)),#93, 307
            nn.BatchNorm2d(64),
            nn.ReLU(inplace = True),

            nn.Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),# 93, 307
            nn.BatchNorm2d(64),
            nn.ReLU(inplace = True),

            nn.Conv2d(64, 128, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2)),#47, 154
            nn.BatchNorm2d(128),
            nn.ReLU(inplace = True),

            nn.Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),#47, 154
            nn.BatchNorm2d(128),
            nn.ReLU(inplace = True),

            nn.Conv2d(128,256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1)),# 24, 77
            nn.BatchNorm2d(256),
            nn.ReLU(inplace = True),

            nn.Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),# 24, 77
            nn.BatchNorm2d(256),
            nn.ReLU(inplace = True),

            nn.Conv2d(256,512, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1)),# 12, 39
            nn.BatchNorm2d(512),
            nn.ReLU(inplace = True),

            nn.Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),# 12, 39
            nn.BatchNorm2d(512),
            nn.ReLU(inplace = True),

            nn.Conv2d(512, 512, kernel_size=3, stride=2, padding=1),  #  6 × 20
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),

            nn.Conv2d(512, 512, kernel_size=3, stride=2, padding=1),  #  3 × 10
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),

            # nn.AdaptiveAvgPool2d((5, 5))  # Глобальный пуллинг к фиксированному размеру
        )
        self.lstm1 = nn.LSTMCell(input_size = 512 * 3 * 10, hidden_size=128)
        self.lstm2 = nn.LSTMCell(input_size = 512 * 3 * 10, hidden_size=128)

        self.network = nn.Sequential(
            nn.Linear(128, 1)
        )

    def forward(self,image1,image2):
        out_features1 = self.features(image1)
        out_features2 = self.features(image2)
        out_features1 = torch.flatten(out_features1, 1)
        out_features2 = torch.flatten(out_features2, 1)
        batch_size = out_features1.size(0)

        # Инициализируем скрытые состояния и состояния ячеек для lstm1
        h_t1 = torch.zeros(batch_size, self.lstm1.hidden_size, device=out_features1.device)
        c_t1 = torch.zeros(batch_size, self.lstm1.hidden_size, device=out_features1.device)

        # Пропускаем признаки первой картинки через lstm1 (один временной шаг)
        h_t1_next, c_t1_next = self.lstm1(out_features1, (h_t1, c_t1))
        h_t2_next, c_t2_next = self.lstm2(out_features2, (h_t1_next, c_t1_next))
        out = self.network(h_t2_next) # [batch_size, 1]
        return out


    # def forward(self,image1,image2):
    #     images = torch.cat((image1,image2),dim = 1)

    #     out_features = self.features(images)
    #     out_features = torch.flatten(out_features, 1)
    #     batch_size = out_features.size(0)

    #     # Инициализируем скрытые состояния и состояния ячеек для lstm1
    #     h_t1 = torch.zeros(batch_size, self.lstm1.hidden_size, device=out_features.device)
    #     c_t1 = torch.zeros(batch_size, self.lstm1.hidden_size, device=out_features.device)

    #     # Пропускаем признаки первой картинки через lstm1 (один временной шаг)
    #     h_t1_next, c_t1_next = self.lstm1(out_features, (h_t1, c_t1))

    #     # Пропускаем признаки второй картинки через lstm2 (один временной шаг)
    #     h_t2_next, c_t2_next = self.lstm2(h_t1_next, (h_t1_next, c_t1_next))

    #     out = self.network(h_t2_next) # [batch_size, 1]
    #     return out

Train

In [None]:
def train(model, num_epochs, last_epoch, train_loader,
                validation_loader, test_loader,optimizer, device,scheduler,
          checkpoint_dict):
    start_time = time.time()
    minibatch_loss_list, train_loss_list, valid_loss_list = [] ,[], []
    criterion = torch.nn.MSELoss()

    for epoch in range(last_epoch, num_epochs):
        model.train()
        for batch_idx,((feature1,feature2),targets) in enumerate(train_loader):
            feature1 = feature1.to(device)
            feature2 = feature2.to(device)
            targets = targets.to(device)

            predicted_y = model(feature1,feature2)
            loss = criterion(predicted_y, targets)

            optimizer.zero_grad()

            loss.backward()
            optimizer.step()

            minibatch_loss_list.append(loss.item())
            if batch_idx %100 == 0:
                print(f'Epoch: {epoch+1:03d}/{num_epochs:03d} '
                      f'| Batch {batch_idx:04d}/{len(train_loader):04d} '
                      f'| Loss: {loss:.4f}'
                      f'| Learning rate: {optimizer.param_groups[0]["lr"]}')
        # scheduler.step()
        model.eval()
        with torch.no_grad():
            avg_loss_train, accuracy_train = compute_accuracy(model, train_loader, device)
            avg_loss_val, accuracy_val = compute_accuracy(model, validation_loader, device)
            # scheduler.step( avg_loss_val)
            print(f'Epoch: {epoch+1:03d}/{num_epochs:03d} '
                  f'| Train AVG LOSS: {avg_loss_train: .4f} | Accuracy_train: {accuracy_train: .4f} \n'
                  f'| Validation AVG LOSS: {avg_loss_val: .4f} | Accuracy_val: {accuracy_val: .4f} ')
            train_loss_list.append(avg_loss_train)
            valid_loss_list.append(avg_loss_val)

        scheduler.step()

        # checkpoint = {
        #     'model_state_dict': model.state_dict(),
        #     'optimizer_state_dict': optimizer.state_dict()
        # }

        checkpoint_dict['state_model'] = model.state_dict()
        checkpoint_dict['state_opt'] =  optimizer.state_dict()
        checkpoint_dict[ 'state_scheduler'] = scheduler.state_dict()
        checkpoint_dict['train_loss'] = avg_loss_train
        checkpoint_dict['val_loss'] = avg_loss_val
        checkpoint_dict['train_acc'] = accuracy_train
        checkpoint_dict['val_acc'] =  accuracy_val
        checkpoint_dict['EPOCHS'] = num_epochs
        checkpoint_dict['current_epoch'] = epoch + 1
        checkpoint_dict[ 'learning'] = optimizer.param_groups[0]["lr"]

        path = directory_path + str(epoch + 1) + '_epoch.pth'
        torch.save(checkpoint_dict, path)
        # path = '/content/drive/My Drive/checkpoint_model_3.pth'
        # torch.save(checkpoint, path)

        elapsed = (time.time() - start_time)/60
        print(f'Time elapsed: {elapsed:.2f} min')

    elapsed = (time.time() - start_time)/60
    print(f'Total Training Time: {elapsed:.2f} min')

    avg_loss_test, accuracy_test = compute_accuracy(model, test_loader,device)
    print(f'Test AVG LOSS: {avg_loss_test: .4f} | Accuracy_test: {accuracy_test: .4f}')

    return minibatch_loss_list, train_loss_list, valid_loss_list


Описание модели, оптимизатора, шедулера

In [None]:
txt_model_opt_shed = '''
class Network(nn.Module):
    def __init__(self):
        super().__init__()

        self.features = nn.Sequential(
            nn.Conv2d(2, 32, kernel_size = (7,7), stride = (2,2), padding=(3, 3)),# 185, 613
            nn.BatchNorm2d(32),
            nn.ReLU(inplace = True),

            nn.Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),# 185, 613
            nn.BatchNorm2d(32),
            nn.ReLU(inplace = True),

            nn.Conv2d(32, 64, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2)),#93, 307
            nn.BatchNorm2d(64),
            nn.ReLU(inplace = True),

            nn.Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),# 93, 307
            nn.BatchNorm2d(64),
            nn.ReLU(inplace = True),

            nn.Conv2d(64, 128, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2)),#47, 154
            nn.BatchNorm2d(128),
            nn.ReLU(inplace = True),

            nn.Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),#47, 154
            nn.BatchNorm2d(128),
            nn.ReLU(inplace = True),

            nn.Conv2d(128,256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1)),# 24, 77
            nn.BatchNorm2d(256),
            nn.ReLU(inplace = True),

            nn.Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),# 24, 77
            nn.BatchNorm2d(256),
            nn.ReLU(inplace = True),

            nn.Conv2d(256,512, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1)),# 12, 39
            nn.BatchNorm2d(512),
            nn.ReLU(inplace = True),

            nn.Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),# 12, 39
            nn.BatchNorm2d(512),
            nn.ReLU(inplace = True),

            nn.Conv2d(512, 512, kernel_size=3, stride=2, padding=1),  #  6 × 20
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),

            nn.Conv2d(512, 512, kernel_size=3, stride=2, padding=1),  #  3 × 10
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),

            # nn.AdaptiveAvgPool2d((5, 5))  # Глобальный пуллинг к фиксированному размеру
        )
        self.lstm1 = nn.LSTMCell(input_size = 512 * 3 * 10, hidden_size=128)
        self.lstm2 = nn.LSTMCell(input_size = 128, hidden_size=128)

        self.network = nn.Sequential(
            nn.Linear(128, 1)
        )

    def forward(self,image1,image2):
        images = torch.cat((image1,image2),dim = 1)

        out_features = self.features(images)
        out_features = torch.flatten(out_features, 1)
        batch_size = out_features.size(0)

        # Инициализируем скрытые состояния и состояния ячеек для lstm1
        h_t1 = torch.zeros(batch_size, self.lstm1.hidden_size, device=out_features.device)
        c_t1 = torch.zeros(batch_size, self.lstm1.hidden_size, device=out_features.device)

        # Пропускаем признаки первой картинки через lstm1 (один временной шаг)
        h_t1_next, c_t1_next = self.lstm1(out_features, (h_t1, c_t1))

        # Пропускаем признаки второй картинки через lstm2 (один временной шаг)
        h_t2_next, c_t2_next = self.lstm2(h_t1_next, (h_t1_next, c_t1_next))

        out = self.network(h_t2_next) # [batch_size, 1]
        return out
optimizer = torch.optim.Adam(model.parameters(), lr = 0.0001)

'''

Создание библиотеки для хранения сохранения

In [None]:
directory_path = '/content/drive/My Drive/checpoint/model_lstm2/'

# Проверка существования директории
if not os.path.exists(directory_path):
    # Создание директории
    os.makedirs(directory_path)
    print(f"Директория {directory_path} была создана.")
else:
    print(f"Директория {directory_path} уже существует.")

Директория /content/drive/My Drive/checpoint/model_lstm2/ была создана.


Создание словаря для сохранения результатов

In [None]:
checkpoint_dict = {
    'model_description' : txt_model_opt_shed,
    'state_model' : None,
    'state_opt'   : None,
    'state_scheduler' : None,

    'train_loss' :None,
    'val_loss'   : None,
    'best_loss'  : None,
    'train_acc'  : None,
    'val_acc'    : None,

    'EPOCHS'     : None,
    'current_epoch' : None,
    'learning' : None
}

MAIN

In [None]:
BATCH_SIZE = 8
RANDOM_SEED = 123
NUM_EPOCHS = 20
last_epoch = 0
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Використовується пристрій: {DEVICE}")

Використовується пристрій: cuda


In [None]:
from torch.optim.lr_scheduler import StepLR
set_all_seeds(RANDOM_SEED)

model = Network()
model = model.to(DEVICE)
# optimizer = torch.optim.Adam(model.parameters(), lr = 0.0001, weight_decay = 0.0006)
optimizer = torch.optim.Adam(model.parameters(), lr = 0.0001)
train_loader, validation_loader, test_loader = get_data(batch_size = BATCH_SIZE)
# scheduler = StepLR(optimizer, step_size = 4, gamma = 0.3)
scheduler = None
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

The model has 25,464,289 trainable parameters




In [None]:
# Загрузка словаря из файла
path = './2_epoch.pth'
checkpoint = torch.load(path)
if scheduler == None:
    scheduler = StepLR(optimizer, step_size = 1, gamma = 1)
# # Загрузка состояния модели
# save_check = checkpoint['optimizer_state_dict']
# save_check['lr'] = 0.0003
model.load_state_dict( checkpoint['state_model'] )
optimizer.load_state_dict( checkpoint['state_opt'] )
scheduler.load_state_dict( checkpoint['state_scheduler'] )
last_epoch = checkpoint['current_epoch']
# # Загрузка состояния оптимизатора
# optimizer.load_state_dict(save_check)

In [None]:
if scheduler == None:
    scheduler = StepLR(optimizer, step_size = 1, gamma = 1)
minibatch_loss, train_acc, valid_acc = train(model = model,
            num_epochs = NUM_EPOCHS,
            last_epoch = last_epoch,
            train_loader = train_loader,
            validation_loader = validation_loader,
            test_loader = test_loader,
            optimizer = optimizer,
            device = DEVICE,
            scheduler = scheduler,
            checkpoint_dict = checkpoint_dict)

Epoch: 003/020 | Batch 0000/2550 | Loss: 0.0077| Learning rate: 0.0001
Epoch: 003/020 | Batch 0100/2550 | Loss: 0.0100| Learning rate: 0.0001
Epoch: 003/020 | Batch 0200/2550 | Loss: 0.0145| Learning rate: 0.0001
Epoch: 003/020 | Batch 0300/2550 | Loss: 0.0198| Learning rate: 0.0001
Epoch: 003/020 | Batch 0400/2550 | Loss: 0.0153| Learning rate: 0.0001
Epoch: 003/020 | Batch 0500/2550 | Loss: 0.0145| Learning rate: 0.0001
Epoch: 003/020 | Batch 0600/2550 | Loss: 0.0096| Learning rate: 0.0001
Epoch: 003/020 | Batch 0700/2550 | Loss: 0.0109| Learning rate: 0.0001
Epoch: 003/020 | Batch 0800/2550 | Loss: 0.0120| Learning rate: 0.0001
Epoch: 003/020 | Batch 0900/2550 | Loss: 0.0091| Learning rate: 0.0001
Epoch: 003/020 | Batch 1000/2550 | Loss: 0.0079| Learning rate: 0.0001
Epoch: 003/020 | Batch 1100/2550 | Loss: 0.0078| Learning rate: 0.0001
Epoch: 003/020 | Batch 1200/2550 | Loss: 0.0083| Learning rate: 0.0001
Epoch: 003/020 | Batch 1300/2550 | Loss: 0.0130| Learning rate: 0.0001
Epoch: