In [2]:
# merchant_code 중복 있는지 확인
import pandas as pd

path = "./_merged_csv/staging_all.utf8.csv"  # 경로 맞게 수정
for enc in ("utf-8-sig", "cp949"):
    try:
        df = pd.read_csv(path, dtype=str, encoding=enc)
        break
    except Exception:
        pass

dups = df["merchant_code"].duplicated(keep=False)       # 중복 전체 표시
print("총 행:", len(df), " / 중복 코드가 있는 행:", dups.sum())

# 중복 코드 목록 + 건수 (상위 20개 보기)
vc = df["merchant_code"].value_counts()
print(vc[vc > 1].head(20))

# 중복된 행 샘플 10개 보기
print(df[dups].head(10))


총 행: 13471842  / 중복 코드가 있는 행: 0
Series([], Name: count, dtype: int64)
Empty DataFrame
Columns: [region, source_db, merchant_code, name, address, ref_date, raw_category, category]
Index: []


In [3]:
# -*- coding: utf-8 -*-

import csv
from pathlib import Path

def split_csv_by_size(infile: str, target_mb: int = 5, encoding: str = "utf-8-sig"):
    """
    대용량 CSV를 용량 기준으로 여러 개의 파일로 분할합니다.

    Parameters
    ----------
    infile : str
        분할할 원본 CSV 경로(상대/절대경로 모두 가능).
        예: "./_merged_csv/staging_all.utf8.csv"
    target_mb : int, default=5
        각 분할 파일의 목표 용량(MB). 정확히 딱 맞추는 건 아니고,
        파일 포인터 크기 기준으로 '이상'이 되는 시점에 새 파일을 엽니다.
    encoding : str, default="utf-8-sig"
        입력/출력 모두 동일 인코딩으로 사용합니다. (윈도우 호환을 위해 기본값 BOM 포함)

    Output
    ------
    <원본과 같은 폴더>/staging_all_split/ 아래에
    "staging_all.part001.csv", "staging_all.part002.csv", ... 형태로 생성됩니다.
    각 파트에는 항상 헤더를 포함하며, 줄바꿈은 CRLF로 고정합니다.

    Notes
    -----
    - 분할 기준은 파일 핸들의 현재 크기(f_out.tell())를 사용합니다.
      한 줄이 써진 뒤 threshold를 넘으면 즉시 다음 파일로 넘어갑니다.
    - 헤더는 모든 파트의 첫 줄에 다시 기록됩니다.
    - 출력 인코딩 또한 encoding 파라미터를 따릅니다.
    """

    in_path = Path(infile)
    if not in_path.exists():
        raise FileNotFoundError(f"입력 파일이 없습니다: {in_path}")

    # 출력 폴더: 원본 CSV와 동일 폴더 아래 'staging_all_split'
    out_dir = in_path.parent / "staging_all_split"
    out_dir.mkdir(parents=True, exist_ok=True)

    target_bytes = target_mb * 1024 * 1024  # MB → byte
    part = 1                 # part 번호(001부터 시작)
    total_rows = 0           # 전체 데이터(헤더 제외) 누적 카운트
    writer = None            # 현재 csv.writer
    f_out = None             # 현재 출력 파일 핸들
    header = None            # 헤더 행 보관

    def new_part():
        """새로운 파트 파일을 열고 헤더를 다시 기록하는 헬퍼 함수."""
        nonlocal part, writer, f_out, header
        # 이전 파일이 열려 있으면 닫기
        if f_out:
            f_out.close()
        out_path = out_dir / f"staging_all.part{part:03d}.csv"
        # newline=""로 열고, writer에서 lineterminator를 CRLF로 강제 설정
        f_out = out_path.open("w", encoding=encoding, newline="")
        writer = csv.writer(f_out, lineterminator="\r\n")
        # 첫 줄: 헤더 재작성
        writer.writerow(header)
        part += 1

    # 입력 파일 오픈
    with in_path.open("r", encoding=encoding, newline="") as f_in:
        reader = csv.reader(f_in)
        # 첫 줄은 헤더
        try:
            header = next(reader)
        except StopIteration:
            # 빈 CSV라면 바로 종료
            print("[INFO] 빈 CSV 입니다. 작업 없음.")
            return

        # 첫 번째 파트 생성
        new_part()

        # 데이터 행을 순차 기록
        for row in reader:
            writer.writerow(row)
            total_rows += 1

            # 현재 파일 크기가 기준을 넘으면 새 파트 시작
            if f_out.tell() >= target_bytes:
                new_part()

    # 마지막 파일 핸들 정리
    if f_out:
        f_out.close()

    print(f"[OK] 총 {total_rows:,}행을 ~{target_mb}MB 기준으로 분할 저장: {out_dir}")

