# MonoDepth: Оценка глубины по одному изображению

В этом ноутбуке мы реализуем и обучим модель для оценки глубины (depth estimation) по одному RGB изображению.

**Задача:** По входному RGB изображению предсказать карту глубины (depth map), где каждый пиксель содержит расстояние до соответствующей точки сцены.

**Архитектура:** Encoder-Decoder на базе MobileNetV2 (компактная и быстрая модель)

**Датасет:** NYU Depth V2 (подмножество)


## Импорты


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping
import numpy as np
from PIL import Image
import os
import h5py
from tqdm import tqdm
import matplotlib.pyplot as plt

torch.manual_seed(42)
np.random.seed(42)
pl.seed_everything(42)


## Загрузка датасета NYU Depth V2

NYU Depth V2 — популярный датасет для задачи depth estimation. Содержит RGB изображения и соответствующие карты глубины, полученные с помощью Kinect сенсора.

Используем labeled subset (~1449 изображений) для обучения.


In [None]:
DATA_DIR = "../data"
NYU_PATH = os.path.join(DATA_DIR, "nyu_depth_v2_labeled.mat")

os.makedirs(DATA_DIR, exist_ok=True)

if not os.path.exists(NYU_PATH):
    print("Скачиваем NYU Depth V2 датасет...")
    url = "http://horatio.cs.nyu.edu/mit/silberman/nyu_depth_v2/nyu_depth_v2_labeled.mat"
    print(f"URL: {url}")
    print("Это может занять несколько минут (~2.8 GB)...")
    !wget -q --show-progress -O {NYU_PATH} {url}
    print("Готово!")
else:
    print(f"Датасет уже существует: {NYU_PATH}")


In [None]:
# Загружаем данные из .mat файла
with h5py.File(NYU_PATH, 'r') as f:
    print("Ключи в файле:", list(f.keys()))
    
    # images: (3, 640, 480, N) -> транспонируем в (N, 480, 640, 3)
    images = np.array(f['images']).transpose(0, 3, 2, 1)
    # depths: (640, 480, N) -> транспонируем в (N, 480, 640)
    depths = np.array(f['depths']).transpose(0, 2, 1)

print(f"Images shape: {images.shape}")
print(f"Depths shape: {depths.shape}")
print(f"Depth range: [{depths.min():.2f}, {depths.max():.2f}] meters")


In [None]:
# Визуализируем несколько примеров
fig, axes = plt.subplots(3, 2, figsize=(12, 12))

for i in range(3):
    idx = i * 100
    
    axes[i, 0].imshow(images[idx])
    axes[i, 0].set_title(f'RGB Image {idx}')
    axes[i, 0].axis('off')
    
    im = axes[i, 1].imshow(depths[idx], cmap='plasma')
    axes[i, 1].set_title(f'Depth Map {idx}')
    axes[i, 1].axis('off')
    plt.colorbar(im, ax=axes[i, 1], label='Depth (m)')

plt.tight_layout()
plt.show()


## Dataset класс

Создаём PyTorch Dataset для загрузки пар (RGB, Depth).


