In [None]:
import os
import json
import openslide
from PIL import Image
import cv2
import numpy as np
from tqdm import tqdm
from glob import glob


In [None]:
json_list = glob('../../data/IHC_HE_Pair_Data_GA_hospital/registration_params/*.json')
print(f"총 {len(json_list)}개의 슬라이드 쌍 발견")

# 출력 폴더 설정
output_base = '../../data/IHC_HE_Pair_Data_GA_hospital/patches'
folder_pdl1_mpp2 = os.path.join(output_base, 'pdl1_mpp2')      # PD-L1 mpp 2.0
folder_he_mpp2 = os.path.join(output_base, 'he_mpp2')          # HE mpp 2.0 (좌표변환)
folder_pdl1_mpp1 = os.path.join(output_base, 'pdl1_mpp1')      # PD-L1 mpp 1.0 (중앙)

os.makedirs(folder_pdl1_mpp2, exist_ok=True)
os.makedirs(folder_he_mpp2, exist_ok=True)
os.makedirs(folder_pdl1_mpp1, exist_ok=True)

print(f"출력 폴더:")
print(f"  - PD-L1 mpp 2.0: {folder_pdl1_mpp2}")
print(f"  - HE mpp 2.0:    {folder_he_mpp2}")
print(f"  - PD-L1 mpp 1.0: {folder_pdl1_mpp1}")

In [None]:
pdl1_tumor_area_json=glob('../../data/IHC_HE_Pair_Data_GA_hospital/PD-L1_tumor_area/ann*.json')
pdl1_tumor_area_dict={}
for i in range(len(pdl1_tumor_area_json)):
    with open(pdl1_tumor_area_json[i], 'r') as f:
        tumor_area_dict = json.load(f)
    pdl1_tumor_area_dict[str(tumor_area_dict['filename'].split('-')[3])]=pdl1_tumor_area_json[i]


