In [1]:
import googleapiclient.discovery as gapi
from googleapiclient.errors import HttpError
import pandas as pd
import os
import time
import concurrent.futures
import datetime

# Setup

In [2]:
CURRENT_FOLDER = 'C:\\Coding Projects\\YoutubeCookingData\\'

with open(CURRENT_FOLDER + "apiKeys.txt") as f:
    YOUTUBE_API_KEY = f.read()
youtube_service = gapi.build('youtube', 'v3', developerKey=YOUTUBE_API_KEY)

# Final Functions

In [3]:
from datetime import date


def jsonFieldHandler(json, key1, key2='', key3='', key4=''):
    # If the key doesn't exist, return ''
    try:
        if(key1 and key2 and key3 and key4):
            return json[key1][key2][key3][key4]
        elif(key1 and key2 and key3):
            return json[key1][key2][key3]
        elif(key1 and key2):
            return json[key1][key2]
        elif(key1):
            return json[key1]
    except:
        return ''

# Gets the data for all channels and parses it
def getChannelDataHandler(channel_id_list):
    channel_dict_list = []
    channel_response_json = []
    
    # Get all fields for Channel
    for i in range(0, len(channel_id_list), 50):
        channel_response_json += getChannelData(channel_id_list[i:i+50])
        
    # Parse response into dictionary
    for result in channel_response_json:
#         print(f"LOG: Fetching Data for Channel ID {result['id']}")
        channel_dict = {}
        
        channel_dict['Channel ID'] = jsonFieldHandler(result, 'id')
        channel_dict['Title'] = jsonFieldHandler(result, 'snippet','title')
        channel_dict['Description'] = jsonFieldHandler(result, 'snippet','description')
        channel_dict['URL'] = jsonFieldHandler(result, 'snippet','customUrl')
        channel_dict['Channel Created Date'] = jsonFieldHandler(result, 'snippet','publishedAt')
        channel_dict['Thumbnail URL'] = jsonFieldHandler(result, 'snippet', 'thumbnails', 'high', 'url')
        channel_dict['Language'] = jsonFieldHandler(result, 'snippet', 'defaultLanguage')
        channel_dict['Country'] = jsonFieldHandler(result, 'snippet', 'country')
        channel_dict['Views'] = jsonFieldHandler(result, 'statistics', 'viewCount')
        channel_dict['Subscriber Count'] = jsonFieldHandler(result, 'statistics', 'subscriberCount')
        channel_dict['Video Count'] = jsonFieldHandler(result, 'statistics', 'videoCount')
        channel_dict['Topics'] = jsonFieldHandler(result, 'topicDetails', 'topicIds')
        channel_dict['Topic Categories'] = jsonFieldHandler(result, 'topicDetails', 'topicCategories')
        channel_dict['Upload Playlist ID'] = jsonFieldHandler(result, 'contentDetails', 'relatedPlaylists', 'uploads')

        channel_dict_list.append(channel_dict)
        
    return pd.DataFrame(channel_dict_list)

# Gets all fields for a given list of channel
def getChannelData(channel_id_list):
    request = youtube_service.channels().list(
        part=['id', 'snippet', 'statistics', 'topicDetails', 'contentDetails'],
        id=channel_id_list,
        maxResults = 50
    )
    
    results = request.execute()
    return results['items']

# Returns the upload playlist for a list of channel ids
def getUploadPlaylistsforChannelHandler(channel_id_list):
    channel_response_json = []
    upload_playlist_list = []
    
    # Get upload playlists
    for i in range(0, len(channel_id_list), 50):
        channel_response_json += getUploadPlaylistsforChannel(channel_id_list[i:i+50])
    
    # Get only the channel's upload playlist id
    return [jsonFieldHandler(channel_json, 'contentDetails', 'relatedPlaylists', 'uploads') for channel_json in channel_response_json]

# Gets upload playlist ID for a list of channel IDs
def getUploadPlaylistsforChannel(channel_id_list):
    request = youtube_service.channels().list(
    part=['contentDetails'],
        id=channel_id_list,
        maxResults=50
    )
    
    result = request.execute()
    
    return result['items']
    
    
    
