In [None]:

import os
import warnings
from argparse import ArgumentParser
import matplotlib.patches as patches
import torch
from tqdm import tqdm
import cv2
from torch.utils import data
# 개별 json 라벨 파일을 이용해 학습 데이터 리스트 생성
from glob import glob
import xml.etree.ElementTree as ET
from xml.dom import minidom
import os
from nets import nn
from utils import util
from utils.dataset import Dataset
from torch.utils import data
import numpy as np
import matplotlib.pyplot as plt
import random
import openslide
import copy
import random
from time import time

import math
import numpy
import torch
import torchvision
from torch.nn.functional import cross_entropy
device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("device:",device)
params={'names':{
            0: "Neutrophil",
            1: "Epithelial",
            2: "Lymphocyte",
            3: "Plasma",
            4: "Eosinophil",
            5: "Connective tissue"
        }}



In [None]:
save_dir='../../model/HnE_cell_detection/yolov11/'
model = nn.yolo_v11_m(len(params['names'])).to(device)
checkpoint_path = os.path.join(save_dir, 'best_model_m.pt')
if os.path.exists(checkpoint_path):
    checkpoint = torch.load(checkpoint_path, map_location=device,weights_only=False)
    model.load_state_dict(checkpoint['model_state_dict'])
    
def wh2xy(x):
    y = x.clone() if isinstance(x, torch.Tensor) else numpy.copy(x)
    y[:, 0] = x[:, 0] - x[:, 2] / 2  # top left x
    y[:, 1] = x[:, 1] - x[:, 3] / 2  # top left y
    y[:, 2] = x[:, 0] + x[:, 2] / 2  # bottom right x
    y[:, 3] = x[:, 1] + x[:, 3] / 2  # bottom right y
    return y
   
def non_max_suppression(outputs, confidence_threshold=0.001, iou_threshold=0.65, class_thresholds=None):
    """
    빠른 클래스별 NMS - 성능 최적화 버전
    """
    max_wh = 7680
    max_det = 300
    max_nms = 30000

    bs = outputs.shape[0]
    nc = outputs.shape[1] - 4
    
    # 빠른 필터링을 위해 가장 낮은 threshold 사용
    min_conf = confidence_threshold
    if class_thresholds:
        min_conf = min(min(class_thresholds.values()), confidence_threshold)
    
    # 전체 confidence가 낮은 것들 먼저 제거
    xc = outputs[:, 4:4 + nc].amax(1) > min_conf
    
    output = [torch.zeros((0, 6), device=outputs.device)] * bs
    
    for xi, x in enumerate(outputs):  # image index, image inference
        x = x.transpose(0, -1)[xc[xi]]
        
        if not x.shape[0]:
            continue

        # 박스와 클래스 분리
        box, cls = x.split((4, nc), 1)
        box = wh2xy(box)
        
        # 각 검출의 최고 클래스와 confidence 찾기
        conf, j = cls.max(1, keepdim=True)
        x = torch.cat((box, conf, j.float()), 1)
        
        # 클래스별 threshold 적용 (간단한 방식)
        if class_thresholds:
            keep = torch.zeros(x.shape[0], dtype=torch.bool, device=x.device)
            for i, detection in enumerate(x):
                class_id = int(detection[5].item())
                threshold = class_thresholds.get(class_id, confidence_threshold)
                if detection[4].item() >= threshold:
                    keep[i] = True
            x = x[keep]
        else:
            x = x[x[:, 4] > confidence_threshold]
        
        if not x.shape[0]:
            continue
            
        # confidence로 정렬하고 상위 max_nms개만 유지
        x = x[x[:, 4].argsort(descending=True)[:max_nms]]
        
        # 빠른 NMS - PyTorch 내장 함수 사용
        c = x[:, 5:6] * max_wh  # 클래스별 offset
        boxes = x[:, :4] + c
        scores = x[:, 4]
        
        # NMS 적용
        keep = torchvision.ops.nms(boxes, scores, iou_threshold)
        if keep.shape[0] > max_det:
            keep = keep[:max_det]
        
        output[xi] = x[keep]
    
    return output
    
