# Basic depth map processing

In [None]:
import cv2
from pathlib import Path
import random
from matplotlib import pyplot as plt
from sklearn.model_selection import StratifiedKFold
from IPython.display import clear_output
import torch
from torchvision import transforms
from PIL import Image
import torch.nn as nn
import torchvision.models as models
from torch.utils.data import DataLoader, Subset
from torchinfo import summary
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, ConfusionMatrixDisplay, f1_score
import numpy as np
from sklearn.model_selection import train_test_split

def seed_everything(seed=4242):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

seed_everything(42)

### Visualization

In [None]:
def depth_to_normals_3d(depth):
    h, w = depth.shape
    depth = depth.astype(np.float32)

    fx = w
    fy = w
    cx = w / 2
    cy = h / 2

    u, v = np.meshgrid(np.arange(w), np.arange(h))

    x = (u - cx) * depth / fx
    y = (v - cy) * depth / fy
    z = depth
    
    points = np.stack((x, y, z), axis=-1)
    dy, dx = np.gradient(points, axis=(0, 1))

    normals = np.cross(dx, dy)

    norm = np.linalg.norm(normals, axis=2, keepdims=True)
    normals /= (norm + 1e-8)

    normals_rgb = ((normals + 1) * 0.5 * 255).astype(np.uint8)
    return normals_rgb

In [None]:
label = 'fall' # fall, adl
sequence_number = random.randint(0, 40 if label == 'adl' else 30) + 1
selected_video = Path(f'../datasets/{label}/sequence-{sequence_number:02}')

frames = sorted(selected_video.glob('*.png'))

for frame_path in frames:
    depth = cv2.imread(str(frame_path), cv2.IMREAD_GRAYSCALE)
    normals = depth_to_normals_3d(depth)

    clear_output(wait=True)
    plt.figure(figsize=(10, 4))

    plt.subplot(1, 2, 1)
    plt.imshow(depth, cmap='gray')
    plt.title("Depth")
    plt.axis("off")

    plt.subplot(1, 2, 2)
    plt.imshow(normals)
    plt.title("Surface normals")
    plt.axis("off")

    plt.show()

# Dataset

In [None]:
# root = Path("../datasets")
# out_root = Path("../data_normals")
# out_root.mkdir(exist_ok=True)

# for cls in ["adl", "fall"]:
#     for seq in (root / cls).iterdir():
#         out_seq = out_root / cls / seq.name
#         out_seq.mkdir(parents=True, exist_ok=True)

#         for f in seq.glob("*.png"):
#             depth = cv2.imread(str(f), cv2.IMREAD_GRAYSCALE)
#             normals = depth_to_normals_3d(depth)

#             np.save(out_seq / f"{f.stem}.npy", normals)


In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

class VideoDataset(torch.utils.data.Dataset):
    def __init__(self, root_dir, transform=None, number_of_frames=64):
        self.samples = []
        self.transform = transform
        self.number_of_frames = number_of_frames

        classes = ['adl', 'fall']
        for label_idx, cls in enumerate(classes):
            cls_path = Path(root_dir) / cls
            for seq_folder in cls_path.iterdir():
                frames = sorted(seq_folder.glob("*.npy"))
                if frames:
                    self.samples.append((frames, label_idx))

    def __getitem__(self, idx):
        frame_paths, label = self.samples[idx]

        if len(frame_paths) >= self.number_of_frames:
            frame_paths = frame_paths[-self.number_of_frames:]
        else:
            frame_paths = [frame_paths[0]] * (self.number_of_frames - len(frame_paths)) + frame_paths

        imgs = []
        for f in frame_paths:
            arr = np.load(f).astype(np.float32)          # (H, W, 3)
            img = Image.fromarray((arr * 255).astype(np.uint8)) # (3, H, W)

            if self.transform:
                img = self.transform(img)

            imgs.append(img)

        video = torch.stack(imgs)  # (T, 3, H, W)
        return video, label


    def __len__(self):
        return len(self.samples)




In [None]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])
root_dir = Path('../data_normals')

dataset = VideoDataset(root_dir, transform)

### Model

