In [1]:
import requests
import json
import copy
import time
import socket
import requests.packages.urllib3.util.connection as urllib3_cn

### Helper functions

In [2]:
# json helper functions

def replace_search(initial_json, query):
    """
    Recursively search through json for the string "__REPLACE__" in the values and replace it with the query string.
    """
    new_json = copy.deepcopy(initial_json)
    for key, value in new_json.items():
        if isinstance(value, dict):
            new_json[key] = replace_search(value, query)
        elif isinstance(value, list):
            new_json[key] = [replace_search(d, query) for d in value]
        elif isinstance(value, str):
            new_json[key] = value.replace("__REPLACE__", query)
    return new_json


def recursive_find(response, search_key):
    """
    recursively search through json looking for the key string parameter and yield values
    """
    if isinstance(response, str):
        return
    for key, value in response.items():
        if key == search_key:
            yield value
        elif isinstance(value, dict):
            for result in recursive_find(value, search_key):
                yield result
        elif isinstance(value, list):
            for d in value:
                for result in recursive_find(d, search_key):
                    yield result


def find_values(response, search_key):
    """
    Find all values in a json response that match the search key (just a wrapper for recursive find)
    """
    return [*recursive_find(response, search_key)]

In [3]:
# force use ipv6 to help with youtube blocking too many requests from ipv4
def allowed_gai_family():
    """
     https://github.com/shazow/urllib3/blob/master/urllib3/util/connection.py
    """
    family = socket.AF_INET
    if urllib3_cn.HAS_IPV6:
        family = socket.AF_INET6 # force ipv6 only if it is available
    return family

urllib3_cn.allowed_gai_family = allowed_gai_family


### URL collection

In [4]:
# random list of search terms to see video collection ability
search_terms = [
    'cats',
    'dogs',
    'nature',
    'cars',
    'food',
    'sports',
    'travel',
    'fashion',
    'art',
    'music',
    'architecture',
    'technology',
    'business',
    'health',
    'fitness',
    'film',
    'books',
    'education',
    'science',
    'history',
    'religion',
    'politics',
    'humor',
    'memes',
    'gaming',
    'anime',
    'cartoons',
    'comics',
    'design',
    'crafts',
    'beauty',
    'diy',
    'gardening',
    'dance',
    'theater',
]
len(search_terms)

35

In [5]:
search_api = 'https://www.youtube.com/youtubei/v1/search?key=AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8&prettyPrint=false'
with open('initial_search.json') as f:
    initial_search_template = json.load(f)
with open('continue_search.json') as f:
    continue_search_template = json.load(f)

In [6]:
urls = []
results_per_search = []
for search_term in search_terms:
    initial_search = replace_search(initial_search_template, search_term)
    continue_search = replace_search(continue_search_template, search_term)
    response = requests.post(search_api, json = initial_search)
    num_results = 0
    while True:
        results = list(set(find_values(response.json(), 'videoId')))
        urls.extend(results)
        num_results += len(results)
        print(search_term, len(urls), end = '\t\r')
        # time.sleep(.5)

        # continue to next request
        continuation = find_values(response.json(), 'continuationCommand')
        assert len(continuation) <= 1 # either 1 confirmation or 0
        if len(continuation) == 0:
            break
        continue_search['continuation'] = continuation[0]['token']
        response = requests.post(search_api, json = continue_search)
        assert response.status_code == 200, response.text
    results_per_search.append(num_results)

cars 224140

In [None]:
len(set(urls))

1785

In [None]:
# average number of results per search
sum(results_per_search) / len(results_per_search)

678.0

In [None]:
## random browsing code


# request_json = browse_req
# response = requests.post(
#     url = 'https://www.youtube.com/youtubei/v1/browse?key=AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8&prettyPrint=false',
#     json = request_json
# )
# print(response, end = '\r')
# time.sleep(1)

# browsing_data = response.json()['onResponseReceivedActions'][0]['appendContinuationItemsAction']['continuationItems']

# for index, item in enumerate(browsing_data):
#     if 'richItemRenderer' in item.keys():
#         vid_id = item['richItemRenderer']['content']['videoRenderer']['videoId']
#         output_file.write(vid_id + '\n')
#         output_file.flush()
#     if 'continuationItemRenderer' in item.keys():
#         new_token = item['continuationItemRenderer']['continuationEndpoint']['continuationCommand']['token']
#         request_json['continuation'] = new_token