Train 데이터 셋 분할

In [None]:
# import os
# import pandas as pd
# from sklearn.model_selection import train_test_split

# DATA_DIR = '/data'
# ORIGIN_TRAIN_PATH = os.path.join(DATA_DIR, 'train.csv')
# SEED = 42


# df = pd.read_csv(ORIGIN_TRAIN_PATH)
# print(f"원본 데이터 개수: {len(df)}개")

# train_val, local_test = train_test_split(
#     df, test_size=0.1, stratify=df['label'], random_state=SEED
# )

# train, valid = train_test_split(
#     train_val, test_size=0.11, stratify=train_val['label'], random_state=SEED
# )

# print(f"분할 완료: Train({len(train)}) / Valid({len(valid)}) / Local_Test({len(local_test)})")

# train.to_csv(os.path.join(DATA_DIR, 'train_fixed.csv'), index=False)
# valid.to_csv(os.path.join(DATA_DIR, 'valid_fixed.csv'), index=False)
# local_test.to_csv(os.path.join(DATA_DIR, 'local_test_fixed.csv'), index=False)

# print("데이터 분할 및 저장 완료")

특수 기호 제거 로직

In [None]:
import os
import pandas as pd
import re
import csv

# ================
#  경로 및 설정
# ================
DATA_DIR = 'make-model/data'
INPUT_FILE = 'train.csv'        # 원본 파일명 (필요시 경로 수정)
OUTPUT_FILE = 'clean_train.csv' # 저장할 파일명

# ==================
# Cleaning 함수 정의
# ==================

def clean_text(text):

    if not isinstance(text, str):
        text = str(text)

    # 1. HTML 태그 제거
    # 예: <br>, <div> -> ""
    text = re.sub(r'<[^>]+>', '', text)

    # 2. 한자 제거
    # 예: 韓國 -> ""
    text = re.sub(r'[\u4E00-\u9FFF]', '', text)

    # 3. 빈 괄호 및 내용 없는 괄호 패턴 제거
    # 예: (), (  ) -> ""
    text = re.sub(r'\(\s*[^\w가-힣]*\s*\)', '', text)

    # 4. ( ? ~ ? ) 형태의 특정 노이즈 패턴 제거
    # 예: (?~?), (~?) -> ""
    text = re.sub(r'\([^\(\)]{0,20}[\?\~]{1,3}[^\(\)]{0,20}\)', '', text)

    # 5. 반복되는 점(...) 정제 -> 점(.) 하나로 변경
    # 무조건 삭제하면 문장의 끝을 알 수 없으므로 축소함
    text = re.sub(r'[.,]{3,}', '.', text)

    # 6. 반복되는 쉼표(,,,) 정제 -> 쉼표(,) 하나로 변경
    text = re.sub(r',\s*,+', ',', text)

    # 7. 중복 괄호((())) 정제 -> 괄호 하나로 변경
    # 여는 괄호 중복
    text = re.sub(r'\({2,}', '(', text)
    # 닫는 괄호 중복
    text = re.sub(r'\){2,}', ')', text)

    # 8. 다중 공백 제거 (하나의 공백으로)
    text = re.sub(r'\s+', ' ', text).strip()

    return text

def load_data(path):
    if not os.path.exists(path):
        print(f"파일이 존재하지 않음: {path}")
        return None
    
    encodings = ['utf-8', 'utf-8-sig', 'cp949']
    df = None
    
    for enc in encodings:
        try:
            df = pd.read_csv(path, encoding=enc)
            print(f"로드 성공 ({enc}): {path}")
            break
        except:
            continue
            
    if df is None:
        print("데이터 로드 실패")
    return df

def main():
    input_path = os.path.join(DATA_DIR, INPUT_FILE)
    df = load_data(input_path)
    
    if df is None:
        return

    text_col = 'full_text' 
    if 'full_text' not in df.columns:
        for candidate in ['text', 'paragraph_text', 'content']:
            if candidate in df.columns:
                text_col = candidate
                break
    
    print(f"텍스트 컬럼 감지됨: {text_col}")
    print("데이터 정제 시작...")

    df[text_col] = df[text_col].apply(clean_text)

    initial_len = len(df)
    df = df[df[text_col].str.strip() != ""]
    removed_count = initial_len - len(df)
    
    if removed_count > 0:
        print(f"정제 후 빈 문자열이 된 {removed_count}개 행을 제거함.")

    output_path = os.path.join(DATA_DIR, OUTPUT_FILE)
    
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    
    df.to_csv(output_path, index=False, encoding='utf-8-sig')
    print(f"\n정제 완료! 저장된 파일 경로: {output_path}")
    print(f"총 데이터 개수: {len(df)}")

if __name__ == "__main__":
    main()

4fold용 데이터셋 분할
1. train -> 0.9 : 0.1(train / test)
2. 4fold 0.75 : 0.25(train / valid)

In [None]:
import os
import pandas as pd
import csv
import numpy as np
from sklearn.model_selection import StratifiedKFold, train_test_split


# 입력 파일 경로(origin)
INPUT_FILE_PATH = 'make-model/data/orgin/clean_train.csv'

# 출력 루트 경로
OUTPUT_ROOT_DIR = 'make-model/data/fold'

# 저장될 파일 이름 설정(변수 앞에 베이스가 되는 데이터 이름 지정)
OUTPUT_TEST_FILENAME = 'local_origin_test.csv' 
OUTPUT_TRAIN_FILENAME = 'origin_train.csv'       
OUTPUT_VALID_FILENAME = 'origin_valid.csv'       

