In [17]:
# resnet 50 damage evaluation
import torch
import torch.nn as nn
import torchvision.models as models
from matplotlib import pyplot as plt
from torch.utils.data import DataLoader
from sensorFusionDataset.data_split import split_dataset
from testLoop import test_loop
from trainLoop import train_loop
from visualization import plot_losses, plot_metrics,plotTraining
import torchvision.transforms as transform

import h5py as h5
from datetime import datetime
import copy
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import glob
import time
from torch.utils.data.dataset import Dataset
import torchvision.transforms as transform
from PIL import Image

damaged_img_path = '../../Dataset/lidar_img_16200/masked_image_65' 
ori_path = '../../Dataset/lidar_img_16200/original_image_gray'

damaged_img_list = sorted(glob.glob(damaged_img_path + '/*'))[0:5000]
ori_img_list = sorted(glob.glob(ori_path + '/*'))[0:5000]

epochs = 100
batch_size = 32


class MyDataset(Dataset):
    def __init__(self, damaged_img_list, ori_list):
        self.image_list = []
        for path in damaged_img_list:
            self.image_list.append((path, 1))
        for path in ori_list:
            self.image_list.append((path, 0))
        self.transform = transform.Compose([
    # transform.Resize((224, 224)),
    transform.Grayscale(num_output_channels=3), 
    transform.ToTensor(),
        
])
        
    def __getitem__(self, index):
        image_path, label = self.image_list[index]
        image = Image.open(image_path)
        image = self.transform(image)

        return image, label

    def __len__(self):
        return len(self.image_list)


# 创建数据集实例
res_dataset = MyDataset(damaged_img_list, ori_img_list)

train_dataset, test_dataset, val_dataset = split_dataset(res_dataset, 0.7, 0.2)
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, drop_last=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=True, num_workers=0, drop_last=True)
val_loader = DataLoader(dataset=val_dataset, batch_size=batch_size, shuffle=False, num_workers=0, drop_last=True)

training_loss = []
testing_loss = []
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# 加载预训练的ResNet-50模型
resnet = models.resnet50(pretrained=True)
# 替换最后一层全连接层
num_features = resnet.fc.in_features
resnet.fc = nn.Sequential(
    nn.Linear(num_features, 1),  # 输出为1维
    nn.Sigmoid()  # 添加sigmoid激活函数
)
resnet = resnet.to(device)

# 定义损失函数和优化器
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(resnet.parameters(), lr=0.001)

# Training loop
best_model_wts = copy.deepcopy(resnet.state_dict())
min_loss = float('inf')
save_path = 'resnet_50_evaluation/'

start_time = time.time()

for epoch in range(1, epochs + 1):
    # Train the model
    resnet.train()
    train_loss = 0
    for i, (image, label) in enumerate(train_loader):
        image = image.to(device)
        label = label.to(device)

        optimizer.zero_grad()

        # Forward pass
        outputs = resnet(image)
        loss = criterion(outputs, label.float().view(-1, 1))

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        # Update training loss
        train_loss += loss.item() * image.size(0)

    # Calculate average training loss
    train_epoch_loss = train_loss / len(train_loader.dataset)
    training_loss.append(train_epoch_loss)

    # Test the model
    resnet.eval()
    test_loss = 0
    with torch.no_grad():
        for image, label in test_loader:
            image = image.to(device)
            label = label.to(device)

            # Forward pass
            outputs = resnet(image)
            loss = criterion(outputs, label.float().view(-1, 1))

            # Update test loss
            test_loss += loss.item() * image.size(0)

        # Calculate average test loss
        test_epoch_loss = test_loss / len(test_loader.dataset)
        testing_loss.append(test_epoch_loss)

        # Check if the current model has the lowest test loss
        if test_epoch_loss < min_loss:
            min_loss = test_epoch_loss
            best_model_wts = copy.deepcopy(resnet.state_dict())

    # Print epoch statistics
    print('Epoch: {}/{} \tTraining Loss: {:.6f} \tTest Loss: {:.6f} \t{:.0f}s'.format(epoch, epochs, train_epoch_loss, test_epoch_loss,time.time() - start_time ))



