데이터 편향성 확인을 위한 코드

In [None]:
import torch
import h5py
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
from tqdm import tqdm

In [None]:
# 데이터셋 경로와 키 설정
data_dir = '/home/Data/train/image'
input_key = 'image_input'
target_key = 'image_label'

def load_data(file_path, input_key, target_key, device='cuda'):
    with h5py.File(file_path, 'r') as f:
        masked_data = torch.tensor(np.array(f[input_key]), device=device)
        original_data = torch.tensor(np.array(f[target_key]), device=device)
    return masked_data, original_data

기존의 data/load_data 모듈을 활용할 수 있는 방법은 없을까?
우리에게 주어진 데이터셋의 구조를 한 번 더 확인해볼 필요가 있을 듯

In [None]:
def calculate_deviation(masked, original):
    deviation = torch.abs(original - masked)
    return deviation

def visualize_deviation_histogram(deviation, title='Deviation between Original and Masked Data'):
    deviation_cpu = deviation.cpu().numpy()
    plt.figure(figsize=(10, 6))
    plt.hist(deviation_cpu.flatten(), bins=50, alpha=0.75)
    plt.title(title)
    plt.xlabel('Deviation')
    plt.ylabel('Frequency')
    plt.grid(True)
    plt.show()
    
def analyze_bias(data_dir, input_key, target_key, device='cuda'):
    all_deviation = []

    # 데이터 파일 리스트 가져오기
    data_files = list(Path(data_dir).rglob('*.h5'))
    
    for file_path in tqdm(data_files, desc="Processing files"):
        masked_data, original_data = load_data(file_path, input_key, target_key, device)
        deviation = calculate_deviation(masked_data, original_data)
        all_deviation.append(deviation.cpu().numpy())
    
    # 전체 편차 데이터 결합
    all_deviation = np.concatenate(all_deviation)
    visualize_deviation_histogram(torch.tensor(all_deviation), title="Overall Deviation Histogram")

어떻게 시각화해야 데이터의 편향성을 쉽게 찾아낼 수 있을까?
관련 모듈이 존재할까?

In [None]:
analyze_bias(data_dir, input_key, target_key)

## 데이터 형태 확인

# 데이터셋 경로와 키 설정
data_dir = '/home/Data/train/image'
input_key = 'image_input'
target_key = 'image_label'

def load_data(file_path, input_key, target_key, device='cuda'):
    with h5py.File(file_path, 'r') as f:
        masked_data = torch.tensor(np.array(f[input_key]), device=device)
        original_data = torch.tensor(np.array(f[target_key]), device=device)
    return masked_data, original_data

### 이미지 확인

In [56]:
import h5py
from pathlib import Path

def check_hdf5_files_structure(data_dir):
    data_dir = Path(data_dir)
    hdf5_files = list(data_dir.glob("*.h5"))  # .h5 파일만 선택

    for hdf5_file in hdf5_files:
        print(f"Checking file: {hdf5_file}")
        with h5py.File(hdf5_file, 'r') as f:
            def print_attrs(name, obj):
                if isinstance(obj, h5py.Dataset):
                    print(f"Dataset: {name}")
                    print(f" - Shape: {obj.shape}")
                    print(f" - Dtype: {obj.dtype}")
                elif isinstance(obj, h5py.Group):
                    print(f"Group: {name}")
            
            f.visititems(print_attrs)
        print("\n")

# 데이터 디렉토리 설정
data_dir = '/home/Data/train/image'

# 디렉토리 구조 확인
check_hdf5_files_structure(data_dir)

Checking file: /home/Data/train/image/brain_acc4_57.h5
Dataset: image_grappa
 - Shape: (16, 384, 384)
 - Dtype: float32
Dataset: image_input
 - Shape: (16, 384, 384)
 - Dtype: float32
Dataset: image_label
 - Shape: (16, 384, 384)
 - Dtype: float32


Checking file: /home/Data/train/image/brain_acc8_1.h5
Dataset: image_grappa
 - Shape: (16, 384, 384)
 - Dtype: float32
Dataset: image_input
 - Shape: (16, 384, 384)
 - Dtype: float32
