In [2]:
import os
import numpy as np
import shutil
import re

## 给 storage 下的数据重新命名

规则：

```
<dataset_id>_<小写的名字>
```

In [3]:
data_root = '../../../storage'
# count = 0
# for file in os.listdir(data_root):
#     try:
#         dataset_id = file.split('_')[0]
#         dataset_name = file.split('_')[1].lower()
#         # print(dataset_name.lower())
#     except Exception as e:
#         dataset_name = file.replace(' ', '-').lower()
    
#     filename = f'{count}_{dataset_name}'
#     origin_path = os.path.join(data_root, file)
#     target_path = os.path.join(data_root, filename)
#     os.rename(origin_path, target_path)
#     count += 1

## 清洗数据集

1. 删除所有的 `._` 开头的文件夹
2. 删除所有的空文件夹

In [4]:
def explore_dataset(dataset: str):
    for file in os.listdir(dataset):
        filepath = os.path.join(dataset, file)
        if file == '.DS_Store' or (file.startswith('._')):
            os.remove(filepath)
        
    for dirpath, dirnames, filenames in os.walk(dataset):
        if not dirnames and not filenames:
            shutil.rmtree(dirpath)
            print('delete ' + dirpath)

for file in os.listdir(data_root):
    file_path = os.path.join(data_root, file)
    data_id, data_name = file.split('_')
    explore_dataset(file_path)

## 生成每个数据集的元信息

基础格式 参考 `./data/demo.json`

- task_ids 映射表 `./data/task.map.json`
- modality_ids 映射表 `./data/modality.map.json`
- organ_ids 映射表 `./data/organ.map.json`

基本策略：
1. 先找 dataset.json
2. 再找 含有 imagesTr, labelsTr 这样的字眼的文件夹
3. 制定规则

In [5]:
import json
task_map: dict = json.load(open('../data/task.map.json', 'r'))
modality_map: dict = json.load(open('../data/modality.map.json', 'r'))
organ_map: dict = json.load(open('../data/organ.map.json', 'r'))

modality_name2id = { modality_map[id]: int(id) for id in modality_map }

In [6]:
modality_name2id['ct']

2

In [7]:
class MedicalDataMeta:
    def __init__(self, id, name) -> None:
        self.id = id
        self.name = name
        self.orgin_url = ''
        self.description = ''
        self.release_date = ''
        self.task_ids = []
        self.modality_ids = []
        self.organ_ids = []
        self.data_num = -1
        self.label_num = -1
        self.split_info = {
            "train": {
                "data": 0,
                "label": 0
            },
            "test": {
                "data": 0,
                "label": 0
            },
            "val": {
                "data": 0,
                "label": 0
            }
        }
    
    def export_json(self):
        json_data = {}
        for attr in dir(self):
            if attr.startswith('__') or attr.startswith('_'):
                continue
            attr_value = getattr(self, attr)
            if callable(attr_value):
                continue
            json_data[attr] = attr_value
        return json_data
    
    def __str__(self) -> str:
        return json.dumps(self.export_json(), indent=4, ensure_ascii=False)

    def merge_json(self, json: dict):
        for attr in dir(self):
            if attr.startswith('__') or attr.startswith('_'):
                continue
            attr_value = getattr(self, attr)
            if callable(attr_value):
                continue
            
            if attr in json:
                setattr(self, attr, json[attr])

mdata_meta = MedicalDataMeta(-1, 'test')
mdata_meta.merge_json({
    'description': 'hello world'
})
print(mdata_meta)

{
    "data_num": -1,
    "description": "hello world",
    "id": -1,
    "label_num": -1,
    "modality_ids": [],
    "name": "test",
    "organ_ids": [],
    "orgin_url": "",
    "release_date": "",
    "split_info": {
        "train": {
            "data": 0,
            "label": 0
        },
        "test": {
            "data": 0,
            "label": 0
        },
        "val": {
            "data": 0,
            "label": 0
        }
    },
    "task_ids": []
}


In [9]:
from erine import ask_llm

def extract_from_dataset_json(dataset_json):
    res = ask_llm([
        {
        "role": "user",
        "content": """我现在需要从 dataset.json 中提炼出我想要的关于一个数据的信息，将数据整合成下面这样的形式：
{
    "id": 51,
    "name": "ASOCA",
    "origin_url": "https://xxx.xxx",
    "description": "xxxx",
    "release_date": "2024.05.21",
    "task_ids": [
        10
    ],
    "modality_ids": [
        0,
        1
    ],
    "organ_ids": [
        0,
        1
    ],
    "data_num": 114514,
    "label_num": 0,
    "split_info": {
        "train": {
            "data": 110,
            "label": 110
        },
        "test": {
            "data": 100,
            "label": 100
        },
        "val": {
            "data": 0,
            "label": 0
        }
    }
}

如果拿到的数据不完全满足条件，使用上述的默认值即可，但是一定要返回满足上面要求的 json. 不要返回任何注释和填充符号。

ONLY RETURN JSON FORMAT WITHOUT ANY OTHER WORDS
"""
        },
        {
            "role": "assistant",
            "content": "好的，我会按照你的要求只整理出json"
        },
        {
            "role": "user",
            "content": str(dataset_json)
        }
    ])

    left, right = -1, -1
    stack = []
    for i, ch in enumerate(res):
        if ch == '{':
            if len(stack) == 0:
                left = i
            stack.append(ch)
        elif ch == '}':
            stack.pop()
            if len(stack) == 0:
                right = i + 1
                break
    
    json_str = res[left: right]
    return eval(json_str)
    

