In [None]:
import os
import glob
from sklearn.model_selection import train_test_split
import torch
import segmentation_models_pytorch as smp
import numpy as np
import rasterio
from rasterio.windows import from_bounds
from rasterio.enums import Resampling as RasterioResampling
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
from torch.utils.data import Dataset, DataLoader
import sys 
import random


MODEL_FILENAME_TO_TEST = "20250705-060518_UnetPlusPlus_efficientnet-b7_best.pth"

BASE_DATA_PATH = "Project II"   #phải sửa lại path này cho phù hợp với hệ thống 
OUTPUT_DIR = "Project II/outputs"   #phải sửa lại path này cho phù hợp với hệ thống 


# Cấu hình model 

BATCH_SIZE = 4
PATCH_SIZE = 256
TERRAIN_THRESHOLD = 0.2
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"





# Hàm tải và tiền xử lý dữ liệu
def load_and_preprocess_pair(dsm_path, dem_path, terrain_threshold):
    try:
        with rasterio.open(dsm_path) as dsm_src, rasterio.open(dem_path) as dem_src:
            if dsm_src.crs != dem_src.crs:
                return None, None
            
            left = max(dsm_src.bounds.left, dem_src.bounds.left)
            bottom = max(dsm_src.bounds.bottom, dem_src.bounds.bottom)
            right = min(dsm_src.bounds.right, dem_src.bounds.right)
            top = min(dsm_src.bounds.top, dem_src.bounds.top)

            if left >= right or bottom >= top:
                return None, None
                
            window = from_bounds(left, bottom, right, top, dem_src.transform)
            dem_nodata = dem_src.nodata if dem_src.nodata is not None else -9999
            dsm_nodata = dsm_src.nodata if dsm_src.nodata is not None else -9999
            dem_data = dem_src.read(1, window=window, boundless=True, fill_value=dem_nodata).astype(np.float32)
            dsm_data = dsm_src.read(1, window=window, out_shape=dem_data.shape, boundless=True, fill_value=dsm_nodata, resampling=RasterioResampling.nearest).astype(np.float32)
            
            valid_mask = (dsm_data != dsm_nodata) & (dem_data != dem_nodata)
            label_mask = np.zeros_like(dem_data, dtype=np.float32)
            is_terrain = np.abs(dsm_data - dem_data) <= terrain_threshold
            label_mask[valid_mask & is_terrain] = 1.0
            
            if np.any(valid_mask):
                mean_val = np.mean(dsm_data[valid_mask])
                std_val = np.std(dsm_data[valid_mask])
                if std_val > 1e-6:
                    dsm_normalized = (dsm_data - mean_val) / std_val
                    dsm_normalized[~valid_mask] = 0
                else:
                    dsm_normalized = np.zeros_like(dsm_data)
            else:
                dsm_normalized = np.zeros_like(dsm_data)
                
            return dsm_normalized, label_mask
    except Exception as e:
        print(f"Lỗi khi xử lý cặp file {os.path.basename(dsm_path)}: {e}")
        return None, None

# Lớp Dataset
class GeoTiffPatchDataset(Dataset):
    def __init__(self, file_pairs, patch_size, terrain_threshold):
        self.file_pairs = file_pairs
        self.patch_size = patch_size
        self.terrain_threshold = terrain_threshold
        self.patches = self._create_patches()

    def _create_patches(self):
        patches = []
        print("Đang tạo các patch từ file ảnh lớn...")
        for dsm_path, dem_path in tqdm(self.file_pairs):
            dsm_full, mask_full = load_and_preprocess_pair(dsm_path, dem_path, self.terrain_threshold)
            if dsm_full is not None:
                img_height, img_width = dsm_full.shape
                for y in range(0, img_height, self.patch_size):
                    for x in range(0, img_width, self.patch_size):
                        dsm_patch = dsm_full[y:y+self.patch_size, x:x+self.patch_size]
                        mask_patch = mask_full[y:y+self.patch_size, x:x+self.patch_size]
                        pad_h = self.patch_size - dsm_patch.shape[0]
                        pad_w = self.patch_size - dsm_patch.shape[1]
                        if pad_h > 0 or pad_w > 0:
                            dsm_patch = np.pad(dsm_patch, ((0, pad_h), (0, pad_w)), mode='constant', constant_values=0)
                            mask_patch = np.pad(mask_patch, ((0, pad_h), (0, pad_w)), mode='constant', constant_values=0)
                        if np.any(mask_patch):
                            patches.append((dsm_patch, mask_patch))
        return patches

    def __len__(self):
        return len(self.patches)

    def __getitem__(self, idx):
        dsm_patch, mask_patch = self.patches[idx]
        dsm_tensor = torch.from_numpy(dsm_patch).float().unsqueeze(0)
        mask_tensor = torch.from_numpy(mask_patch).float().unsqueeze(0)
        return dsm_tensor, mask_tensor