# Save the best model
torch.save(best_model_wts, 'resnet_50_evaluation/best_model_{}.pth'.format(epochs))
print('Best model saved.')

Epoch: 1/100 	Training Loss: 0.505214 	Test Loss: 0.499756 	63s
Epoch: 2/100 	Training Loss: 0.503005 	Test Loss: 0.500319 	125s
Epoch: 3/100 	Training Loss: 0.502568 	Test Loss: 0.499559 	188s
Epoch: 4/100 	Training Loss: 0.502893 	Test Loss: 0.499749 	251s
Epoch: 5/100 	Training Loss: 0.502945 	Test Loss: 0.499369 	314s
Epoch: 6/100 	Training Loss: 0.503053 	Test Loss: 0.499559 	376s
Epoch: 7/100 	Training Loss: 0.502945 	Test Loss: 0.499369 	439s
Epoch: 8/100 	Training Loss: 0.502891 	Test Loss: 0.499749 	502s
Epoch: 9/100 	Training Loss: 0.503053 	Test Loss: 0.499939 	565s
Epoch: 10/100 	Training Loss: 0.502782 	Test Loss: 0.499559 	627s
Epoch: 11/100 	Training Loss: 0.502945 	Test Loss: 0.499749 	690s
Epoch: 12/100 	Training Loss: 0.502836 	Test Loss: 0.499369 	753s
Epoch: 13/100 	Training Loss: 0.502999 	Test Loss: 0.499179 	816s
Epoch: 14/100 	Training Loss: 0.502945 	Test Loss: 0.500129 	878s
Epoch: 15/100 	Training Loss: 0.502945 	Test Loss: 0.499749 	941s
Epoch: 16/100 	Train

In [24]:

class MyDataset(Dataset):
    def __init__(self, damaged_img_list, ori_list):
        self.image_list = []
        for path in damaged_img_list:
            self.image_list.append((path, 1))
        for path in ori_list:
            self.image_list.append((path, 0))
        self.transform = transform.Compose([
    # transform.Resize((224, 224)),
    transform.Grayscale(num_output_channels=3), 
    transform.ToTensor(),
        
])
        
    def __getitem__(self, index):
        image_path, label = self.image_list[index]
        image = Image.open(image_path)
        image = self.transform(image)

        return image, label

    def __len__(self):
        return len(self.image_list)

# Resnet 50

import torch
import glob
from torch.utils.data import DataLoader


metric_value_list = []
batch_size = 32

img_list =  sorted(glob.glob('RecosntructedImages/2_fusion_linear/0.7-0.3/*'))
# img_list =  sorted(glob.glob('RecosntructedImages/2_fusion_concatenate/*'))
# img_list =  sorted(glob.glob('RecosntructedImages/2_denoising/*'))

ValDataset = MyDataset(img_list, [])
new_valloader = DataLoader(dataset=ValDataset, batch_size=batch_size, shuffle=False, num_workers=0, drop_last=True)



# 加载预训练的ResNet-50模型
resnet = models.resnet50(pretrained=True)
# 替换最后一层全连接层
num_features = resnet.fc.in_features
resnet.fc = nn.Sequential(
    nn.Linear(num_features, 1),  # 输出为1维
    nn.Sigmoid()  # 添加sigmoid激活函数
)
resnet = resnet.to(device)
resnet.load_state_dict(torch.load('resnet_50_evaluation/best_model_100.pth'))
resnet.to(device)
dataiter = iter(new_valloader)

image, label = next(dataiter)
image = image.to(device)
label = label.to(device)

output = resnet(image)
metric = sum(output)/len(output)
print(metric.item())


# for i, batch in enumerate(val_loader):
#     ori_img, noi_img = batch
#     test_model_best.eval()
#     output = test_model_best(noi_img)
#     count = 0
#     for j in range(ori_img.size(0)):
#         output_np = output[j][0].detach().cpu().numpy()
#         count = count + output_np
#     ssim_img = round(count / ori_img.size(0), 3)
#     metric_value_list.append(ssim_img)

# print(metric_value_list)


# print(metric_values)








0.5897318124771118


In [None]:
# LLIPS metrics

import lpips

