### Set up nerf_template

In [None]:
!pip install numpy==1.24.4 --force-reinstall --quiet

import os
os.kill(os.getpid(), 9)

In [None]:
!pip install -q torch==2.0.1+cu118 torchvision==0.15.2+cu118 torchaudio==2.0.2+cu118 \
    -f https://download.pytorch.org/whl/torch_stable.html

!pip install -q triton scikit-image

In [None]:
%cd /content
!git clone https://github.com/ashawkey/nerf_template.git
%cd nerf_template
!pip install -q -r requirements.txt

* Patch the nerf_template imports/metric computations

In [None]:
import pathlib, re

f = pathlib.Path('nerf/utils.py')

txt = f.read_text()
pattern = r"try:.*?structural_similarity_index_measure.*?ssim.*?\n"
replacement = "from skimage.metrics import structural_similarity as structural_similarity_index_measure\n"
patched = re.sub(pattern, replacement, txt, flags=re.S)
f.write_text(patched)

txt = f.read_text()
txt = re.sub(
    r"# try out torch 2\.0[\s\S]*?model = torch\.compile\(model\)",
    (
        "# torch.compile disabled (Python 3.11+ unsupported)\n"
        "# try out torch 2.0\n"
        "# if torch.__version__[0] == '2':\n"
        "#     model = torch.compile(model)"
    ),
    txt
)
txt = re.sub(
    r"(class Trainer\(object\):\s+def __init__\([^)]*\):)",
    r"\1\n        # ensure log_ptr exists for __del__\n        self.log_ptr = None",
    txt,
    flags=re.MULTILINE
)
f.write_text(txt)

txt = f.read_text()
ssim_patch = r"""
    def update(self, preds, truths):
        if torch.is_tensor(preds):
            preds = preds.detach().cpu().numpy()
        if torch.is_tensor(truths):
            truths = truths.detach().cpu().numpy()

        if preds.ndim == 4:
            preds = preds[0]
        if truths.ndim == 4:
            truths = truths[0]

        preds = np.clip(preds, 0.0, 1.0)
        truths = np.clip(truths, 0.0, 1.0)

        ssim = structural_similarity_index_measure(preds, truths, channel_axis=-1, data_range=1.0)

        self.V += ssim
        self.N += 1
        return ssim
"""
txt = re.sub(
    r"def update\(self, preds, truths\):\s+preds, truths = self\.prepare_inputs\([^)]+\)[\s\S]+?self\.N \+= 1",
    ssim_patch.strip(),
    txt
)
f.write_text(txt)

In [None]:
import pathlib, re

f = pathlib.Path('/content/nerf_template/nerf/provider.py')
txt   = f.read_text()
txt = re.sub(
    r"(if self\.training:\s*)(images = self\.images\[index,\s*rays\['j'\],\s*rays\['i'\]\]\.float\(\)\.to\(self\.device\) / 255)",
    r"""\1
                rays_i = rays['i'].cpu()
                rays_j = rays['j'].cpu()
                images = self.images[index, rays_j, rays_i].float().to(self.device) / 255""",
    txt,
    flags=re.MULTILINE
)
f.write_text(txt)

### Set up the dataset

In [None]:
import os
import zipfile
import json
import shutil
import threading
from glob import glob
from tqdm import tqdm
import numpy as np
import random
from pathlib import Path

In [None]:
!rm -rf /content/dataset
!rm -rf /content/workspace
!rm -rf /content/output

In [None]:
SCENE = "redkitchen"
TARGET_SEQ = "seq-13"

ZIP_PATH     = f"/content/drive/MyDrive/thesis/datasets/7_scenes/{SCENE}.zip"
ROOT         = "/content"
EXTRACT_DIR  = f"/content/{SCENE}"
DST_DIR      = f"/content/dataset"
WORKSPACE    = f"/content/workspace"

os.makedirs(DST_DIR,                         exist_ok=True)
os.makedirs(os.path.join(DST_DIR, "images"), exist_ok=True)
os.makedirs(os.path.join(DST_DIR, "depths"), exist_ok=True)
os.makedirs(WORKSPACE,                       exist_ok=True)