# CÁC HÀM ĐÁNH GIÁ MÔ HÌNH
# Hàm validate_one_epoch
def validate_one_epoch(model, dataloader, loss_fn_1, loss_fn_2, device):
    model.eval()
    total_loss = 0
    epoch_tp, epoch_fp, epoch_fn, epoch_tn = 0, 0, 0, 0
    with torch.no_grad():
        progress_bar = tqdm(dataloader, desc="Calculating Metrics")
        for inputs, labels in progress_bar:
            # ... (phần code bên trong vòng lặp giữ nguyên)
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = loss_fn_1(outputs, labels) + loss_fn_2(outputs, labels)
            total_loss += loss.item()
            preds = (torch.sigmoid(outputs) > 0.5).long()
            labels_long = labels.long()
            tp, fp, fn, tn = smp.metrics.get_stats(preds, labels_long, mode='binary')
            epoch_tp += tp.sum(); epoch_fp += fp.sum(); epoch_fn += fn.sum(); epoch_tn += tn.sum()

    avg_loss = total_loss / len(dataloader)
    
    # Trả về một dictionary chứa tất cả các chỉ số
    metrics = {
        "loss": avg_loss,
        "iou": smp.metrics.iou_score(epoch_tp, epoch_fp, epoch_fn, epoch_tn, reduction='micro').item(),
        "f1_score": smp.metrics.f1_score(epoch_tp, epoch_fp, epoch_fn, epoch_tn, reduction='micro').item(),
        "precision": smp.metrics.precision(epoch_tp, epoch_fp, epoch_fn, epoch_tn, reduction='micro').item(),
        "recall": smp.metrics.recall(epoch_tp, epoch_fp, epoch_fn, epoch_tn, reduction='micro').item()
    }
    return metrics


# Hàm Hiển thị Kết quả
def visualize_predictions(model, dataloader, device, save_path, num_examples=5):
    model.eval()
    plt.figure(figsize=(15, num_examples * 5))

    num_batches = len(dataloader)
    if num_batches == 0:
        print("Lỗi: Test Dataloader rỗng, không có dữ liệu để hiển thị.")
        return

    
    # Lấy số lượng batch random, nếu muốn chỉ định có thể để i == "số thứ tự file chỉ định" trong vòng lặp for
    random_batch_idx = random.randint(0, num_batches - 1)
    print(f"Hiển thị các ảnh từ batch ngẫu nhiên số {random_batch_idx + 1}/{num_batches}...")
    for i, batch in enumerate(dataloader):
        if i == random_batch_idx:
            inputs, labels = batch
            break
    inputs, labels = inputs.to(device), labels.to(device)
    
    with torch.no_grad():
        outputs = model(inputs)
        preds = (torch.sigmoid(outputs) > 0.5).float()
        
    inputs_np = inputs.cpu().numpy()
    labels_np = labels.cpu().numpy()
    preds_np = preds.cpu().numpy()
    
    for i in range(min(num_examples, len(inputs_np))):
        plt.subplot(num_examples, 3, i * 3 + 1)
        plt.imshow(inputs_np[i, 0, :, :], cmap='viridis')
        plt.title(f"Input DSM Patch {i + 1}")
        plt.axis('off')

        plt.subplot(num_examples, 3, i * 3 + 2)
        plt.imshow(labels_np[i, 0, :, :], cmap='gray')
        plt.title(f"Ground Truth Mask {i + 1}")
        plt.axis('off')

        plt.subplot(num_examples, 3, i * 3 + 3)
        plt.imshow(preds_np[i, 0, :, :], cmap='gray')
        plt.title(f"Predicted Mask {i + 1}")
        plt.axis('off')
    
    plt.tight_layout()
    plt.savefig(save_path)
    print(f"Ảnh ví dụ dự đoán đã được lưu tại: {save_path}")
    plt.show()

#-TẢI DỮ LIỆU, CHẠY ĐÁNH GIÁ VÀ HIỂN THỊ