# Function that returns all videoIds for a given channel's upload playlist
def getVideoListForPlaylist(upload_playlist_id, page_token=''):
    video_ids = []

    t1 = time.time()
    
    request = youtube_service.playlistItems().list(
        part="contentDetails",
        playlistId=upload_playlist_id,
        maxResults=50,
        pageToken=page_token
    )
    
    try:
        results = request.execute()
        result_videos = results['items']
    except HttpError as err:
        appendToLog(HttpError)
        raise err
    
    # If there is a next page then append videos to result array
    if('nextPageToken' in results):
        nextPage = results['nextPageToken']
        result_videos += getVideoListForPlaylist(upload_playlist_id, nextPage)
    
    # If not called by another instance of this function, return only IDs
    if(page_token == ''):
#         print(f"LOG: Working on Channel ID {channelId}.")
        t2 = time.time()
        appendToLog(f'Time elapsed for videos of playlist {upload_playlist_id} {t2-t1} for {len(result_videos)} videos')
        return [vid['contentDetails']['videoId'] for vid in result_videos]
    else:
        # If recursing, return whole response list
        return result_videos

# Function that parses the return JSON from getVideoListForPlaylist and gets additional columns
def getVideoDataHandler(video_id_list):
    video_response_json = []
    video_dict_list = []
    
    # Get all fields for Video
    for i in range(0, len(video_id_list), 50):
        try:
            video_response_json += getVideoData(video_id_list[i:i+50])
        except HttpError as err:
            raise err
    
    # Parse response into dictionary
    for result in video_response_json:
#         print(f"\tLOG: Fetching Data for Video ID {result['id']}")
        video_dict = {}
        
        video_dict['Title'] = jsonFieldHandler(result, 'snippet', 'title')
        video_dict['Video ID'] = jsonFieldHandler(result, 'id')
        video_dict['Channel ID'] = jsonFieldHandler(result, 'snippet', 'channelId')
        video_dict['Duration'] = jsonFieldHandler(result, 'contentDetails', 'duration')
        video_dict['Description'] = jsonFieldHandler(result, 'snippet', 'description')
        video_dict['Publish Date'] = jsonFieldHandler(result, 'snippet', 'publishedAt')
        video_dict['Thumbnail URL'] = jsonFieldHandler(result, 'snippet', 'thumbnails', 'high',  'url')
        video_dict['View Count'] = jsonFieldHandler(result, 'statistics', 'viewCount')
        video_dict['Like Count'] = jsonFieldHandler(result, 'statistics', 'likeCount')
        video_dict['Comment Count'] = jsonFieldHandler(result, 'statistics', 'commentCount')        
        video_dict['Video Definition'] = jsonFieldHandler(result, 'contentDetails', 'definition')
        video_dict['Default Audio Language'] = jsonFieldHandler(result, 'snippet', 'defaultAudioLanguage')
        video_dict['Tags'] = jsonFieldHandler(result, 'snippet', 'tags')
        video_dict['Category ID'] = jsonFieldHandler(result, 'snippet', 'categoryId')
        video_dict['Topic Details'] = jsonFieldHandler(result, 'topicDetails')
        video_dict['Made for Kids'] = jsonFieldHandler(result, 'status', 'madeForKids')
        video_dict['Favorite Count'] = jsonFieldHandler(result, 'statistics', 'favoriteCount')
        video_dict_list.append(video_dict)
        
    return pd.DataFrame(video_dict_list)
     
def getVideoData(list_of_videos):
    request = youtube_service.videos().list(
        part=['contentDetails', 'liveStreamingDetails', 'id',
              'snippet', 'statistics', 'status', 'topicDetails'],
        id=list_of_videos,
        maxResults=50,
    )
    
    try:
        results = request.execute()
        return results['items']
    except HttpError as err: 
        # If error, return 
        appendToLog(HttpError)
        raise err

