Here we will collect all the data from the youtube API and save it to a csv file in the project root directory.

In [None]:
# Import necessary libraries

import pandas as pd
from googleapiclient.discovery import build
from IPython.display import JSON
from dotenv import load_dotenv
import os
import isodate

In [None]:
# Load API key from .env file

load_dotenv()
apiKey = os.getenv('API_KEY')


Here we will manually list all the youtube channels that we want to collect the data for. To do so we will need both the channel name and the channel ID

To find a youtube channel ID, you should:
1. go to the channel's youtube page, 
2. right click anywhere
3. select the option view page source
4. Press ctrl+f
5. search for ?channel_id

In [None]:
# Dictionary of the youtube channels 
# from which we will retrieve the data.
# The key is the channel name and the 
# value is the ID

channelIDDict = {
    "gorgonoid": "UCLfCo17TCjx7qf-JMhQioLQ",
    "noticiasMaromba": "UCmK5h2-a4CquS4nIxDN6j7g",
    'albert': "UCij0YPRA_vGwKwCyn_o9MLw"
}

Now we will create the necessary functions to retrieve the data from the api.

In [None]:
# Transforms the channel id dictionary to
# a string of the channel ID's separated
# by commas. Example: "id1, id2, id3"
def channelIDDictToCommaSeparatedIDString(channelIDDict):

    channelIDString = ""
    firstIDAdded = False
    for channel in channelIDDict:
        
        if firstIDAdded is False:
            channelIDString += channelIDDict[channel]
        else:
            channelIDString +=  ", " + channelIDDict[channel]

        firstIDAdded = True
    
    return channelIDString

In [None]:
# Builds youtube api
def buildYoutubeAPI():

    api_service_name = "youtube"
    api_version = "v3"

    # Get credentials and create an API client
    youtube = build(
        api_service_name, api_version, developerKey=apiKey)

    return youtube

In [None]:
# Gets the statistics of all channels
# included in the channel dictionary
def getAllChannelStatistics(channelIDDict): 

    allChannelStatistics = []
    
    request = buildYoutubeAPI().channels().list(
        part="snippet,contentDetails,statistics",
        id=channelIDDictToCommaSeparatedIDString(channelIDDict)
    )

    response = request.execute()

    # Loops through all channels and
    # adds data to dataframe
    for item in response['items']:
        channelStatistics = {
            "channelName": item['snippet']['title'],
            'subscriberCount': item['statistics']['subscriberCount'],
            'viewCount': item['statistics']['viewCount'],
            'videoCount': item['statistics']['videoCount'],
            'uploadedVideosPlaylistID': item['contentDetails']['relatedPlaylists']['uploads']
        }

        allChannelStatistics.append(channelStatistics)
        df = pd.DataFrame(allChannelStatistics)
    
    return df

Now that we have created getAllChannelStatistics() we can use it to retrieve information from the youtube channels we entered in the dictionary. Below, we will run a test to see if the channel information is being retrieved correctly.

In [None]:
channelStatistics = getAllChannelStatistics(channelIDDict)

channelStatistics

Now we will create functions that enable us to get the data on each video that was uploaded by a specific channel.

In [None]:
# Gets the IDs of the videos uploaded by
# a specific channel. OBS.: the playlist
# ID is the ID of the playlist that contains 
# all the videos of a channel.
# Returns a list that contains all of the
# channels video IDs
def getUploadedVideoIDs(playlistID):

    videoIDs = []

    youtube = buildYoutubeAPI()

    next_page_token = None
    while True:
        request = youtube.playlistItems().list(
            part="snippet,contentDetails",
            playlistId=playlistID,
            maxResults=50,
            pageToken=next_page_token
        )
        response = request.execute()

        for item in response['items']:
            videoIDs.append(item['contentDetails']['videoId'])
    
        next_page_token = response.get('nextPageToken')
        if not next_page_token:
            break

    return videoIDs

Now that we have created getUploadedVideoIDs() we can use it to retrieve the video IDs of the videos the channel has uploaded. Below, we will run a test to see if the video IDs are being retrieved correctly.

In [None]:
videoIDs = getUploadedVideoIDs('UULfCo17TCjx7qf-JMhQioLQ')

print(videoIDs)

len(videoIDs)

Now we will create a function that iterates through all of the playlist IDs in the channel statistics dataframe, gets all of the video ID's contained in each playlist and appends them to a allVideoIDs list.

In [None]:
# Retrieves all the video IDs of playlists 
# contained in whose IDs are contained in 
# a column of a dataframe
def getAllVideoIDs(channelStatisticsDataframe):
    allVideoIDs = []

    for playlistID in channelStatisticsDataframe['uploadedVideosPlaylistID']:
        allVideoIDs.extend(getUploadedVideoIDs(playlistID))

    return allVideoIDs

Now let's test if all the video IDs are being retrieved correctly

