In [None]:
# (필수) 이 셀/파일에서 직접 실행한다면 필요한 모듈 임포트
from pathlib import Path                 # 파일/디렉터리 경로를 객체지향적으로 다루기 위한 표준 라이브러리
from typing import List                  # 타입 힌트용 List
import numpy as np                       # 수치 연산 라이브러리(아래에서 np.number 등 dtype 판별에 사용)
import pandas as pd                      # 데이터프레임 처리 라이브러리

# ============================================================================
# 셀 2: Football 데이터 EDA - 통합 분석 (각 테이블별로 모든 분석 수행)
# ============================================================================

def _project_root() -> Path:
    cwd = Path.cwd()                                                         # 현재 작업 디렉터리를 Path 객체로 획득
    if (cwd.parents / "data" / "raw").exists():                                      # 현재 폴더 안에 data/raw 폴더가 존재하면(프로젝트 루트로 판단)
        return cwd.parents                                                            # 그 현재 경로를 프로젝트 루트로 반환           # 그렇지 않으면 지정한 절대 경로를 루트로 사용(Windows 경로는 r'' 권장)

def _memory_mb(df: pd.DataFrame) -> float:
    return float(df.memory_usage(deep=True).sum() / 1024**2)                 # DataFrame 전체 메모리 사용량(byte)을 MB로 변환하여 반환(deep=True: 객체 열까지 정확 계산)

def _analyze_data_leakage_risks(df: pd.DataFrame, table_name: str) -> List[str]:
    """데이터 누수 위험 요소들을 분석합니다."""                              # 함수 설명: EDA 중 '누수 위험' 후보를 텍스트로 리포트
    risks = []                                                               # 누수 위험 메시지를 담을 리스트 초기화
    
    # 1. 미래 정보 포함 가능성 (날짜 컬럼 분석)
    date_cols = [col for col in df.columns                                   # 모든 컬럼명을 훑어보며
                 if any(keyword in col.lower()                               # 소문자화한 이름에 특정 키워드가 포함되면
                        for keyword in ['date', 'time', 'at', 'created', 'updated'])]
    if date_cols:                                                             # 날짜/시간 관련 컬럼이 하나라도 있으면
        risks.append(f"날짜/시간 컬럼 존재: {date_cols} - 미래 정보 누수 위험")  # 미래 시점 정보가 포함될 가능성 경고(리드미성 텍스트)

    # 2. 고유값 비율이 너무 높은 컬럼 (식별자 가능성)
    for col in df.columns:                                                    # 모든 컬럼에 대해
        if df[col].dtype == 'object':                                         # object(문자열/혼합형) 타입인 경우
            unique_ratio = df[col].nunique() / len(df)                        # 유니크한 값의 개수 비율(레코드 수로 나눔)
            if unique_ratio > 0.95:                                           # 유니크 비율이 95% 초과 → 거의 식별자(고유키)일 가능성
                risks.append(f"고유값 비율 높음 ({unique_ratio:.2%}): {col} - 식별자 가능성")

    # 3. 결측값 패턴 분석
    missing_pattern = df.isnull().sum()                                       # 각 컬럼별 결측치 개수 시리즈
    if missing_pattern.sum():                                                 # 전체 결측치가 하나 이상이면
        high_missing_cols = missing_pattern[missing_pattern > len(df) * 0.5]  # 결측률 50% 초과 컬럼만 필터링
        high_missing_cols = high_missing_cols.index.tolist()                  # 컬럼명 리스트로 변환
        if high_missing_cols:                                                 # 하나라도 있으면
            risks.append(f"높은 결측률 컬럼: {high_missing_cols} - 데이터 품질 이슈")  # 품질 이슈 경고 추가
    
    return risks                                                              # 누수 위험 목록 반환

# 설정
PROJECT_ROOT = _project_root()                                                # 위에서 정의한 프로젝트 루트 경로 계산(동적/고정 분기)
DATA_CURATED_DIR = PROJECT_ROOT / "data" / "curated"                          # curated CSV들이 모여 있는 디렉터리 경로 생성(Path 연산자로 결합)
NROWS = None                                                                  # CSV 로딩시 읽을 최대 행 수(None=전체, 대용량이면 1_000_000 등으로 샘플링)
MAX_UNIQUE = 25                                                               # 저카디널리티 판단 기준(유니크 개수가 25 이하면 Low-cardinal로 간주)

# CSV 파일 자동 탐색
csv_map = {p.stem: p for p in sorted(DATA_CURATED_DIR.glob("*.csv"))}         # data/curated 폴더에서 *.csv 파일을 찾아 {파일이름(확장자제외): Path} 매핑 구성

print("=" * 80)                                                               # 구분선 출력(가독성)
print("📊 FOOTBALL 데이터 EDA - 통합 분석 결과")                                 # 제목 출력
print("=" * 80)                                                               # 구분선

