Train Regression (3 CLIP scores, and T -> predict best T) Model

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from pathlib import Path
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, Dataset
import os
import random

random_seed = 42
torch.manual_seed(random_seed)
torch.cuda.manual_seed(random_seed)
torch.cuda.manual_seed_all(random_seed)  # if use multi-GPU
np.random.seed(random_seed)
random.seed(random_seed)

os.environ["CUDA_VISIBLE_DEVICES"] = '2'
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')


Load Model

In [2]:
import torch
import torch.nn as nn
from torchmetrics.multimodal import CLIPImageQualityAssessment
from transformers import CLIPTokenizer

# Regressor 모델 (입력: (batch_size, 4, 768))
class Regressor(nn.Module):
    def __init__(self, vision_dim=512, text_dim=512, hidden_dim=64, num_prompts=3):
        super(Regressor, self).__init__()
        self.num_prompts = num_prompts
        self.vision_mlp = nn.Sequential(
            nn.Linear(vision_dim, hidden_dim),
            nn.ReLU()
        )

        self.cross_attention = nn.MultiheadAttention(embed_dim=hidden_dim, num_heads=4)
        self.fc = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Linear(hidden_dim // 2, 1)
        )
    
    def forward(self, combined_input):
        # combined_input: (batch_size, 4, 768)
        batch_size = combined_input.size(0)

        
        # Vision MLP: (batch_size, 4, 768) -> (B, 4, 64)
        combined_input = self.vision_mlp(combined_input)  # (B, 4, 64)
        
        
        # Cross-Attention
        vision_cls = combined_input[:, 0, :].unsqueeze(dim=1) # (batch_size, 1, 64)
        text_feature = combined_input[:, 1:self.num_prompts+1, :]  # (batch_size, 3, 64)
        # print(vision_cls.shape, text_feature.shape)

        vision_cls = vision_cls.permute(1, 0, 2)  # (1, B, 64)
        text_feature = text_feature.permute(1, 0, 2)  # (3, B, 64)
        attn_output, a = self.cross_attention(vision_cls, text_feature, text_feature)  # (B, 1, 64)
        attn_output = attn_output.permute(1, 0, 2)  # (B, 1, 64)
        
        # FC Layer
        T_pred = self.fc(attn_output.squeeze(1)).squeeze(-1)  # (B,)
        return T_pred

# 테스트
if __name__ == "__main__":
    x = torch.randn(2, 4, 512)  # 예시 텍스트 특징
    model = Regressor()
    output = model(x)
    print("Output shape:", output.shape)  # torch.Size([2])

  from .autonotebook import tqdm as notebook_tqdm


Output shape: torch.Size([2])


## Test할 때 T 없는 버전

In [4]:
import torch
from tqdm import tqdm
from network.conv_node import NODE
# from network.clip_classifier import TtoTClassifier
from misc import *
import os
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
from torchmetrics.multimodal import CLIPImageQualityAssessment
import time
import torchvision.transforms as transforms

# GPU 번호 지정
gpu_number = 2  # 원하는 GPU 번호로 변경 가능

# GPU 사용 제한
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_number)

# 이제 GPU가 한 개만 보이므로 cuda:0으로 접근
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

model = NODE(device, (3, 400, 600), 32, augment_dim=0, time_dependent=True, adjoint=True)
model.eval()
model.to(device)
model.load_state_dict(torch.load(f'/home/lbw/CLODE/pth/lowlight.pth', weights_only=True), strict=False)

# 결과 저장을 위한 디렉토리 생성
results_dir = Path('/home/lbw/CLODE/result_img/cls_text_ca_pca')
results_dir.mkdir(parents=True, exist_ok=True)


# 하이퍼파라미터 설정
input_dim = 768
hidden_dim = 64
# score_dim = len(prompts)

# 모델 초기화
regressor = Regressor(hidden_dim=hidden_dim).to(device)
regressor.eval()
regressor.to(device)
# model_path_name = (model_path / f'att_regression_{best_epoch}.pth')
# model_path_name = '/home/lbw/CLODE/regression_pth/lbw_low_feature_L_score_model2/att_regression_27.pth'
# model_path_name = '/home/lbw/CLODE/regression_pth/lol_T_sa_ca_model/att_regression_17.pth'
# model_path_name = '/home/lbw/CLODE/regression_pth/lol_T_sa_ca_model_soom/att_regression_42.pth'
model_path_name = '/home/lbw/CLODE/regression_pth/lol_soom_cls_text_ca2_64_pca/att_regression_168.pth'
print(model_path_name, input_dim)
regressor.load_state_dict(torch.load(model_path_name, map_location=device))