from sensorFusionDataset.creatDataset import CreateDatasets
from evaluation import calculate_frechet_distance
import glob
from matplotlib import pyplot as plt
from torch.utils.data import DataLoader
from sensorFusionDataset.creatDataset import CreateDatasets
from sensorFusionNetwork.simpleAutoencoder import SimpleAutoencoder
import numpy as np
import torch


dataset_path = '../../Dataset/lidar_img_16200/damaged_image_gray_all/DamagedDatasetforComparison'
noi_img_list = sorted(glob.glob(dataset_path + '/*'))
CAM_FRONT_path = '../../Dataset/lidar_img_16200/original_image_gray'
img_list = sorted(glob.glob(CAM_FRONT_path + '/*'))[14400:14656]

metric_value_list =[]
batch_size = 32

ValDataset = CreateDatasets(img_list, noi_img_list)
new_valloader = DataLoader(dataset=ValDataset, batch_size=batch_size, shuffle=False, num_workers=0, drop_last=True)

test_model_best = SimpleAutoencoder()
# test_model_last.load_state_dict(torch.load('model_save/damage_{}/last_model_{}.pth'.format(damage_label, experimentIndex)))
test_model_best.load_state_dict(
    torch.load('Experiment2_sequence_inverse/img2img_model/best_model_300_32_20230502_052654.pth')
)

loss_fn = lpips.LPIPS(net='alex')


for i, batch in enumerate(new_valloader):  # i -->index of batch
    ori_img, noi_img = batch
    test_model_best.eval()
    output = test_model_best(noi_img)
    count = 0
    for j in range(ori_img.size(0)):
        # ori_img_np = ori_img[j][0].detach().numpy()
        # output_np = output[j][0].detach().numpy()
        lpips_img = loss_fn.forward(ori_img, output)
        sum_result = (torch.sum(lpips_img)).item()
        print(sum_result)
        count = count+sum_result
        # count = count.item()
    lpips_img = round(count/ori_img.size(0),3)
    print(lpips_img)
    metric_value_list.append(lpips_img)

print(f'scuccess: {metric_value_list}')



    

In [None]:
# FID

# FID
from sensorFusionDataset.creatDataset import CreateDatasets
from evaluation import calculate_frechet_distance
import glob
from matplotlib import pyplot as plt
from torch.utils.data import DataLoader
from sensorFusionDataset.creatDataset import CreateDatasets
from sensorFusionNetwork.simpleAutoencoder import SimpleAutoencoder
import numpy as np
import torch



dataset_path = '../../Dataset/lidar_img_16200/damaged_image_gray_all/DamagedDatasetforComparison'
noi_img_list = sorted(glob.glob(dataset_path + '/*'))
CAM_FRONT_path = '../../Dataset/lidar_img_16200/original_image_gray'
img_list = sorted(glob.glob(CAM_FRONT_path + '/*'))[14400:14656]

metric_value_list =[]
batch_size = 32

ValDataset = CreateDatasets(img_list, noi_img_list)
new_valloader = DataLoader(dataset=ValDataset, batch_size=batch_size, shuffle=False, num_workers=0, drop_last=True)

test_model_best = SimpleAutoencoder()
# test_model_last.load_state_dict(torch.load('model_save/damage_{}/last_model_{}.pth'.format(damage_label, experimentIndex)))
test_model_best.load_state_dict(
    torch.load('Experiment2_sequence_inverse/img2img_model/best_model_300_32_20230502_052654.pth')
)
for i, batch in enumerate(new_valloader):  # i -->index of batch
    ori_img, noi_img = batch
    test_model_best.eval()
    output = test_model_best(noi_img)
    count = 0
    for j in range(ori_img.size(0)):
        ori_img_np = ori_img[j][0].detach().numpy()
        output_np = output[j][0].detach().numpy()
        fid_img = calculate_frechet_distance(ori_img_np, output_np)
        count = count+fid_img
    fid_img = round(count/ori_img.size(0),3)
    
    metric_value_list.append(fid_img)

print(metric_value_list)




In [4]:
#  regional mse denoising compare

