# -----------------------------------------------------------------------------
# License Notice
# Copyright (c) 2025 Duc Huy Vu, Hieu Minh Tran
#
# This code is for personal and academic use only.
# Only the authors may modify, distribute, or reuse it.
# Viewing for grading purposes is allowed.
# -----------------------------------------------------------------------------

In [None]:
# in model.py (or wherever)
import torch.nn as nn
import torch

In [None]:
# Colab Cell 1 — Mount your Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# New Model Setup


In [None]:
import torch
import torch.nn as nn
import torch.nn.utils as nn_utils
import torch.optim as optim
from torch.optim.lr_scheduler import CosineAnnealingLR, OneCycleLR
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix
from typing import Tuple

class DepthwiseSeparableConv(nn.Module):
    def __init__(self, in_channels: int, out_channels: int,
                 kernel_size: int = 3, padding: int = 1,
                 num_groups: int = 8):
        super().__init__()
        # depthwise
        self.depthwise = nn.Conv2d(
            in_channels, in_channels,
            kernel_size=kernel_size,
            padding=padding,
            groups=in_channels,
            bias=False
        )
        # pointwise
        self.pointwise = nn.Conv2d(
            in_channels, out_channels,
            kernel_size=1,
            bias=False
        )
        # use GroupNorm for small‐batch stability
        self.gn = nn.GroupNorm(num_groups=min(num_groups, out_channels),
                               num_channels=out_channels)
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.depthwise(x)
        x = self.pointwise(x)
        x = self.gn(x)
        return self.relu(x)