def pred_patch(torch_patch, model, start_x, start_y, magnification):
    model.eval()
    
    # HnE 세포 분류를 위한 클래스별 개별 confidence threshold 설정
    class_thresholds = {
        0: 0.05,  # Neutrophil
        1: 0.05,  # Epithelial
        2: 0.05,  # Lymphocyte
        3: 0.05,  # Plasma
        4: 0.05,  # Eosinophil
        5: 0.05   # Connective tissue
    }
    
    # 각 클래스별 세포 리스트 (임시로 기존 변수명 유지)
    cells_list = []  # 모든 검출된 세포를 여기에 저장

    
    with torch.no_grad():
        with torch.amp.autocast('cuda'):
            pred = model(torch_patch)
        
        # 빠른 NMS 적용
        results = non_max_suppression(pred, confidence_threshold=0.05, 
                                    iou_threshold=0.3, class_thresholds=class_thresholds)
        
        if len(results[0]) > 0:
            # 벡터화된 처리로 속도 향상
            detections = results[0]
            xyxy = detections[:, :4]
            confs = detections[:, 4]
            cls_ids = detections[:, 5]
            
            # 중심점 계산 (벡터화)
            centers_x = (xyxy[:, 0] + xyxy[:, 2]) / 2
            centers_y = (xyxy[:, 1] + xyxy[:, 3]) / 2
            
            # 실제 좌표 계산
            actual_x = start_x + centers_x * magnification
            actual_y = start_y + centers_y * magnification
            
            # 모든 클래스의 세포를 cells_list 리스트에 저장 (벡터화)
            for i in range(len(detections)):
                cls_id = int(cls_ids[i].item())
                cell_data = {
                    'x': actual_x[i].item(), 
                    'y': actual_y[i].item(), 
                    'cls_id': cls_id,
                    'confidence': confs[i].item()
                }
                
                # 모든 세포를 cells_list에 저장 (XML 생성 함수에서 cls_id로 구분)
                cells_list.append(cell_data)
    
    return cells_list

In [None]:
import json

def save_patch_cells_to_json(cells_list, temp_dir, patch_id):
    """각 패치별로 개별 JSON 파일에 저장 (파싱 없음!)"""
    if not cells_list:
        return
    
    patch_json_path = os.path.join(temp_dir, f"patch_{patch_id}.json")
    
    # 직접 저장 (파일 읽기/파싱 전혀 없음!)
    with open(patch_json_path, 'w') as f:
        json.dump(cells_list, f)

def merge_patch_jsons_to_xml(temp_dir, xml_path):
    """temp 폴더의 모든 개별 JSON 파일을 병합하여 XML로 변환"""
    
    # temp 폴더의 모든 JSON 파일 수집
    json_files = glob(os.path.join(temp_dir, "patch_*.json"))
    all_cells = []
    
    print(f"📄 {len(json_files)}개의 패치 JSON 파일 병합 중...")
    
    # 모든 JSON 파일에서 세포 데이터 수집
    for json_file in json_files:
        with open(json_file, 'r') as f:
            patch_cells = json.load(f)
            all_cells.extend(patch_cells)
    
    # HnE 세포 분류를 위한 클래스별 색상 매핑
    class_colors = {
        0: "#FFA500",  # Neutrophil - 주황색
        1: "#008000",  # Epithelial - 녹색
        2: "#FF0000",  # Lymphocyte - 빨간색
        3: "#87CEEB",  # Plasma - 하늘색
        4: "#0000FF",  # Eosinophil - 파란색sssssdasdasdasdasdasdasdasdasdasdㅁㄴㅇsds
        5: "#FFFF00"   # Connective tissue - 노란색
    }
    
    class_names = {
        0: "Neutrophil",
        1: "Epithelial", 
        2: "Lymphocyte",
        3: "Plasma",
        4: "Eosinophil",
        5: "Connective tissue"
    }
    
    # 루트 엘리먼트 생성
    root = ET.Element("ASAP_Annotations")
    
    # Annotations 엘리먼트 생성
    annotations = ET.SubElement(root, "Annotations")
    
    # 한 번에 모든 세포 추가
    annotation_id = 0
    for cell in all_cells:
        cls_id = cell.get('cls_id', 0)
        
        annotation = ET.SubElement(annotations, "Annotation")
        annotation.set("Name", f"Annotation {annotation_id}")
        annotation.set("Type", "Dot")
        annotation.set("PartOfGroup", class_names.get(cls_id, f"Class_{cls_id}"))
        annotation.set("Color", class_colors.get(cls_id, "#FFFFFF"))
        
        coordinates = ET.SubElement(annotation, "Coordinates")
        coordinate = ET.SubElement(coordinates, "Coordinate")
        coordinate.set("Order", "0")
        coordinate.set("X", str(float(cell['x'])))
        coordinate.set("Y", str(float(cell['y'])))
        
        annotation_id += 1
    
    # AnnotationGroups 엘리먼트 생성
    annotation_groups = ET.SubElement(root, "AnnotationGroups")
    
    # 각 클래스별 그룹 생성
    for cls_id, class_name in class_names.items():
        group = ET.SubElement(annotation_groups, "Group")
        group.set("Name", class_name)
        group.set("PartOfGroup", "None")
        group.set("Color", class_colors.get(cls_id, "#FFFFFF"))
        attributes = ET.SubElement(group, "Attributes")
    
    # XML 저장
    rough_string = ET.tostring(root, 'unicode')
    reparsed = minidom.parseString(rough_string)
    pretty_xml = reparsed.toprettyxml(indent="	")
    
    # <?xml version... 라인을 원하는 형태로 수정
    lines = pretty_xml.split('\n')
    lines[0] = '<?xml version="1.0"?>'
    pretty_xml = '\n'.join(lines[1:])  # 빈 라인 제거
    
    with open(xml_path, 'w', encoding='utf-8') as f:
        f.write(pretty_xml)
    
    print(f"✅ XML 변환 완료: {len(all_cells)}개 세포")
    return len(all_cells)