print("Script Test Model Bắt đầu")
print(f"Sử dụng thiết bị: {DEVICE.upper()}")

# Tự động trích xuất kiến trúc và backbone từ tên file
try:
    # Bỏ đi phần timestamp 
    name_without_timestamp = MODEL_FILENAME_TO_TEST.split('_', 1)[1]
    
    # Bỏ đi phần đuôi '_best.pth'
    clean_name = name_without_timestamp.replace('_best.pth', '')
    
    # Tách kiến trúc và backbone 
    parts = clean_name.split('_', 1)
    MODEL_ARCHITECTURE = parts[0]
    MODEL_BACKBONE = parts[1] 
    
    print(f"Đã nhận diện model: Kiến trúc={MODEL_ARCHITECTURE}, Backbone={MODEL_BACKBONE}")

except Exception as e:
    print(f"LỖI: Tên file '{MODEL_FILENAME_TO_TEST}' không đúng định dạng hoặc có lỗi khi xử lý. Lỗi: {e}")
    sys.exit() # Thoát chương trình nếu tên file sai

# Tải và chuẩn bị đường dẫn file test ---
DSM_DIR = os.path.join(BASE_DATA_PATH, 'DSM')
DEM_DIR = os.path.join(BASE_DATA_PATH, 'DEM')
dsm_pattern = os.path.join(DSM_DIR, '**', '*.TIF')
dem_pattern = os.path.join(DEM_DIR, '**', '*.TIF')
dsm_files = sorted(glob.glob(dsm_pattern, recursive=True))
dem_files = sorted(glob.glob(dem_pattern, recursive=True))
dem_dict = {os.path.basename(f).replace('dem', 'dsm'): f for f in dem_files}
file_pairs = [];
for dsm_file in dsm_files:
    base_name = os.path.basename(dsm_file)
    if base_name in dem_dict: file_pairs.append((dsm_file, dem_dict[base_name]))
train_val_pairs, test_pairs = train_test_split(file_pairs, test_size=0.15, random_state=42)
print(f"Đã tìm thấy {len(test_pairs)} cặp file trong tập test.")

# Tải model
model_class = getattr(smp, MODEL_ARCHITECTURE)
best_model = model_class(encoder_name=MODEL_BACKBONE, encoder_weights="imagenet", in_channels=1, classes=1).to(DEVICE)  
# Phải sửa imagenet thành advprop nếu khác weight
best_model_path = os.path.join(OUTPUT_DIR, MODEL_FILENAME_TO_TEST)
best_model.load_state_dict(torch.load(best_model_path))
print(f"Đã tải model tốt nhất từ: {best_model_path}")

# Định nghĩa các hàm loss
loss_fn_1 = smp.losses.DiceLoss(mode='binary')
loss_fn_2 = smp.losses.SoftBCEWithLogitsLoss()

# Tạo test_dataset và test_loader ---
test_dataset = GeoTiffPatchDataset(test_pairs, PATCH_SIZE, TERRAIN_THRESHOLD)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)


# --- BƯỚC 1: Chạy đánh giá định lượng (TÍNH TOÁN CHỈ SỐ) ---
print("\n--- Đang tính toán các chỉ số trên tập test... ---")
test_metrics = validate_one_epoch(best_model, test_loader, loss_fn_1, loss_fn_2, DEVICE)

print("\n--- KẾT QUẢ ĐỊNH LƯỢNG ---")
print(f"  Test Loss:    {test_metrics['loss']:.4f}")
print(f"  Test IoU:     {test_metrics['iou']:.4f}")
print(f"  Test F1-Score:  {test_metrics['f1_score']:.4f}")
print(f"  Test Precision: {test_metrics['precision']:.4f}")
print(f"  Test Recall:    {test_metrics['recall']:.4f}")

# --- BƯỚC 2: Chạy hiển thị định tính (HIỂN THỊ HÌNH ẢNH) ---
print("\n--- KẾT QUẢ ĐỊNH TÍNH (VÍ DỤ DỰ ĐOÁN) ---")
prediction_save_path = os.path.join(OUTPUT_DIR, f"{MODEL_FILENAME_TO_TEST.replace('_best.pth', '')}_predictions.png")
visualize_predictions(best_model, test_loader, DEVICE, prediction_save_path)

print("\n--- QUÁ TRÌNH ĐÁNH GIÁ HOÀN TẤT ---")