In [None]:
import os
import re
import pandas as pd
import numpy as np

from pathlib import Path
from datetime import datetime


In [None]:
# ----------------------------------------------
# 경로 설정
# ----------------------------------------------

def find_project_root() -> Path:
    p = Path.cwd()

    for parent in [p] + list(p.parents):
        if (parent / "data").exists() and (parent / "notebooks").exists():
            return parent

    return p

PROJECT_ROOT = find_project_root()

IN_PATH = PROJECT_ROOT / "data" / "raw" / "kaggle" / "youtube_2025_channels" / "youtube_channel_info_v2.csv"
OUT_DIR = PROJECT_ROOT / "data" / "processed"
OUT_DIR.mkdir(parents=True, exist_ok=True)
OUT_PATH = OUT_DIR / "channels_clean.csv"

print("IN_PATH :", IN_PATH)
print("OUT_PATH:", OUT_PATH)

if not IN_PATH.exists():
    raise FileNotFoundError(f"채널 데이터가 없습니다: {IN_PATH}")


In [None]:
# -----------------------------------------------------
# 데이터 불러오기
# -----------------------------------------------------

try:
    yt_channels_df = pd.read_csv(IN_PATH, low_memory=False)

    print(f"CSV 파일 로딩 완료: {IN_PATH}")
    print("shape:", yt_channels_df.shape)

except Exception as e:
    raise ValueError(f"CSV 파일을 읽는 중 오류 발생 → {str(e)}")


In [None]:
# ----------------------------------------------
# 필요한 컬럼 선택
# ----------------------------------------------

use_cols = [
    "channel_id",
    "channel_name",
    "subscriber_count",
    "view_count",
    "video_count",
    "created_date",
    "category",
    "country",
    "videos_last_30_days",
    "views_last_30_days"
]

# 존재하지 않는 컬럼 확인
missing_cols = [col for col in use_cols if col not in yt_channels_df.columns]

if missing_cols:
    raise ValueError(f"필수 컬럼이 누락됨: {missing_cols}")

# 필수 컬럼만 선택
yt_channels_df = yt_channels_df[use_cols].copy()
print("필수 컬럼 확인 완료")


In [None]:
# -----------------------------------------------------
# 숫자형 변환
# -----------------------------------------------------

num_cols = ["subscriber_count", "view_count", "video_count", "videos_last_30_days", "views_last_30_days"]

for col in num_cols:
    yt_channels_df[col] = pd.to_numeric(yt_channels_df[col], errors="coerce")


In [None]:
# -----------------------------------------------------
# 날짜 변환
# -----------------------------------------------------

yt_channels_df["created_date"] = pd.to_datetime(yt_channels_df["created_date"], errors="coerce", utc=True)


In [None]:
# -----------------------------------------------------
# 파생 컬럼 생성
# -----------------------------------------------------

ref_date = pd.Timestamp.utcnow().normalize()

yt_channels_df["channel_age_days"] = (ref_date - yt_channels_df["created_date"]).dt.days
yt_channels_df["upload_frequency"] = yt_channels_df["video_count"] / yt_channels_df["channel_age_days"].replace({0: np.nan})
yt_channels_df["subscriber_per_view"] = yt_channels_df["subscriber_count"] / yt_channels_df["view_count"].replace({0: np.nan})
yt_channels_df["views_per_video"] = yt_channels_df["view_count"] / yt_channels_df["video_count"].replace({0: np.nan})
yt_channels_df["uploads_per_subscriber"] = yt_channels_df["video_count"] / yt_channels_df["subscriber_count"].replace({0: np.nan})

# 범주형 변수 처리 + 인코딩
for col in ["category", "country"]:
    yt_channels_df[col] = yt_channels_df[col].astype(str).str.strip()
    yt_channels_df[col] = yt_channels_df[col].replace("", np.nan)
    yt_channels_df[f"{col}_encoded"] = yt_channels_df[col].astype("category").cat.codes


In [None]:
# -----------------------------------------------------
# 파생 컬럼 검증
# -----------------------------------------------------

derived_cols = [
    "channel_age_days", "upload_frequency",
    "subscriber_per_view", "views_per_video", "uploads_per_subscriber",
    "category_encoded", "country_encoded"
]

missing_derived = [col for col in derived_cols if col not in yt_channels_df.columns]

if missing_derived:
    raise ValueError(f"파생 컬럼 생성 실패: {missing_derived}")

print("파생 컬럼 검증 완료")


In [None]:
# -----------------------------------------------------
# 저장 경로 자동 생성
# -----------------------------------------------------

def get_next_version_file(out_dir: Path, base_name: str, ext: str = "csv") -> Path:
    out_dir.mkdir(parents=True, exist_ok=True)

    pattern = re.compile(rf"^{re.escape(base_name)}_v(\d+)\.{re.escape(ext)}$")
    versions = []

    for f in out_dir.iterdir():
        if f.is_file():
            m = pattern.match(f.name)
            if m:
                versions.append(int(m.group(1)))

    next_version = max(versions) + 1 if versions else 1
    
    return out_dir / f"{base_name}_v{next_version}.{ext}"


In [None]:
# -----------------------------------------------------
# 저장
# -----------------------------------------------------

base_name = "youtube_channels_clean"
OUT_PATH = get_next_version_file(OUT_DIR, base_name, ext="csv")

yt_channels_df.to_csv(OUT_PATH, index=False, encoding="utf-8-sig")

print(f"✅ 저장 완료: {OUT_PATH}")
print(f"✅ 최종 데이터 형태: {yt_channels_df.shape}")
