# Data prepare

## 1. Download Korean Data from aihub.go.kr
- The data downloaded from aihub.go.kr is separated into zip files by folder hierarchy, so it must be fully extracted without distinction before working only with the json files.
- Then, parse the json to collect only the Korean data.

In [2]:
import os
import zipfile
import glob
import json
import re

### (1) Check if the code works.

In [3]:
base_path = "/mnt/t7/dnn_data/korean_data/data"
target_path = "/mnt/t7/dnn/llm_practicing/korean_data"

In [8]:
def extract_zip_files(base_path):
    # Zip 파일 경로를 찾습니다.
    zip_files = glob.glob(f'{base_path}/**/*.zip', recursive=True)
    for zip_file in zip_files:
        # Zip 파일을 압축 해제합니다.
        with zipfile.ZipFile(zip_file, 'r') as zip_ref:
            extract_path = os.path.dirname(zip_file)
            zip_ref.extractall(extract_path)

def find_json_files(base_path):
    # JSON 파일 경로를 찾습니다.
    return glob.glob(f'{base_path}/**/*.json', recursive=True)

def contains_korean(text):
    return bool(re.search("[가-힣]", text))

def find_korean_values(data, result):
    if isinstance(data, dict):
        for value in data.values():
            find_korean_values(value, result)
    elif isinstance(data, list):
        for item in data:
            find_korean_values(item, result)
    elif isinstance(data, str) and contains_korean(data):
        result.append({"text": data})

def save_as_jsonl(data, target_path):
    for top_folder, texts in data.items():
        file_name = f"{top_folder}.jsonl"
        with open(os.path.join(target_path, file_name), 'w', encoding='utf-8') as file:
            for item in texts:
                json_record = json.dumps(item, ensure_ascii=False)
                file.write(json_record + '\n')



In [1]:
def safe_load_json_file(json_file):
    with open(json_file, 'r', encoding='utf-8') as file:
        try:
            file_content = file.read()
            # 유효하지 않은 제어 문자 제거 또는 이스케이프 처리
            # file_content = file_content.replace('\n', '\\n').replace('\r', '\\r')
            return json.loads(file_content)
        except json.JSONDecodeError as e:
            print(f"Error decoding JSON from {json_file}: {e}")
            return None  # 또는 적절한 오류 처리

# def process_json_files(json_files, base_path):
#     organized_data = {}
#     for json_file in json_files:
#         print(json_file)
#         top_folder = json_file[len(base_path):].split(os.sep)[1]
#         if top_folder not in organized_data:
#             organized_data[top_folder] = []
        
#         with open(json_file, 'r', encoding='utf-8') as file:
#             data = json.load(file)
#             result = []
#             find_korean_values(data, result)
#             organized_data[top_folder].extend(result)
#     return organized_data

# process_json_files 함수 내에서 json.load(file) 대신 safe_load_json_file을 사용
def process_json_files(json_files, base_path):
    organized_data = {}
    for json_file in json_files:
        top_folder = json_file[len(base_path):].split(os.sep)[1]
        if top_folder not in organized_data:
            organized_data[top_folder] = []
        
        data = safe_load_json_file(json_file)
        if data:  # 데이터가 성공적으로 로드된 경우에만 처리
            result = []
            find_korean_values(data, result)
            organized_data[top_folder].extend(result)
    return organized_data

In [10]:
def main(base_path, target_path):
    extract_zip_files(base_path)
    json_files = find_json_files(base_path)
    korean_texts = process_json_files(json_files, base_path)
    save_as_jsonl(korean_texts, target_path)

In [12]:
main(base_path, target_path)

NotImplementedError: That compression method is not supported

In [26]:
# json_files = find_json_files(base_path)
# korean_texts = process_json_files(json_files, base_path)
# save_as_jsonl(korean_texts, target_path)

In [None]:
# for json_file in json_files:
#     check_st = safe_load_json_file(json_file)
#     if check_st is not None:
#         print(json_file)

In [1]:
def safe_load_json_file(json_file):
    with open(json_file, 'r', encoding='utf-8') as file:
        try:
            file_content = file.read()
            # 유효하지 않은 제어 문자 제거 또는 이스케이프 처리
            file_content = file_content.replace('\n', '\\n').replace('\r', '\\r')
            return json.loads(file_content)
        except json.JSONDecodeError as e:
            print(f"Error decoding JSON from {json_file}: {e}")
            return None  # 또는 적절한 오류 처리

In [13]:
dir_list = os.listdir(base_path)

print(dir_list)

