# Split (EASY, HARD)

1. label_files 변수에 학습 데이터 정보와 난이도를 구분할 평가 데이터 정보를 담는다. {"별칭": "레이블 경로"}
2. 코드를 쭉 수행한다.
3. 알아서 저장됌

In [59]:
label_files = {
    "aihub_train": ["/home/labelsets/aihub_rec_full_horizontal_clean_80:10:10/train_label.txt"],
    # "aihub_eval": ["/home/labelsets/aihub_rec_full_horizontal_clean_80:10:10/eval_label.txt"],
    # "aihub_test": ["/home/labelsets/aihub_rec_full_horizontal_clean_80:10:10/test_label.txt"],
    # "GIST_test": ["/home/labelsets/GIST_rec_full_test/test_label_horizontal_only_korean.txt"]
    "KAIST_test_horizontal": ["/home/datasets/KAIST_rec/label_only_korean_horizontal.txt"]
}

In [60]:
def load_label(label_files):
    labels = list()
    for file in label_files:
        with open(file, "r") as f:
            labels = [line.rstrip().split("\t") for line in f.readlines()]
    return labels


# {task: labels}
labels_dict = {name: load_label(files) for name, files in label_files.items()}

In [61]:

import itertools


def get_char_set_from_labels(labels):
    char_set = set()
    for label in labels:
        char_set.update(set(label[1]))
    return char_set

char_set_dict = {task: get_char_set_from_labels(labels) for task, labels in labels_dict.items()}

for task, char_set in char_set_dict.items():
    print(f"{task}: {len(char_set)}")
    
task_combinations = list(itertools.combinations(list(char_set_dict.keys()), 2))
for task1, task2 in task_combinations: # 각 task 조합에 대해 정보 출력
    print(f"{task1} & {task2} = {len(char_set_dict[task1] & char_set_dict[task2])} | {task1} - {task2} = {len(char_set_dict[task1] - char_set_dict[task2])} | {task2} - {task1} = {len(char_set_dict[task2] - char_set_dict[task1])}")

aihub_train: 1741
KAIST_test_horizontal: 769
aihub_train & KAIST_test_horizontal = 768 | aihub_train - KAIST_test_horizontal = 973 | KAIST_test_horizontal - aihub_train = 1


In [62]:
def char_num_report(labels):
    report = {}
    for label in labels:
        for char in label[1]:
            if char not in report:
                report[char] = 0
            report[char] += 1
    return dict(sorted(report.items(), key=lambda item: item[1], reverse=True))


char_num_report_dict = {task: char_num_report(labels) for task, labels in labels_dict.items()}
char_num_report_dict