# Text embedding 로드
text_embedding = torch.tensor(np.load('/home/lbw/CLODE/traindata_csv/text_embedding_pca.npy'), dtype=torch.float32).to(device)  # (3, 768)
text_embedding_un = text_embedding.unsqueeze(0).to(device)  # (1, 3, 768)

# CLIP-IQA 로드 (eval15 데이터의 vision_cls 계산용)
clip_iqa = CLIPImageQualityAssessment(
    model_name_or_path="openai/clip-vit-base-patch16",
    prompts=('brightness', 'natural', 'colorfullness')
).to(device)

# Transform 정의
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.48145466, 0.4578275, 0.40821073],
                        std=[0.26862954, 0.26130258, 0.27577711])
])

# eval15 데이터 불러오기

base_eval_path = Path('/home/lbw/data/LSRW/Eval/')
camera_types = ['Huawei', 'Nikon']

def load_eval_image(idx):
    img_name = eval_images[idx]
    lq_img = image_tensor(eval_path / 'low' / img_name)
    gt_img = image_tensor(eval_path / 'high' / img_name)
    
    # lq_img_224는 transform 적용
    lq_img_pil = Image.open(eval_path / 'low' / img_name).convert('RGB')
    lq_img_224 = transform(lq_img_pil).unsqueeze(0).to(device)  # (1, 3, 224, 224)
    
    # Vision CLS 계산
    with torch.no_grad():
        vision_feature = clip_iqa.model.vision_model(lq_img_224)[1]  # (1, 768)
        vision_cls = clip_iqa.model.visual_projection(vision_feature)  # (1, 768)
    
    return lq_img.to(device), gt_img.to(device), img_name, vision_cls.to(device)

def tensor_to_numpy(tensor):
    img = tensor.detach().cpu().numpy()
    if img.ndim == 3 and img.shape[0] == 3:
        img = np.transpose(img, (1, 2, 0))
    img = np.clip(img, 0, 1)
    return img

pred_img = []
pred_Ts = []
pred_psnrs = []
pred_ssims = []

# 이미지별 NODE와 Classifier 결과 비교
# with alive_bar(len(eval_images), title='Processing images', bar='notes', spinner='waves', force_tty=True, monitor=True) as bar:
for camera_type in camera_types:
    eval_path = base_eval_path / camera_type
    eval_images = [f for f in sorted(os.listdir(eval_path / 'low')) if f.lower().endswith('.jpg')]


    # 이미지별 NODE와 Regressor 결과 비교
    for idx in tqdm(range(len(eval_images))):
        lq_img, gt_img, img_name, vision_cls = load_eval_image(idx)
        
        with torch.no_grad():
            # vision_cls: (1, 768)
            vision_cls = vision_cls.unsqueeze(1)  # (1, 1, 768)
            text_batch = text_embedding_un.repeat(vision_cls.size(0), 1, 1).to(device)  # (1, 3, 768)
            combined_input = torch.cat([vision_cls, text_batch], dim=1)  # (1, 4, 768)
            
            # Regressor로 T 예측
            pred_T = regressor(combined_input)
            pred_T = pred_T.item()
        
        with torch.no_grad():
            T_tensor = torch.tensor([0, pred_T]).float().to(device)
            pred = model(lq_img, T_tensor, inference=True)['output'][0]
            psnr = calculate_psnr(pred, gt_img).item()
            ssim_value = calculate_ssim(pred, gt_img)

        pred_img.append(pred)    
        pred_Ts.append(pred_T)
        pred_psnrs.append(psnr)
        pred_ssims.append(ssim_value)
        
        # 시각화
        plt.figure(figsize=(18, 6))
        
        plt.subplot(1, 3, 1)
        plt.imshow(tensor_to_numpy(lq_img[0]))
        plt.title(f'Low Quality Image: {img_name}')
        plt.axis('off')
        
        plt.subplot(1, 3, 2)
        plt.imshow(tensor_to_numpy(gt_img[0]))
        plt.title('Ground Truth')
        plt.axis('off')
        
        plt.subplot(1, 3, 3)
        plt.imshow(tensor_to_numpy(pred))
        plt.title(f'Regressor Pred T={pred_T:.2f}, PSNR={psnr:.4f}dB, SSIM={ssim_value:.4f}')
        plt.axis('off')
        
        plt.tight_layout()
        plt.savefig(results_dir / f'{img_name.split(".")[0]}_compare.png')
        plt.close()

