In [28]:
import time
import random
from datetime import datetime
from typing import List, Dict, Tuple, Optional
import yt_dlp
import pandas as pd

## 채널url 입력시 채널영상 크롤링

In [30]:
# ------------------------------
# 공통 유틸
# ------------------------------
def _sec_to_hms(sec: Optional[int]) -> Optional[str]:
    if sec is None:
        return None
    m, s = divmod(int(sec), 60)
    h, m = divmod(m, 60)
    return f"{h:02d}:{m:02d}:{s:02d}"


# ------------------------------
# 1) 채널 업로드 목록에서 영상 URL 수집
# ------------------------------
def list_channel_video_urls(
    channel_url: str,
    max_videos: Optional[int] = None,
) -> Tuple[List[str], Dict]:
    """
    yt_dlp로 채널 업로드 목록(playlist처럼 동작)을 '평면 추출'해서 영상 URL만 빠르게 수집.
    필요시 '/videos' 탭으로 재시도.
    반환: (video_urls, channel_meta)
    """
    def _extract_entries(url: str):
        ydl_opts = {
            "quiet": True,
            "no_warnings": True,
            "skip_download": True,
            "noplaylist": False,          # 채널은 내부적으로 playlist처럼 처리
            "extract_flat": True,         # 목록만 빠르게
        }
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            return ydl.extract_info(url, download=False)

    info = None
    try:
        info = _extract_entries(channel_url)
    except Exception:
        # /videos 탭으로 1차 보정
        if "/videos" not in channel_url:
            fixed = channel_url.rstrip("/") + "/videos"
            info = _extract_entries(fixed)
        else:
            raise

    # 엔트리 없으면 /videos로 최종 재시도
    entries = (info or {}).get("entries") or []
    if not entries and "/videos" not in channel_url:
        fixed = channel_url.rstrip("/") + "/videos"
        info = _extract_entries(fixed)
        entries = (info or {}).get("entries") or []

    if not entries:
        raise RuntimeError("채널 업로드 목록을 찾지 못했습니다. 채널 URL이 맞는지 확인하거나 '/videos' 탭 URL을 사용하세요.")

    # 채널 메타
    channel_meta = {
        "channel_title": info.get("channel") or info.get("uploader"),
        "channel_id": info.get("channel_id") or info.get("uploader_id"),
        "channel_url": info.get("uploader_url") or channel_url,
        "subscriber_count": info.get("channel_follower_count"),
    }

    # 평면 엔트리 → 동영상 URL 생성
    urls: List[str] = []
    for e in entries:
        if isinstance(e, dict):
            # e["url"]이 id만 들어있을 수 있으니 보정
            raw = e.get("url") or e.get("id")
            if not raw:
                continue
            full = raw if str(raw).startswith("http") else f"https://www.youtube.com/watch?v={raw}"
            # Shorts 등도 업로드 목록에 포함될 수 있음(필터는 상세 단계에서 가능)
            urls.append(full)
        elif isinstance(e, str):
            urls.append(f"https://www.youtube.com/watch?v={e}")

        if max_videos and len(urls) >= max_videos:
            break

    # 중복 제거(간혹 중복이 섞이는 경우 방지)
    urls = list(dict.fromkeys(urls))
    return urls, channel_meta


