In [None]:
# -*- coding: utf-8 -*-
import os
import json
import gzip
import argparse
from pathlib import Path
from collections import Counter, defaultdict
from typing import Iterator, Union, Dict, Any


SUPPORTED_EXTS = {".json", ".jsonl", ".ndjson", ".gz"}  # .gz는 내부 확장자까지 확인


def open_text(path: Path):
    """
    파일 오픈 헬퍼:
    - *.gz 는 gzip으로 텍스트 모드로
    - 나머지는 일반 open
    """
    if path.suffix == ".gz":
        # 내부 확장자 추정 (예: file.jsonl.gz)
        return gzip.open(path, mode="rt", encoding="utf-8", errors="ignore")
    return open(path, mode="r", encoding="utf-8", errors="ignore")


def infer_inner_ext(path: Path) -> str:
    """
    .gz 파일의 경우 내부 원본 확장자를 추정 (예: .jsonl.gz -> .jsonl)
    """
    if path.suffix != ".gz":
        return path.suffix.lower()
    name = path.name[:-3]  # remove .gz
    return Path(name).suffix.lower()


def iter_records(path: Path) -> Iterator[Union[Dict[str, Any], Any]]:
    """
    파일에서 '레코드(기사)'를 한 건씩 내보내는 제너레이터.
    - JSONL/NDJSON: 줄 단위 JSON 객체
    - JSON: 배열이면 각 요소, 딕셔너리면 단일 레코드로 간주
    - GZ: 내부 확장자에 따라 처리
    """
    ext = path.suffix.lower()
    inner_ext = infer_inner_ext(path)

    # JSONL/NDJSON (또는 .jsonl.gz / .ndjson.gz)
    if inner_ext in (".jsonl", ".ndjson"):
        with open_text(path) as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                try:
                    obj = json.loads(line)
                    yield obj
                except Exception:
                    # 손상된 라인은 건너뛰기
                    continue
        return

    # 일반 JSON (또는 .json.gz)
    if inner_ext == ".json":
        with open_text(path) as f:
            try:
                data = json.load(f)
            except Exception:
                return  # 파싱 실패 시 0건으로 간주

        if isinstance(data, list):
            for item in data:
                yield item
        elif isinstance(data, dict):
            # 단일 레코드로 취급
            yield data
        # 그 외 타입은 무시
        return

    # 지원하지 않는 확장자는 스킵
    return


def count_in_directory(root: Path):
    """
    디렉토리 아래(하위 폴더 포함) 모든 지원 파일을 스캔하여
    - 파일별 레코드 수
    - 총 레코드 수
    - category별 합계
    를 반환
    """
    per_file_counts = {}
    category_counter = Counter()
    total = 0
    errors = []

    for path in root.rglob("*"):
        if not path.is_file():
            continue

        # 확장자 필터: .json/.jsonl/.ndjson 또는 .gz(내부가 json/jsonl/ndjson)
        ext = path.suffix.lower()
        if ext not in SUPPORTED_EXTS and infer_inner_ext(path) not in (".json", ".jsonl", ".ndjson"):
            continue

        cnt = 0
        try:
            for rec in iter_records(path):
                cnt += 1
                # category 집계(문자열만 집계)
                if isinstance(rec, dict):
                    cat = rec.get("category")
                    if isinstance(cat, str) and cat.strip():
                        category_counter[cat.strip()] += 1
        except Exception as e:
            errors.append((str(path), str(e)))
            cnt = 0

        per_file_counts[str(path)] = cnt
        total += cnt

    return per_file_counts, total, category_counter, errors


def main():
    parser = argparse.ArgumentParser(description="크롤링한 기사 개수 세기 (JSON/JSONL/NDJSON, *.gz 지원)")
    parser.add_argument(
        "--root",
        type=str,
        default=r"/home/ds4_sia_nolb/#FINAL_POLARIS/01_Web Crawling/Crawling_data",
        help="스캔할 최상위 폴더 경로",
    )
    args = parser.parse_args()

    root = Path(args.root).expanduser()
    if not root.exists() or not root.is_dir():
        print(f"[에러] 폴더가 존재하지 않습니다: {root}")
        return

    per_file, total, cat_counter, errors = count_in_directory(root)

    # ── 결과 출력 ──────────────────────────────────────────────────────────
    print("=" * 80)
    print(f"[스캔 폴더] {root}")
    print("- 지원 확장자: .json, .jsonl, .ndjson, (이들의 .gz)")
    print("=" * 80)
    print("\n[파일별 개수 상위 20개]")
    for path, cnt in sorted(per_file.items(), key=lambda x: x[1], reverse=True)[:20]:
        print(f"{cnt:8d}  |  {path}")

    empty_files = [p for p, c in per_file.items() if c == 0]
    if empty_files:
        print("\n[0건 파일 수] :", len(empty_files))

    print("\n[카테고리별 합계 Top 20]")
    for cat, c in cat_counter.most_common(20):
        print(f"{c:8d}  |  {cat}")

    print("\n" + "-" * 80)
    print(f"[총 기사 개수] {total:,}")
    print("-" * 80)

    if errors:
        print("\n[파싱 오류(건너뜀)]")
        for p, msg in errors[:10]:
            print(f"- {p} :: {msg}")
        if len(errors) > 10:
            print(f"... (외 {len(errors) - 10}건)")

if __name__ == "__main__":
    main()