if __name__ == "__main__":
    # 상대경로 예시: 현재 스크립트 위치 기준
    # 필요시 target_mb를 더 작게/크게 조절하세요.
    split_csv_by_size("./_merged_csv/staging_all.utf8.csv", target_mb=5)


[OK] 총 13,471,842행을 ~5MB 기준으로 분할 저장: _merged_csv\staging_all_split


In [4]:
# -*- coding: utf-8 -*-
"""
staging_all.partXXX.csv 분할 파일들을 '빠르게' 병렬로 MySQL에 적재합니다.

핵심 최적화
- executemany 대신 'INSERT ... VALUES (..),(..),(..)' 멀티-VALUES로 전송 → 왕복 횟수↓
- 파일당 1회 커밋(배치 플러시만) → COMMIT 오버헤드↓
- 적절한 스레드 수(기본 3) → InnoDB 경합 완화

규약
- CSV 헤더: region, source_db, merchant_code, name, address, ref_date, raw_category, category
- DB 컬럼 순서: id(AI), merchant_code, name, category(JSON), raw_category, region, address, source_date, source_db, ingested_at
- INSERT(IGNORE 아님): merchant_code UNIQUE 위반 시 에러 → 파일 .FAIL 생성
- 성공 시 .DONE 생성(재실행 시 스킵)
"""

import os, csv, json, time, glob, traceback
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from sshtunnel import SSHTunnelForwarder
import pymysql

# ===== SSH =====
SSH_HOST = "j13b108.p.ssafy.io"
SSH_PORT = 22
SSH_USER = "ubuntu"
SSH_KEY  = r"C:\Users\SSAFY\Desktop\j13B108T.pem"

# ===== MySQL (터널 기준 내부 주소/포트) =====
DB_HOST_INSIDE_SSH = "127.0.0.1"
DB_PORT_INSIDE_SSH = 3306
DB_USER = "admin"
DB_PASS = "dnxlachl"
DB_NAME = "walletslotdb"

# ===== CSV 분할 파일 폴더(상대경로) =====
SPLIT_DIR = Path("./_merged_csv/staging_all_split")
PATTERN   = "staging_all.part*.csv"

# ===== 배치/병렬 설정 =====
BATCH_SIZE  = 10000     # 10k~20k 권장 (max_allowed_packet 상황에 맞춰 조절)
MAX_WORKERS = 3         # 2~3 추천(과다 동시성은 오히려 느려짐)

# ===== 테이블 생성 =====
CREATE_TABLE_SQL = """
create table if not exists staging_merchants (
  id            bigint unsigned auto_increment primary key,
  merchant_code varchar(32) not null,
  name          varchar(255) null,
  category      json         null,
  raw_category  text         null,
  region        varchar(32)  null,
  address       varchar(255) null,
  source_date   varchar(32)  null,
  source_db     varchar(16)  not null,
  ingested_at   timestamp not null default current_timestamp,
  unique key uk_merchant_code (merchant_code),
  key idx_source_db (source_db),
  key idx_region (region),
  key idx_ingested_at (ingested_at)
) engine=innodb default charset=utf8mb4;
"""

# 멀티-VALUES INSERT 프리픽스/플레이스홀더
INSERT_PREFIX = """
insert into staging_merchants
(merchant_code, name, category, raw_category, region, address, source_date, source_db, ingested_at)
values
"""
ROW_PH = "(%s,%s,CAST(%s AS JSON),%s,%s,%s,%s,%s,CURRENT_TIMESTAMP)"

def sanitize_category(s: str) -> str:
    """category 문자열을 JSON 배열로 보정."""
    if not s or s.strip() == "":
        return "[]"
    try:
        j = json.loads(s)
        return json.dumps(j, ensure_ascii=False)
    except Exception:
        return "[]"

def iter_rows_from_csv(path: str):
    """
    CSV 헤더는 고정:
    region, source_db, merchant_code, name, address, ref_date, raw_category, category
    INSERT 순서:
    (merchant_code, name, category, raw_category, region, address, source_date, source_db)
    """
    with open(path, "r", encoding="utf-8-sig", newline="") as f:
        r = csv.DictReader(f)
        for row in r:
            mc = (row.get("merchant_code") or "").strip()
            if not mc:
                continue
            name        = (row.get("name") or "").strip() or None
            category    = sanitize_category(row.get("category"))
            raw_cat     = (row.get("raw_category") or "").strip() or None
            region      = (row.get("region") or "").strip() or None
            address     = (row.get("address") or "").strip() or None
            source_date = (row.get("ref_date") or "").strip() or None
            source_db   = (row.get("source_db") or "").strip() or None
            yield (mc, name, category, raw_cat, region, address, source_date, source_db)

