In [18]:
import copy
import csv
import functools
import glob
import os

In [19]:
from collections import namedtuple

In [20]:
import SimpleITK as sitk # 데이터 파일 포맷을 numpy로 불러들이기 위함
import numpy as np

In [21]:
import torch
import torch.cuda
from torch.utils.data import Dataset

In [22]:
import os
os.getcwd()

'D:\\Lung_Cancer_Diagnostic'

In [24]:
from util.disk import getCache # diskcache disk에  cache를 저장해주는 라이브러리
from util.util import XyzTuple, xyz2irc
from util.logconf import logging

In [25]:
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)

In [26]:
raw_cache = getCache('part2ch10_raw')

In [28]:
CandidateInforTuple = namedtuple(
    'CandidateInfoTuple',
    'isNodule_bool, diameter_mm, series_uid, center_xyz',
)

In [44]:
"""
- 인메모리나 온디스크 캐싱을 적절하게 사용하여 데이터 파이프라인 속도를 올려 놓으면 훈련 속도가 상당히 개선됨
- 훈련 데이터셋 전체를 사용하게 되면 다운로드도 오래 걸리고 필요한 디스크 공간도 커지기 때문에 전체를 사용하는 대신 훈련 프로그램 실행에 집중
- requireOnDisk_bool 파라미터를 사용하여 디스크 상에서 시리즈 UID가 발견되는 LUNA 데이터만 사용하고 이에 해당하는 엔트리는 csv에서 거름
"""

@functools.lru_cache(1) # 표준 인메모리 캐싱 라이브러리
def getCandidateInfoList(requireOnDisk_bool=True): # disk에 없는 데이터를 거르기 위함
    # 모든 디스크에 있는 모든 세트를 series_uids로 구성함
    # 이렇게 하면 모든 하위 집합을 다운로드 하지 않더라도 사용할 수 있음
    mhd_list = glob.glob('D:/Luna/subset*/*.mhd')
    presentOnDisk_set = {os.path.split(p)[-1][:-4] for p in mhd_list}
    
    diameter_dict = {}
    with open("D:/Luna/annotations.csv", "r") as f:
        for row in list(csv.reader(f))[1:]:
            series_uid = row[0]
            annotationCenter_xyz = tuple([float(x) for x in row[1:4]])
            annotationDiameter_mm = float(row[4])
            
            diameter_dict.setdefault(series_uid, []).append( # 첫번째 인자로 키값, 두번째 인자로 기본값을 넘김
                (annotationCenter_xyz, annotationDiameter_mm)
            )
    
    candidateInfo_list = []
    with open('D:/Luna/candidates.csv', "r") as f:
        for row in list(csv.reader(f))[1:]:
            series_uid = row[0]
            
            if series_uid not in presentOnDisk_set and requireOndisk_bool: # series_uid가 없으면 서브셋에 있지만 디스크에는 없으므로 건너뜀
                continue
                
            isNodule_bool = bool(int(row[4]))
            candidateCenter_xyz = tuple([float(x) for x in row[1:4]])
            
            candidateDiameter_mm = 0.0
            for annotation_tup in diameter_dict.get(series_uid, []):
                annotation_xyz, annotationDiameter_mm = annotation_tup
                for i in range(3):
                    delta_mm = abs(candidateCenter_xyz[i] - annotationCenter_xyz[i])
                    if delta_mm > annotationDiameter_mm / 4: # 반경을 얻기 위해 직경을 2로 나누고, 두 개의 결절 센터가 결절의 크기 기준으로
                        break                                # 너무 떨어져 있는 지를 반지름의 절반 길이를 기준으로 판정한다.(바운딩 박스 체크)
                else:
                    candidateDiameter_mm = annotationDiameter_mm
                    break
                    
            candidateInfo_list.append(CandidateInfoTuple(
                isNodule_bool,
                candidateDiameter_mm,
                series_uid,
                candidateCenter_xyz,
            ))
            
    candidateInfo_list.sort(reverse=True) # s내림차순 정렬
    return candidateInfo_list

# 개별 CT 스캔 로딩
- 디스크에서 CT 데이터를 얻어와 파이썬 객체로 변환 후 3차원 결절 밀도 데이터로 사용할 수 있도록 만드는 작업
- CT 스캔 파일의 원래 포맷은 DICOM이라고 부름
- CT 스캔 복셀은 하운스필드 단위(https://en.wikipedia.org/wiki/Hounsfield_scale)로 표시

In [None]:
class Ct:
    def __init__(self, series_uid):
        mhd_path = glob.glob(
            'D:/Luna/subset*/{}.mhd'.format(series_uid)
        )[0]
        
        ct_mhd = sitk.ReadImage(mhd_path) # sitk.ReadImage는 .mhd 파일뿐 아니라 .raw 파일도 읽음
        ct_a = np.array(sitk.GetArrayFromImage(ct_mhd), dtype=np.float32)
        
        """
        공기는 -1000HU(0g/cc), 물은 0HU(1g/cc), 뼈는 +1000HU(2~3g/cc)다. 
        우리가 관심잇는 종양은 대체로 1g/cc(0HU) 근처이므로 -1000HU, 1000HU을 제거하고 1g/cc가 아닌 것도 제거함
        """ 
        ct_a.clip(-1000, 1000, ct_a)
        
        self.series_uid = series_uid
        self.hu_a = ct_a
        
        """
        환자 좌표계를 사용해 결절 위치 정하기
        """