In [None]:
with zipfile.ZipFile(ZIP_PATH, 'r') as z:
    z.extractall(ROOT)

def unzip(zip_path):
    with zipfile.ZipFile(zip_path, 'r') as z:
        for member in z.namelist():
            # Problematic Thumbs.db files
            if member.lower().endswith(".db"):
                continue
            z.extract(member, EXTRACT_DIR)

# subzips = sorted(glob(os.path.join(EXTRACT_DIR, "seq-*.zip")))
# threads = [threading.Thread(target=unzip, args=(z,)) for z in subzips]

# for t in threads: t.start()
# for t in threads: t.join()

target_zip = os.path.join(EXTRACT_DIR, f"{TARGET_SEQ}.zip")
unzip(target_zip)

In [None]:
all_frames = []

# Inclusive Lower Bound - Exclusive Upper Bound
SKIP_RANGES = [(0, 250), (300, 1000)]
skipped_frames = []

def should_skip(index):
    return any(start <= index < end for start, end in SKIP_RANGES)

def get_pose_matrix(p):
    return np.loadtxt(p).reshape(4,4).tolist()

def process_sequence(seq_name):
    seq_dir = os.path.join(EXTRACT_DIR, seq_name)
    color_paths = sorted(glob(os.path.join(seq_dir, "*.color.png")))

    for idx, color_path in enumerate(color_paths):
        base = os.path.basename(color_path).replace(".color.png", "")
        depth_path = color_path.replace("color", "depth")
        pose_path = color_path.replace("color", "pose").replace(".png", ".txt")

        if not (os.path.exists(depth_path) and os.path.exists(pose_path)):
            continue

        new_name = f"{seq_name}_{base}"

        if should_skip(idx):
            frame = {
                "file_path": f"./images/{new_name}.png",
                "depth_path": f"./depths/{new_name}.png",
                "transform_matrix": get_pose_matrix(pose_path)
            }
            skipped_frames.append(frame)
            continue

        shutil.copy(color_path, os.path.join(DST_DIR, "images", f"{new_name}.png"))
        shutil.copy(depth_path, os.path.join(DST_DIR, "depths", f"{new_name}.png"))

        frame = {
            "file_path": f"./images/{new_name}.png",
            "depth_path": f"./depths/{new_name}.png",
            "transform_matrix": get_pose_matrix(pose_path)
        }
        all_frames.append(frame)

# all_seqs = sorted([os.path.basename(z).replace(".zip", "") for z in subzips])
# for seq in tqdm(all_seqs, desc="Processing sequences"):
#     process_sequence(seq)

print(f"Processing sequence: {TARGET_SEQ}, skipping index ranges: {SKIP_RANGES}")
process_sequence(TARGET_SEQ)

In [None]:
random.seed(42)
random.shuffle(all_frames)

n = len(all_frames)
n_train = int(0.8 * n)
n_val   = int(0.1 * n)

train_frames = all_frames[:n_train]
val_frames   = all_frames[n_train:n_train + n_val]
# test_frames  = all_frames[n_train + n_val:]
test_frames = [all_frames[0]]

In [None]:
intrinsics     = [[585.0, 0.0, 320.0],
                  [0.0, 585.0, 240.0],
                  [0.0,   0.0,   1.0]]
W, H           = 640, 480
camera_angle_x = 2 * np.arctan2(W/2, intrinsics[0][0])

def save_json(frames, split):
    meta = {
        "camera_angle_x": camera_angle_x,
        "fl_x": intrinsics[0][0], "fl_y": intrinsics[1][1],
        "cx":  intrinsics[0][2], "cy":  intrinsics[1][2],
        "w":   W, "h": H,
        "frames": frames
    }
    with open(os.path.join(DST_DIR, f"transforms_{split}.json"), "w") as f:
        json.dump(meta, f, indent=2)

save_json(train_frames, "train")
save_json(val_frames,   "val")
save_json(test_frames,  "test")

In [None]:
len(train_frames)

### NeRF training

