In [163]:
import requests
import time
import numpy as np
import pandas as pd
import copy

In [196]:
def post_request_ytb(country_code_list, trending_type_dict):
    responses_dict = {}
    for country_code in country_code_list:
        responses_dict[country_code] = {}
        for trending_type, trending_type_url in trending_type_dict.items():
            json_data = {
                'context': {
                    'client': {
                        'gl': country_code,
                        'clientName': 'WEB',
                        'clientVersion': '2.20231115.01.01',
                        'originalUrl': 'https://www.youtube.com/feed/trending',
                    },
                },
                'browseId': 'FEtrending',
                'params': trending_type_url,
            }

            response = requests.post(
                'https://www.youtube.com/youtubei/v1/browse',
                json=json_data,
            )
            responses_dict[country_code][trending_type] = response.json()
    return responses_dict        

In [197]:
def collect_video_data(video_list, data_dictionary, trending_type, country_code, current_timestamp):
    # Collect data (generic videos) INPLACE
    nb_items = len(video_list)
    data_dictionary["videoId"] += [video_list[k]["videoRenderer"]["videoId"] for k in range(nb_items)]
    data_dictionary["videoType"] += [trending_type for k in range(nb_items)]
    data_dictionary["trendingCountry"] += [country_code for k in range(nb_items)]
    data_dictionary["videoTrendsRanking"] += [k for k in range(nb_items)]
    data_dictionary["scanTimestamp"] += [current_timestamp for k in range(nb_items)]
    return None

In [198]:
def collect_short_data(shorts_list, data_dictionary, country_code, current_timestamp):
    # Collect data (generic videos) INPLACE
    nb_items = len(shorts_list)
    data_dictionary["videoId"] += [shorts_list[k]["reelItemRenderer"]["videoId"] for k in range(nb_items)]
    data_dictionary["videoType"] += ['Short' for k in range(nb_items)]
    data_dictionary["trendingCountry"] += [country_code for k in range(nb_items)]
    data_dictionary["videoTrendsRanking"] += [k for k in range(nb_items)]
    data_dictionary["scanTimestamp"] += [current_timestamp for k in range(nb_items)]
    return None