# Gets the comment threads for all videos in a given list
def getCommentThreadsHandler(list_of_videos):
    dataframes_list = []
    
    for video_id in list_of_videos:
        try:
            dataframes_list.append(getCommentsForVideo(video_id))
        except HttpError as err:
            # If it is bad request, try again in 5 seconds. Else raise error
            if(err.resp.status == 400):
                try:
                    appendToLog(f"Potentially transient ({err.resp.status}), trying again for video {video_id}")
                    time.sleep(600)
                    dataframes_list.append(getCommentsForVideo(video_id))
                except HttpError as err:
                    raise err
            else:
                raise err

    return pd.concat(dataframes_list, ignore_index=True)

# Get list of comment threads for a given video, returns a dataframe
def getCommentsForVideo(video_id, page_token=""):
#     print(f"\tLOG: Fetching Comment Data for Video ID {video_id}, {page_token}")
    t1 = time.time()
    request = youtube_service.commentThreads().list(
        part=['id', 'replies', 'snippet'],
        videoId=video_id,
        maxResults=100,
        pageToken=page_token
    )

    try:
        results = request.execute()
        result_comments = results['items']
    
        if('nextPageToken' in results):
            next_page = results['nextPageToken']
            result_comments += getCommentsForVideo(video_id, next_page)
        
        # If not recursing, parse the full response
        if(page_token == ''):
            t2 = time.time()
            appendToLog(f'Time elapsed for comments on video {video_id} {t2-t1} for {len(result_comments)} comments')
            return parseCommentThreadResponse(result_comments)
        else: 
            # Return raw results if called by another instance of this function
            return result_comments
    except HttpError as err:
        appendToLog(f'{err.resp.status} - {err._get_reason()} - {video_id} with page {page_token}')
        raise err

# Parse comment threads response JSON
def parseCommentThreadResponse(list_of_threads):
    thread_dict_list = []
    
    for thread in list_of_threads:
#         print(f"\t\tLOG: Fetching Data for Comment Thread ID {thread['id']}")
        thread_dict = {}
        
        thread_dict['Comment Thread ID'] = jsonFieldHandler(thread, 'id')
        thread_dict['Video ID'] = jsonFieldHandler(thread, 'snippet', 'videoId')
        thread_dict['Top Level Comment'] = jsonFieldHandler(thread, 'snippet', 'topLevelComment')
        thread_dict['Total Replies'] = jsonFieldHandler(thread, 'snippet', 'totalReplyCount')
        thread_dict['Can Reply'] = jsonFieldHandler(thread, 'snippet', 'canReply')
        thread_dict['Replies'] = jsonFieldHandler(thread, 'replies')
        
        thread_dict_list.append(thread_dict)
    
    return pd.DataFrame(thread_dict_list)

def appendToLog(message): 
    print(message)
    current_time = datetime.datetime.now()
    
    file_path = CURRENT_FOLDER + f"Log {current_time.year}-{current_time.month}-{current_time.day}.txt"

    if(os.path.exists(file_path)):
        append_write = 'a'
    else:
        append_write ='w'
    
    with open(file_path, append_write) as f:
        f.write(message + '\n')