['021.도서자료 기계독해', '022.요약문 및 레포트 생성 데이터', '024.에세이 글 평가 데이터', '025.일상생활 및 구어체 한-영 번역 병렬 말뭉치 데이터', '029.대규모 구매도서 기반 한국어 말뭉치 데이터', '030.웹데이터 기반 한국어 말뭉치 데이터', '142.한국어 지식기반 관계 데이터', '150.숫자연산 기계독해 데이터', '152.기술과학 문서 기계독해 데이터', '153.기술과학 요약 데이터', '156.전문분야 영-한, 중-한 번역 말뭉치(식품)', '157.추상 요약 사실성 검증 데이터', '160.문화, 게임 콘텐츠 분야 용어 말뭉치', '308.AI 허브 데이터 활용을 위한 기계 번역앱 구축과 번역기 평가 및 신규 말뭉치 구축', '기계독해', '도서자료 요약', '법률 지식베이스', '일반상식', '전문분야 말뭉치', '전문분야 한영 말뭉치', '한국어-영어 번역 말뭉치(기술과학)', '한국어-영어 번역 말뭉치(사회과학)', '한국어-영어 번역(병렬) 말뭉치', '한국어-일본어 번역 말뭉치', '한국어-중국어 번역 말뭉치(기술과학)', '한국어-중국어 번역 말뭉치(사회과학)']


### (2) Modify the code to work by top_folder.

In [4]:
def extract_zip_files(base_path):
    # Zip 파일 경로를 찾습니다.
    zip_files = glob.glob(f'{base_path}/**/*.zip', recursive=True)
    for zip_file in zip_files:
        # Zip 파일을 압축 해제합니다.
        with zipfile.ZipFile(zip_file, 'r') as zip_ref:
            extract_path = os.path.dirname(zip_file)
            zip_ref.extractall(extract_path)

def find_json_files(base_path):
    # JSON 파일 경로를 찾습니다.
    return glob.glob(f'{base_path}/**/*.json', recursive=True)

def contains_korean(text):
    return bool(re.search("[가-힣]", text))

def find_korean_values(data, result):
    if isinstance(data, dict):
        for value in data.values():
            find_korean_values(value, result)
    elif isinstance(data, list):
        for item in data:
            find_korean_values(item, result)
    elif isinstance(data, str) and contains_korean(data):
        result.append({"text": data})

def safe_load_json_file(json_file):
    with open(json_file, 'r', encoding='utf-8') as file:
        try:
            file_content = file.read()
            return json.loads(file_content)
        except json.JSONDecodeError as e:
            print(f"Error decoding JSON from {json_file}: {e}")
            return None  # 또는 적절한 오류 처리

In [5]:
def process_json_files(json_files):
    organized_data = []
    for json_file in json_files:
        data = safe_load_json_file(json_file)
        if data:  # 데이터가 성공적으로 로드된 경우에만 처리
            result = []
            find_korean_values(data, result)
            organized_data.extend(result)
    return organized_data

def save_as_jsonl(data, top_dir, target_path):
    file_name = f"{top_dir}.jsonl"
    with open(os.path.join(target_path, file_name), 'w', encoding='utf-8') as file:
        for item in data:
            json_record = json.dumps(item, ensure_ascii=False)
            file.write(json_record + '\n')


In [6]:
def main(base_path, target_path):
    top_dir_list = [dir for dir in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, dir))]

    for top_dir in top_dir_list:
        base_target_path = os.path.join(base_path, top_dir)
        extract_zip_files(base_target_path)
        json_files = find_json_files(base_target_path)
        korean_texts = process_json_files(json_files)
        save_as_jsonl(korean_texts, top_dir, target_path)
        print(top_dir, "is done!!!")

In [7]:
top_dir_list = [dir for dir in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, dir))]
print(top_dir_list)

['029.대규모 구매도서 기반 한국어 말뭉치 데이터', '030.웹데이터 기반 한국어 말뭉치 데이터', '142.한국어 지식기반 관계 데이터', '150.숫자연산 기계독해 데이터', '152.기술과학 문서 기계독해 데이터', '153.기술과학 요약 데이터', '156.전문분야 영-한, 중-한 번역 말뭉치(식품)', '157.추상 요약 사실성 검증 데이터', '160.문화, 게임 콘텐츠 분야 용어 말뭉치', '308.AI 허브 데이터 활용을 위한 기계 번역앱 구축과 번역기 평가 및 신규 말뭉치 구축', '기계독해', '도서자료 요약', '법률 지식베이스', '일반상식', '전문분야 말뭉치', '전문분야 한영 말뭉치', '한국어-영어 번역 말뭉치(기술과학)', '한국어-영어 번역 말뭉치(사회과학)', '한국어-영어 번역(병렬) 말뭉치', '한국어-일본어 번역 말뭉치', '한국어-중국어 번역 말뭉치(기술과학)', '한국어-중국어 번역 말뭉치(사회과학)']


In [8]:
# base_target_path = os.path.join(base_path, top_dir_list[1])
# extract_zip_files(base_target_path)
# json_files = find_json_files(base_target_path)
# korean_texts = process_json_files(json_files)
# save_as_jsonl(korean_texts, top_dir_list[1], target_path)

In [9]:
main(base_path, target_path)