In [199]:
def add_now_videos_shorts(response_dict, data_dictionary, current_timestamp):
    # Update data_dictionary with "Now", "Recently Trending" and "Shorts" videos data INPLACE
    for country_code in response_dict.keys():
        print("Current country : ", country_code)
        section_now = response_dict[country_code]['Now']["contents"]["twoColumnBrowseResultsRenderer"]["tabs"][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"]
        try:
            list_videos_before = section_now[0]["itemSectionRenderer"]["contents"][0]["shelfRenderer"]["content"]["expandedShelfContentsRenderer"]["items"]
            shorts_list = section_now[1]["itemSectionRenderer"]["contents"][0]["reelShelfRenderer"]["items"]
            list_videos_after = section_now[2]["itemSectionRenderer"]["contents"][0]["shelfRenderer"]["content"]["expandedShelfContentsRenderer"]["items"]
            list_videos_recently = section_now[3]["itemSectionRenderer"]["contents"][0]["shelfRenderer"]["content"]["expandedShelfContentsRenderer"]["items"]
        except KeyError:
            list_videos_before = section_now[1]["itemSectionRenderer"]["contents"][0]["shelfRenderer"]["content"]["expandedShelfContentsRenderer"]["items"]
            shorts_list = section_now[2]["itemSectionRenderer"]["contents"][0]["reelShelfRenderer"]["items"]
            list_videos_after = section_now[3]["itemSectionRenderer"]["contents"][0]["shelfRenderer"]["content"]["expandedShelfContentsRenderer"]["items"]
            list_videos_recently = section_now[4]["itemSectionRenderer"]["contents"][0]["shelfRenderer"]["content"]["expandedShelfContentsRenderer"]["items"]
        collect_video_data(list_videos_before, data_dictionary, 'Now', country_code, current_timestamp)
        collect_video_data(list_videos_after, data_dictionary, 'Now', country_code, current_timestamp)
        collect_video_data(list_videos_recently, data_dictionary, 'Recently Trending', country_code, current_timestamp)
        collect_short_data(shorts_list, data_dictionary, country_code, current_timestamp)
    return None

In [200]:
def add_other_sections(response_dict, data_dictionary, current_timestamp):
    # Update data_dictionary with "Music", "Gaming", "Movies" videos data INPLACE
    for country_code in response_dict.keys():
        for idx, key in enumerate(response_dict[country_code].keys()): # in recent Python versions, dictionary are ordered
            if idx != 0:
                sections = response_dict[country_code][key]["contents"]["twoColumnBrowseResultsRenderer"]["tabs"][idx]["tabRenderer"]["content"]["sectionListRenderer"]["contents"]
                video_list = sections[0]["itemSectionRenderer"]["contents"][0]["shelfRenderer"]["content"]["expandedShelfContentsRenderer"]["items"]
                collect_video_data(video_list, data_dictionary, key, country_code, current_timestamp)
    return None

In [201]:
def update_video_data(data_dictionary):
    # Update data for individual videos in data_dictionary INPLACE
    for video_id in data_dictionary["videoId"]:
        print("Currently processing video_id:", video_id)
        json_data = {
            'context': {
                'client': {
                    'clientName': 'WEB',
                    'clientVersion': '2.20231208.01.00',            
                    }
            },
            'videoId': video_id,
        }
        
        response = requests.post('https://www.youtube.com/youtubei/v1/next', json=json_data)
        video_response = response.json()
        individual_video_data = video_response["contents"]["twoColumnWatchNextResults"]["results"]["results"]["contents"]
        data_dictionary["exactViewNumber"].append(individual_video_data[0]["videoPrimaryInfoRenderer"]["viewCount"]["videoViewCountRenderer"]["viewCount"]["simpleText"])
        data_dictionary["numberLikes"].append(individual_video_data[0]["videoPrimaryInfoRenderer"]["videoActions"]["menuRenderer"]["topLevelButtons"][0]["segmentedLikeDislikeButtonViewModel"]["likeButtonViewModel"]["likeButtonViewModel"]["toggleButtonViewModel"]["toggleButtonViewModel"]["defaultButtonViewModel"]["buttonViewModel"]["title"])
        data_dictionary["creatorSubscriberNumber"].append(individual_video_data[1]["videoSecondaryInfoRenderer"]["owner"]["videoOwnerRenderer"]["subscriberCountText"]["simpleText"])     
        try:    
            data_dictionary["numberOfComments"].append(individual_video_data[2]["itemSectionRenderer"]["contents"][0]["commentsEntryPointHeaderRenderer"]["commentCount"]["simpleText"])
        except KeyError: # correct for additional "merchandise" section
            try:
                data_dictionary["numberOfComments"].append(individual_video_data[3]["itemSectionRenderer"]["contents"][0]["commentsEntryPointHeaderRenderer"]["commentCount"]["simpleText"])
            except IndexError: # comments are turned off
                data_dictionary["numberOfComments"].append(np.nan)
        try:
            data_dictionary["isCreatorVerified"].append(individual_video_data[1]["videoSecondaryInfoRenderer"]["owner"]["videoOwnerRenderer"]["badges"][0]["metadataBadgeRenderer"]["tooltip"] == "Verified")
        except KeyError:
            data_dictionary["isCreatorVerified"].append(False)    
        time.sleep(1)
    return None

In [202]:
def update_meta_data(data_dictionary):
    # Update metadata for individual videos in data_dictionary INPLACE
    for video_id in data_dictionary["videoId"]:
        print("Currently processing video_id:", video_id)
        json_data = {
        'context': {
            'client': {
                'clientName': 'WEB',
                'clientVersion': '2.20231208.01.00',
            },
        },
        'videoId': video_id,
        }
        
        response = requests.post('https://www.youtube.com/youtubei/v1/player', json=json_data)
        
        meta_response = response.json()
        try:
            data_dictionary["videoKeywords"].append(meta_response["videoDetails"]["keywords"])
        except KeyError: # no keywords
            data_dictionary["videoKeywords"].append(np.nan)
        data_dictionary["videoLengthSeconds"].append(meta_response["videoDetails"]["lengthSeconds"])
        data_dictionary["videoCategory"].append(meta_response["microformat"]["playerMicroformatRenderer"]["category"])
        data_dictionary["videoExactPublishDate"].append(meta_response["microformat"]["playerMicroformatRenderer"]["uploadDate"])
        time.sleep(1)
    return None

In [203]:
TRENDING_TYPE_DICT = {
    'Now': None,
    # 'Music': '4gINGgt5dG1hX2NoYXJ0cw%3D%3D',
    # 'Gaming': '4gIcGhpnYW1pbmdfY29ycHVzX21vc3RfcG9wdWxhcg%3D%3D',
    # 'Movie': '4gIKGgh0cmFpbGVycw%3D%3D',
}

FRENCH_COUNTRY_DICT = ['FR']

In [204]:
def scan_trending_ytb(n_scans=5, delta_scans=1800):
    """

    Args:
        n_scans (_type_): nombre de fois ou les tendances sont scannees
        delta_scans (_type_): delai entre deux scans des tendances (en secondes)
        
    """
    data_dictionary = {
    'videoId': [],
    'scanTimestamp': [],
    'videoExactPublishDate': [],
    'creatorSubscriberNumber': [],
    'videoTrendsRanking': [],
    'videoLengthSeconds': [],
    'videoType': [],
    'videoCategory': [],
    'trendingCountry': [],
    'exactViewNumber': [],
    'numberLikes': [],
    'numberOfComments': [],
    'isCreatorVerified': [],
    'videoKeywords': []
}
    data_dictionary_panel = [copy.deepcopy(data_dictionary) for k in range(n_scans)]
    for scan in range(n_scans):
        current_timestamp = time.time()
        response_dict = post_request_ytb(FRENCH_COUNTRY_DICT, TRENDING_TYPE_DICT)
        add_now_videos_shorts(response_dict, data_dictionary_panel[scan], current_timestamp)
        add_other_sections(response_dict, data_dictionary_panel[scan], current_timestamp)
        update_video_data(data_dictionary_panel[scan])
        update_meta_data(data_dictionary_panel[scan])
        for k in data_dictionary_panel[scan].keys():
            print(len(data_dictionary_panel[scan][k]))
        time.sleep(delta_scans)
    return pd.concat([pd.DataFrame(lst) for lst in data_dictionary_panel])

In [None]:
panel_data_df = scan_trending_ytb()