In [None]:
!rm -rf /content/workspace/

In [None]:
!python main.py /content/dataset \
    --data_format nerf \
    --workspace /content/workspace \
    -O \
    --bound 1.0 \
    --scale 0.33 \
    --iters 20000 \
    --save_cnt 20 \
    --eval_cnt 1   \
    --test_no_video \
    --test_no_mesh

In [None]:
!rm -rf /content/output

### Checkpoint saving, NeRF rendering and dataset saving

In [None]:
OUTPUT_DIR   = f"/content/output"
VAL_DIR = f"{WORKSPACE}/validation"
TRANSFORM_JSON = f"{DST_DIR}/transforms_test.json"

POSE_DIR = f"/content/{SCENE}/{TARGET_SEQ}"

CHECKPOINTS_DRIVE_PATH = f"/content/drive/MyDrive/thesis/Data_Preparation/Checkpoints_Metrics/{SCENE}_{TARGET_SEQ}-1"
DRIVE_DIR = "/content/drive/MyDrive/thesis/Data_Preparation/Datasets_No_Augments"

os.makedirs(CHECKPOINTS_DRIVE_PATH, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)

In [None]:
CHECKPOINT = f"{WORKSPACE}/checkpoints/ngp_ep0500.pth"
LOGS = f"{WORKSPACE}/log_ngp.txt"

shutil.copy(CHECKPOINT, CHECKPOINTS_DRIVE_PATH)
shutil.copy(LOGS, CHECKPOINTS_DRIVE_PATH)

#### Include all frames into test frames to render them

By default, the nerf_template will render the test frames when performing the evaluation.

In [None]:
save_json(all_frames, "test")
with open(TRANSFORM_JSON, "r") as f:
    test_frames = json.load(f)["frames"]

In [None]:
!rm -rf /content/workspace/validation

* Same settings as the training, the framework will load the last checkpoint and only perform the test

In [None]:
!python main.py /content/dataset \
    --data_format nerf \
    --workspace /content/workspace \
    -O \
    --bound 1.0 \
    --scale 0.33 \
    --iters 20000 \
    --save_cnt 20 \
    --eval_cnt 1   \
    --test_no_video \
    --test_no_mesh

* Format the data

In [None]:
for i, frame in enumerate(test_frames):
    original_name = Path(frame["file_path"]).stem
    original_name = original_name.split('_')[-1]
    prefix = f"{SCENE}_{TARGET_SEQ}_{original_name}"

    rgb_path = Path(VAL_DIR) / f"ngp_ep0500_{i+1:04d}_rgb.png"
    depth_path = Path(VAL_DIR) / f"ngp_ep0500_{i+1:04d}_depth.png"

    pose_path = Path(POSE_DIR) / f"{original_name}.pose.txt"

    out_rgb = Path(OUTPUT_DIR) / f"{prefix}.rgb.png"
    out_depth = Path(OUTPUT_DIR) / f"{prefix}.depth.png"
    out_pose = Path(OUTPUT_DIR) / f"{prefix}.pose.txt"

    if rgb_path.exists():
        shutil.copy(rgb_path, out_rgb)
    else:
        print(f"[WARN] Missing RGB: {rgb_path}")

    if depth_path.exists():
        shutil.copy(depth_path, out_depth)
    else:
        print(f"[WARN] Missing depth: {depth_path}")

    if pose_path.exists():
        shutil.copy(pose_path, out_pose)
    else:
        print(f"[WARN] Missing pose: {pose_path}")

print("All files processed and copied to output directory.")

In [None]:
ZIP_NAME = f"{SCENE}_{TARGET_SEQ}-2.zip"

shutil.make_archive(base_name="/content/" + ZIP_NAME.replace(".zip", ""),
                    format="zip",
                    root_dir=OUTPUT_DIR,
                    base_dir=".")

shutil.copy(f"/content/{ZIP_NAME}", os.path.join(DRIVE_DIR, ZIP_NAME))

print(f"Zipped contents of {OUTPUT_DIR} and copied to {DRIVE_DIR}/{ZIP_NAME}")