def main(channels_flag=True, videos_flag=True, comments_flag=True, current_folder=''):
#     CHANNEL_LIST = pd.read_csv("Channel IDs.csv")['ID'].to_list()
    appendToLog(f"Starting execution for channel data: {channels_flag}, video data: {videos_flag}, comment data: {comments_flag}")
    CHANNEL_ID_TOTAL_DF = pd.read_csv(current_folder + "Channel IDs.csv")

    # File Paths
    channels_file_path = current_folder + "Channel Data.csv"
    videos_file_path = current_folder + "Video Data.csv"
    comments_file_path = current_folder + "Comment Data.csv"
    
    # If Channels step is done, skip
    if(channels_flag):
        # If the file exists check how many channels still need to be queried, else get the whole list
        if(os.path.exists(channels_file_path)):
            # Get list of channel ids that have been queried and compare against the total list of ids
            channel_data_done_ids = pd.read_csv(channels_file_path)['Channel ID'].to_list()
            
            channelIdsToFetch = CHANNEL_ID_TOTAL_DF[~CHANNEL_ID_TOTAL_DF['ID'].isin(channel_data_done_ids)]['ID'].to_list()
            appendToLog(f"{len(channel_data_done_ids)} channels already done out of {CHANNEL_ID_TOTAL_DF['ID'].shape[0]}")
        else:
            channelIdsToFetch = CHANNEL_ID_TOTAL_DF['ID'].to_list()
            appendToLog(f"No channels already done, querying all {len(channelIdsToFetch)} IDs")
        
        # If there are channels to fetch, get data and write to csv
        if(len(channelIdsToFetch) > 0):
            channels_df = getChannelDataHandler(channelIdsToFetch)
            channels_df.to_csv(channels_file_path)
    
    # Check video progress, skip if done
    if(videos_flag):
        video_ids = []
        videos_list = []
        # Check if a given channel has already had its videos fetched
        if(os.path.exists(videos_file_path)):
            channel_data_done_ids_videos = pd.read_csv(videos_file_path)['Channel ID'].unique().tolist()
            
            # TODO, switch this to load Upload Playlist IDs directly
            channel_ids_to_fetch_videos = CHANNEL_ID_TOTAL_DF[~CHANNEL_ID_TOTAL_DF['ID'].isin(channel_data_done_ids_videos)]['ID'].to_list()
            playlist_ids_to_fetch_videos = getUploadPlaylistsforChannelHandler(channel_ids_to_fetch_videos)
            appendToLog(f"{len(channel_data_done_ids_videos)} channel's videos already done out of {len(CHANNEL_ID_TOTAL_DF['ID'].to_list())}")
            # Read in current data
            videos_list.append(pd.read_csv(videos_file_path))
        else:
            channel_ids_to_fetch_videos = CHANNEL_ID_TOTAL_DF['ID'].to_list()
            appendToLog(f"No channel's videos already done, querying all {len(channel_ids_to_fetch_videos)} channels")
            playlist_ids_to_fetch_videos = getUploadPlaylistsforChannelHandler(channel_ids_to_fetch_videos)
        for playlist_id in playlist_ids_to_fetch_videos:
            try:
                video_ids = getVideoListForPlaylist(playlist_id)
                videos_list.append(getVideoDataHandler(video_ids))
            except HttpError as err:
                appendToLog(f"Error caught while fetching video data, ending execution")
                # Write current data to file
                pd.concat(videos_list, ignore_index=True).to_csv(videos_file_path)
                return  

        # If all videos are read successfully, write to file and continue
        pd.concat(videos_list, ignore_index=True).to_csv(videos_file_path)
            
    if(comments_flag):
        comment_ids = []
        comments_list = []
        if(os.path.exists(videos_file_path)):
            videos_data_total_ids_df = pd.read_csv(videos_file_path)
            if(os.path.exists(comments_file_path)):
                videos_data_done_ids_comments = pd.read_csv(comments_file_path)['Video ID'].unique().tolist()
                # Only fetch videos that have comments
                video_ids_to_fetch_comments = videos_data_total_ids_df[~videos_data_total_ids_df['Video ID'].isin(videos_data_done_ids_comments) & videos_data_total_ids_df['Comment Count'] > 0]['Video ID'].to_list()
                appendToLog(f"Done {len(videos_data_done_ids_comments)} video's comments out of {videos_data_total_ids_df.shape[0]}")
                # Read in current data
                comments_list.append(pd.read_csv(comments_file_path))
            else:
                # Only fetch videos that have comments
                video_ids_to_fetch_comments = videos_data_total_ids_df[videos_data_total_ids_df['Comment Count'] > 0]['Video ID'].to_list()
                appendToLog(f"No comments done, getting data for all {len(video_ids_to_fetch_comments)} videos")
        else:
            appendToLog("No video data file, exiting")
            return
        for video_id in video_ids_to_fetch_comments:
            try:
                comments_list.append(getCommentThreadsHandler([video_id]))
            except HttpError as err:
                appendToLog(f"Error caught while fetching comment data, ending execution")
                # Write current data to file 
                pd.concat(comments_list, ignore_index=True).to_csv(comments_file_path)
                return
        # Write each video's comments to the file if not excepted
        pd.concat(comments_list, ignore_index=True).to_csv(comments_file_path)
                
    