def mark(path: str, suffix: str, content: str = ""):
    """path+'.SUFFIX' 마커 파일 생성"""
    m = f"{path}.{suffix}"
    with open(m, "w", encoding="utf-8") as f:
        if content:
            f.write(content)

def has_mark(path: str, suffix: str) -> bool:
    return os.path.exists(f"{path}.{suffix}")

def exec_batch_multi_values(cur, batch):
    """
    batch: [(mc, name, category_json, raw, region, addr, date, src), ...]
    멀티-VALUES 한 번에 전송
    """
    sql = INSERT_PREFIX + ",".join([ROW_PH] * len(batch))
    args = []
    for rec in batch:
        args.extend(rec)
    cur.execute(sql, args)

def load_one_file(local_port: int, csv_path: str):
    """
    한 파일을 멀티-VALUES 배치로 INSERT.
    반환: (파일명, 읽은행, 삽입행, 초, 상태)
    """
    if has_mark(csv_path, "DONE"):
        return (os.path.basename(csv_path), 0, 0, 0.0, "SKIP_DONE")

    t0 = time.time()
    conn = None
    n_rows = n_ins = 0
    status = "OK"
    try:
        conn = pymysql.connect(
            host="127.0.0.1",
            port=local_port,
            user=DB_USER,
            password=DB_PASS,
            database=DB_NAME,
            charset="utf8mb4",
            autocommit=False,   # 파일당 1회 커밋
            read_timeout=600,
            write_timeout=600,
        )
        with conn.cursor() as cur:
            # 타임아웃만 늘림 (unique_checks=1 유지: 중복 차단 보장)
            cur.execute("set session net_write_timeout=600, net_read_timeout=600")
            cur.execute("set session wait_timeout=28800, interactive_timeout=28800")
            cur.execute("set session foreign_key_checks=0")

            batch = []
            for rec in iter_rows_from_csv(csv_path):
                batch.append(rec)
                n_rows += 1
                if len(batch) >= BATCH_SIZE:
                    exec_batch_multi_values(cur, batch)
                    n_ins += cur.rowcount
                    batch.clear()

            if batch:
                exec_batch_multi_values(cur, batch)
                n_ins += cur.rowcount
                batch.clear()

            # 파일 단위 1회 커밋
            conn.commit()

        mark(csv_path, "DONE")
        if has_mark(csv_path, "FAIL"):
            try: os.remove(f"{csv_path}.FAIL")
            except Exception: pass

    except Exception as e:
        status = f"ERR: {type(e).__name__}: {e}"
        mark(csv_path, "FAIL", content=traceback.format_exc())
        if conn:
            try: conn.rollback()
            except Exception: pass
    finally:
        if conn:
            conn.close()

    return (os.path.basename(csv_path), n_rows, n_ins, time.time() - t0, status)

def main():
    files = sorted(glob.glob(str(SPLIT_DIR / PATTERN)))
    if not files:
        print(f"[INFO] 분할 파일 없음: {SPLIT_DIR}\\{PATTERN}")
        return

    files = [fp for fp in files if not has_mark(fp, "DONE")]
    if not files:
        print("[INFO] 처리할 신규 파일 없음 (.DONE 모두 존재)")
        return

    with SSHTunnelForwarder(
        (SSH_HOST, SSH_PORT),
        ssh_username=SSH_USER,
        ssh_pkey=SSH_KEY,
        remote_bind_address=(DB_HOST_INSIDE_SSH, DB_PORT_INSIDE_SSH),
    ) as tunnel:
        tunnel.start()
        local_port = tunnel.local_bind_port
        print(f"[SSH] 127.0.0.1:{local_port} -> {DB_HOST_INSIDE_SSH}:{DB_PORT_INSIDE_SSH}")

        # 테이블 보장
        conn = pymysql.connect(
            host="127.0.0.1",
            port=local_port,
            user=DB_USER,
            password=DB_PASS,
            database=DB_NAME,
            charset="utf8mb4",
            autocommit=True,
        )
        with conn.cursor() as cur:
            cur.execute(CREATE_TABLE_SQL)
        conn.close()

        total_rows = total_ins = 0
        t_all = time.time()

        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
            futs = [ex.submit(load_one_file, local_port, fp) for fp in files]
            for fut in as_completed(futs):
                fn, n_rows, n_ins, secs, status = fut.result()
                total_rows += n_rows
                total_ins  += n_ins
                print(f"[DONE] {Path(fn).name:>24s} | 읽음 {n_rows:>8,} / 삽입 {n_ins:>8,} | {secs:5.1f}s | {status}")

        print(f"\n✅ 전체 합계: 읽음 {total_rows:,} / 삽입 {total_ins:,} | 총 {time.time()-t_all:.1f}s")

if __name__ == "__main__":
    main()


[INFO] 처리할 신규 파일 없음 (.DONE 모두 존재)