class EfficientCNNEncoder(nn.Module):
    def __init__(self, in_channels: int = 1, out_channels: int = 8):
        super().__init__()
        self.encoder = nn.Sequential(
            DepthwiseSeparableConv(in_channels, 16),
            DepthwiseSeparableConv(16,          out_channels),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.encoder(x)

In [None]:
class Attention(nn.Module):
    def __init__(self, hidden_dim):
        super().__init__()
        self.attn = nn.Linear(hidden_dim, 1)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        weights = self.attn(x)         # (B, T, 1)
        weights = self.softmax(weights)
        context = torch.sum(weights * x, dim=1)  # (B, hidden_dim)
        return context

In [None]:
class EfficientUAVNavigationModel(nn.Module):
    def __init__(
        self,
        num_classes: int            = 9,
        hidden_dim:   int            = 128,
        encoder_out:  int            = 8,
        image_size:   Tuple[int,int] = (256, 256),
        debug:        bool           = False
    ):
        super().__init__()
        self.debug = debug

        # -- spatial encoder --
        self.encoder = EfficientCNNEncoder(in_channels=1,
                                           out_channels=encoder_out)
        self.cnn = nn.Sequential(
            DepthwiseSeparableConv(encoder_out, 64),
            nn.MaxPool2d(2),
            DepthwiseSeparableConv(64,         128),
            nn.MaxPool2d(2),
        )

        # -- check downsampling is ≥2×2 --
        # run a dummy tensor through to verify
        with torch.no_grad():
            dummy = torch.zeros(1, encoder_out, *image_size)
            feat  = self.cnn(dummy)
            Hf, Wf = feat.shape[-2], feat.shape[-1]
            assert Hf >= 2 and Wf >= 2, \
                f"Feature map {Hf}×{Wf} too small for 2×2 SPP"

        # -- spatial pyramid pooling into 2×2 bins --
        self.spp = nn.AdaptiveAvgPool2d((2, 2))  # outputs (B*T, 128, 2, 2)

        # -- projection down to GRU input dim --
        self.proj    = nn.Linear(128 * 4, 128)
        self.dropout = nn.Dropout(p=0.3)

        # -- temporal model --
        self.gru = nn.GRU(
            input_size=128,
            hidden_size=hidden_dim,
            batch_first=True
        )

        # -- attention & final classifier --
        self.attn    = nn.Linear(hidden_dim, 1)
        self.softmax = nn.Softmax(dim=1)
        self.fc      = nn.Linear(hidden_dim, num_classes)

        # initialize weights
        self._init_weights()


    def _init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, nonlinearity='relu')
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                nn.init.zeros_(m.bias)
            elif isinstance(m, nn.GRU):
                # leave GRU defaults or add custom init here if desired
                pass

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        x: (B, T, 1, H, W)
        returns logits: (B, num_classes)
        """
        B, T, C, H, W = x.shape
        # merge batch & time dims
        x = x.view(B * T, C, H, W)           # → (B*T, 1, H, W)

        # spatial features
        x = self.encoder(x)                  # → (B*T, encoder_out, H, W)
        x = self.cnn(x)                      # → (B*T, 128, H', W')

        # 2×2 spatial pyramid pooling
        x = self.spp(x)                      # → (B*T, 128, 2, 2)
        x = x.view(B, T, 128 * 4)            # → (B, T, 512)

        # project & dropout
        x = self.proj(x)                     # → (B, T, 128)
        x = self.dropout(x)

        # temporal modeling
        x, _ = self.gru(x)                   # → (B, T, hidden_dim)

        # attention pooling
        weights = self.attn(x)               # → (B, T, 1)
        weights = self.softmax(weights)      # → (B, T, 1)
        context = torch.sum(weights * x, dim=1)  # → (B, hidden_dim)

        # classification
        logits = self.fc(context)            # → (B, num_classes)

        if self.debug:
            print(f"[forward] logits.shape = {logits.shape}")

        return logits

In [None]:
import os, glob, numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

class DepthDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.files = sorted(glob.glob(os.path.join(data_dir, "*.npy")))
        self.transform = transform
    def __len__(self):
        return len(self.files)
    def __getitem__(self, idx):
        arr = np.load(self.files[idx]).astype(np.float32)  # [H,W]
        if self.transform:
            return self.transform(arr), 0
        else:
            return torch.from_numpy(arr).unsqueeze(0), 0

test_transform = transforms.Compose([
    transforms.ToTensor(),  # adds channel dim
    # transforms.Normalize(mean=[…], std=[…])  # if you used it
])


In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt

# Running

In [None]:
import torch

# 1) Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 2) Create & move model
model = EfficientUAVNavigationModel(
    num_classes=9,
    hidden_dim=128,
    encoder_out=8,
    image_size=(256,256),
    debug=False
).to(device)

# 3) Load weights
checkpoint = torch.load("/content/drive/MyDrive/Colab Notebooks/FastyUpgrades.pt",
                        map_location=device)
model.load_state_dict(checkpoint)

# 4) Eval mode
model.eval()


# Multiple Frames FIFO

In [None]:
import glob
import cv2
import numpy as np
import torch
import time
import matplotlib.pyplot as plt
from collections import deque
from matplotlib.backends.backend_pdf import PdfPages
from IPython.display import display, Image  # 引入 IPython.display 用於顯示圖片

# --- 0) Configuration ---
frame_dir = "/content/drive/MyDrive/Colab Notebooks/Data/Data_png"
seq_len   = 10
device    = torch.device("cuda" if torch.cuda.is_available() else "cpu")
pdf_output_path = "/content/drive/MyDrive/Colab Notebooks/output.pdf"  # Output PDF path

label_map = {
    0: 'forward', 1: 'backward', 2: 'up', 3: 'down',
    4: 'left', 5: 'right', 6: 'stop', 7: 'rotate_left', 8: 'rotate_right'
}

# --- 1) Inference helper ---
def infer_and_time(model, input_tensor, device):
    model.eval()
    input_tensor = input_tensor.to(device)
    if device.type == 'cuda':
        torch.cuda.synchronize()
    start = time.time()
    with torch.no_grad():
        logits = model(input_tensor)
    if device.type == 'cuda':
        torch.cuda.synchronize()
    pred    = logits.argmax(dim=1).item()
    elapsed = time.time() - start
    return pred, elapsed

# --- 2) Load & preprocess frames ---
frame_paths = sorted(glob.glob(f"{frame_dir}/*.png"))
frames = []

for p in frame_paths:
    img     = cv2.imread(p, cv2.IMREAD_COLOR)
    gray    = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    resized = cv2.resize(gray, (256, 256), interpolation=cv2.INTER_AREA)
    normed  = resized.astype(np.float32) / 255.0
    tensor  = torch.from_numpy(normed).unsqueeze(0)  # shape [1,256,256]
    frames.append(tensor)

assert len(frames) >= seq_len+1, f"Need at least {seq_len+1} frames, got {len(frames)}"

# --- 3) Pre-fill a FIFO buffer of the first `seq_len` frames ---
buffer = deque(maxlen=seq_len)
for i in range(seq_len):
    buffer.append(frames[i])   # each element is [1,256,256] tensor

# --- 4) Create PDF and process frames ---
with PdfPages(pdf_output_path) as pdf:
    for idx in range(seq_len, len(frames)):
        new_frame = frames[idx]
        buffer.append(new_frame)  # drop oldest, add newest

        # Build input sequence tensor of shape [1, T=seq_len, C=1, H=256, W=256]
        seq_tensor = torch.stack(list(buffer), dim=0)      # [T,1,H,W]
        seq_tensor = seq_tensor.unsqueeze(0)               # [1,T,1,H,W]

        # Run inference & measure latency
        pred, latency = infer_and_time(model, seq_tensor, device)

        # Display result in console
        start_frame = idx - seq_len + 1
        end_frame   = idx
        print(f"Buffer frames {start_frame}–{end_frame} → "
              f"Pred for frame {idx+1}: {label_map[pred]} "
              f"({latency*1000:.1f} ms)")

        # Create the figure for both displaying and saving to PDF
        img = new_frame.squeeze().cpu().numpy()
        plt.figure(figsize=(4,4))
        plt.imshow(img, cmap='gray')
        plt.title(f"Frame {idx+1} → Pred: {label_map[pred]}", fontsize=14)
        plt.axis('off')

        # Save the figure to a temporary buffer for displaying in Colab
        from io import BytesIO
        buf = BytesIO()
        plt.savefig(buf, format='png', bbox_inches='tight')
        buf.seek(0)

        # Display the figure in Colab
        display(Image(buf.getvalue()))

        # Save the figure to the PDF
        pdf.savefig()  # Save the current figure to the PDF
        plt.close()    # Close the figure to free memory

print(f"PDF saved to {pdf_output_path}")

# save_data

In [None]:
import glob
import cv2
import numpy as np
import torch
import time
import matplotlib.pyplot as plt
from collections import deque
from matplotlib.backends.backend_pdf import PdfPages
from IPython.display import display, Image

# --- 0) Configuration ---
frame_dir = "/content/drive/MyDrive/Colab Notebooks/Data/Data_png"
seq_len = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
pdf_output_path = "/content/drive/MyDrive/Colab Notebooks/angelo_output.pdf"  # Output PDF path

label_map = {
    0: 'forward', 1: 'backward', 2: 'up', 3: 'down',
    4: 'left', 5: 'right', 6: 'stop', 7: 'rotate_left', 8: 'rotate_right'
}

# --- 1) Inference helper ---
def infer_and_time(model, input_tensor, device):
    model.eval()
    input_tensor = input_tensor.to(device)
    if device.type == 'cuda':
        torch.cuda.synchronize()
    start = time.time()
    with torch.no_grad():
        logits = model(input_tensor)
    if device.type == 'cuda':
        torch.cuda.synchronize()
    pred = logits.argmax(dim=1).item()
    elapsed = time.time() - start
    return pred, elapsed

# --- 2) Load & preprocess frames ---
frame_paths = sorted(glob.glob(f"{frame_dir}/*.png"))
frames = []

for p in frame_paths:
    img = cv2.imread(p, cv2.IMREAD_COLOR)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    resized = cv2.resize(gray, (256, 256), interpolation=cv2.INTER_AREA)
    normed = resized.astype(np.float32) / 255.0
    tensor = torch.from_numpy(normed).unsqueeze(0)  # shape [1,256,256]
    frames.append(tensor)

assert len(frames) >= seq_len + 1, f"Need at least {seq_len + 1} frames, got {len(frames)}"

# --- 3) Pre-fill a FIFO buffer of the first `seq_len` frames ---
buffer = deque(maxlen=seq_len)
for i in range(seq_len):
    buffer.append(frames[i])  # each element is [1,256,256] tensor

# --- 4) Create PDF and process frames ---
with PdfPages(pdf_output_path) as pdf:
    for idx in range(seq_len, len(frames)):
        new_frame = frames[idx]
        buffer.append(new_frame)  # drop oldest, add newest

        # Build input sequence tensor of shape [1, T=seq_len, C=1, H=256, W=256]
        seq_tensor = torch.stack(list(buffer), dim=0)  # [T,1,H,W]
        seq_tensor = seq_tensor.unsqueeze(0)  # [1,T,1,H,W]

        # Run inference & measure latency
        pred, latency = infer_and_time(model, seq_tensor, device)

        # Display result in console
        start_frame = idx - seq_len + 1
        end_frame = idx
        print(f"Buffer frames {start_frame}–{end_frame} → "
              f"Pred for frame {idx+1}: {label_map[pred]} "
              f"({latency*1000:.1f} ms)")

        # Create the figure for both displaying and saving to PDF
        img = new_frame.squeeze().cpu().numpy()
        plt.figure(figsize=(3, 3), dpi=300)  # Smaller size for paper, high DPI
        plt.imshow(img, cmap='gray')
        plt.title(f"Frame {idx+1}: {label_map[pred]}", fontsize=10)  # Smaller font
        plt.axis('off')
        plt.tight_layout(pad=0.5)  # Minimize padding

        # Save the figure to a temporary buffer for displaying in Colab
        from io import BytesIO
        buf = BytesIO()
        plt.savefig(buf, format='png', bbox_inches='tight', pad_inches=0.05, dpi=300)
        buf.seek(0)

        # Display the figure in Colab
        display(Image(buf.getvalue()))

        # Save the figure to the PDF
        pdf.savefig(bbox_inches='tight', pad_inches=0.05, dpi=300)  # High-quality PDF
        plt.close()  # Close the figure to free memory

print(f"PDF saved to {pdf_output_path}")