res = extract_from_dataset_json({
    "channel_names": {
        "0": "CT"
    },
    "labels": {
        "background": 0,
        "Liver": 1
    },
    "numTraining": 20,
    "file_ending": ".nii.gz",
    "name": "Sliver07",
    "reference": "none",
    "release": "prerelease",
    "description": "Sliver07",
    "overwrite_image_reader_writer": "NibabelIOWithReorient"
})
print(res)

{'id': 51, 'name': 'ASOCA', 'origin_url': 'https://xxx.xxx', 'description': 'xxxx', 'release_date': '2024.05.21', 'task_ids': [10], 'modality_ids': [0, 1], 'organ_ids': [0, 1], 'data_num': 114514, 'label_num': 0, 'split_info': {'train': {'data': 110, 'label': 110}, 'test': {'data': 100, 'label': 100}, 'val': {'data': 0, 'label': 0}}}


In [11]:
def try_multitime_llm(json: str):
    for i in range(3):
        try:
            res = extract_from_dataset_json(json)
            return res
        except Exception as e:
            return None
    return None

def explore_dataset(dataset: str, mdata_meta: MedicalDataMeta):
    for file in os.listdir(dataset):
        filepath = os.path.join(dataset, file)
        if file == 'dataset.json':
            dataset_json: dict = json.load(open(filepath, 'r'))
            llm_json = try_multitime_llm(dataset_json)
            if llm_json:
                mdata_meta.merge_json(llm_json)
            
            mdata_meta.name = dataset_json.get('name', mdata_meta.name)
            mdata_meta.orgin_url = dataset_json.get('reference', mdata_meta.orgin_url)
            mdata_meta.description = dataset_json.get('description', mdata_meta.description)
            
data = { 'storage': [] }
for file in os.listdir(data_root):
    file_path = os.path.join(data_root, file)
    data_id, data_name = file.split('_')
    mdata_meta = MedicalDataMeta(int(data_id), data_name)
    explore_dataset(file_path, mdata_meta)
    mdata_meta.id = int(data_id)
    data['storage'].append(mdata_meta.export_json())

In [12]:
# 看一下所有 imagesTr 下的文件名后缀
datamapper: dict[int, MedicalDataMeta] = {}
for mdata in data['storage']:
    datamapper[mdata['id']] = mdata

suffix_set = set()

def get_suffix(path):
    if '.' in path:
        sample_suffix = '.'.join(path.split('.')[1:])
        return sample_suffix
    else:
        return 'unknown'
    
def explore_dataset(dataset: str, mdata_meta: MedicalDataMeta):
    modality_set = set(mdata_meta)
    for file in os.listdir(dataset):
        filepath = os.path.join(dataset, file)
        if file == 'imagesTr':
            sample_num = len(os.listdir(filepath))
            sample_file = os.listdir(filepath)[0]
            suffix_set.add(get_suffix(sample_file))
            
        elif file == 'imagesVal':
            sample_num = len(os.listdir(filepath))
            sample_file = os.listdir(filepath)[0]
            suffix_set.add(get_suffix(sample_file))
        elif file == 'labelsTr':
            sample_num = len(os.listdir(filepath))
            sample_file = os.listdir(filepath)[0]
            suffix_set.add(get_suffix(sample_file))

for file in os.listdir(data_root):
    file_path = os.path.join(data_root, file)
    data_id, data_name = file.split('_')
    mdata_meta = datamapper[int(data_id)]
    explore_dataset(file_path, mdata_meta)

print(suffix_set)

{'_lung_020.nii.gz', '_colon_042.nii.gz', '_hepaticvessel_284.nii.gz', 'nii', 'nii.gz', 'jpg'}


In [18]:
datamapper: dict[int, dict] = {}
for mdata in data['storage']:
    datamapper[mdata['id']] = mdata

def get_filetype_by_suffix(path: str):
    if '/' in path:
        path = path.split('/')[-1]
    
    if path.endswith('.nii.gz') or path.endswith('.nii'):
        return modality_name2id['ct']

def explore_dataset(dataset: str, mdata_meta: dict):
    # modality_set = set(mdata_meta)
        
    for file in os.listdir(dataset):
        filepath = os.path.join(dataset, file)
        if file == 'imagesTr':
            sample_num = len(os.listdir(filepath))
            mdata_meta['split_info']['train']['data'] = sample_num
            
        elif file == 'imagesVal':
            sample_num = len(os.listdir(filepath))
            mdata_meta['split_info']['val']['data'] = sample_num
            
        elif file == 'labelsTr':
            sample_num = len(os.listdir(filepath))
            mdata_meta['split_info']['train']['label'] = sample_num
            
        elif file == 'labelsVal':
            sample_num = len(os.listdir(filepath))
            mdata_meta['split_info']['val']['label'] = sample_num
            
for file in os.listdir(data_root):
    file_path = os.path.join(data_root, file)
    data_id, data_name = file.split('_')
    mdata_meta = datamapper[int(data_id)]
    explore_dataset(file_path, mdata_meta)
    mdata_meta['data_num'] = mdata_meta['split_info']['train']['data'] + \
                          mdata_meta['split_info']['test']['data'] + \
                          mdata_meta['split_info']['val']['data']
    
    mdata_meta['label_num'] = mdata_meta['split_info']['train']['label'] + \
                          mdata_meta['split_info']['test']['label'] + \
                          mdata_meta['split_info']['val']['label']

In [19]:
with open('../data/storage.json', 'w', encoding='utf-8') as fp:
    json.dump(data, fp, ensure_ascii=False, indent=4)

## 如果运行过之前的，请从当前开始运行

In [None]:
# 进行第二次迭代
import os
import json
import shutil

with open('../data/storage.json', 'w', encoding='utf-8') as fp:
    json.dump(data, fp, ensure_ascii=False, indent=4)