SEED = 42
detected_delimiter = ','
detected_quotechar = '"'




def find_column_name(columns, candidates):
    for col in columns:
        if col.lower().strip() in candidates:
            return col
    return None

def load_and_fix_data(path, is_test=False):
    if not os.path.exists(path):
        print(f"파일이 없음: {path}")
        return None

    df = None
    encodings_to_try = ['utf-8-sig', 'utf-8', 'cp949']

    for encoding in encodings_to_try:
        try:
            df = pd.read_csv(
                path,
                encoding=encoding,
                engine='python',
                on_bad_lines='skip',
                encoding_errors='ignore',
                delimiter=detected_delimiter,
                quotechar=detected_quotechar,
                quoting=csv.QUOTE_MINIMAL
            )
            break
        except Exception as e:
            df = None

    if df is None:
        print(f"데이터 로드 실패: {path}")
        return None

    text_candidates = ['paragraph_text', 'text', 'sentence', 'content', 'full_text']
    text_col = find_column_name(df.columns, text_candidates)
    if text_col:
        df.rename(columns={text_col: 'text'}, inplace=True)
    else:
        obj_cols = df.select_dtypes(include=['object']).columns
        if len(obj_cols) > 0:
            df.rename(columns={obj_cols[0]: 'text'}, inplace=True)
        else:
            return None

    if is_test:
        id_candidates = ['id', 'idx', 'index', 'no', 'ID']
        id_col = find_column_name(df.columns, id_candidates)
        if id_col:
            df.rename(columns={id_col: 'id'}, inplace=True)
        else:
            df['id'] = df.index

    if not is_test:
        target_candidates = ['generated', 'label', 'target', 'class']
        target_col = find_column_name(df.columns, target_candidates)
        if target_col:
            df.rename(columns={target_col: 'label'}, inplace=True)
            try:
                df['label'] = df['label'].astype(int)
            except:
                pass
        else:
            print("타겟(Label) 컬럼을 찾을 수 없습니다.")
            return None

    return df



def create_split_and_kfold(input_path, output_root, n_splits=4, test_size=0.1):
    print(f">>> 데이터 처리 시작")
    print(f"    입력: {input_path}")
    print(f"    출력 루트: {output_root}")
    print(f"    파일명 설정: Train[{OUTPUT_TRAIN_FILENAME}], Valid[{OUTPUT_VALID_FILENAME}], Test[{OUTPUT_TEST_FILENAME}]")
    
    # 1. 원본 데이터 로드
    df = load_and_fix_data(input_path, is_test=False)
    
    if df is None:
        print("데이터 로드 실패로 작업을 중단")
        return

    print(f" - 원본 데이터 개수: {len(df)}개")

    # 출력 루트 폴더 생성
    if not os.path.exists(output_root):
        os.makedirs(output_root, exist_ok=True)

    # 2. [Test Set 분리]
    print(f"\n[Step 1] Test Set 분리 (비율: {test_size})")
    dev_df, test_df = train_test_split(
        df, 
        test_size=test_size, 
        stratify=df['label'], 
        random_state=SEED,
        shuffle=True
    )
    
    # Test Set 저장 (변수로 지정한 파일명 사용)
    test_save_path = os.path.join(output_root, OUTPUT_TEST_FILENAME)
    test_df.to_csv(test_save_path, index=False)
    
    print(f" -> Test Set 저장 완료: {test_save_path}")
    print(f"    (Size: {len(test_df)}, Label 0: {(test_df['label']==0).sum()}, Label 1: {(test_df['label']==1).sum()})")

    # 3. [K-Fold 분할 및 폴더별 저장]
    print(f"\n[Step 2] {n_splits}-Fold Cross Validation 데이터 생성")
    
    dev_df = dev_df.reset_index(drop=True)
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=SEED)

    for fold, (train_idx, val_idx) in enumerate(skf.split(dev_df, dev_df['label'])):
        fold_train_df = dev_df.iloc[train_idx]
        fold_val_df = dev_df.iloc[val_idx]
        
        # 폴더 경로 생성 (fold0, fold1 ...)
        fold_dir_name = f'fold{fold}'
        current_fold_dir = os.path.join(output_root, fold_dir_name)
        
        if not os.path.exists(current_fold_dir):
            os.makedirs(current_fold_dir, exist_ok=True)
        
        # 파일 저장 (변수로 지정한 파일명 사용)
        train_save_path = os.path.join(current_fold_dir, OUTPUT_TRAIN_FILENAME)
        val_save_path = os.path.join(current_fold_dir, OUTPUT_VALID_FILENAME)
        
        fold_train_df.to_csv(train_save_path, index=False)
        fold_val_df.to_csv(val_save_path, index=False)
        
        t_0 = (fold_train_df['label'] == 0).sum()
        t_1 = (fold_train_df['label'] == 1).sum()
        v_0 = (fold_val_df['label'] == 0).sum()
        v_1 = (fold_val_df['label'] == 1).sum()
        
        print(f" -> [{fold_dir_name}] 저장 완료")
        print(f"      Train: {OUTPUT_TRAIN_FILENAME} ({len(fold_train_df)}개)")
        print(f"      Valid: {OUTPUT_VALID_FILENAME} ({len(fold_val_df)}개)")

    print("\n>>> 모든 작업이 완료되었습니다.")

if __name__ == "__main__":
    create_split_and_kfold(
        input_path=INPUT_FILE_PATH, 
        output_root=OUTPUT_ROOT_DIR, 
        n_splits=4, 
        test_size=0.1
    )

데이터 축소 작업