In [None]:
import json
import os
import shutil
from glob import glob
from PIL import Image

import numpy as np
from tqdm import tqdm

from torch.utils.data import DataLoader, Dataset

In [None]:
# FIXME
dataset_name = 'aihub_1'

In [None]:
original_image_path_root = '/opt/ml/utils/aihub_to_ufo/images'
original_label_path_root = '/opt/ml/utils/aihub_to_ufo/labels'

In [None]:
new_image_path_root = os.path.join('/opt/ml/input/data/', dataset_name, 'images')
new_label_path_root = os.path.join('/opt/ml/input/data/', dataset_name, 'labels_icdar')

os.makedirs(new_image_path_root, exist_ok=True)
os.makedirs(new_label_path_root, exist_ok=True)

---
---
## aihub to icdar
/opt/ml/utils/aihub_to_ufo/images 폴더에 있는 이미지를

/opt/ml/input/data/{dataset_name}/images 폴더로 복사

In [None]:
for original_image_path in glob(original_image_path_root+'/*'):
    original_image_name = os.path.basename(original_image_path)
    new_image_path = os.path.join(new_image_path_root, original_image_name)
    
    ## '/opt/ml/utils/aihub_to_ufo/images' 에 있는 파일 복사. 원본 유지
    shutil.copy(original_image_path, new_image_path)
    
    ## '/opt/ml/utils/aihub_to_ufo/images' 에 있는 파일 옮기고 지우기
    # shutil.move(original_image_path, new_image_path)

/opt/ml/utils/aihub_to_ufo/labels 폴더에 있는 aihub 포맷 레이블을

/opt/ml/input/data/{dataset_name}/labels_icdar 폴더로 icdar 포맷으로 변환

In [None]:
for original_label_path in glob(original_label_path_root+'/*'):

    with open(original_label_path, encoding='utf-8') as f:
        data = json.load(f)

    icdar_format = []
    for aihub_label in data['annotations']:
        x = aihub_label['bbox'][0]
        y = aihub_label['bbox'][1]
        width = aihub_label['bbox'][2]
        height = aihub_label['bbox'][3]
        if aihub_label['text'] == 'xxx':
            text = '###'
        else:
            text = aihub_label['text']
        
        icdar_format.append(f"{x},{y},{x+width},{y},{x+width},{y+height},{x},{y+height},Korean,{text}")
        
    new_label_name = os.path.basename(original_label_path)
    new_label_name = os.path.splitext(new_label_name)[0]
    with open(os.path.join(new_label_path_root,f'gt_{new_label_name}.txt'), 'w', encoding='utf-8') as txt:
        txt.write('\n'.join(icdar_format))

---
---
## icdar to ufo
/opt/ml/input/data/{dataset_name}/labels_icdar 폴더에 있는 icdar 포맷 레이블을

/opt/ml/input/data/{dataset_name}/ufo/train.json 로 ufo 포맷으로 변환

In [None]:
IMAGE_EXTENSIONS = {'.gif', '.GIF', '.jpg', '.JPG', '.png', '.PNG', '.jpeg', '.JPEG'}
LANGUAGE_MAP = {
    'Korean': 'ko',
    'Latin': 'en',
    'Symbols': None
}
def get_language_token(x):
    return LANGUAGE_MAP.get(x, 'others')