In [None]:
# 썸네일 및 마스크 생성 함수
def get_thumbnail_and_mask(slide_path, downsample=30):
    """슬라이드에서 썸네일과 조직 마스크 생성"""
    slide = openslide.OpenSlide(slide_path)
    thumb = slide.get_thumbnail((slide.dimensions[0]//downsample, slide.dimensions[1]//downsample))
    thumb_np = np.array(thumb)
    
    # Grayscale 변환 및 Otsu threshold
    gray = cv2.cvtColor(thumb_np, cv2.COLOR_RGB2GRAY)
    _, mask = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
    
    # 모폴로지 연산으로 노이즈 제거
    kernel = np.ones((5,5), np.uint8)
    mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
    mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
    
    return slide, thumb_np, mask


def apply_transformation_to_mask(mask, thumb_shape, params):
    """HE 마스크를 PD-L1 공간으로 변환"""
    from scipy.ndimage import rotate as scipy_rotate
    
    angle = params['transformation']['thumbnail']['angle_degrees']
    scale = params['transformation']['thumbnail']['scale']
    tx = int(params['transformation']['thumbnail']['translation_x'])
    ty = int(params['transformation']['thumbnail']['translation_y'])
    
    # 마스크 회전
    rotated_mask = scipy_rotate(mask, angle, reshape=False, order=0)
    
    # 마스크 스케일링
    new_h, new_w = int(mask.shape[0] * scale), int(mask.shape[1] * scale)
    scaled_mask = cv2.resize(rotated_mask.astype(np.uint8), (new_w, new_h), interpolation=cv2.INTER_NEAREST)
    
    # PD-L1 크기에 맞춰 결과 마스크 생성
    target_h = params['dimensions']['pdl1_thumbnail']['height']
    target_w = params['dimensions']['pdl1_thumbnail']['width']
    result_mask = np.zeros((target_h, target_w), dtype=np.uint8)
    
    # 이동(translation) 적용
    start_y = (target_h - scaled_mask.shape[0]) // 2 + ty
    start_x = (target_w - scaled_mask.shape[1]) // 2 + tx
    
    src_start_y = max(0, -start_y)
    src_start_x = max(0, -start_x)
    src_end_y = min(scaled_mask.shape[0], target_h - start_y)
    src_end_x = min(scaled_mask.shape[1], target_w - start_x)
    
    dst_start_y = max(0, start_y)
    dst_start_x = max(0, start_x)
    dst_end_y = dst_start_y + (src_end_y - src_start_y)
    dst_end_x = dst_start_x + (src_end_x - src_start_x)
    
    if src_end_y > src_start_y and src_end_x > src_start_x:
        result_mask[dst_start_y:dst_end_y, dst_start_x:dst_end_x] = scaled_mask[src_start_y:src_end_y, src_start_x:src_end_x]
    
    return result_mask


def transform_point_pdl1_to_he(pdl1_x, pdl1_y, params):
    """PD-L1 WSI 좌표를 HE WSI 좌표로 역변환"""
    angle = params['transformation']['fullres']['angle_degrees']
    scale = params['transformation']['fullres']['scale']
    tx = params['transformation']['fullres']['translation_x']
    ty = params['transformation']['fullres']['translation_y']
    
    he_width = params['dimensions']['he_full']['width']
    he_height = params['dimensions']['he_full']['height']
    he_center_x = he_width / 2
    he_center_y = he_height / 2
    
    pdl1_width = params['dimensions']['pdl1_full']['width']
    pdl1_height = params['dimensions']['pdl1_full']['height']
    pdl1_center_x = pdl1_width / 2
    pdl1_center_y = pdl1_height / 2

    # Translation 역변환
    x = pdl1_x - pdl1_center_x - tx
    y = pdl1_y - pdl1_center_y - ty

    # Scale 역변환
    x = x / scale
    y = y / scale
    
    # Rotation 역변환
    angle_rad = np.radians(angle)
    cos_a = np.cos(angle_rad)
    sin_a = np.sin(angle_rad)
    
    rotated_x = x * cos_a - y * sin_a
    rotated_y = x * sin_a + y * cos_a
    
    # HE 중심 복원
    x = rotated_x + he_center_x
    y = rotated_y + he_center_y
    
    return x, y

In [None]:
def extract_patch(slide, center_x, center_y, patch_size_wsi, patch_resize=1024, level=0):
    """
    WSI에서 중점 기준으로 패치 추출
    
    Args:
        slide: OpenSlide 객체
        center_x, center_y: 패치 중점 좌표 (level 0 기준)
        patch_size_wsi: WSI에서 추출할 패치 크기 (level 0 기준)
        patch_resize: 최종 출력 패치 크기
        level: 추출할 레벨
    
    Returns:
        patch: numpy array (RGB), 또는 None (유효하지 않은 경우)
    """
    # 좌상단 좌표 계산
    x = int(center_x - patch_size_wsi / 2)
    y = int(center_y - patch_size_wsi / 2)
    
    # 경계 체크 - 슬라이드 밖이면 None 반환
    if x < 0 or y < 0 or x + patch_size_wsi > slide.dimensions[0] or y + patch_size_wsi > slide.dimensions[1]:
        return None
    
    # 패치 읽기
    patch = slide.read_region((x, y), level, (patch_size_wsi, patch_size_wsi))
    patch = np.array(patch.convert('RGB'))
    
    # 패치 크기 조정 (resize)
    if patch_resize != patch_size_wsi:
        patch = cv2.resize(patch, (patch_resize, patch_resize), interpolation=cv2.INTER_LINEAR)
    
    return patch


def get_valid_patch_positions(common_mask, pdl1_slide, pdl1_mpp, target_mpp=1.0, 
                               patch_size=1024, threshold=0.3, downsample=30):
    """
    유효한 패치 위치 계산 (mpp 1.0 기준 그리드)
    
    Args:
        common_mask: he_transformed_mask AND pdl1_mask
        pdl1_slide: PD-L1 슬라이드 객체
        pdl1_mpp: PD-L1 슬라이드의 mpp
        target_mpp: 그리드 기준 mpp (기본 1.0)
        patch_size: 패치 크기 (기본 1024)
        threshold: 유효 영역 비율 임계값 (기본 0.3)
        downsample: 썸네일 다운샘플 비율
    
    Returns:
        valid_positions: [(center_x, center_y), ...] WSI 좌표 리스트
    """
    # mpp 1.0 기준 패치 크기 (WSI 픽셀)
    stride_wsi = int(patch_size * (target_mpp / pdl1_mpp))
    
    # WSI 크기
    wsi_width, wsi_height = pdl1_slide.dimensions
    
    # 썸네일 스케일
    thumb_scale_x = common_mask.shape[1] / wsi_width
    thumb_scale_y = common_mask.shape[0] / wsi_height
    
    # 썸네일에서 패치 크기
    patch_thumb_size_x = int(stride_wsi * thumb_scale_x)
    patch_thumb_size_y = int(stride_wsi * thumb_scale_y)
    
    valid_positions = []
    
    # 그리드 순회
    for center_y in range(stride_wsi // 2, wsi_height - stride_wsi // 2, stride_wsi):
        for center_x in range(stride_wsi // 2, wsi_width - stride_wsi // 2, stride_wsi):
            # 썸네일 좌표로 변환
            thumb_cx = int(center_x * thumb_scale_x)
            thumb_cy = int(center_y * thumb_scale_y)
            
            # 패치 영역 (썸네일)
            half_x = patch_thumb_size_x // 2
            half_y = patch_thumb_size_y // 2
            
            y1 = max(0, thumb_cy - half_y)
            y2 = min(common_mask.shape[0], thumb_cy + half_y)
            x1 = max(0, thumb_cx - half_x)
            x2 = min(common_mask.shape[1], thumb_cx + half_x)
            
            if y2 <= y1 or x2 <= x1:
                continue
            
            # 유효 영역 비율 계산
            patch_mask = common_mask[y1:y2, x1:x2]
            valid_ratio = np.sum(patch_mask > 0) / patch_mask.size
            
            if valid_ratio >= threshold:
                valid_positions.append((center_x, center_y))
    
    return valid_positions

In [None]:
# 메인 패치 추출 루프
PATCH_SIZE = 1024
VALID_THRESHOLD = 0.3
DOWNSAMPLE = 30

total_patches = 0

for json_idx, json_path in enumerate(tqdm(json_list, desc="Processing slides")):
    # JSON 로드
    with open(json_path, 'r') as f:
        json_data = json.load(f)
    
    slide_name = os.path.basename(json_path).replace('.json', '')
    print(f"\n[{json_idx+1}/{len(json_list)}] Processing: {slide_name}")
    
    # 슬라이드 열기
    try:
        he_slide = openslide.OpenSlide(json_data['files']['he_slide'])
        pdl1_slide = openslide.OpenSlide(json_data['files']['pdl1_slide'])
    except Exception as e:
        print(f"  Error opening slides: {e}")
        continue
    
    # MPP 정보
    pdl1_mpp = json_data.get('mpp', {}).get('pdl1', 0.25)
    he_mpp = json_data.get('mpp', {}).get('he', 0.25)
    
    # 패치 크기 계산 (WSI 픽셀 기준)
    # mpp 2.0 패치
    pdl1_patch_size_mpp2 = int(PATCH_SIZE * (2.0 / pdl1_mpp))
    he_patch_size_mpp2 = int(PATCH_SIZE * (2.0 / he_mpp))
    # mpp 1.0 패치 (중앙 영역)
    pdl1_patch_size_mpp1 = int(PATCH_SIZE * (1.0 / pdl1_mpp))
    
    print(f"  MPP: PD-L1={pdl1_mpp}, HE={he_mpp}")
    print(f"  Patch sizes (WSI): mpp2.0 PD-L1={pdl1_patch_size_mpp2}, HE={he_patch_size_mpp2}, mpp1.0={pdl1_patch_size_mpp1}")
    
    # 마스크 생성
    _, he_thumb, he_mask = get_thumbnail_and_mask(json_data['files']['he_slide'], DOWNSAMPLE)
    _, pdl1_thumb, pdl1_mask = get_thumbnail_and_mask(json_data['files']['pdl1_slide'], DOWNSAMPLE)
    
    # HE 마스크를 PD-L1 공간으로 변환
    he_transformed_mask = apply_transformation_to_mask(he_mask, pdl1_thumb.shape, json_data)
    pdl1_tumor_area_dict_key=str(slide_name.split('-')[3].split('_')[0])
    if pdl1_tumor_area_dict_key in pdl1_tumor_area_dict:
        with open(pdl1_tumor_area_dict[pdl1_tumor_area_dict_key], 'r') as f:
            tumor_area_data = json.load(f)
        tumor_mask = np.zeros_like(he_transformed_mask)
        for region in tumor_area_data['objects']:
            pts = np.array(region['coordinate'], dtype=np.int32)
            scaled_points=[(int(x//DOWNSAMPLE ), int(y//DOWNSAMPLE )) for x,y in pts]
            cv2.fillPoly(tumor_mask, [np.array(scaled_points, dtype=np.int32)], 255)
        he_transformed_mask = np.logical_and(he_transformed_mask > 0, tumor_mask > 0).astype(np.uint8) * 255
    # 공통 마스크 (AND)
    common_mask = np.logical_and(he_transformed_mask > 0, pdl1_mask > 0).astype(np.uint8) * 255
    
    # 유효 패치 위치 계산 (mpp 1.0 기준 그리드)
    valid_positions = get_valid_patch_positions(
        common_mask, pdl1_slide, pdl1_mpp, 
        target_mpp=1.0, patch_size=PATCH_SIZE, threshold=VALID_THRESHOLD
    )
    
    print(f"  Valid positions: {len(valid_positions)}")
    
    # 패치 추출
    patch_count = 0
    for pos_idx, (cx, cy) in enumerate(tqdm(valid_positions, desc="  Extracting patches", leave=False)):
        # 파일명 생성
        patch_name = f"{slide_name}_{pos_idx:04d}_x{cx}_y{cy}.png"
        
        # 1. PD-L1 mpp 2.0 패치
        pdl1_patch_mpp2 = extract_patch(pdl1_slide, cx, cy, pdl1_patch_size_mpp2, PATCH_SIZE)
        if pdl1_patch_mpp2 is None:
            continue
        
        # 2. HE mpp 2.0 패치 (좌표 변환)
        he_cx, he_cy = transform_point_pdl1_to_he(cx, cy, json_data)
        he_patch_mpp2 = extract_patch(he_slide, he_cx, he_cy, he_patch_size_mpp2, PATCH_SIZE)
        if he_patch_mpp2 is None:
            continue
        
        # 3. PD-L1 mpp 1.0 패치 (중앙 영역)
        # mpp 2.0 패치의 중앙에서 mpp 1.0 영역 추출
        pdl1_patch_mpp1 = extract_patch(pdl1_slide, cx, cy, pdl1_patch_size_mpp1, PATCH_SIZE)
        if pdl1_patch_mpp1 is None:
            continue
        
        # 저장
        Image.fromarray(pdl1_patch_mpp2).save(os.path.join(folder_pdl1_mpp2, patch_name))
        Image.fromarray(he_patch_mpp2).save(os.path.join(folder_he_mpp2, patch_name))
        Image.fromarray(pdl1_patch_mpp1).save(os.path.join(folder_pdl1_mpp1, patch_name))
        
        patch_count += 1
    
    print(f"  Saved {patch_count} patches")
    total_patches += patch_count
    
    # 메모리 정리
    he_slide.close()
    pdl1_slide.close()

print(f"\n=== 완료 ===")
print(f"총 추출된 패치 수: {total_patches}")
print(f"저장 위치:")
print(f"  - PD-L1 mpp 2.0: {folder_pdl1_mpp2}")
print(f"  - HE mpp 2.0:    {folder_he_mpp2}")
print(f"  - PD-L1 mpp 1.0: {folder_pdl1_mpp1}")