In [None]:
videoIDs = getAllVideoIDs(channelStatistics)

print(videoIDs)

len(videoIDs)

Now let's transform the list of video IDs to a a string of comma separated video IDs.

In [None]:
def videoIDListToCommaSepString(videoIDs):
    
    commaSepVideoIDString = ""
    firstItem = True
    
    for videoID in videoIDs:
        if firstItem:
            commaSepVideoIDString += videoID
            firstItem = False
        else:
            commaSepVideoIDString += ", " + videoID
    
    return commaSepVideoIDString


Now let's test if it correctly transforms the list of IDs to a comma separated string:

In [None]:
commaSepVideoIDString = videoIDListToCommaSepString(videoIDs)

commaSepVideoIDString

Now that we are able to get all video IDs from all the uploaded videos of all the channels we initially set in our dictionary, we can now get the individual statistics for each video. But first, let's find out what are the names of the possible category IDs a video could have so that we can include the video category in the video statistics.

In [None]:
# Gets all the youtube defined video 
# categories and adds them to a  
# dictionary that maps the id of the  
# category to the category title
def getVideoCategoriesBrazil():
    request = buildYoutubeAPI().videoCategories().list(
        part="snippet",
        regionCode='BR'
    )
    response = request.execute()

    categories = {}
    for item in response.get('items', []):
        id = item['id']
        title = item['snippet']['title']
        categories[id] = title
    
    return categories

Now let's see if we are retrieving the categories correctly.

In [None]:
categories = getVideoCategoriesBrazil()

print(categories)

Now we will create a funciton that retrieves the video statistics of all the videos contained in a list of video IDs.

In [None]:
# Gets all video statistics for the 
# videos that were passed as argument 
# in video ID parameter
def getAllVideoStatistics(videoIDs):

    allVideoStatistics = []

    currentVideoRequestList = []

    videoCategories = getVideoCategoriesBrazil()

    # Iterate through videoIDs 50 at a time
    for i in range(0, len(videoIDs), 50):
        # Get next slice of 50 video IDs
        currentVideoRequestList = videoIDs[i:i+50]

        request = buildYoutubeAPI().videos().list(
            part="snippet,contentDetails,statistics",
            id=currentVideoRequestList,
            maxResults=50
        )
        response = request.execute()

        for item in response['items']:
            durationISO = item['contentDetails'].get('duration', None)
            if durationISO:
                durationTimeDelta = isodate.parse_duration(durationISO)
                durationSeconds = durationTimeDelta.total_seconds()
                hours = int(durationSeconds // 3600)
                minutes = int((durationSeconds % 3600) // 60)
                seconds = int(durationSeconds % 60)
                durationFormatted = f"{hours}h {minutes}m {seconds}s"
            else:
                durationFormatted = 'videoDuration unknown'

            videoCategoryID = item['snippet'].get('categoryId', None)

            videoCategory = ''
            if videoCategoryID:
                videoCategory = videoCategories[item['snippet'].get('categoryId', None)]
            else:
                videoCategory = 'videoCategoryID unknown'

            videoStatistics = {
                "videoPublishDatetime": item['snippet'].get('publishedAt', 'videoPublishDatetime unknown'),
                "channelID": item['snippet'].get('channelId', 'channelID unknown'),
                "videoTitle": item['snippet'].get('title', 'videoTitle unknown'),
                "videoDescription": item['snippet'].get('description', 'videoDescription unknown'),
                "channelTitle": item['snippet'].get('channelTitle', 'channelTitle unknown'),
                "videoTags": item['snippet'].get('tags', []),
                "videoCategory": videoCategory,
                "isLiveBroadcastContent": item['snippet'].get('liveBroadcastContent', 'isLiveBroadcastContent unknown'),
                "videoDuration": durationFormatted,
                "videoUploadStatus": item.get('status', {}).get('uploadStatus', 'videoUploadStatus unknown'),
                "isPublicStatsViewable": item.get('status', {}).get('publicStatsViewable', 'isPublicStatsViewable unknown'),
                "videoViewCount": item.get('statistics', {}).get('viewCount', 'videoViewCount unknown'),
                "videoLikeCount": item.get('statistics', {}).get('likeCount', 'videoLikeCount unknown'),
                "videoCommentCount": item.get('statistics', {}).get('commentCount', 'videoCommentCount unknown'),
                "videoRecordingLocation": item.get('recordingDetails', {}).get('location', 'videoRecordingLocation unknown'),
                "videoRecordingDatetime": item.get('recordingDetails', {}).get('recordingDate', 'videoRecordingDatetime unknown')
                
            }
            
            allVideoStatistics.append(videoStatistics)
            df = pd.DataFrame(allVideoStatistics)

    return df


Now lets test if the video statistics are being retrieved correctly.

In [None]:
videoStatistics = getAllVideoStatistics(videoIDs)

videoStatistics

In [None]:
len(videoStatistics)