In [1]:
import os  
from data.dataset import *
import numpy as np
import torch
from torch import nn
import torch.utils.data as data
from torch.utils.data import DataLoader 
from tqdm import tqdm
from tools.evaluation_metric import  SSIM, SAM, ERGAS, Fssim, Frmse, Fpsnr, midd_calc_rmse, calc_rmse
from skimage.metrics import peak_signal_noise_ratio as PSNR
from model.Ablation.moe_IFM_clip import MOE_IFM_Clip
os.environ['CUDA_VISIBLE_DEVICES']='0' 

test_exp_name = 'MOE_IFM_Clip_7tasks'
weight_path = './Checkpoint/All-in-one/MOE_IFM_Clip_7tasks/Best_PSNR_MRI.pth'
save_test_path = os.path.join('./Checkpoint/test_metric','{}.txt'.format(test_exp_name))
os.makedirs(os.path.dirname(save_test_path), exist_ok=True)

# 创建、加载模型
model = MOE_IFM_Clip(dim = 22, num_blocks=[3, 4, 4, 5]).cuda()
model.load_state_dict(torch.load(weight_path)['model_state_dict'])
clip_feat = torch.load('./data/SR_text_feature_GF2_WV4_QB_MRI_Dep4_Dep8_Dep16.th').cuda()


# 数据集路径
depth_NYU_root = '/data/cjj/dataset/NYU_V2'
depth_Middlebury_root = '/data/cjj/dataset/Depth_Enh/01_Middlebury_Dataset'
depth_Lu_root = '/data/cjj/dataset/Depth_Enh/03_RGBD_Dataset'

mri_root = '/data/wtt/WJ_Code_DURE_HF/MRI_dataset/BT'

pan_WV4_root = '/data/datasets/pansharpening/NBU_dataset0730/WV4'
pan_QB_root = '/data/datasets/pansharpening/NBU_dataset0730/QB'
pan_GF1_root = '/data/datasets/pansharpening/NBU_dataset0730/GF1'


In [2]:
## Depth测试
def test_MI_LU(net,dataloader,dataset_name,scale):
    net.eval()
    if dataset_name == "Middlebury":
        rmse = np.zeros(30)
    elif dataset_name == "Lu":
        rmse = np.zeros(6)

    if scale == 4:
        clip_feature = clip_feat[4]
    elif scale == 8:
        clip_feature = clip_feat[5]
    elif scale == 16:
        clip_feature = clip_feat[6]
    with torch.no_grad():
        pbar = tqdm(dataloader, desc=f'Testing {dataset_name}', leave=False)  # leave=False 可以在完成后清除进度条
        for index, data in enumerate(pbar):
            gt = data['gt'].cuda()
            guidance = data['guidance'].cuda()
            lr = data['lr'].cuda()

            H, W = guidance.shape[2], guidance.shape[3]
            new_H, new_W = (H // 8) * 8, (W // 8) * 8  # 裁剪成8的整数倍
            
            guidance = guidance[:, :, :new_H, :new_W]
            gt = gt[:, :, :new_H, :new_W]
            
            img_predict = net(lr, guidance, clip_feature)
            rmse[index] = midd_calc_rmse(img_predict[0,0], gt[0,0])
    return rmse.mean()

def test_NYU(net, dataloader, test_minmax, scale):
    net.eval()
    count = 0
    sum_rmse = 0
    if scale == 4:
        clip_feature = clip_feat[4]
    elif scale == 8:
        clip_feature = clip_feat[5]
    elif scale == 16:
        clip_feature = clip_feat[6]
    with torch.no_grad():
        pbar = tqdm(dataloader, desc=f'Testing NYU x{scale}', leave=True)
        for index, data in enumerate(pbar):
            count += 1
            lr = data[0].cuda()
            gt = data[1].cuda()  
            guidance = data[2].cuda()
            img_predict = net(lr, guidance, clip_feature)[0]
            rmse = calc_rmse(img_predict[0], gt[0, 0], torch.from_numpy(test_minmax[:, index]).cuda())
            sum_rmse += rmse
    return sum_rmse / count

data_transform = transforms.Compose([transforms.ToTensor()])
test_scales = [4, 8, 16]
test_minmax = np.load(f'{depth_NYU_root}/test_minmax.npy')

# 测试NYU数据集
# nyu_rmse = []
# for scale in test_scales:
#     print(f"\nTesting NYU x{scale}")
#     val_nyu_dataset = NYU_v2_datset(root_dir=depth_NYU_root, scale=scale, transform=data_transform, train=False)
#     val_nyu_dataloader = DataLoader(val_nyu_dataset, batch_size=1, shuffle=False)
#     print(len(val_nyu_dataloader))
#     rmse = test_NYU(model, val_nyu_dataloader, test_minmax, scale)
#     nyu_rmse.append(rmse)
#     print(f"NYU x{scale} RMSE: {rmse:.4f}")

# 测试Middlebury数据集 
middlebury_rmse = []
for scale in test_scales:
    print(f"\nTesting Middlebury x{scale}")
    val_middlebury_dataset = Depth_test_dataset(root_dir=depth_Middlebury_root, scale=scale, transform=data_transform)
    val_middlebury_dataloader = DataLoader(val_middlebury_dataset, batch_size=1, shuffle=False)
    rmse = test_MI_LU(model, val_middlebury_dataloader, "Middlebury", scale)
    middlebury_rmse.append(rmse)
    print(f"Middlebury x{scale} RMSE: {rmse:.4f}")

# 测试Lu数据集
lu_rmse = []
for scale in test_scales:
    print(f"\nTesting Middlebury x{scale}")
    val_lu_dataset = Depth_test_dataset(root_dir=depth_Lu_root, scale=scale, transform=data_transform)
    val_lu_dataloader = DataLoader(val_lu_dataset, batch_size=1, shuffle=False)
    rmse = test_MI_LU(model, val_lu_dataloader, "Lu", scale)
    lu_rmse.append(rmse)
    print(f"Lu x{scale} RMSE: {rmse:.4f}")

# 保存结果到save_test_path
with open(save_test_path, 'w') as f:
    f.write("NYU RMSE Results:\n")
    for scale, rmse in zip(test_scales, nyu_rmse):
        f.write(f"Scale {scale}: RMSE = {rmse:.4f}\n")
    
    f.write("\nMiddlebury RMSE Results:\n")
    for scale, rmse in zip(test_scales, middlebury_rmse):
        f.write(f"Scale {scale}: RMSE = {rmse:.4f}\n")
    
    f.write("\nLu RMSE Results:\n")
    for scale, rmse in zip(test_scales, lu_rmse):
        f.write(f"Scale {scale}: RMSE = {rmse:.4f}\n")

print(f"Results saved to {save_test_path}")






Testing Middlebury x4
30


                                                                   

Middlebury x4 RMSE: 2.8180

Testing Middlebury x8
30


                                                                   

Middlebury x8 RMSE: 1.7720

Testing Middlebury x16
30


                                                                   

Middlebury x16 RMSE: 2.9742

Testing Middlebury x4
6


                                                         

Lu x4 RMSE: 0.9930

Testing Middlebury x8
6


                                                         

Lu x8 RMSE: 1.8119

Testing Middlebury x16
6


                                                         

Lu x16 RMSE: 4.1471


NameError: name 'nyu_rmse' is not defined