# 1. 발견된 파일 목록
print("\n📁 발견된 CSV 파일 목록:")                                            # 섹션 제목
print("-" * 50)                                                               # 소제목 구분선
if not csv_map:                                                               # 찾은 CSV가 하나도 없으면
    print("❌ data/curated 에 CSV 파일이 없습니다.")                           # 경고 메시지 출력
else:
    for i, (name, path) in enumerate(csv_map.items(), 1):                     # 발견된 CSV들을 인덱스와 함께 순회
        print(f"{i:2d}. {name:<20} -> {path}")                                # 번호, 파일 스템, 전체 경로 출력(정렬 서식 적용)

# 2. 각 테이블별 통합 분석 (상세분석 + 저카디널리티 + 데이터 누수 위험)
print("\n" + "=" * 80)                                                        # 큰 구분선(줄바꿈 + 줄긋기)
print("📋 테이블별 통합 분석")                                               # 섹션 제목
print("=" * 80)                                                               # 구분선

all_tables_info = {}                                                          # 테이블별 요약 정보를 저장할 딕셔너리(키: 테이블명, 값: 요약 dict)
data_leakage_risks = {}                                                       # 테이블별 누수 위험 목록 저장 딕셔너리

for name, path in csv_map.items():                                            # 각 CSV(=각 테이블)마다 반복
    try:
        print(f"\n{'='*80}")                                                  # 테이블별 시작 구분선
        print(f"🔍 [{name.upper()}] 테이블 통합 분석")                          # 현재 분석 중인 테이블 이름을 대문자로 표시
        print(f"{'='*80}")                                                    # 구분선
        
        df = pd.read_csv(path, nrows=NROWS, low_memory=True)                  # CSV 로딩(대용량 대비 low_memory=True: dtype 추론을 지연/부분화하여 메모리 절약)

        # ========================================
        # 2. 테이블 상세 분석
        # ========================================
        print(f"\n📊 2. 테이블 상세 분석")                                      # 소제목
        print("-" * 50)                                                       # 구분선
        
        # 기본 정보
        info = {                                                              
            'rows': len(df),                                                  # 행 수
            'cols': df.shape[1],                                              # 열 수
            'memory_mb': _memory_mb(df),                                      # 메모리 사용량(MB)
            'missing_total': int(df.isnull().sum().sum()),                    # 전체 결측치 총합(정수)
            'missing_rate': float(df.isnull().sum().sum() / (len(df) * len(df.columns))), # 전체 결측 비율(결측 총합 / 전체 셀 개수)
            'numeric_cols': len(df.select_dtypes(include=[np.number]).columns),            # 수치형 컬럼 개수(np.number 기반)
            'categorical_cols': len(df.select_dtypes(include=['object', 'category']).columns),  # 범주형 컬럼 개수(object/category)
            'date_cols': [col for col in df.columns                           # 날짜/시간 후보 컬럼명 리스트(키워드 기반 단순 탐지)
                          if any(keyword in col.lower() for keyword in ['date', 'time', 'at'])],
            'id_cols': [col for col in df.columns                             # 식별자 후보 컬럼명 리스트(키워드 기반: id/key/uuid 포함)
                        if any(keyword in col.lower() for keyword in ['id', 'key', 'uuid'])],
        }
        all_tables_info[name] = info                                          # 현재 테이블의 요약 정보를 딕셔너리에 저장(후속 집계/보고용)
        
        print(f"📊 크기: {info['rows']:,}행, {info['cols']}열 ({info['memory_mb']:.1f}MB)")   # 행/열/메모리 사용량 출력(천단위 구분자 포함)
        print(f"📈 데이터 타입: 수치형 {info['numeric_cols']}개, 범주형 {info['categorical_cols']}개")  # 타입 개수 요약
        print(f"❌ 결측값: {info['missing_total']:,}개 ({info['missing_rate']:.2%})")         # 결측 총합과 전체 비율 출력
        
        if info['date_cols']:                                                 # 날짜 관련 컬럼이 있으면
            print(f"📅 날짜 컬럼: {info['date_cols']}")                        # 목록 출력
        if info['id_cols']:                                                   # 식별자 관련 컬럼이 있으면
            print(f"🔑 ID 컬럼: {info['id_cols']}")                           # 목록 출력
        
        # 컬럼 정보 
        print(f"\n📋 컬럼 목록 :")                                             # 컬럼 상세 목록 섹션
        for i, col in enumerate(df.columns[:], 1):                            # 모든 컬럼을 번호와 함께 순회
            dtype = str(df[col].dtype)                                        # 해당 컬럼의 dtype 문자열
            nunique = df[col].nunique()                                       # 유니크 값 개수
            print(f"  {i:2d}. {col:<25} ({dtype:<10}) - 유니크: {nunique:,}") # 번호, 컬럼명(정렬), dtype, 유니크 개수 출력
        
        # 샘플 데이터
        print(f"\n📄 샘플 데이터 (상위 3행):")                                  # 샘플 출력 섹션 제목
        print(df.head(3).to_string())                                         # 상위 3행을 표 형태로 보기 좋게 출력(to_string)
        
        # 결측값 상세
        missing_info = df.isnull().sum()                                      # 컬럼별 결측치 개수
        missing_info = missing_info[missing_info > 0].sort_values(ascending=False)  # 결측치가 있는 컬럼만 내림차순 정렬
        if not missing_info.empty:                                            # 결측이 하나라도 있으면
            print(f"\n⚠️  결측값 상세:")                                         # 경고 아이콘과 함께 섹션 제목
            for col, count in missing_info.head(5).items():                   # 결측 상위 5개 컬럼만 출력(너무 많을 때 요약)
                rate = count / len(df) * 100                                  # 해당 컬럼의 결측률(%)
                print(f"  {col:<25}: {count:,}개 ({rate:.1f}%)")              # 컬럼명, 결측 개수, 결측률 표시
        else:
            print(f"\n✅ 결측값 없음")                                           # 결측치가 전혀 없음을 표시
        
        # ========================================
        # 3. 저카디널리티 컬럼 분석
        # ========================================
        print(f"\n🏷️ 3. Low 카디널리티 컬럼 분석 (유니크 ≤ {MAX_UNIQUE})")     # 저카디널리티 분석 섹션 제목
        print("-" * 50)                                                       # 구분선
        
        findings = []                                                          # 발견 항목을 저장할 리스트 초기화
        for col in df.columns:                                                # 모든 컬럼을 순회하면서
            nunique = int(df[col].nunique(dropna=True))                       # NaN 제외 유니크 개수 계산
            if nunique <= MAX_UNIQUE:                                         # 기준 이하(=저카디널리티)면
                vals = list(pd.Series(df[col].dropna().unique()).astype("string"))  # 유니크 값들을 문자열로 변환하여 리스트화
                try:
                    vals = sorted(vals, key=lambda x: (x is None, str(x)))    # 정렬 가능한 경우 정렬(값/None 순)
                except Exception:
                    pass                                                       # 정렬 실패(서로 비교 불가 타입 혼합 등) 시 그냥 통과
                preview = ", ".join([str(v) for v in vals[:MAX_UNIQUE]])      # 앞부분만 미리보기 문자열로 합침
                findings.append((col, str(df[col].dtype), nunique, preview))  # (컬럼명, dtype, 유니크 개수, 값 미리보기) 튜플 저장
        
        if findings:                                                           # 발견된 저카디널리티 컬럼이 있으면
            print(f"📌 저카디널리티 컬럼 발견:")                                 # 안내 문구 출력
            for col, dtype_str, nunique, preview in findings:                 # 각 항목을 순회하며
                print(f"  {col:<25} ({dtype_str:<10}) nunique={nunique:2d} -> {preview}")  # 상세 정보 출력
        else:
            print(f"✅ 유니크 ≤ {MAX_UNIQUE}인 컬럼이 없습니다.")                # 해당 없음 표시
        
        # ========================================
        # 4. 데이터 누수 위험 분석
        # ========================================
        print(f"\n⚠️  4. 데이터 누수 위험 분석")                                 # 누수 위험 섹션 제목
        print("-" * 50)                                                       # 구분선
        
        risks = _analyze_data_leakage_risks(df, name)                         # 위에서 정의한 함수로 현재 테이블의 누수 위험 후보 텍스트 목록 획득
        data_leakage_risks[name] = risks                                      # 딕셔너리에 저장(테이블명→위험목록)
        
        if risks:                                                             # 하나라도 위험 후보가 있으면
            print(f"🚨 위험 요소 발견:")                                         # 경고 아이콘 출력
            for i, risk in enumerate(risks, 1):                               # 번호와 함께 나열
                print(f"  {i}. {risk}")                                       # 각 위험 요소 설명 출력
        else:
            print(f"✅ 위험 요소 없음")                                          # 위험 요소가 없음을 표시
            
    except Exception as e:                                                    # CSV 로드나 처리 중 오류 발생 시
        print(f"\n❌ [{name}] 로드 실패: {e}")                                 # 어떤 테이블에서 실패했는지와 에러 메시지 출력


In [None]:
import easydict

args = easydict.EasyDict()

# curated 데이터 경로 설정
args.default_data_path = "./data/curated/"
args.player_final_file = "player_final.csv"

# 데이터 로드
df = pd.read_csv(args.default_data_path + args.player_final_file)

print(f"✅ player_final.csv 로드 완료!")
print(f"📊 데이터 크기: {df.shape[0]:,}행, {df.shape[1]}열")
print(f"📋 컬럼 목록:")
for i, col in enumerate(df.columns, 1):
    print(f"  {i:2d}. {col}")