from sensorFusionDataset.creatDataset import CreateDatasets
import glob
from matplotlib import pyplot as plt
from torch.utils.data import DataLoader
from sensorFusionDataset.creatDataset import CreateDatasets
from sensorFusionNetwork.simpleAutoencoder import SimpleAutoencoder
import numpy as np
import torch

dataset_path = 'regional_mse/masked_img'
noi_img_list = sorted(glob.glob(dataset_path + '/*'))
CAM_FRONT_path = 'regional_mse/ori_img'
img_list = sorted(glob.glob(CAM_FRONT_path + '/*'))

metric_value_list =[]
batch_size = 32

ValDataset = CreateDatasets(img_list, noi_img_list)
new_valloader = DataLoader(dataset=ValDataset, batch_size=batch_size, shuffle=False, num_workers=0, drop_last=True)

test_model_best = SimpleAutoencoder()
test_model_best.load_state_dict(
    # torch.load('denoising_model/best_model_300_32_20230509_104410.pth')
    torch.load('Experiment2_sequence_inverse/img2img_model/best_model_300_32_20230509_175834.pth')

)

# load the mask function

def create_mask(image_shape, block_size, num_blocks):
    mask = np.ones(image_shape, dtype=np.float32) 
    block_size_px = (int(image_shape[0] * block_size), int(image_shape[1] * block_size))
    np.random.seed(5)  # 设置随机种子，以确保每次生成的随机方块位置相同
    
    # 限制方块的位置在下半部分
    lower_half = int(image_shape[0] / 3)
    
    for _ in range(num_blocks):
        top_left = np.random.randint(lower_half, image_shape[0] - block_size_px[0] + 1), np.random.randint(0, image_shape[1] - block_size_px[1] + 1)
        bottom_right = top_left[0] + block_size_px[0], top_left[1] + block_size_px[1]
        mask[top_left[0]:bottom_right[0], top_left[1]:bottom_right[1]] = 0  # 将方块区域的像素值设为灰度值 0.0
    return mask

block_size = 0.2  # 方块大小为原图大小的百分比
num_blocks = 6  # 方块数量


for i, batch in enumerate(new_valloader):  # i -->index of batch
    ori_img, noi_img = batch
    test_model_best.eval()
    output = test_model_best(noi_img)
    count = 0
    for j in range(ori_img.size(0)):
        ori_img_np = ori_img[j][0].detach().numpy()
        output_np = output[j][0].detach().numpy()
        mask = create_mask(ori_img_np.shape[:2], block_size, num_blocks)
        local_mse = np.mean(np.square(output_np - ori_img_np) * mask)
        count = count+local_mse
    local_mse = round(count/ori_img.size(0),6)
    metric_value_list.append(local_mse)
    
    
    
    
print(metric_value_list)



[0.000236]


In [None]:
# downstream task

import torch
import torch.nn as nn
import torchvision.models as models

# 加载预训练的ResNet-50模型
resnet = models.resnet50(pretrained=True)

# 替换最后一层全连接层
num_features = resnet.fc.in_features
resnet.fc = nn.Linear(num_features, 1)  # 输出为1维

# 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(resnet.parameters(), lr=0.001)

# 训练回归模型
def train(model, train_loader, criterion, optimizer, num_epochs):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        for images, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, running_loss))
    print('Training finished.')

# 定义数据集和数据加载器
# 这里假设您已经准备好了您的数据集并将其转换为PyTorch Dataset格式
dataset = MyDataset(...)  # 替换为您的数据集
train_loader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=True)

# 调用训练函数进行模型训练
train(resnet, train_loader, criterion, optimizer, num_epochs=10)

# 评估模型并输出还原程度的度量值
def evaluate(model, test_loader):
    model.eval()
    metric_values = []
    with torch.no_grad():
        for images, labels in test_loader:
            outputs = model(images)
            metric_values.extend(outputs.flatten().tolist())
    return metric_values

# 定义测试数据集和数据加载器
# 这里假设您已经准备好了测试数据集并将其转换为PyTorch Dataset格式
test_dataset = MyDataset(...)  # 替换为您的测试数据集
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False)

# 调用评估函数获取还原程度的度量值
metric_values = evaluate(resnet, test_loader)

# metric_values包含了输入经过reconstruction的受损图的还原程度度量值
print(metric_values)