# ------------------------------
# 2) 단일 영상 상세 메타데이터 수집
# ------------------------------
def get_youtube_video_info(video_url: str) -> Dict:
    ydl_opts = {
        "noplaylist": True,
        "quiet": True,
        "no_warnings": True,
        "skip_download": True,
    }
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(video_url, download=False)

    # 날짜 ISO 변환
    upload_date = info.get("upload_date")
    published_date = None
    if upload_date:
        try:
            published_date = datetime.strptime(upload_date, "%Y%m%d").date().isoformat()
        except Exception:
            published_date = upload_date  # 원본 유지

    # 가장 큰 썸네일
    thumbnail_url = None
    thumbs = info.get("thumbnails")
    if isinstance(thumbs, list) and thumbs:
        thumbnail_url = sorted(thumbs, key=lambda t: t.get("height", 0))[-1].get("url")

    duration_str = info.get("duration_string") or _sec_to_hms(info.get("duration"))
    channel_name = info.get("channel") or info.get("uploader")

    return {
        "title": info.get("title"),
        "video_id": info.get("id"),
        "video_url": f"https://www.youtube.com/watch?v={info.get('id')}" if info.get("id") else video_url,
        "published_date": published_date,
        "thumbnail_url": thumbnail_url,
        "view_count": info.get("view_count"),
        "like_count": info.get("like_count"),
        "comment_count": info.get("comment_count"),
        "duration_hms": duration_str,
        "duration_sec": info.get("duration"),
        "channel_name": channel_name,
        "channel_id": info.get("channel_id") or info.get("uploader_id"),
        "subscriber_count": info.get("channel_follower_count"),

        # 선택 필드
        "categories": info.get("categories"),
        "tags": info.get("tags"),
        "uploader_url": info.get("uploader_url"),
        "age_limit": info.get("age_limit"),
        "availability": info.get("availability"),
        "live_status": info.get("live_status"),
        "description": info.get("description"),
    }


# ------------------------------
# 3) 채널 → (URL 수집) → (각 영상 상세 수집)
# ------------------------------
def collect_channel_videos(
    channel_url: str,
    max_videos: Optional[int] = None,
    include_shorts: bool = True,
    sleep_range: Tuple[float, float] = (0.05, 0.2),
    retry: int = 2,
) -> Tuple[pd.DataFrame, Dict]:
    """
    channel_url: 채널 홈/핸들/UCID URL 모두 가능. 필요시 '/videos'로 자동 보정
    max_videos: 수집 개수 제한(None이면 전체)
    include_shorts: False면 duration<60인 영상(숏츠) 제외
    sleep_range: 요청 간 랜덤 지연(차단 방지)
    retry: 영상별 상세 수집 실패 시 재시도 횟수
    """
    video_urls, channel_meta = list_channel_video_urls(channel_url, max_videos=max_videos)

    rows = []
    for url in video_urls:
        last_err = None
        for attempt in range(retry + 1):
            try:
                row = get_youtube_video_info(url)
                # 숏츠 제외 옵션
                if not include_shorts:
                    dur = row.get("duration_sec")
                    if dur is not None and int(dur) < 60:
                        # skip
                        pass
                    else:
                        rows.append(row)
                else:
                    rows.append(row)
                break
            except Exception as e:
                last_err = e
                if attempt < retry:
                    time.sleep(0.5 + attempt * 0.5)
                else:
                    # 실패해도 최소 정보 기록
                    rows.append({
                        "title": None,
                        "video_id": None,
                        "video_url": url,
                        "published_date": None,
                        "thumbnail_url": None,
                        "view_count": None,
                        "like_count": None,
                        "comment_count": None,
                        "duration_hms": None,
                        "duration_sec": None,
                        "channel_name": channel_meta.get("channel_title"),
                        "channel_id": channel_meta.get("channel_id"),
                        "subscriber_count": channel_meta.get("subscriber_count"),
                        "_error": str(last_err),
                    })
        time.sleep(random.uniform(*sleep_range))

    df = pd.DataFrame(rows)

    # 정렬(가능하면)
    if "published_date" in df.columns:
        try:
            df["published_date"] = pd.to_datetime(df["published_date"], errors="coerce")
            df = df.sort_values("published_date", ascending=False).reset_index(drop=True)
        except Exception:
            pass

    return df, channel_meta



if __name__ == "__main__":
    channel_url = "https://www.youtube.com/@KoreanCryingGuy/videos"

    # 옵션
    MAX_VIDEOS = 10
    INCLUDE_SHORTS = False
    SAVE_CSV = False
    CSV_PATH = "channel_videos_metadata.csv"

    df, meta = collect_channel_videos(
        channel_url=channel_url,
        max_videos=MAX_VIDEOS,
        include_shorts=INCLUDE_SHORTS,
        sleep_range=(0.08, 0.25),
        retry=2
    )

    print("Channel Meta:", meta)
    print("Collected:", len(df))
    print(df.head(5))

    if SAVE_CSV:
        df.to_csv(CSV_PATH, index=False, encoding="utf-8-sig")
        print(f"Saved -> {CSV_PATH}")