#     comments_df = getCommentThreadsHandler(video_ids)    
#     comments_df.to_csv(comments_file_path)
    
#     return videos_df, comments_df
    return
    
    

In [4]:
main(current_folder=CURRENT_FOLDER)

Starting execution for channel data: True, video data: True, comment data: True
No channels already done, querying all 59 IDs
No channel's videos already done, querying all 59 channels
727 1
727 2
754 1
754 2
733 1
733 2
442 1
442 2
1349 1
1349 2
4359 1
4359 2
1091 1
1091 2
434 1
434 2
372 1
372 2
93 1
93 2
606 1
606 2
418 1
418 2
424 1
424 2
1991 1
1991 2
513 1
513 2
159 1
159 2
1579 1
1579 2
1881 1
1881 2
1781 1
1781 2
371 1
371 2
555 1
555 2
408 1
408 2
479 1
479 2
1967 1
1967 2
106 1
106 2
411 1
411 2
322 1
322 2
500 1
500 2
1741 1
1741 2
344 1
344 2
1059 1
1059 2
40 1
40 2
1055 1
1055 2
938 1
938 2
1330 1
1330 2
573 1
573 2
197 1
197 2
1346 1
1346 2
766 1
766 2
1748 1
1748 2
450 1
450 2
1791 1
1791 2
996 1
996 2
375 1
375 2
435 1
435 2
12697 1
12697 2
833 1
833 2
296 1
296 2
492 1
492 2
529 1
529 2
18 1
18 2
207 1
207 2
211 1
211 2
1579 1
1579 2
141 1
141 2
204 1
204 2
11 1
11 2
221 1
221 2
No comments done, getting data for all 56324 videos
0 1
Time elapsed for comments on video 

# Data Validation

In [5]:
test = pd.read_csv("Comment Data.csv")

In [6]:
test

Unnamed: 0.1,Unnamed: 0,Comment Thread ID,Video ID,Top Level Comment,Total Replies,Can Reply,Replies
0,0,Ugz4WtWC2lIxTbGJrDB4AaABAg,I0glbWBxeFc,"{'kind': 'youtube#comment', 'etag': 'KnBhGhh-U...",0,True,
1,1,UgwvNv08ik_Z2tSjt_l4AaABAg,I0glbWBxeFc,"{'kind': 'youtube#comment', 'etag': 'pxilHJCfv...",0,True,
2,2,UgxwXahagCIZBWDhDGx4AaABAg,I0glbWBxeFc,"{'kind': 'youtube#comment', 'etag': 'GOkOyp340...",0,True,
3,3,Ugzq1LzKobwaMcWd0h14AaABAg,I0glbWBxeFc,"{'kind': 'youtube#comment', 'etag': 'n3GRzAD4-...",0,True,
4,4,UgyjlY7obLEHvJOXX6d4AaABAg,I0glbWBxeFc,"{'kind': 'youtube#comment', 'etag': '2wM6-YcA4...",0,True,
...,...,...,...,...,...,...,...
303899,303899,Ugj-iR5-F5gM7XgCoAEC,EI23CdUGwfk,"{'kind': 'youtube#comment', 'etag': 'Io46ESXU5...",0,True,
303900,303900,UggEwUJK9sXk_HgCoAEC,EI23CdUGwfk,"{'kind': 'youtube#comment', 'etag': '1oBukmE1z...",0,True,
303901,303901,UghzMEulBFiNv3gCoAEC,EI23CdUGwfk,"{'kind': 'youtube#comment', 'etag': '9Pf0ocN2k...",0,True,
303902,303902,UgizoF1UUWBQ7HgCoAEC,EI23CdUGwfk,"{'kind': 'youtube#comment', 'etag': 'ceqyArhLY...",0,True,