In [None]:
class NYUDepthDataset(Dataset):
    def __init__(self, images, depths, indices=None, img_size=(256, 256), augment=False):
        self.images = images
        self.depths = depths
        self.indices = indices if indices is not None else list(range(len(images)))
        self.img_size = img_size
        self.augment = augment
        
        self.transform = transforms.Compose([
            transforms.Resize(img_size),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        
    def __len__(self):
        return len(self.indices)
    
    def __getitem__(self, idx):
        real_idx = self.indices[idx]
        
        # Загружаем RGB
        image = Image.fromarray(self.images[real_idx].astype(np.uint8))
        
        # Загружаем depth
        depth = self.depths[real_idx]
        depth = Image.fromarray(depth.astype(np.float32), mode='F')
        depth = depth.resize(self.img_size, Image.BILINEAR)
        depth = np.array(depth)
        
        # Аугментации (горизонтальный flip)
        if self.augment and np.random.random() > 0.5:
            image = image.transpose(Image.FLIP_LEFT_RIGHT)
            depth = np.fliplr(depth).copy()
        
        # Применяем трансформации к RGB
        image = self.transform(image)
        
        # Нормализуем depth (клиппим экстремальные значения)
        depth = np.clip(depth, 0.1, 10.0)  # 0.1m - 10m range
        depth = torch.tensor(depth, dtype=torch.float32).unsqueeze(0)
        
        return image, depth


In [None]:
# Разбиваем на train/val/test
NUM_SAMPLES = len(images)
indices = np.random.permutation(NUM_SAMPLES)

train_size = int(0.7 * NUM_SAMPLES)
val_size = int(0.15 * NUM_SAMPLES)

train_indices = indices[:train_size].tolist()
val_indices = indices[train_size:train_size + val_size].tolist()
test_indices = indices[train_size + val_size:].tolist()

print(f"Train: {len(train_indices)} samples")
print(f"Val: {len(val_indices)} samples")
print(f"Test: {len(test_indices)} samples")


In [None]:
IMG_SIZE = (256, 256)
BATCH_SIZE = 16

train_dataset = NYUDepthDataset(images, depths, train_indices, IMG_SIZE, augment=True)
val_dataset = NYUDepthDataset(images, depths, val_indices, IMG_SIZE, augment=False)
test_dataset = NYUDepthDataset(images, depths, test_indices, IMG_SIZE, augment=False)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=4, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=4, pin_memory=True)


In [None]:
# Проверяем размеры
sample_img, sample_depth = train_dataset[0]
print(f"Image shape: {sample_img.shape}")
print(f"Depth shape: {sample_depth.shape}")
print(f"Depth range: [{sample_depth.min():.2f}, {sample_depth.max():.2f}]")


## Архитектура модели: MobileNetV2 Encoder-Decoder

Используем MobileNetV2 как encoder (backbone) — компактная и эффективная модель.

Decoder состоит из upsampling блоков с skip connections для восстановления пространственного разрешения.


In [None]:
class UpBlock(nn.Module):
    """Upsampling блок с skip connection"""
    def __init__(self, in_channels, skip_channels, out_channels):
        super().__init__()
        self.up = nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2)
        self.conv = nn.Sequential(
            nn.Conv2d(out_channels + skip_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )
    
    def forward(self, x, skip=None):
        x = self.up(x)
        if skip is not None:
            # Align sizes if needed
            if x.shape[2:] != skip.shape[2:]:
                x = F.interpolate(x, size=skip.shape[2:], mode='bilinear', align_corners=True)
            x = torch.cat([x, skip], dim=1)
        return self.conv(x)


class MonoDepthNet(nn.Module):
    """Encoder-Decoder модель для depth estimation на базе MobileNetV2"""
    def __init__(self, pretrained=True):
        super().__init__()
        
        # Encoder: MobileNetV2
        mobilenet = models.mobilenet_v2(weights='IMAGENET1K_V1' if pretrained else None)
        features = mobilenet.features
        
        # Разбиваем на блоки для skip connections
        # MobileNetV2 feature maps: 
        # После block 1: 16 channels, 1/2 resolution
        # После block 3: 24 channels, 1/4 resolution  
        # После block 6: 32 channels, 1/8 resolution
        # После block 13: 96 channels, 1/16 resolution
        # После block 17: 320 channels, 1/32 resolution (но мы используем 1280 после conv)
        
        self.enc1 = features[:2]   # 16 ch, 1/2
        self.enc2 = features[2:4]  # 24 ch, 1/4
        self.enc3 = features[4:7]  # 32 ch, 1/8
        self.enc4 = features[7:14] # 96 ch, 1/16
        self.enc5 = features[14:]  # 1280 ch, 1/32
        
        # Decoder
        self.up1 = UpBlock(1280, 96, 256)   # 1/32 -> 1/16
        self.up2 = UpBlock(256, 32, 128)    # 1/16 -> 1/8
        self.up3 = UpBlock(128, 24, 64)     # 1/8 -> 1/4
        self.up4 = UpBlock(64, 16, 32)      # 1/4 -> 1/2
        self.up5 = nn.Sequential(           # 1/2 -> 1/1
            nn.ConvTranspose2d(32, 16, kernel_size=2, stride=2),
            nn.BatchNorm2d(16),
            nn.ReLU(inplace=True)
        )
        
        # Output head (предсказываем depth)
        self.head = nn.Sequential(
            nn.Conv2d(16, 16, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(16, 1, kernel_size=1),
            nn.ReLU()  # Depth всегда положительный
        )
        
    def forward(self, x):
        # Encoder
        e1 = self.enc1(x)    # 1/2
        e2 = self.enc2(e1)   # 1/4
        e3 = self.enc3(e2)   # 1/8
        e4 = self.enc4(e3)   # 1/16
        e5 = self.enc5(e4)   # 1/32
        
        # Decoder with skip connections
        d1 = self.up1(e5, e4)  # 1/16
        d2 = self.up2(d1, e3)  # 1/8
        d3 = self.up3(d2, e2)  # 1/4
        d4 = self.up4(d3, e1)  # 1/2
        d5 = self.up5(d4)      # 1/1
        
        # Output
        out = self.head(d5)
        return out


In [None]:
# Проверяем модель
model_test = MonoDepthNet(pretrained=True)
dummy_input = torch.randn(1, 3, 256, 256)
output = model_test(dummy_input)
print(f"Input shape: {dummy_input.shape}")
print(f"Output shape: {output.shape}")
print(f"Parameters: {sum(p.numel() for p in model_test.parameters()):,}")
del model_test, dummy_input, output


## Метрики для Depth Estimation

Стандартные метрики для оценки качества depth estimation:

**Error metrics (чем меньше, тем лучше):**
- **Abs Rel**: $\frac{1}{N}\sum|d - d^*| / d^*$ — относительная абсолютная ошибка
- **Sq Rel**: $\frac{1}{N}\sum(d - d^*)^2 / d^*$ — относительная квадратичная ошибка  
- **RMSE**: $\sqrt{\frac{1}{N}\sum(d - d^*)^2}$ — среднеквадратичная ошибка
- **RMSE log**: $\sqrt{\frac{1}{N}\sum(\log d - \log d^*)^2}$ — RMSE в логарифмическом пространстве

**Accuracy metrics (чем больше, тем лучше):**
- **$\delta < 1.25$**: % пикселей где $\max(d/d^*, d^*/d) < 1.25$
- **$\delta < 1.25^2$**: % пикселей где $\max(d/d^*, d^*/d) < 1.25^2$
- **$\delta < 1.25^3$**: % пикселей где $\max(d/d^*, d^*/d) < 1.25^3$


In [None]:
def compute_depth_metrics(pred, target, min_depth=0.1, max_depth=10.0):
    """
    Вычисляет стандартные метрики для depth estimation.
    
    Args:
        pred: предсказанная карта глубины (B, 1, H, W)
        target: ground truth карта глубины (B, 1, H, W)
        min_depth: минимальная глубина для фильтрации
        max_depth: максимальная глубина для фильтрации
    
    Returns:
        dict с метриками
    """
    # Создаём маску валидных пикселей
    mask = (target > min_depth) & (target < max_depth)
    
    pred = pred[mask]
    target = target[mask]
    
    # Клиппим предсказания
    pred = torch.clamp(pred, min_depth, max_depth)
    
    # Error metrics
    abs_rel = torch.mean(torch.abs(pred - target) / target)
    sq_rel = torch.mean(((pred - target) ** 2) / target)
    rmse = torch.sqrt(torch.mean((pred - target) ** 2))
    rmse_log = torch.sqrt(torch.mean((torch.log(pred) - torch.log(target)) ** 2))
    
    # Accuracy metrics (delta thresholds)
    thresh = torch.max(pred / target, target / pred)
    delta1 = (thresh < 1.25).float().mean() * 100
    delta2 = (thresh < 1.25 ** 2).float().mean() * 100
    delta3 = (thresh < 1.25 ** 3).float().mean() * 100
    
    return {
        'abs_rel': abs_rel,
        'sq_rel': sq_rel,
        'rmse': rmse,
        'rmse_log': rmse_log,
        'delta1': delta1,
        'delta2': delta2,
        'delta3': delta3
    }


## Loss функции для Depth Estimation

Комбинация нескольких loss функций:
1. **L1 Loss** — для базовой регрессии
2. **SSIM Loss** — для сохранения структуры изображения
3. **Gradient Loss** — для сохранения резких границ


In [None]:
class SSIM(nn.Module):
    """Structural Similarity Index"""
    def __init__(self, window_size=11, size_average=True):
        super().__init__()
        self.window_size = window_size
        self.size_average = size_average
        self.channel = 1
        self.window = self._create_window(window_size, self.channel)
        
    def _create_window(self, window_size, channel):
        def gaussian(window_size, sigma):
            gauss = torch.tensor([np.exp(-(x - window_size//2)**2/(2*sigma**2)) 
                                  for x in range(window_size)], dtype=torch.float32)
            return gauss / gauss.sum()
        
        _1D_window = gaussian(window_size, 1.5).unsqueeze(1)
        _2D_window = _1D_window.mm(_1D_window.t()).unsqueeze(0).unsqueeze(0)
        window = _2D_window.expand(channel, 1, window_size, window_size).contiguous()
        return window
    
    def forward(self, img1, img2):
        channel = img1.size(1)
        
        if channel != self.channel or self.window.device != img1.device:
            self.window = self._create_window(self.window_size, channel).to(img1.device)
            self.channel = channel
        
        mu1 = F.conv2d(img1, self.window, padding=self.window_size//2, groups=channel)
        mu2 = F.conv2d(img2, self.window, padding=self.window_size//2, groups=channel)
        
        mu1_sq = mu1.pow(2)
        mu2_sq = mu2.pow(2)
        mu1_mu2 = mu1 * mu2
        
        sigma1_sq = F.conv2d(img1*img1, self.window, padding=self.window_size//2, groups=channel) - mu1_sq
        sigma2_sq = F.conv2d(img2*img2, self.window, padding=self.window_size//2, groups=channel) - mu2_sq
        sigma12 = F.conv2d(img1*img2, self.window, padding=self.window_size//2, groups=channel) - mu1_mu2
        
        C1 = 0.01 ** 2
        C2 = 0.03 ** 2
        
        ssim_map = ((2*mu1_mu2 + C1)*(2*sigma12 + C2)) / ((mu1_sq + mu2_sq + C1)*(sigma1_sq + sigma2_sq + C2))
        
        if self.size_average:
            return ssim_map.mean()
        return ssim_map.mean(1).mean(1).mean(1)


class GradientLoss(nn.Module):
    """Loss на градиентах для сохранения резких границ"""
    def __init__(self):
        super().__init__()
        
    def forward(self, pred, target):
        # Градиенты по x
        pred_dx = pred[:, :, :, :-1] - pred[:, :, :, 1:]
        target_dx = target[:, :, :, :-1] - target[:, :, :, 1:]
        
        # Градиенты по y
        pred_dy = pred[:, :, :-1, :] - pred[:, :, 1:, :]
        target_dy = target[:, :, :-1, :] - target[:, :, 1:, :]
        
        loss_x = F.l1_loss(pred_dx, target_dx)
        loss_y = F.l1_loss(pred_dy, target_dy)
        
        return loss_x + loss_y


class DepthLoss(nn.Module):
    """Комбинированный loss для depth estimation"""
    def __init__(self, ssim_weight=0.5, gradient_weight=0.5):
        super().__init__()
        self.ssim = SSIM()
        self.gradient = GradientLoss()
        self.ssim_weight = ssim_weight
        self.gradient_weight = gradient_weight
        
    def forward(self, pred, target):
        # L1 loss
        l1_loss = F.l1_loss(pred, target)
        
        # Нормализуем для SSIM (в диапазон [0, 1])
        pred_norm = pred / 10.0  # Предполагаем max depth = 10m
        target_norm = target / 10.0
        
        # SSIM loss (1 - SSIM, так как SSIM=1 это идеальное совпадение)
        ssim_loss = 1 - self.ssim(pred_norm, target_norm)
        
        # Gradient loss
        grad_loss = self.gradient(pred, target)
        
        total_loss = l1_loss + self.ssim_weight * ssim_loss + self.gradient_weight * grad_loss
        
        return total_loss, {'l1': l1_loss, 'ssim': ssim_loss, 'gradient': grad_loss}


## PyTorch Lightning модуль


In [None]:
class DepthModule(pl.LightningModule):
    def __init__(self, learning_rate=1e-4, min_depth=0.1, max_depth=10.0):
        super().__init__()
        self.save_hyperparameters()
        
        self.model = MonoDepthNet(pretrained=True)
        self.criterion = DepthLoss(ssim_weight=0.5, gradient_weight=0.5)
        self.learning_rate = learning_rate
        self.min_depth = min_depth
        self.max_depth = max_depth
    
    def forward(self, x):
        return self.model(x)
    
    def training_step(self, batch, batch_idx):
        images, depths = batch
        pred_depths = self.forward(images)
        
        loss, loss_dict = self.criterion(pred_depths, depths)
        
        # Метрики
        with torch.no_grad():
            metrics = compute_depth_metrics(pred_depths, depths, self.min_depth, self.max_depth)
        
        self.log('train_loss', loss, on_step=True, on_epoch=True, prog_bar=True)
        self.log('train_abs_rel', metrics['abs_rel'], on_step=False, on_epoch=True)
        self.log('train_rmse', metrics['rmse'], on_step=False, on_epoch=True)
        self.log('train_delta1', metrics['delta1'], on_step=False, on_epoch=True)
        
        return loss
    
    def validation_step(self, batch, batch_idx):
        images, depths = batch
        pred_depths = self.forward(images)
        
        loss, loss_dict = self.criterion(pred_depths, depths)
        metrics = compute_depth_metrics(pred_depths, depths, self.min_depth, self.max_depth)
        
        self.log('val_loss', loss, on_step=False, on_epoch=True, prog_bar=True)
        self.log('val_abs_rel', metrics['abs_rel'], on_step=False, on_epoch=True, prog_bar=True)
        self.log('val_rmse', metrics['rmse'], on_step=False, on_epoch=True, prog_bar=True)
        self.log('val_delta1', metrics['delta1'], on_step=False, on_epoch=True, prog_bar=True)
        self.log('val_delta2', metrics['delta2'], on_step=False, on_epoch=True)
        self.log('val_delta3', metrics['delta3'], on_step=False, on_epoch=True)
        
        return loss
    
    def test_step(self, batch, batch_idx):
        images, depths = batch
        pred_depths = self.forward(images)
        
        loss, _ = self.criterion(pred_depths, depths)
        metrics = compute_depth_metrics(pred_depths, depths, self.min_depth, self.max_depth)
        
        self.log('test_loss', loss, on_step=False, on_epoch=True)
        self.log('test_abs_rel', metrics['abs_rel'], on_step=False, on_epoch=True)
        self.log('test_sq_rel', metrics['sq_rel'], on_step=False, on_epoch=True)
        self.log('test_rmse', metrics['rmse'], on_step=False, on_epoch=True)
        self.log('test_rmse_log', metrics['rmse_log'], on_step=False, on_epoch=True)
        self.log('test_delta1', metrics['delta1'], on_step=False, on_epoch=True)
        self.log('test_delta2', metrics['delta2'], on_step=False, on_epoch=True)
        self.log('test_delta3', metrics['delta3'], on_step=False, on_epoch=True)
        
        return loss
    
    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.parameters(), lr=self.learning_rate, weight_decay=1e-4)
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=30, eta_min=1e-6)
        return [optimizer], [scheduler]


## Обучение модели


In [None]:
model = DepthModule(
    learning_rate=1e-4,
    min_depth=0.1,
    max_depth=10.0
)

checkpoint_callback = ModelCheckpoint(
    monitor='val_abs_rel',
    mode='min',
    save_top_k=1,
    filename='monodepth-{epoch:02d}-{val_abs_rel:.4f}'
)

early_stopping = EarlyStopping(
    monitor='val_abs_rel',
    mode='min',
    patience=10
)

trainer = pl.Trainer(
    max_epochs=50,
    accelerator='auto',
    devices=1,
    callbacks=[checkpoint_callback, early_stopping],
    enable_progress_bar=True
)


In [None]:
trainer.fit(model, train_loader, val_loader)


## Оценка на тестовой выборке


In [None]:
test_results = trainer.test(model, test_loader)

print("\n" + "="*60)
print("Результаты на тестовой выборке:")
print("="*60)
print(f"\nError metrics (чем меньше, тем лучше):")
print(f"  Abs Rel:  {test_results[0]['test_abs_rel']:.4f}")
print(f"  Sq Rel:   {test_results[0]['test_sq_rel']:.4f}")
print(f"  RMSE:     {test_results[0]['test_rmse']:.4f} m")
print(f"  RMSE log: {test_results[0]['test_rmse_log']:.4f}")
print(f"\nAccuracy metrics (чем больше, тем лучше):")
print(f"  δ < 1.25:   {test_results[0]['test_delta1']:.1f}%")
print(f"  δ < 1.25²:  {test_results[0]['test_delta2']:.1f}%")
print(f"  δ < 1.25³:  {test_results[0]['test_delta3']:.1f}%")
print("="*60)


## Визуализация предсказаний