In [None]:
import pyvips
import shutil

slide_path=glob('../../data/*_HnE/*.ndpi')
image_size=1024 # 모델 입력 크기
origin_mpp=0.25
output_mpp=0.5
original_size=int(image_size*output_mpp/origin_mpp) #1122
magnification=original_size/image_size
count=0

for i in range(len(slide_path)):
    file_name=os.path.basename(slide_path[i]).split('.')[0]
    slide=openslide.OpenSlide(slide_path[i])
    thumbnail=slide.get_thumbnail((slide.dimensions[0]//64, slide.dimensions[1]//64))
    slide = pyvips.Image.new_from_file(slide_path[i])

    thumb_mask=cv2.threshold(255-np.array(thumbnail.convert('L')),30,255,cv2.THRESH_BINARY)[1]
    thumb_mask=cv2.morphologyEx(thumb_mask,cv2.MORPH_CLOSE,np.ones((15,15),np.uint8))
    thumb_mask=cv2.morphologyEx(thumb_mask,cv2.MORPH_OPEN,np.ones((5,5),np.uint8))
    
    # 파일 경로 설정
    output_xml_path = f"../../results/BR_HnE/cell_detection/{file_name}.xml"
    temp_dir = f"../../results/BR_HnE/cell_detection/{file_name}_temp"
    os.makedirs("../../results/BR_HnE/cell_detection", exist_ok=True)
    
    # temp 폴더 초기화 (기존 폴더 삭제 후 새로 생성)
    if os.path.exists(temp_dir):
        shutil.rmtree(temp_dir)
    os.makedirs(temp_dir)
    
    total_cells_detected = 0
    patch_counter = 0  # 패치 ID 카운터
    row_pbar = tqdm(range(slide.width//image_size-1), total=slide.width//image_size-1)
    
    for patch_row in row_pbar:
        for patch_col in range(slide.height//image_size-1):
            if np.sum(thumb_mask[(patch_col*image_size)//64:((patch_col+1)*image_size)//64,(patch_row*image_size)//64:((patch_row+1)*image_size)//64])>0:
                count+=1
                patch=slide.crop(patch_row*image_size, patch_col*image_size, image_size, image_size)
                patch=np.ndarray(buffer=patch.write_to_memory(),
                            dtype=np.uint8,
                            shape=[patch.height, patch.width, patch.bands])
                patch = cv2.resize(np.array(patch)[:,:,:3], (512, 512))
                torch_patch=torch.from_numpy(patch).permute(2,0,1).unsqueeze(0).float()/255.
                torch_patch=torch_patch.to(device)
                cell_list = pred_patch(torch_patch, model, patch_row*image_size, patch_col*image_size, 2)
                
                # 🚀 개별 JSON 파일로 저장 (파일 파싱 전혀 없음!)
                if len(cell_list) > 0:
                    save_patch_cells_to_json(cell_list, temp_dir, patch_counter)
                    total_cells_detected += len(cell_list)
                
                patch_counter += 1  # 패치 ID 증가
                s = f"처리된 패치: {count}, 총 검출된 세포: {total_cells_detected}"
                row_pbar.set_description(f'{s}')

    # 🎯 처리 완료 후 모든 개별 JSON → XML 일괄 변환
    print(f"\n📄 개별 JSON 파일들 병합하여 XML 변환 중...")
    final_cell_count = merge_patch_jsons_to_xml(temp_dir, output_xml_path)
    
    # temp 폴더 전체 삭제
    shutil.rmtree(temp_dir)
    
    print(f"🎯 WSI 처리 완료! 총 {final_cell_count}개 세포 검출됨")
    print(f"📄 XML 파일 저장 위치: {output_xml_path}")
    print(f"🗑️  임시 폴더 정리 완료")
    