In [None]:
class CNN_LSTM(nn.Module):
    def __init__(self, cnn_model='resnet18', hidden_size=256, num_classes=2, pretrained=True):
        super(CNN_LSTM, self).__init__()
        
        self.cnn = models.resnet18(pretrained=pretrained)
        self.cnn = nn.Sequential(*list(self.cnn.children())[:-1])  # output: (B, 512, 1, 1)

        for param in self.cnn.parameters():
            param.requires_grad = False

        self.feature_dim = 512
        self.lstm = nn.LSTM(input_size=self.feature_dim, hidden_size=hidden_size, 
                          num_layers=1, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)
    
    def forward(self, x):
        """
        x: (B, T, C, H, W)
        B = batch_size
        T = num_frames
        C = channels
        H = height
        W = width
        """
        B, T, C, H, W = x.size()

        cnn_features = []
        for t in range(T):
            frame = x[:, t, :, :, :]        # (B, C, H, W)
            feat = self.cnn(frame)          # (B, 512, 1, 1)
            feat = feat.view(B, -1)         # (B, 512)
            cnn_features.append(feat)
        

        cnn_features = torch.stack(cnn_features, dim=1) # (B, T, feature_dim)
    
        lstm_out, _ = self.lstm(cnn_features) 
        last_time_step = lstm_out[:, -1, :] # (B, hidden_size)

        out = self.fc(last_time_step) # (B, num_classes)
        return out

In [None]:
model = CNN_LSTM().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.NAdam(model.parameters(), lr=1e-4)

# summary(model, input_size=(2, 256, 3, 224, 224), device=device)

### Training

In [None]:
num_epochs = 10
batch_size = 4
num_folds = 10

g = torch.Generator()
g.manual_seed(42)
seed_everything(42)

indices = np.arange(len(dataset))
labels = np.array([dataset[i][1] for i in indices])

skf = StratifiedKFold(
    n_splits=num_folds,
    shuffle=True,
    random_state=42
)

checkpoint_dir = Path('../models/surfaces_normals')
checkpoint_dir.mkdir(exist_ok=True, parents=True)


In [None]:
fold_results = []

full_loader = DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=1,
        pin_memory=True,
        generator=g
    )

for fold, (train_idx, val_idx) in enumerate(skf.split(indices, labels)):
    print(f"\n========== FOLD {fold+1}/{num_folds} ==========")

    train_dataset = Subset(dataset, train_idx)
    val_dataset = Subset(dataset, val_idx)

    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=1,
        pin_memory=True,
        generator=g
    )

    val_loader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=1,
        pin_memory=True
    )

    best_val_loss = 1e10
    best_model_path = checkpoint_dir / f'best_model_fold_{fold}.pt'

    for epoch in range(num_epochs):
        # ===================== TRAIN =====================
        model.train()
        train_loss = 0.0
        correct = 0
        total = 0

        for videos, y in train_loader:
            videos = videos.to(device)
            y = y.to(device)

            optimizer.zero_grad()
            outputs = model(videos)
            loss = criterion(outputs, y)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * videos.size(0)
            _, preds = torch.max(outputs, 1)
            correct += (preds == y).sum().item()
            total += y.size(0)

        train_loss /= len(train_dataset)
        train_acc = 100 * correct / total

        # ===================== VAL =====================
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0

        with torch.no_grad():
            for videos, y in val_loader:
                videos = videos.to(device)
                y = y.to(device)

                outputs = model(videos)
                loss = criterion(outputs, y)

                val_loss += loss.item() * videos.size(0)
                _, preds = torch.max(outputs, 1)
                correct += (preds == y).sum().item()
                total += y.size(0)

        val_loss /= len(val_dataset)
        val_acc = 100 * correct / total

        print(
            f"Epoch [{epoch+1}/{num_epochs}] "
            f"TRAIN Loss: {train_loss:.4f} Acc: {train_acc:.2f}% | "
            f"VAL Loss: {val_loss:.4f} Acc: {val_acc:.2f}%"
        )

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), best_model_path)

    # ===================== EVAL BEST MODEL =====================
    model.load_state_dict(torch.load(best_model_path, map_location=device))
    model.eval()

    all_preds = []
    all_labels = []

    with torch.no_grad():
        for videos, y in full_loader:
            videos = videos.to(device)
            y = y.to(device)

            outputs = model(videos)
            _, preds = torch.max(outputs, 1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(y.cpu().numpy())

    fold_metrics = {
        'acc': accuracy_score(all_labels, all_preds),
        'prec': precision_score(all_labels, all_preds, average='binary'),
        'rec': recall_score(all_labels, all_preds, average='binary'),
        'f1': f1_score(all_labels, all_preds, average='binary'),
    }

    fold_results.append(fold_metrics)

    print("Fold results:", fold_metrics)


In [None]:
print("\n========== CV RESULTS ==========")

for metric in fold_results[0].keys():
    values = [f[metric] for f in fold_results]
    print(
        f"{metric.upper()}: "
        f"{np.mean(values)*100:.2f}% std: {np.std(values)*100:.2f}%"
    )