Channel Meta: {'channel_title': '유병재', 'channel_id': 'UCHw9p667e9l0qoYfY8calaA', 'channel_url': 'https://www.youtube.com/@KoreanCryingGuy', 'subscriber_count': 1680000}
Collected: 10
                                    title     video_id  \
0         [무공해] 험한 것이 나와도 김고은이랑 무조건 공감합니다  _ttuPeDExTo   
1    [무공해] 폭주하는 공감 요정.. 아이브 레이랑 무조건 공감합니다  N5Zk-xH1e0k   
2  [ENG SUB] 직역된 가사보고 노래 맞히기 (w.태래 규빈 리키)  -6vOqZs6CFA   
3                             근데 운전은 로이킴이  QxSIqNOMCm4   
4         [무공해] 샤이니 T랑, 아니 키랑.. 무조건 공감합니다  cqIZU2iQym4   

                                     video_url published_date  \
0  https://www.youtube.com/watch?v=_ttuPeDExTo     2025-09-12   
1  https://www.youtube.com/watch?v=N5Zk-xH1e0k     2025-09-05   
2  https://www.youtube.com/watch?v=-6vOqZs6CFA     2025-09-02   
3  https://www.youtube.com/watch?v=QxSIqNOMCm4     2025-08-29   
4  https://www.youtube.com/watch?v=cqIZU2iQym4     2025-08-20   

                                       thumbnail_url  view_count  like_count 

In [126]:
import re
import collections
import numpy as np
import pandas as pd
from tqdm import tqdm
import yt_dlp
from sklearn.feature_extraction.text import TfidfVectorizer

# ---------- 간단 토크나이저 & TF-IDF ----------
def tokenize_ko(text: str):
    text = re.sub(r"[^0-9가-힣A-Za-z]+", " ", str(text))
    toks = text.lower().split()
    return [t for t in toks if len(t) > 1]

STOPWORDS = set(["영상","채널","유튜브"])

def analyzer(doc: str):
    return [t for t in tokenize_ko(doc) if t not in STOPWORDS]

def tfidf_top_keywords(titles, top_k=20, min_df=2):
    vec = TfidfVectorizer(analyzer=analyzer, ngram_range=(1,2), min_df=min_df)
    X = vec.fit_transform(pd.Series(titles).astype(str))
    vocab = np.array(vec.get_feature_names_out())
    scores = np.asarray(X.sum(axis=0)).ravel()
    order = scores.argsort()[::-1][:top_k]
    return list(zip(vocab[order], scores[order]))

# ---------- yt-dlp 공통 클라이언트 ----------
def ydl_client(cookies_path=None, region="KR"):
    ydl_opts = {
        "quiet": True,
        "skip_download": True,
        "extract_flat": True,          # 목록을 낼 때 필수
        "noplaylist": False,
        "ignoreerrors": True,
        "geo_bypass": True,
        # Accept-Language 강제
        "http_headers": {"Accept-Language": "ko-KR,ko;q=0.9"},
    }
    if cookies_path:
        ydl_opts["cookiefile"] = cookies_path
    return yt_dlp.YoutubeDL(ydl_opts)

def get_full_info(url_or_id: str, cookies_path=None):
    # 개별 영상의 상세(카테고리 등)를 다시 긁을 때는 extract_flat=False로 새 인스턴스 생성
    with yt_dlp.YoutubeDL({
        "quiet": True,
        "skip_download": True,
        "extract_flat": False,
        "ignoreerrors": True,
        "http_headers": {"Accept-Language": "ko-KR,ko;q=0.9"},
        **({"cookiefile": cookies_path} if cookies_path else {})
    }) as ydl:
        return ydl.extract_info(url_or_id, download=False)

def extract_category(info: dict):
    if not isinstance(info, dict):
        return None
    if info.get("categories"):
        cats = info["categories"]
        return cats[0] if isinstance(cats, list) and cats else None
    if info.get("category"):
        return info["category"]
    return None