{'aihub_train': {' ': 69363,
  '이': 39728,
  '스': 32335,
  '리': 30160,
  '원': 29709,
  '사': 27834,
  '의': 24634,
  '어': 23990,
  '인': 22302,
  '한': 21056,
  '과': 19776,
  '아': 19651,
  '수': 19521,
  '학': 17981,
  '동': 17910,
  '지': 17906,
  '전': 17855,
  '기': 17709,
  '부': 16945,
  '대': 16730,
  '장': 16711,
  '마': 15696,
  '산': 15323,
  '가': 15164,
  '국': 14665,
  '미': 14612,
  '문': 14541,
  '라': 14266,
  '소': 14151,
  '치': 13316,
  '다': 12738,
  '트': 12477,
  '정': 12461,
  '주': 12138,
  '점': 11938,
  '성': 11879,
  '비': 11627,
  '시': 11558,
  '고': 11517,
  '구': 11305,
  '자': 11293,
  '일': 11224,
  '공': 10879,
  '우': 10548,
  '로': 10547,
  '세': 10353,
  '하': 10255,
  '무': 10227,
  '도': 10114,
  '나': 10079,
  '영': 10065,
  '상': 9881,
  '노': 9737,
  '화': 9731,
  '래': 9575,
  '식': 9571,
  '피': 9569,
  '교': 9418,
  '회': 9247,
  '서': 9189,
  '신': 9083,
  '연': 9051,
  '방': 8983,
  '오': 8846,
  '중': 8756,
  '터': 8754,
  '드': 8583,
  '는': 8524,
  '개': 8429,
  '제': 8344,
  '에': 8245,
  '보': 7773

In [63]:
def easy_hard(sample):
    char, num = sample
    if 1500 <= num  :
        return "many"
    elif 100 <= num:
        return "medium"
    else:
        return "few"

def get_easy_hard_report_dict(char_num_report_dict):
    easy_hard_report = dict()
    for split, num_report in char_num_report_dict.items():
        easy_hard_sub_report = {}
        for sample in num_report.items():
            easy_hard_sub_report.setdefault(easy_hard(sample), []).append(sample[0])
        easy_hard_report[split]=easy_hard_sub_report
    return easy_hard_report

many_few_char_set_dict = get_easy_hard_report_dict(char_num_report_dict)

for task, report in many_few_char_set_dict.items():
    for easy_hard, samples in report.items():
        print(f"({task}, {easy_hard}): {len(samples)}")
    print()
    
# train을 제외하면 many, medium, few 등이 의미가 없다고 보면 될 듯


(aihub_train, many): 257
(aihub_train, medium): 529
(aihub_train, few): 955

(KAIST_test_horizontal, medium): 27
(KAIST_test_horizontal, few): 742



In [64]:
def get_split_with_char_set(labels, char_set):
    report = {
        "used": [],
        "unused": []
    }
    
    for img_path, label in labels:
        if len(set(label) & set(char_set)) == 0:
            report["unused"].append((img_path, label))
        else:
            report["used"].append((img_path, label))
    return report


def get_task_level_sample_dict(many_few_char_set_dict, char_set_dict, labels_dict, criterion = "train"):
    report = dict()
    
    many = many_few_char_set_dict[criterion]["many"]
    medium = many_few_char_set_dict[criterion]["medium"]
    few = many_few_char_set_dict[criterion]["few"]
    
    
    for task, char_set in char_set_dict.items():
        if task == criterion:
            continue
        
        unseen = char_set_dict[task] - char_set_dict[criterion]

        report[task] = dict()
        
        use_report = get_split_with_char_set(labels_dict[task], unseen)
        report[task]["unseen"] = use_report["used"]
        
        use_report = get_split_with_char_set(use_report["unused"], few)
        report[task]["hard"] = use_report["used"]
        
        use_report = get_split_with_char_set(use_report["unused"], medium)
        report[task]["normal"] = use_report["used"]
        report[task]["easy"] = use_report["unused"]
        
        
        # report[task]["hard"] = get_split_with_char_set(labels_dict[task], few)["used"]
        # report[task]["easy"] = get_split_with_char_set(labels_dict[task], few+medium)["unused"]
        # report[task]["normal"] = get_split_with_char_set(labels_dict[task], few)["unused"]
        
        
    return report

# task별 level 별 sample
task_level_sample_dict = get_task_level_sample_dict(many_few_char_set_dict, char_set_dict, labels_dict, criterion="aihub_train")

In [65]:
for task, report in task_level_sample_dict.items():
    for easy_hard, samples in report.items():
        print(f"({task}, {easy_hard}): {len(samples)}")
    print()

(KAIST_test_horizontal, unseen): 2
(KAIST_test_horizontal, hard): 153
(KAIST_test_horizontal, normal): 1297
(KAIST_test_horizontal, easy): 1176



In [66]:

        
from pathlib import Path

dir_path = Path("/home/test_dataset")
dir_path.mkdir(exist_ok=True, parents=True)

for task, level_sample_dict in task_level_sample_dict.items():
    for level, samples in level_sample_dict.items():
        file_path = dir_path/task/level/"label.txt"
        file_path.parent.mkdir(exist_ok=True, parents=True)
        with open(file_path, "w") as f:
            for img_path, label in samples:
                f.write(f"{img_path}\t{label}\n")
                
        file_path = dir_path/task/level/"infer.txt"
        file_path.parent.mkdir(exist_ok=True, parents=True)
        with open(file_path, "w") as f:
            for img_path, label in samples:
                f.write(f"{img_path}\n")

# Shape distinguish
이미지가 가로형인지 세로형인지 대략적으로 구분

In [3]:
from pathlib import Path
from PIL import Image
import multiprocessing
from tqdm import tqdm

def chunk_list(data, num_chunks):
    avg = len(data) / float(num_chunks)
    chunks = []
    last = 0.0

    while last < len(data):
        chunks.append(data[int(last):int(last + avg)])
        last += avg

    return chunks

def get_shape_from_size(image, label):
    w, h = image.size
    if w >= h*1.5:
        return "horizontal"
    else:
        return "others"

def add_size(work_list, shared_list, data_dir):
    for work in tqdm(work_list):
        image_path, label = work
        image = Image.open(data_dir/image_path)
        shape = get_shape_from_size(image, label)
        shared_list.append([image_path, label, shape])

def get_shape_reportf(data_dir, label_path, worker_num = 10):
    with open(label_path) as f:
        lines = [line.strip().split("\t") for line in f.readlines() if len(line.strip().split("\t")) == 2]
        

    manager = multiprocessing.Manager()
    shared_list = manager.list()
    
    data_parts = chunk_list(lines, worker_num)
    processes = []
            
    for part in data_parts:
        p = multiprocessing.Process(target=add_size, args=(part, shared_list, data_dir))
        processes.append(p)
        p.start()
        
    for p in processes:
        p.join()
        
    shape_report = {}
    for image_path, label, shape in tqdm(shared_list):
        shape_report.setdefault(shape, []).append([image_path, label])
    
    return shape_report




In [56]:
# data_dir = Path("/home/datasets/aihub_rec/")
# label_path = Path("/home/datasets/aihub_rec/clean_label.txt")

data_dir = Path("/home/datasets/KAIST_rec")
label_path = Path("/home/datasets/KAIST_rec/label_only_korean.txt")

shape_report = get_shape_reportf(data_dir, label_path, worker_num = 50)

100%|██████████| 59/59 [00:00<00:00, 3093.03it/s]


100%|██████████| 59/59 [00:00<00:00, 2985.99it/s]
100%|██████████| 59/59 [00:00<00:00, 2277.10it/s]
100%|██████████| 59/59 [00:00<00:00, 2217.24it/s]
100%|██████████| 59/59 [00:00<00:00, 1634.55it/s]
100%|██████████| 59/59 [00:00<00:00, 1291.18it/s]
100%|██████████| 59/59 [00:00<00:00, 1245.70it/s]
100%|██████████| 59/59 [00:00<00:00, 1222.66it/s]
100%|██████████| 59/59 [00:00<00:00, 243.05it/s]
100%|██████████| 60/60 [00:00<00:00, 98.76it/s]
100%|██████████| 59/59 [00:00<00:00, 88.48it/s]
100%|██████████| 59/59 [00:00<00:00, 85.77it/s]
100%|██████████| 59/59 [00:00<00:00, 103.86it/s]
100%|██████████| 59/59 [00:00<00:00, 75.97it/s]
100%|██████████| 59/59 [00:00<00:00, 186.33it/s]
100%|██████████| 59/59 [00:00<00:00, 228.50it/s]
100%|██████████| 59/59 [00:00<00:00, 263.38it/s]
100%|██████████| 59/59 [00:00<00:00, 367.12it/s]
100%|██████████| 59/59 [00:00<00:00, 373.48it/s]
100%|██████████| 59/59 [00:00<00:00, 364.49it/s]
100%|██████████| 60/60 [00:00<00:00, 377.34it/s]
100%|██████████| 

In [57]:
for k, samples in shape_report.items():
    print(k, len(samples))

others 327
horizontal 2628


In [58]:
target_label_path = Path("/home/datasets/KAIST_rec/label_only_korean_horizontal.txt")
with open(target_label_path, "w") as f:
    for img_path, label in shape_report["horizontal"]:
        f.write(f"{img_path}\t{label}\n")
    



# 한글로만 이루어진 샘플 추출

In [38]:
import re

# 파일 경로


input_file_path = "/home/datasets/KAIST_rec/label.txt"
output_file_path = "/home/datasets/KAIST_rec/label_only_korean.txt"

# 한글 패턴 (한글 문자만 있는지 확인하기 위해)
hangul_pattern = re.compile("^[가-힣]+$")

# 결과 파일에 한글 레이블만 저장
with open(input_file_path, 'r', encoding='utf-8') as infile, open(output_file_path, 'w', encoding='utf-8') as outfile:
    for line in infile:
        try:
            # 오른쪽 레이블 추출
            label = line.strip().split('\t')[1]
            # 한글로만 이루어졌는지 확인
            if hangul_pattern.match(label):
                outfile.write(line)
        except Exception as e:
            print(e)
            print(line)

print(f"한글로만 이루어진 레이블의 샘플이 '{output_file_path}'에 저장되었습니다.")

list index out of range
2\1523.png	

list index out of range
5\4591.png	

list index out of range
5\4592.png	

list index out of range
5\4593.png	

list index out of range
6\5141.png	

한글로만 이루어진 레이블의 샘플이 '/home/datasets/KAIST_rec/label_only_korean.txt'에 저장되었습니다.


# label filtering

In [None]:
from PIL import Image
from pathlib import Path
from collections import Counter
import pandas as pd

def load_label(label_file_path):
    with open(label_file_path) as f:
        lines = [line.strip().split("\t") for line in f.readlines()]    
    for line in lines:
        if len(line) == 1:  # 레이블이 없는 경우 (공백으로 추론 된 경우)
            line.append("")
    return lines
        
def text_check(text):
    IGNORE_TEXT = ["(한자)", "((한자))", "(((한자)))", "(일본어)", "((일본어))", "(((일본어)))", "(외국어)","((외국어))","(((외국어)))",  "(영어)", "((영어))", "(((영어)))", "xx", "xxx", "xxxx", "xxxxx", "XX", "XXX", "XXXX", "XXXXX"]
    IGNORE_MASK = ["xx", "xxx", "xxxx", "xxxxx", "XX", "XXX", "XXXX", "XXXXX"]+["ㄱ","ㄴ","ㄷ","ㄹ","ㄺ","ㅁ","ㅂ","ㅅ","ㅆ","ㅇ","ㅈ","ㅊ","ㅋ","ㅌ","ㅍ","ㅎ","ㅏ","ㅑ","ㅓ","ㅕ","ㅗ","ㅛ","ㅜ","ㅠ","ㅡ","ㅣ","ㅐ","ㅒ","ㅔ","ㅖ","ㅘ","ㅙ","ㅚ","ㅝ","ㅞ","ㅟ","ㅢ"]+["!",'"',"#","$","%","&","'","(",")","*","+","-","/","0","1","2","3","4","5","6","7","8","9",":",";","<","=",">","?","A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z","[","\\","]","^","_","`","a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r","s","t","u","v","w","x","y","z","{","|","}","~","ㄱ","ㄴ","ㄷ","ㄹ","ㄺ","ㅁ","ㅅ","ㅆ","ㅇ","ㅈ","ㅊ","ㅋ","ㅌ","ㅍ","ㅎ"]+[",", ".", "º"]
    # 쓸 수 있는 test(trascription) 인지 체크하여 T, F로 반환
    if text in IGNORE_TEXT: # 금지된 텍스트에 해당하면 탈락
        return False
    else:
        for mask in IGNORE_MASK: # mask를 포함하고 있으면 탈락
            if mask in text:
                return False
    return True




def filter_label_with_mask(labels):
    filtered = []
    removed = []
    for i, (image, label) in enumerate(labels):
        if text_check(label):
            filtered.append([image, label])
        else:
            removed.append([image, label])
    return filtered, removed

def filter_by_length(labels, max_len):
    filtered = []
    removed = []
    for i, (image, label) in enumerate(labels):
        if len(label) <= max_len:
            filtered.append([image, label])
        else:
            removed.append([image, label])
    return filtered, removed

def get_char_num_report(labels):
    char_list = []
    for image, label in labels:
        char_list+=list(label)
    count = Counter(char_list)
    return dict(sorted(count.items(), key=lambda item: item[1]))
    

def get_length_report(labels):
    size_list = [len(text) for image, text in labels]

    count = Counter(size_list)
    return dict(sorted(count.items(), key=lambda item: item[0]))

def get_df(image_text_pairs):
    samples = []
    for image, text in image_text_pairs:
        samples.append({"image":image, "text":text})
    return pd.DataFrame(samples)


def make_char_set_file(char_num_report, file_path = "/home/char_set.txt"):
    char_list = list(char_num_report.keys())
    char_list.sort()

    with open(file_path, "w") as f:
        for c in char_list:
            f.write(f"{c}\n")
    print(f"{len(char_list)} characters are saved in '{file_path}'")


In [None]:
data_dir = "/home/datasets/aihub_rec"
label_file_path = "/home/datasets/aihub_rec/label.txt"
infer_file_path = "/home/datasets/aihub_rec/clean_infer_result.txt"

#################################################
labels = load_label(label_file_path)
print(f"Totel label num: {len(labels)}")

#################################################
labels, removed_labels = filter_label_with_mask(labels)
print(f"Label num after char_set filtering = {len(labels)}       ... {len(removed_labels)} samples are removed")

#################################################
MAX_LENGTH = 20
labels, removed_labels = filter_by_length(labels, MAX_LENGTH)
print(f"Label num after max length filtering = {len(labels)}       ... {len(removed_labels)} samples are removed")

#################################################
# 가로 세로 구분을 하려 했는데 필요 없을 것 같음
infers = load_label(infer_file_path)
label_df = get_df(labels)
infer_df = get_df(infers)
df = pd.merge(label_df, infer_df, on="image", suffixes=["_label", "_infer"])

#################################################
char_num_report = get_char_num_report(labels)
length_report = get_length_report(labels)

#################################################
make_char_set_file(char_num_report)

In [None]:
from pathlib import Path

def get_sample_seen_unseen_report(labels, seen_char):
    seen_unseen_report = {
        "seen": [],
        "unseen": []
    }
    
    for image, label in labels:
        unseen_char = set(label) - set(seen_char)
        if len(unseen_char) == 0:
            seen_unseen_report["seen"].append([image, label])
        else:
            seen_unseen_report["unseen"].append([image, label])

    return seen_unseen_report



trainset_path = Path("/home/labelsets/aihub_rec_full_horizontal_clean_80:10:10/train_label.txt")

testset_path_dict = {
    "aihub_eval": Path("/home/labelsets/aihub_rec_full_horizontal_clean_80:10:10/eval_label.txt"),
    "aihub_test": Path("/home/labelsets/aihub_rec_full_horizontal_clean_80:10:10/test_label.txt"),
    "GIST_test": Path("/home/labelsets/GIST_rec_full_test/test_label.txt"),
    "KAIST_test": Path("/home/labelsets/KAIST_rec_full_test/test_label.txt"),
}

trainset = load_label(trainset_path)[:10000]
testset_dict = {task: load_label(file_path) for task, file_path in testset_path_dict.items()}

trainset_char_num_report = get_char_num_report(trainset)
testset_char_num_report_dict = {task: get_char_num_report(labels) for task, labels in testset_dict.items()}

trainset_unique_char = set(trainset_char_num_report.keys())
seen_unseen_report_dict = {task: get_sample_seen_unseen_report(testset, trainset_unique_char) for task,  testset in testset_dict.items()}