# 통계 정보 계산 (전체 이미지에 대해)
print("모든 이미지 처리 완료!")

# PSNR 비교
# print(f"Best T PSNR 평균: {np.mean(best_psnrs):.2f}dB")
print(f"Predicted T PSNR 평균: {np.mean(pred_psnrs):.4f}dB")
# print(f"Best T SSIM 평균: {np.mean(best_ssims):.4f}")
print(f"Predicted T SSIM 평균: {np.mean(pred_ssims):.4f}")


/home/lbw/CLODE/regression_pth/lol_soom_cls_text_ca2_64_pca/att_regression_168.pth 768


  regressor.load_state_dict(torch.load(model_path_name, map_location=device))
100%|██████████| 30/30 [05:16<00:00, 10.56s/it]
100%|██████████| 20/20 [02:55<00:00,  8.78s/it]

모든 이미지 처리 완료!
Predicted T PSNR 평균: 18.0804dB
Predicted T SSIM 평균: 0.5366





In [1]:
import torch
import numpy as np
from pathlib import Path
import os
from PIL import Image
import torchvision.transforms as transforms
from torchmetrics.multimodal import CLIPImageQualityAssessment
from transformers import CLIPTokenizer

# GPU 번호 지정
gpu_number = 2  # 원하는 GPU 번호로 변경 가능

# GPU 사용 제한
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_number)

# 이제 GPU가 한 개만 보이므로 cuda:0으로 접근
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

# CLIP-IQA 로드
clip_iqa = CLIPImageQualityAssessment(
    model_name_or_path="openai/clip-vit-base-patch16",
    prompts=('brightness', 'natural', 'colorfullness')
).to(device)

# CLIP 토크나이저 로드
tokenizer = CLIPTokenizer.from_pretrained("openai/clip-vit-base-patch16")

# 이미지 전처리
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.48145466, 0.4578275, 0.40821073],
                        std=[0.26862954, 0.26130258, 0.27577711])
])

# 이미지 로드
file_path = Path('/home/lbw/data/LOL/our485')
img_labels = [f for f in sorted(os.listdir(file_path / 'low')) if f.lower().endswith('.png')]

# 1. Text Embedding 계산 및 저장
prompts = ('brightness', 'natural', 'colorfullness')
tokenized = tokenizer(
    prompts,
    padding=True,
    truncation=True,
    return_tensors="pt",
    max_length=77
)

with torch.no_grad():
    text_feature = clip_iqa.model.text_model(
        input_ids=tokenized['input_ids'].to(device),
        attention_mask=tokenized['attention_mask'].to(device)
    )  # (num_prompts, 768)
    text_feature = clip_iqa.model.text_projection(text_feature[1])  # (num_prompts, 768)

# numpy 배열로 변환 및 저장
text_embedding = text_feature.cpu().numpy()  # (3, 768)
np.save('/home/lbw/CLODE/traindata_csv/text_embedding_pca.npy', text_embedding)

# 2. Vision CLS 계산 및 저장
vision_cls_list = []
for img_name in img_labels:
    img_path = file_path / 'low' / img_name
    image = Image.open(img_path).convert('RGB')
    image = transform(image).unsqueeze(0).to(device)  # (1, 3, 224, 224)
    
    with torch.no_grad():
        vision_feature = clip_iqa.model.vision_model(image)[1]  # (1, 768)
        vision_cls = clip_iqa.model.visual_projection(vision_feature)  # (1, 768)
    
    vision_cls_list.append(vision_cls.cpu())

# 리스트를 텐서로 결합 후 numpy 배열로 변환 및 저장
vision_cls = torch.cat(vision_cls_list, dim=0)  # (485, 768)
np.save('/home/lbw/CLODE/traindata_csv/vision_cls_pca.npy', vision_cls.numpy())
print("CLIP embeddings saved: text_embedding.npy, vision_cls.npy")

  from .autonotebook import tqdm as notebook_tqdm
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


CLIP embeddings saved: text_embedding.npy, vision_cls.npy
