In [1]:
!pip install google-api-python-client
!pip install oauth2client
!pip install youtube-transcript-api



In [2]:
import pandas as pd
# 힙 라이브러리
import heapq
# google api
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from oauth2client.tools import argparser
from googleapiclient.discovery import build
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
import datetime

# your key
API_KEY = ''
YOUTUBE_API_SERVICE_NAME = 'youtube'
YOUTUBE_API_SERVICE_VERSION = 'v3'

youtube = build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_SERVICE_VERSION, developerKey = API_KEY)

In [6]:
# 특정 조건 search
'''
order의 종류
date: 최근 순으로 정렬
rating: 높은 평가 순으로 정렬
relevance: 검색 쿼리(q)에 대한 관련성을 기준으로 정렬
title: 제목에 따라 문자순으로 리소스 정렬
videoCount: 업로드한 동영상이 많은 채널 내림차순 정렬
viewCount: 리소스 조회 수가 높은 항목부터 정렬
'''

# 이건 최대 max가 50개 밖에 안되는 쿼리
search_something = youtube.search().list(
    # q (query): 검색할 키워드를 지정하는 파라미터
    q = '서문 시장',
    # part: YouTube API가 검색 결과로 반환할 정보의 범위를 지정
    part = 'snippet',
    # order: 데이터를 어떤 기준으로 조회할 것인지를 선택
    order = 'relevance',
    # maxResults: 검색 결과로 반환할 항목의 최대 개수를 지정
    maxResults = 100,
    # publishedAfter: 검색할 비디오가 업로드된 시작 날짜 (ISO 8601 형식, YYYY-MM-DDThh:mm:ssZ)
    publishedAfter = '2023-08-15T00:00:00Z',
    # publishedBefore: 검색할 비디오가 업로드된 종료 날짜 (ISO 8601 형식, YYYY-MM-DDThh:mm:ssZ)
    publishedBefore = '2023-09-24T23:59:59Z',
).execute()

In [9]:
# 관련 영상 모두 가져오는 쿼리
def search_youtube(query, published_after, published_before):
    results = []
    page_token = None

    while True:
        search_response = youtube.search().list(
            q=query,
            part='snippet',
            order='relevance',
            maxResults=50,  # 50으로 설정
            publishedAfter=published_after,
            publishedBefore=published_before,
            pageToken=page_token  # 이전 페이지의 nextPageToken을 전달
        ).execute()

        # 현재 페이지의 결과 추가
        results.extend(search_response['items'])

        # 다음 페이지의 토큰이 없으면 종료
        page_token = search_response.get('nextPageToken')
        if not page_token:
            break

    return results


In [12]:
search_results = search_youtube(
    query='서문 시장',
    published_after='2023-08-15T00:00:00Z',
    published_before='2023-09-24T23:59:59Z'
)

print(search_results)

