In [None]:
pip install google-api-python-client

In [None]:
pip install oauth2client

In [194]:
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from oauth2client.tools import argparser
from datetime import datetime, timedelta
import os
import isodate
import random
from tqdm import tqdm

DEVELOPER_KEY = 'AIzaSyBdXZGCKkB5HT24UXbafdIi6WyLx-Rcpm0'
YOUTUBE_API_SERVICE_NAME='youtube'
YOUTUBE_API_VERSION='v3'
youtube=build(YOUTUBE_API_SERVICE_NAME,YOUTUBE_API_VERSION,developerKey=DEVELOPER_KEY)

In [138]:
def get_popular_videos(channel_id, max_results=5, days=365):
    """ 특정 채널의 인기 영상 정보 가져오기 (쿼터 절약 버전) """

    # 🔹 특정 기간 설정 (예: 최근 days일 전부터 오늘까지)
    published_after = (datetime.utcnow() - timedelta(days=days)).isoformat() + "Z"

    try:
        # 🔹 1단계: 검색 API로 인기 영상 리스트 가져오기
        search_request = youtube.search().list(
            part="snippet",
            channelId=channel_id,
            maxResults=max_results,
            order="viewCount",
            type="video",
            publishedAfter=published_after
        )
        search_response = search_request.execute()

        video_data = []
        video_ids = []

        for item in search_response.get("items", []):
            video_id = item["id"]["videoId"]
            title = item["snippet"]["title"]
            thumbnail_url = item["snippet"]["thumbnails"]["high"]["url"]
            published_date = item["snippet"]["publishedAt"]

            video_ids.append(video_id)

            video_data.append({
                "title": title,
                "video_id": video_id,
                "published_date": published_date[:10],
                "thumbnail_url": thumbnail_url
            })

        if not video_ids:
            return video_data  # 영상이 없으면 리턴

        # 🔹 2단계: 영상 통계 정보 가져오기 (배치 요청)
        stats_request = youtube.videos().list(
            part="statistics,snippet,contentDetails",
            id=",".join(video_ids)  # 🔹 여러 개의 ID를 한 번에 요청
        )
        stats_response = stats_request.execute()

        stats_dict = {item["id"]: item for item in stats_response.get("items", [])}

        for video in video_data:
            video_id = video["video_id"]
            video_info = stats_dict.get(video_id, {})

            video["view_count"] = int(video_info.get("statistics", {}).get("viewCount", 0))
            video["like_count"] = int(video_info.get("statistics", {}).get("likeCount", 0))
            video["comment_count"] = int(video_info.get("statistics", {}).get("commentCount", 0))
            video["category_id"] = video_info.get("snippet", {}).get("categoryId", "N/A")
            video["duration"] = video_info.get("contentDetails", {}).get("duration", "N/A")

        return video_data

    except Exception as e:
        print(f"Error: {e}")
        return []

In [116]:
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
import time

''' 인기급상승 채널명 크롤링 '''
# 🔹 Selenium을 위한 Chrome 드라이버 설정
options = webdriver.ChromeOptions()
options.add_argument("--headless")  # 브라우저 창을 띄우지 않도록 설정
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)

# 🔹 유튜브 트렌딩 페이지 접속
driver.get("https://www.youtube.com/feed/trending")

# 🔹 페이지 로딩 대기 (스크롤하기 전에 충분한 시간 확보)
time.sleep(3)

# 🔹 스크롤을 맨 아래로 내리는 함수 (한 번만 실행)
driver.execute_script("window.scrollTo(0, document.documentElement.scrollHeight);")
time.sleep(3)  # 페이지가 로딩되는 시간을 기다림

# 🔹 채널 이름 추출: #text > a 셀렉터 사용
channel_links = driver.find_elements(By.CSS_SELECTOR, "#text > a")

# 🔹 채널 이름 리스트에 저장
channel_names = set()
for link in channel_links:
    channel_name = link.text
    if channel_name:  # 빈 텍스트는 무시
        channel_names.add(channel_name)

# 🔹 드라이버 종료
driver.quit()

In [140]:
import googleapiclient.errors

''' 채널명을 입력받아 채널 ID를 반환하는 함수 '''
def get_channel_id(query):
    try:
        request = youtube.search().list(
            part="snippet",
            q=query,
            type="channel",
            maxResults=1  # 최대 1개의 채널만 반환
        )
        response = request.execute()

        if response["items"]:
            channel = response["items"][0]
            channel_title = channel["snippet"]["title"]
            channel_id = channel["id"]["channelId"]
            return channel_title, channel_id
        else:
            return None, None
    except googleapiclient.errors.HttpError as err:
        print(f"Error occurred: {err}")
        return None, None

# 채널 ID 리스트에 채널명과 ID를 저장
new_channel_ids = []
for channel_name in channel_names:
    channel_title, channel_id = get_channel_id(channel_name)
    if channel_id:  # 채널 ID가 존재할 때만 추가
        new_channel_ids.append((channel_title, channel_id))

In [144]:
import json

# 기존 JSON 파일 불러오기
file_path = "channel_ids.json"

# 파일이 존재하면 기존 데이터 로드, 없으면 빈 리스트 사용
if os.path.exists(file_path):
    with open(file_path, "r", encoding="utf-8") as file:
        existing_channel_ids = json.load(file)
else:
    existing_channel_ids = []

# 기존 채널명을 Set으로 저장하여 중복 확인
existing_channel_names = {channel[0] for channel in existing_channel_ids}

# 중복되지 않는 채널만 추가
for channel in new_channel_ids:
    if channel[0] not in existing_channel_names:  # 채널명이 기존에 없을 때만 추가
        existing_channel_ids.append(channel)

# JSON 파일에 저장
with open(file_path, "w", encoding="utf-8") as file:
    json.dump(existing_channel_ids, file, indent=4, ensure_ascii=False)

In [None]:
pop_videos = []
for title, channel_id in tqdm(channel_ids):
    try:
        videos = get_popular_videos(channel_id, max_results=5, days=360)
        pop_videos.append(videos)  # 여기서만 리스트 추가 (빈 리스트 방지)
    except HttpError as e:
        error_message = str(e)
        if "quotaExceeded" in error_message:
            print("API 사용량 초과")
            sys.exit(1)  # 프로그램 즉시 종료 (return 사용 가능)
        else:
            print(f"API 요청 중 오류 발생: {error_message}")
            time.sleep(5)  # 일시적인 오류일 경우 5초 대기 후 재시도

In [190]:
# 저장할 파일명
json_filename = "video_info.json"

# 기존 JSON 데이터 불러오기 (파일이 존재하면)
try:
    with open(json_filename, "r", encoding="utf-8") as file:
        existing_data = json.load(file)
except (FileNotFoundError, json.JSONDecodeError):
    existing_data = []  # 파일이 없거나 비어있으면 빈 리스트로 시작

# 새로운 데이터 추가 (중복 확인 없이 리스트 자체를 추가)
existing_data.extend(pop_videos)  # pop_videos 리스트 자체를 기존 리스트에 추가

# JSON 파일로 저장
with open(json_filename, "w", encoding="utf-8") as file:
    json.dump(existing_data, file, ensure_ascii=False, indent=4)