Exception ignored in: <bound method IPythonKernel._clean_thread_parent_frames of <ipykernel.ipkernel.IPythonKernel object at 0x7f7da63f4eb0>>
Traceback (most recent call last):
  File "/home/aeolian83/anaconda3/envs/dps_for_p380/lib/python3.8/site-packages/ipykernel/ipkernel.py", line 770, in _clean_thread_parent_frames
    def _clean_thread_parent_frames(
KeyboardInterrupt: 


### (3) Change the file handling process to resolve system memory leak issues:

1) Instead of processing all text as a list, handle each JSON file separately, creating a temporary jsonl file before generating the final jsonl file.
2) Improve processing speed by parallel processing (multiprocessing) the handling of JSON files.
    - Reduced a task that took over 2 hours to under 20 minutes.
3) Handle decompression of compressed files through multiprocessing as well.
    - Reduced decompression time from 11 minutes to about 5 minutes and 40 seconds.


In [1]:
import os
import glob
import json
import zipfile
import re
from multiprocessing import Pool, current_process
import shutil
from tqdm import tqdm  # tqdm 라이브러리를 임포트

In [None]:
import os
import glob
import json
import zipfile
import re
from multiprocessing import Pool, current_process
import shutil
from tqdm import tqdm  # tqdm 라이브러리를 임포트


def extract_zip_file(zip_file_info):
    zip_file, extract_path = zip_file_info
    with zipfile.ZipFile(zip_file, "r") as zip_ref:
        zip_ref.extractall(extract_path)


def extract_all_zip_files(base_path):
    zip_files = glob.glob(f"{base_path}/**/*.zip", recursive=True)
    zip_file_info_list = [
        (zip_file, os.path.dirname(zip_file)) for zip_file in zip_files
    ]

    # ZIP 파일을 병렬로 압축 해제
    with Pool(processes=os.cpu_count()) as pool:
        list(
            tqdm(
                pool.imap(extract_zip_file, zip_file_info_list),
                total=len(zip_file_info_list),
                desc="Extracting ZIP files",
            )
        )


def find_all_json_files(base_path):
    return glob.glob(f"{base_path}/**/*.json", recursive=True)


def contains_korean(text):
    return bool(re.search("[가-힣]", text))


def find_korean_values(data, result):
    if isinstance(data, dict):
        for value in data.values():
            find_korean_values(value, result)
    elif isinstance(data, list):
        for item in data:
            find_korean_values(item, result)
    elif isinstance(data, str) and contains_korean(data):
        result.append({"text": data})


def safe_load_json_file(json_file):
    try:
        with open(json_file, "r", encoding="utf-8") as file:
            return json.load(file)
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON from {json_file}: {e}")
        return None


def process_json_file(args):
    json_file, temp_dir = args
    data = safe_load_json_file(json_file)
    temp_file_path = None  # 초기화 수정

    if data:
        result = []
        find_korean_values(data, result)
        if result:
            temp_file_path = os.path.join(
                temp_dir, f"temp_{current_process().pid}.jsonl"
            )
            with open(temp_file_path, "a", encoding="utf-8") as file:
                for item in result:
                    json_record = json.dumps(item, ensure_ascii=False)
                    file.write(json_record + "\n")

    # temp_file_path가 성공적으로 생성되었는지 확인
    if temp_file_path:
        return temp_file_path
    else:
        return None


def merge_temp_files(temp_files, final_path):
    with open(final_path, "w", encoding="utf-8") as final_file:
        for temp_file in temp_files:
            if os.path.exists(temp_file):
                with open(temp_file, "r", encoding="utf-8") as f:
                    shutil.copyfileobj(f, final_file)
                os.remove(temp_file)  # Remove temp file after merging


def main(base_path, target_path, top_dir):
    # extract_all_zip_files(base_path)
    extract_zip_files(base_path)
    json_files = find_all_json_files(base_path)
    temp_dir = os.path.join(target_path, "temp")
    os.makedirs(temp_dir, exist_ok=True)

    args_list = [(json_file, temp_dir) for json_file in json_files]

    with Pool(processes=os.cpu_count()) as pool:
        temp_files = list(
            tqdm(
                pool.imap(process_json_file, args_list),
                total=len(args_list),
                desc="Processing JSON files",
            )
        )

    final_file_name = f"{top_dir}.jsonl"
    final_path = os.path.join(target_path, final_file_name)
    merge_temp_files(set(filter(None, temp_files)), final_path)


if __name__ == "__main__":
    base_path = "/mnt/t7/dnn_data/korean_data/data"
    target_path = "/mnt/t7/dnn/llm_practicing/korean_data"

    top_dir_list = [
        dir
        for dir in os.listdir(base_path)
        if os.path.isdir(os.path.join(base_path, dir))
    ]

    for top_dir in tqdm(top_dir_list):
        base_target_path = os.path.join(base_path, top_dir)
        main(base_target_path, target_path, top_dir)
        print(top_dir, "is done!!!")