Dataset: image_label
 - Shape: (16, 384, 384)
 - Dtype: float32


Checking file: /home/Data/train/image/brain_acc8_48.h5
Dataset: image_grappa
 - Shape: (14, 384, 384)
 - Dtype: float32
Dataset: image_input
 - Shape: (14, 384, 384)
 - Dtype: float32
Dataset: image_label
 - Shape: (14, 384, 384)
 - Dtype: float32


Checking file: /home/Data/train/image/brain_acc4_89.h5
Dataset: image_grappa
 - Shape: (16, 384, 384)
 - Dtype: float32
Dataset: image_input
 - Shape: (16, 384, 384)
 - Dtype: float32
Dataset: image_label
 - Shape: (16, 384, 384)
 - Dtype: float32


C

## 이미지 픽셀값 범위 확인

In [None]:
import h5py
from pathlib import Path

def check_hdf5_files_structure(data_dir):
    data_dir = Path(data_dir)
    hdf5_files = list(data_dir.glob("*.h5"))  # .h5 파일만 선택

    for hdf5_file in hdf5_files:
        print(f"Checking file: {hdf5_file}")
        with h5py.File(hdf5_file, 'r') as f:
            def print_attrs(name, obj):
                if isinstance(obj, h5py.Dataset):
                    print(f"Dataset: {name}")
                    print(f" - Shape: {obj.shape}")
                    print(f" - Dtype: {obj.dtype}")
                    
                    # Calculate and print the range of pixel values
                    data = obj[()]  # Load the dataset into memory
                    min_val = data.min()
                    max_val = data.max()
                    print(f" - Pixel Value Range: [{min_val}, {max_val}]")
                elif isinstance(obj, h5py.Group):
                    print(f"Group: {name}")
            
            f.visititems(print_attrs)
        print("\n")

# 데이터 디렉토리 설정
# 학습 데이터
data_dir = '/home/Data/train/image'
# 리더보드 데이터
data_dir = '/home/Data/leaderboard/acc9/image'

# 디렉토리 구조 확인
check_hdf5_files_structure(data_dir)


### k-space 확인

In [None]:
# 데이터 디렉토리 설정
data_dir = '/home/Data/train/kspace'

# 디렉토리 구조 확인
check_hdf5_files_structure(data_dir)

### 함께 확인

In [1]:
import h5py
from pathlib import Path

def check_hdf5_files_structure(file_path):
    print(f"Checking file: {file_path}")
    with h5py.File(file_path, 'r') as f:
        def print_attrs(name, obj):
            if isinstance(obj, h5py.Dataset):
                print(f"Dataset: {name}")
                print(f" - Shape: {obj.shape}")
                print(f" - Dtype: {obj.dtype}")
            elif isinstance(obj, h5py.Group):
                print(f"Group: {name}")
        
        f.visititems(print_attrs)
    print("\n")

def find_common_files(dir1, dir2):
    dir1_files = {f.name for f in Path(dir1).glob("*.h5")}
    dir2_files = {f.name for f in Path(dir2).glob("*.h5")}
    common_files = dir1_files.intersection(dir2_files)
    return common_files

# 디렉토리 설정
data_dir_kspace = '/home/Data/train/kspace'
data_dir_image = '/home/Data/train/image'

# 공통 파일 찾기
common_files = find_common_files(data_dir_kspace, data_dir_image)
print(f"Common files: {common_files}\n")

# 공통 파일의 구조 확인
for file_name in common_files:
    print(f"Checking structures for file: {file_name}")
    
    kspace_file_path = Path(data_dir_kspace) / file_name
    image_file_path = Path(data_dir_image) / file_name
    
    print("Kspace file structure:")
    check_hdf5_files_structure(kspace_file_path)
    
    print("Image file structure:")
    check_hdf5_files_structure(image_file_path)