In [None]:
class MLT17Dataset(Dataset):
    def __init__(self, image_dir, label_dir):
        image_paths = set(x for x in glob(os.path.join(image_dir, '*')) if os.path.splitext(x)[1] in
                       IMAGE_EXTENSIONS)
        label_paths = set(glob(os.path.join(label_dir, '*.txt')))
        assert len(image_paths) == len(label_paths)

        ## sample_id 예시: 'img_999'
        sample_ids, samples_info = list(), dict()
        for image_path in image_paths:
            sample_id = os.path.splitext(os.path.basename(image_path))[0]

            label_path = os.path.join(label_dir, 'gt_{}.txt'.format(sample_id))
            assert label_path in label_paths

            words_info, extra_info = self.parse_label_file(label_path)
            if 'ko' not in extra_info['languages'] or extra_info['languages'].difference({'ko', 'en'}):
                continue

            sample_ids.append(sample_id)
            samples_info[sample_id] = dict(image_path=image_path, label_path=label_path,
                                           words_info=words_info)

        self.sample_ids, self.samples_info = sample_ids, samples_info

    def __len__(self):
        return len(self.sample_ids)

    def __getitem__(self, idx):
        sample_info = self.samples_info[self.sample_ids[idx]]

        image_fname = os.path.basename(sample_info['image_path'])
        image = Image.open(sample_info['image_path'])
        img_w, img_h = image.size

        license_tag = dict(usability=True, public=True, commercial=True, type='CC-BY-SA',
                           holder=None)
        sample_info_ufo = dict(img_h=img_h, img_w=img_w, words=sample_info['words_info'], tags=None,
                               license_tag=license_tag)

        return image_fname, sample_info_ufo

    def parse_label_file(self, label_path):
        def rearrange_points(points):
            # points 내부의 점들의 위치를 체크
            # np.linalg.norm(p, ord=1)은 0번 axis 기준으로 sum 하는 것.
            # [[1376.0, 0.0], [1600.0, 0.0], [1600.0, 341.0], [1376.0, 341.0]] 같은 게
            # [1376.0, 1600.0, 1941.0, 1717.0] 로 변환. 
            # 좌측 상단이 (0,0) 이니까 가장 처음 오는 point의 x값+y값이 최소여야 한다는 것.
            start_idx = np.argmin([np.linalg.norm(p, ord=1) for p in points])
            
            # 이 때 가장 작은 값이 0번째 point인지를 확인
            if start_idx != 0:
                # (만약 아니라면) start_idx에 해당하는 point가 맨 앞으로 오게 roll
                points = np.roll(points, -start_idx, axis=0).tolist()
            return points

        with open(label_path, encoding='utf-8') as f:
            lines = f.readlines()

        words_info, languages = dict(), set()
        for word_idx, line in enumerate(lines):
            items = line.strip().split(',', 9)
            language, transcription = items[8], items[9]
            points = np.array(items[:8], dtype=np.float32).reshape(4, 2).tolist()
            points = rearrange_points(points)

            illegibility = transcription == '###'
            orientation = 'Horizontal'
            language = get_language_token(language)
            words_info[word_idx] = dict(
                points=points, transcription=transcription, language=[language],
                illegibility=illegibility, orientation=orientation, word_tags=None
            )
            languages.add(language)

        return words_info, dict(languages=languages)

In [None]:
def main():
    mlt_train = MLT17Dataset(new_image_path_root, new_label_path_root)

    anno = dict(images=dict())
    with tqdm(total=len(mlt_train)) as progress_bar:
        for batch in DataLoader(mlt_train, num_workers=32, collate_fn=lambda x: x):
            image_fname, sample_info = batch[0]
            anno['images'][image_fname] = sample_info
            progress_bar.update(1)

    ufo_dir = os.path.join(os.path.dirname(new_label_path_root), 'ufo')
    os.makedirs(ufo_dir, exist_ok=True)
    with open(os.path.join(ufo_dir, 'train.json'), 'w', encoding='utf-8') as f:
        json.dump(anno, f, indent=4)

In [None]:
main()

---
---
## 원본 제거하고싶으면 실행
- /opt/ml/utils/aihub_to_ufo/images

- /opt/ml/utils/aihub_to_ufo/labels

- /opt/ml/input/data/{dataset_name}/labels_icdar

In [None]:
# shutil.rmtree(original_image_path_root)
# shutil.rmtree(original_label_path_root)
# shutil.rmtree(new_label_path_root)