[{'kind': 'youtube#searchResult', 'etag': 'GMP-2g6LDnLHcz00brOuVDTpyN4', 'id': {'kind': 'youtube#video', 'videoId': 'Y-tNb5sk_WA'}, 'snippet': {'publishedAt': '2023-09-22T09:00:31Z', 'channelId': 'UCuLHDMXUqV5JR8Qv-FpwF8g', 'title': '대구 서문시장에서만 만날 수 있는 삼각만두, 고로케, 시장 국수까지 길거리 먹방 투어 🚫배터짐 주의🚫', 'description': '민경장군 #김민경 #대구서문시장 안녕하세요 쫄병아리 여러분   오늘은 저의 고향이자 제가 홍보 모델로 있는 대구에 방문 ...', 'thumbnails': {'default': {'url': 'https://i.ytimg.com/vi/Y-tNb5sk_WA/default.jpg', 'width': 120, 'height': 90}, 'medium': {'url': 'https://i.ytimg.com/vi/Y-tNb5sk_WA/mqdefault.jpg', 'width': 320, 'height': 180}, 'high': {'url': 'https://i.ytimg.com/vi/Y-tNb5sk_WA/hqdefault.jpg', 'width': 480, 'height': 360}}, 'channelTitle': '민경장군', 'liveBroadcastContent': 'none', 'publishTime': '2023-09-22T09:00:31Z'}}, {'kind': 'youtube#searchResult', 'etag': 'pqolu07ve0DOvUiZgDDvtoCdcAw', 'id': {'kind': 'youtube#video', 'videoId': 'Cbz2N7A4xEw'}, 'snippet': {'publishedAt': '2023-09-05T12:13:16Z', 'channelId': 'UCHXcIK7rVnqh

In [14]:
# 제목 & videoID 추출

data = []

for item in search_results:
    description = item['snippet']['title']
    videoID = item['id']['videoId']
    data.append((description, videoID))

df = pd.DataFrame(data, columns=['Description', 'VideoID'])

df

Unnamed: 0,Description,VideoID
0,"대구 서문시장에서만 만날 수 있는 삼각만두, 고로케, 시장 국수까지 길거리 먹방 투...",Y-tNb5sk_WA
1,"42년을 육교 밑에서 장사한 서문시장 다리 밑 원조할매 국수집, 줄서서 먹는 칼국수...",Cbz2N7A4xEw
2,서문시장의 40년 전통의 소갈비찜 맛집. 정성 가득한 청국장이 덤~~,f3UPxPV2128
3,[대구 매운 찜갈비 맛집] 먹을수록 맛있는 중독적인 갈비찜! 맛과 스토리가 있는 대...,peyk1LItf2w
4,대구 서문시장 야시장 #대구서문시장 #3대시장 #시장구경 #대구당일치기여행 #대구가...,LPkthsmyzcM
...,...,...
485,[대구맛집]라이브 반야월 시장 5일장 맛집 장보기,ot5QGgklFbg
486,&quot; 단골식당 &quot; 불향 가득 머금은 연탄석쇠불고기 | 대구 칠성시장...,0vVThMmMRLQ
487,구미새마을중앙시장ㅣ가는날이장날 [생방송굿데이],tWwKYhApl7o
488,님아 (제발) 그만 먹고 구경해다오 : 단양 구경시장에 나타난 뚱종원😋,srbhSpOfE0E


In [None]:
# 검색한 channelId를 통해 채널의 동영상 정보 가져오기

def set_video(channelId, maxCount): 
    # channelId: 채널 ID
    # maxCount: 동영상 개수
  	#search 함수를 통해 해당 channelId의 조회 수(viewCount)가 높은 영상을 파라미터로 받아 준 maxCount만큼 조회.
    search_response = youtube.search().list(
        order = 'viewCount',
        part = 'snippet',
        channelId = channelId,
        maxResults = maxCount,
    ).execute()
    
  
    channel_response = youtube.channels().list(
    part="statistics",
    id=channelId
    ).execute()  #채널 정보를 가지고 오면 구독자를 가지고 올 수 있다. search()가 아닌 channels()로 조회해서 채널 정보를 조회한다.
    
    
    video_ids = []
    for i in range(0, len(search_response['items'])):
        video_ids.append((search_response['items'][i]['id']['videoId'])) #videoId의 리스트를 만들어 둔다 (videoId로 조회할 수 있게)
  
  	#추출할 정보의 list들과 그 모든 정보를 key-value로 저장할 딕셔너리 변수를 하나 생성한다.
    channel_video_id = []
    channel_video_title = []
    channel_rating_view = []
    channel_rating_comments = []
    channel_rating_good = []
    channel_published_date = []
    channel_subscriber_count = []
    channel_thumbnails_url = []
    data_dicts = { }
    
    # 영상이름, 조회수 , 좋아요수 등 정보 등 추출
    for k in range(0, len(search_response['items'])):
        video_ids_lists = youtube.videos().list(
            part='snippet, statistics',
            id=video_ids[k],
        ).execute()
        
        #print(video_ids_lists)
    
        str_video_id = video_ids_lists['items'][0]['id']
        str_thumbnails_url = str(video_ids_lists['items'][0]['snippet']['thumbnails']['high'].get('url'))
        str_video_title = video_ids_lists['items'][0]['snippet'].get('title')
        str_view_count = video_ids_lists['items'][0]['statistics'].get('viewCount')
        if str_view_count is None:
            str_view_count = "0"
        str_comment_count = video_ids_lists['items'][0]['statistics']['commentCount']
        if str_comment_count is None:
            str_comment_count = "0"
        str_like_count = video_ids_lists['items'][0]['statistics'].get('likeCount')
        if str_like_count is None:
            str_like_count = "0"
        str_published_date = str(video_ids_lists['items'][0]['snippet'].get('publishedAt'))
        str_subscriber_count = channel_response['items'][0]['statistics']['subscriberCount']
        if str_subscriber_count is None:
            str_subscriber_count = "0"

        # 비디오 ID 
        channel_video_id.append(str_video_id)
        # 비디오 제목 
        channel_video_title.append(str_video_title)
        # 조회수 
        channel_rating_view.append(str_view_count)
        # 댓글수 
        channel_rating_comments.append(str_comment_count)
        # 좋아요 
        channel_rating_good.append(str_like_count)
        # 게시일 
        channel_published_date.append(str_published_date)
        # 구독자 수 
        channel_subscriber_count.append(str_subscriber_count)
        channel_thumbnails_url.append(str_thumbnails_url)

    data_dicts['id'] = channel_video_id
    data_dicts['title'] = channel_video_title
    data_dicts['viewCount'] = channel_rating_view
    data_dicts['commentCount'] = channel_rating_comments
    data_dicts['likeCount'] = channel_rating_good
    data_dicts['publishedDate'] = channel_published_date
    data_dicts['subsciberCount'] = channel_subscriber_count 
    data_dicts['thumbnail'] = channel_thumbnails_url
    
    return data_dicts     

In [36]:
# 기존 코드
# 댓글 수집 코드
# videoId: video_id에 입력

# your key
api_key = 'AIzaSyDnM5ux61EweNGHzegZfViDZnXUSP18NaY'
video_id = df['VideoID']

# 코드를 저장하는 list
comments = list()
# Google API 객체 생성
api_obj = build('youtube', 'v3', developerKey=api_key)
# API 객체 호출. 이 때, video_id에는 댓글을 수집하고자 하는 YouTube 동영상의 id를 입력
# part='snippet,replies': 댓글의 본문(snippet)과 답글(replies)을 포함한 데이터 request
response = api_obj.commentThreads().list(part='snippet', videoId=video_id, maxResults=20).execute()

# 반복문을 통해 api 반복적 호출
'''
comment['textDisplay']: 댓글의 텍스트 내용
comment['authorDisplayName']: 댓글 작성자의 이름
comment['publishedAt']: 댓글 작성 시간
comment['likeCount']: 댓글이 받은 좋아요 수
'''

while response:
    for item in response['items']:
        comment = item['snippet']['topLevelComment']['snippet']
        comments.append([comment['textDisplay'], comment['authorDisplayName'], comment['publishedAt'], comment['likeCount']])
 
        if item['snippet']['totalReplyCount'] > 0:
            # 댓글에 답글이 있는지 확인 (>0)
            # 답글이 있는 경우 item['replies']['comments']에 답글이 저장
            for reply_item in item['replies']['comments']:
                reply = reply_item['snippet']
                comments.append([reply['textDisplay'], reply['authorDisplayName'], reply['publishedAt'], reply['likeCount']])
    # nextPageToken: YouTube API는 한 번에 최대 100개의 댓글만 반환하므로, 댓글이 많을 경우 여러 페이지에 나눠서 데이터를 가져옴.
    if 'nextPageToken' in response:
        # pageToken=response['nextPageToken']: 다음 페이지의 댓글을 요청할 때 nextPageToken을 사용하여 호출.
        response = api_obj.commentThreads().list(
            part='snippet,replies',
            videoId=video_id,
            pageToken=response['nextPageToken'],
            maxResults=100).execute()
    else:
        break

# 좋아요 수를 기준으로 상위 30개의 댓글만 추출
top_10_comments = heapq.nlargest(30, comments, key=lambda x: x[3])

df = pd.DataFrame(top_10_comments)

In [None]:
import heapq
import pandas as pd
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

api_key = 'AIzaSyDnM5ux61EweNGHzegZfViDZnXUSP18NaY'

# 코드를 저장하는 list
comments = list()

# Google API 객체 생성
api_obj = build('youtube', 'v3', developerKey=api_key)

# df['VideoID'] 내 모든 video_id에 대해 반복 수행
for video_id in df['VideoID']:
    try:
        # 임시 리스트로 해당 비디오의 댓글을 저장
        video_comments = []

        # API 객체 호출. 이 때, video_id에는 댓글을 수집하고자 하는 YouTube 동영상의 id를 입력
        response = api_obj.commentThreads().list(
            part='snippet,replies',
            videoId=video_id,
            maxResults=20
        ).execute()

        # 반복문을 통해 api 반복적 호출
        while response:
            for item in response['items']:
                comment = item['snippet']['topLevelComment']['snippet']
                video_comments.append([comment['textDisplay'], comment['authorDisplayName'], comment['publishedAt'], comment['likeCount'], video_id])

                if item['snippet']['totalReplyCount'] > 0:
                    # 댓글에 답글이 있는지 확인 (>0)
                    for reply_item in item['replies']['comments']:
                        reply = reply_item['snippet']
                        video_comments.append([reply['textDisplay'], reply['authorDisplayName'], reply['publishedAt'], reply['likeCount'], video_id])

            # nextPageToken이 있는 경우 다음 페이지의 댓글을 요청
            if 'nextPageToken' in response:
                response = api_obj.commentThreads().list(
                    part='snippet,replies',
                    videoId=video_id,
                    pageToken=response['nextPageToken'],
                    maxResults=100
                ).execute()
            else:
                break

        # 각 동영상의 댓글 중 좋아요 수를 기준으로 상위 30개 추출
        top_30_video_comments = heapq.nlargest(30, video_comments, key=lambda x: x[3])
        comments.extend(top_30_video_comments)

    except HttpError as e:
        error_content = e.content.decode("utf-8")
        if 'commentsDisabled' in error_content:
            print(f"Comments are disabled for video ID: {video_id}")
        else:
            print(f"An error occurred for video ID: {video_id}: {e}")
        continue

# DataFrame으로 변환 (댓글 내용, 작성자, 작성 시간, 좋아요 수, 비디오 ID)
df_comments = pd.DataFrame(comments, columns=['Comment', 'Author', 'PublishedAt', 'LikeCount', 'VideoID'])

df_comments

Comments are disabled for video ID: VCXhXt0aNO8
Comments are disabled for video ID: L_-0mo3LX4g
Comments are disabled for video ID: L_-0mo3LX4g
Comments are disabled for video ID: eI69BTPipqs
Comments are disabled for video ID: eI69BTPipqs
Comments are disabled for video ID: 4Jxgx4D34YU


In [35]:
# DataFrame을 CSV 파일로 저장
df_comments.to_csv('/Users/baeahram/Desktop/2024 빅콘테스트 공모전/jupyter/youtube_comments.csv', index=False)

In [None]:
# 유튜브 스크립트 api call
def download_transcript(youtube_id, language='ko'):
    try:
        # 트랜스크립트 가져오기
        transcript_list = YouTubeTranscriptApi.list_transcripts(youtube_id)

        # 요청한 언어의 트랜스크립트 찾기
        transcript = transcript_list.find_transcript([language, 'en'])  # 우선 한국어, 없으면 영어

        # 트랜스크립트 다운로드
        transcript_data = transcript.fetch()

        # 데이터프레임으로 변환
        df = pd.DataFrame(transcript_data)

        # 타임스탬프를 읽기 쉬운 형식으로 변환
        df['start_datetime'] = df['start'].apply(lambda x: str(datetime.timedelta(seconds=x)))
        df['end_datetime'] = df['start'] + df['duration']
        df['end_datetime'] = df['end_datetime'].apply(lambda x: str(datetime.timedelta(seconds=x)))

        return df

    except TranscriptsDisabled:
        print("이 동영상은 트랜스크립트가 비활성화되어 있습니다.")
    except NoTranscriptFound:
        print("요청한 언어의 트랜스크립트를 찾을 수 없습니다.")
    except Exception as e:
        print("트랜스크립트 다운로드 중 오류 발생:", e)

if __name__ == "__main__":
    youtube_id = "hmmwLySI7nE"  # 크롤링하려는 동영상 ID로 변경
    transcript_df = download_transcript(youtube_id, language='ko')

    if transcript_df is not None:
        # 데이터프레임 출력
        print(transcript_df.head())

        # CSV 파일로 저장 (선택 사항)
        transcript_df.to_csv('youtube_transcript.csv', index=False, encoding='utf-8-sig')