Common files: {'brain_acc5_51.h5', 'brain_acc4_22.h5', 'brain_acc4_65.h5', 'brain_acc8_4.h5', 'brain_acc8_39.h5', 'brain_acc4_14.h5', 'brain_acc4_107.h5', 'brain_acc8_79.h5', 'brain_acc4_74.h5', 'brain_acc8_54.h5', 'brain_acc8_42.h5', 'brain_acc5_109.h5', 'brain_acc5_41.h5', 'brain_acc4_68.h5', 'brain_acc8_113.h5', 'brain_acc4_99.h5', 'brain_acc8_86.h5', 'brain_acc8_83.h5', 'brain_acc5_78.h5', 'brain_acc4_46.h5', 'brain_acc8_76.h5', 'brain_acc8_36.h5', 'brain_acc8_94.h5', 'brain_acc5_28.h5', 'brain_acc5_24.h5', 'brain_acc8_110.h5', 'brain_acc4_2.h5', 'brain_acc5_103.h5', 'brain_acc4_108.h5', 'brain_acc8_105.h5', 'brain_acc4_24.h5', 'brain_acc5_68.h5', 'brain_acc5_79.h5', 'brain_acc5_67.h5', 'brain_acc8_1.h5', 'brain_acc4_9.h5', 'brain_acc8_64.h5', 'brain_acc4_18.h5', 'brain_acc5_37.h5', 'brain_acc5_57.h5', 'brain_acc4_98.h5', 'brain_acc8_14.h5', 'brain_acc8_120.h5', 'brain_acc8_2.h5', 'brain_acc5_23.h5', 'brain_acc4_101.h5', 'brain_acc8_9.h5', 'brain_acc5_43.h5', 'brain_acc8_6.h5', 'br

In [None]:
import h5py
from pathlib import Path

def check_hdf5_files_structure(file_path):
    print(f"Checking file: {file_path}")
    with h5py.File(file_path, 'r') as f:
        def print_attrs(name, obj):
            if isinstance(obj, h5py.Dataset):
                print(f"Dataset: {name}")
                print(f" - Shape: {obj.shape}")
                print(f" - Dtype: {obj.dtype}")
            elif isinstance(obj, h5py.Group):
                print(f"Group: {name}")
        
        f.visititems(print_attrs)
    print("\n")

def find_common_files(dir1, dir2):
    dir1_files = {f.name for f in Path(dir1).glob("*.h5")}
    dir2_files = {f.name for f in Path(dir2).glob("*.h5")}
    common_files = dir1_files.intersection(dir2_files)
    return common_files

def print_mask_values(file_path):
    with h5py.File(file_path, 'r') as f:
        if 'mask' in f:
            mask = f['mask'][:]
            print(f"mask values from {file_path}:")
            print(mask)
        else:
            print(f"No mask dataset found in {file_path}")

# 디렉토리 설정
data_dir_kspace = '/home/Data/leaderboard/acc9/kspace'
data_dir_image = '/home/Data/leaderboard/acc9/image'
# data_dir_kspace = '/root/result/test_Varnet/reconstructions_leaderboard/private'
# data_dir_image = '/root/result/test_Varnet/reconstructions_leaderboard/private'

# 공통 파일 찾기
common_files = find_common_files(data_dir_kspace, data_dir_image)
print(f"Common files: {common_files}\n")

# 공통 파일의 구조 확인 및 mask 값 출력
for file_name in common_files:
    print(f"Checking structures for file: {file_name}")
    
    kspace_file_path = Path(data_dir_kspace) / file_name
    image_file_path = Path(data_dir_image) / file_name
    
    print("Kspace file structure:")
    check_hdf5_files_structure(kspace_file_path)
    
    print("Image file structure:")
    check_hdf5_files_structure(image_file_path)
    
    # mask 값 출력
    print("Kspace file mask values:")
    print_mask_values(kspace_file_path)
    
    print("Image file mask values:")
    print_mask_values(image_file_path)


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from fastmri.data.subsample import RandomMaskFunc

# RandomMaskFunc 초기화
mask_func = RandomMaskFunc(center_fractions=[0.04], accelerations=[8])

# k-space 데이터 형태 정의
shape = (1, 256, 256)  # 3차원으로 설정 (num_slices, height, width)

# 마스크 생성
mask = mask_func(shape)

# 마스크 값 출력
print("Generated mask values:")
print(mask)

# 마스크 시각화
plt.figure(figsize=(10, 2))
plt.imshow(mask[0], aspect='auto', cmap='gray')  # 첫 번째 slice의 마스크 시각화
plt.xlabel('k-space columns')
plt.ylabel('k-space rows')
plt.title('k-space Mask')
plt.colorbar()
plt.show()


## SSIM 값 확인

In [None]:
import h5py
import torch
import torch.nn as nn
import numpy as np
from pathlib import Path

In [None]:
class SSIM(nn.Module):
    """Layer to compute the SSIM loss between a pair of images"""
    def __init__(self):
        super(SSIM, self).__init__()
        self.mu_x_pool   = nn.AvgPool2d(3, 1)
        self.mu_y_pool   = nn.AvgPool2d(3, 1)
        self.sig_x_pool  = nn.AvgPool2d(3, 1)
        self.sig_y_pool  = nn.AvgPool2d(3, 1)
        self.sig_xy_pool = nn.AvgPool2d(3, 1)

        # 입력 경계의 반사를 사용하여 상/하/좌/우에 입력 텐서를 추가로 채웁니다.
        self.refl = nn.ReflectionPad2d(1)

        self.C1 = 0.001 ** 2
        self.C2 = 0.03 ** 2

    def forward(self, x, y):
        # shape : (xh, xw) -> (xh + 2, xw + 2)
        x = self.refl(x) 
        # shape : (yh, yw) -> (yh + 2, yw + 2)
        y = self.refl(y)

        mu_x = self.mu_x_pool(x)
        mu_y = self.mu_y_pool(y)

        sigma_x  = self.sig_x_pool(x ** 2) - mu_x ** 2
        sigma_y  = self.sig_y_pool(y ** 2) - mu_y ** 2
        sigma_xy = self.sig_xy_pool(x * y) - mu_x * mu_y

        SSIM_n = (2 * mu_x * mu_y + self.C1) * (2 * sigma_xy + self.C2)
        SSIM_d = (mu_x ** 2 + mu_y ** 2 + self.C1) * (sigma_x + sigma_y + self.C2)

        # SSIM score
        return torch.clamp((SSIM_n / SSIM_d) / 2, 0, 1)

        # Loss function
        # return torch.clamp((1 - SSIM_n / SSIM_d) / 2, 0, 1)


In [None]:
def read_hdf5_datasets(file_path, dataset_names):
    datasets = {}
    with h5py.File(file_path, 'r') as f:
        for name in dataset_names:
            if name in f:
                datasets[name] = f[name][:]
            else:
                print(f"No {name} dataset found in {file_path}")
                datasets[name] = None
    return datasets

def calculate_ssim_for_images(label, input_data, grappa, ssim_module):
    if label is not None and input_data is not None:
        ssim_input = ssim_module(label, input_data)
        print(f"SSIM between label and input:\n{ssim_input}")

    if label is not None and grappa is not None:
        ssim_grappa = ssim_module(label, grappa)
        print(f"SSIM between label and grappa:\n{ssim_grappa}")

data_dir_image = '/home/Data/train/image'
common_files = {f.name for f in Path(data_dir_image).glob("*.h5")}
dataset_names = ['image_label', 'image_input', 'image_grappa']

ssim_module = SSIM()

for file_name in common_files:
    print(f"Checking structures for file: {file_name}")
    
    image_file_path = Path(data_dir_image) / file_name
    
    datasets = read_hdf5_datasets(image_file_path, dataset_names)
    
    label = datasets['image_label']
    input_data = datasets['image_input']
    grappa = datasets['image_grappa']
    
    if label is not None:
        label = torch.tensor(label, dtype=torch.float32).unsqueeze(1)
    if input_data is not None:
        input_data = torch.tensor(input_data, dtype=torch.float32).unsqueeze(1)
    if grappa is not None:
        grappa = torch.tensor(grappa, dtype=torch.float32).unsqueeze(1)
    
    calculate_ssim_for_images(label, input_data, grappa, ssim_module)


In [None]:
def check_data_values(label, input_data, grappa):
    print("Checking data values...")
    if label is not None:
        print("Label data:")
        print(label)
    if input_data is not None:
        print("Input data:")
        print(input_data)
    if grappa is not None:
        print("Grappa data:")
        print(grappa)


In [None]:
def convert_to_tensor(data):
    if data is not None:
        return torch.tensor(data, dtype=torch.float32).unsqueeze(1)
    return None


In [None]:
import h5py
import torch
import torch.nn as nn
import numpy as np
from pathlib import Path

class SSIM(nn.Module):
    """Layer to compute the SSIM loss between a pair of images"""
    def __init__(self):
        super(SSIM, self).__init__()
        self.mu_x_pool   = nn.AvgPool2d(3, 1)
        self.mu_y_pool   = nn.AvgPool2d(3, 1)
        self.sig_x_pool  = nn.AvgPool2d(3, 1)
        self.sig_y_pool  = nn.AvgPool2d(3, 1)
        self.sig_xy_pool = nn.AvgPool2d(3, 1)

        self.refl = nn.ReflectionPad2d(1)

        # Adjusted constants for numerical stability
        self.C1 = 0.01
        self.C2 = 0.03

    def forward(self, x, y):
        x = self.refl(x) 
        y = self.refl(y)

        mu_x = self.mu_x_pool(x)
        mu_y = self.mu_y_pool(y)

        sigma_x  = self.sig_x_pool(x ** 2) - mu_x ** 2
        sigma_y  = self.sig_y_pool(y ** 2) - mu_y ** 2
        sigma_xy = self.sig_xy_pool(x * y) - mu_x * mu_y

        SSIM_n = (2 * mu_x * mu_y + self.C1) * (2 * sigma_xy + self.C2)
        SSIM_d = (mu_x ** 2 + mu_y ** 2 + self.C1) * (sigma_x + sigma_y + self.C2)

        return torch.clamp((SSIM_n / SSIM_d) / 2, 0, 1)

def read_hdf5_datasets(file_path, dataset_names):
    datasets = {}
    with h5py.File(file_path, 'r') as f:
        for name in dataset_names:
            if name in f:
                datasets[name] = f[name][:]
            else:
                print(f"No {name} dataset found in {file_path}")
                datasets[name] = None
    return datasets

def convert_to_tensor(data):
    if data is not None:
        data = data.astype(np.float32)
        # Normalize data to range [0, 1]
        data_min, data_max = np.min(data), np.max(data)
        data = (data - data_min) / (data_max - data_min)
        return torch.tensor(data, dtype=torch.float32).unsqueeze(1)
    return None

def check_data_values(label, input_data, grappa):
    print("Checking data values...")
    if label is not None:
        print("Label data:")
        print(label)
    if input_data is not None:
        print("Input data:")
        print(input_data)
    if grappa is not None:
        print("Grappa data:")
        print(grappa)

def calculate_ssim_for_images(label, input_data, grappa, ssim_module):
    if label is not None and input_data is not None:
        ssim_input = ssim_module(label, input_data)
        print(f"SSIM between label and input:\n{ssim_input}")

    if label is not None and grappa is not None:
        ssim_grappa = ssim_module(label, grappa)
        print(f"SSIM between label and grappa:\n{ssim_grappa}")

data_dir_image = '/home/Data/leaderboard/acc9/image'
common_files = {f.name for f in Path(data_dir_image).glob("*.h5")}
dataset_names = ['image_label', 'image_input', 'image_grappa']

ssim_module = SSIM()

for file_name in common_files:
    print(f"Checking structures for file: {file_name}")
    
    image_file_path = Path(data_dir_image) / file_name
    
    datasets = read_hdf5_datasets(image_file_path, dataset_names)
    
    label = convert_to_tensor(datasets['image_label'])
    input_data = convert_to_tensor(datasets['image_input'])
    grappa = convert_to_tensor(datasets['image_grappa'])
    
    check_data_values(label, input_data, grappa)
    
    calculate_ssim_for_images(label, input_data, grappa, ssim_module)