# ---------- 채널의 주 카테고리 추정(선택) ----------
def infer_channel_main_category(df_channel: pd.DataFrame, url_col="url", cookies_path=None):
    if url_col not in df_channel.columns:
        return None
    cats = []
    for u in tqdm(df_channel[url_col].astype(str).tolist(), desc="Channel video categories"):
        try:
            info = get_full_info(u, cookies_path=cookies_path)
            cat = extract_category(info)
            if cat:
                cats.append(cat)
        except Exception:
            pass
    if not cats:
        return None
    return collections.Counter(cats).most_common(1)[0][0]

# ---------- 트렌딩 수집 (카테고리 포함) ----------
def get_trending_with_categories(region="KR", limit=30, cookies_path=None):
    # 지역/언어 고정 파라미터로 리다이렉트 방지
    trending_url = f"https://www.youtube.com/feed/trending?gl={region}&persist_gl=1&hl=ko"
    with ydl_client(cookies_path=cookies_path, region=region) as ydl:
        listing = ydl.extract_info(trending_url, download=False)
    entries = (listing or {}).get("entries", [])[:limit]

    rows = []
    for e in tqdm(entries, desc="Trending details"):
        vid = e.get("id") or e.get("url")
        if not vid:
            continue
        video_url = f"https://www.youtube.com/watch?v={vid}"
        try:
            info = get_full_info(video_url, cookies_path=cookies_path)
            title = info.get("title") or e.get("title")
            cat = extract_category(info)
            rows.append({
                "video_id": info.get("id") or vid,
                "title": title,
                "category": cat,
                "url": video_url
            })
            print(pd.DataFrame(rows))
        except Exception:
            continue
    return pd.DataFrame(rows)

# ---------- 파이프라인: 같은 카테고리만 모아 TF-IDF ----------
def tfidf_with_same_category(df_channel: pd.DataFrame,
                             channel_category: str = None,
                             region="KR",
                             trending_limit=40,
                             url_col="url",
                             top_k=20,
                             cookies_path=None):
    if channel_category is None:
        channel_category = infer_channel_main_category(df_channel, url_col=url_col, cookies_path=cookies_path)
    if not channel_category:
        raise ValueError("채널 카테고리를 알 수 없습니다. channel_category를 직접 지정하거나 df_channel에 url을 제공하세요.")

    df_trend = get_trending_with_categories(region=region, limit=trending_limit, cookies_path=cookies_path)
    df_trend_same = df_trend[df_trend["category"] == channel_category].copy()

    titles_all = pd.concat([
        df_channel["title"].astype(str),
        df_trend_same["title"].astype(str)
    ]).tolist()

    keywords = tfidf_top_keywords(titles_all, top_k=top_k, min_df=2)
    return {
        "channel_category": channel_category,
        "n_trending_all": len(df_trend),
        "n_trending_same_cat": len(df_trend_same),
        "top_keywords": keywords,
        "df_trending_same": df_trend_same
    }

In [128]:
# ================== 사용 예시 ==================
if __name__ == "__main__":
    df_channel = df[['title','video_url']]

    # 1) 채널 카테고리 자동 추정 + 동일 카테고리 트렌딩 결합 TF-IDF
    result = tfidf_with_same_category(
        df_channel,
        channel_category='Comedy',   # None이면 df_channel url로 추정 시도
        region="KR",
        trending_limit=40,
        top_k=20
    )

    print("\n채널 주 카테고리:", result["channel_category"])
    print("트렌딩 전체 개수:", result["n_trending_all"])
    print("동일 카테고리 트렌딩 개수:", result["n_trending_same_cat"])
    print("\n[Top TF-IDF 키워드]")
    for k, s in result["top_keywords"]:
        print(f"{k}\t{s:.3f}")

    # 필요하면 동일 카테고리 트렌딩 목록 확인
    # print(result["df_trending_same"].head())

ERROR: [youtube:tab] trending: The channel/playlist does not exist and the URL redirected to youtube.com home page
Trending details: 0it [00:00, ?it/s]


KeyError: 'category'