This notebook documents the construction of an application that performs three actions:

1. Create a database containing snap shots of descriptor statistics for the all the videos on a number of channels on Youtube.

2. Generate reports of pertinent information summarising some subselection of the videos retrieved.

3. Perform time series analysis of of historical snap-shots.


# Part 1


Let's build the part of the application that talks to Youtube. There is a quickstart guide at:
https://developers.google.com/youtube/v3/quickstart/python
but we will mostly follow our own route using 
https://github.com/youtube/api-samples/blob/master/python/search.py

We will also refer to 

https://developers.google.com/youtube/v3/docs/
https://www.forgov.qld.gov.au/file/21896/download?token=mo1SpiZT 

We want to get a list of statistics for all videos for all channels in a given list. We'll need to make calls to at least 3 different resources:

> 
**Channels**
-  Needs: channel_name
-  Returns: channel_id
> 
**Videos**
-  Needs: video_id
-  Returns: Statistics on video
> 
**PlaylistItems**
-  Needs: playlist_id (derivable from channel_id)
-  Returns: All videos_ids on that playlist
    
It's probably a good idea to first make an index of all the channel_ids of the channels that we want to survey. Usefully, each of these channel ids are only one character away from the id of the corresponding default playlist, which contains every video uploaded by that channel. 

There is a stackoverflow question which has a lot of useful information as to how we should structure our calls to the API: https://stackoverflow.com/questions/18953499/youtube-api-to-fetch-all-videos-on-a-channel/20795628

From this, and a little consideration,  note a few important facts:
1. Each call to an API resource returns at most 50 results, i.e., if a channel has uploaded 6000 videos (e.g., Khan Academy), to return every video_id we must make at least 120 calls to the API.
2. You can iterate through this list of 6000 videos, 50 at a time, using the optional next_page token. 
3. You can actually only get a maximum of 500 videos using the next_page token.
4. To get around the 500 max limitation, you can try to restrict your query by date of upload. This might take some trial and error if there were any periods of really high upload density.
5. We'll need to do a big initial survey to get the video ids to date. After that the process will shift to maintenance of an existing database and should require much lower volumes of queries. However, If we want up-to-date information on videos, that will require a fair bit of resampling. API call quota may become an issue.


There's only a few things that we really want:

1. A list of all the videos on a given playlist (-time indexed? only by upload date?)
2. Engagement statistics for each video. (-time indexed)


This suggests the following tasks:

1. Construct index of channel name and channel id. 
    - [] Manually construct list of names from Epsilon Stream channel list.
    - [] Use channels API call to get channel id (or playlist ID) for each.
    - [] Store list in file.
    - [] Derive upload playlist ids from channel ids. 

2. Get list of all video ids on each channel -> Create video-list-updater
    - [] routine to find total number of videos in playlist
    - [] routine to create search request for playlist with bounded upload dates to restrict number of resuslts <500
    - [] routine to take <500 result search and iterate through pagination
    - [] routine to read search response and obtain all desired details
    - [] routine to handle aggregation and quality control of requests (e.g. check for doubles/missing, repeat failed requests)
    - [] routine to handle I/O of results

3. Having gotten all video_ids for each channel, for each video:
    - [] Asynchronously query video resource 50 ids at a time 
    - [] routine to extract "status" (all details desired) on every video
    - [] routine to add each 
    - [] program to govern all these actions, reuse retry function and quality control from earlier

To consider:
What kinds of operations will I frequently be doing with this database?
Should the database be snapshot based, or delta based, or a mix of the two?
    


### Calls to the Youtube API

The following API call retrieves the video_ids of the first 50 videos, according to some order, of all videos posted by Khan Academy's channel on youtube. We take advantage of the fact that every channel has a default playlist resource, which contains a list of every video published on that channel.


In [None]:
from googleapiclient.discovery import build  
#We are currently building an authorisationless app, so we don't need oauth2client
import os

#put the textfile containing the apikey into the parent folder of your local git repo
apikey_path = os.path.join(os.path.join(os.getcwd(),os.pardir),"apikey.txt")

with open(apikey_path,"rb") as f:
    apikey = f.readline()

DEVELOPER_KEY = apikey
YOUTUBE_API_SERVICE_NAME = "youtube"
YOUTUBE_API_VERSION = "v3"

In [None]:
playlist_id = "UU4a-Gbdw7vOaccHmFo40b9g" #This is the default playlist id for Khan Academy

def get_youtube_playlist(playlist_id, max_results=50):
    
    youtube = build(serviceName = YOUTUBE_API_SERVICE_NAME,
                    version = YOUTUBE_API_VERSION,
                    developerKey = DEVELOPER_KEY)
    
    search_response = youtube.playlistItems().list(
        playlistId = playlist_id,
        part = 'snippet',
        maxResults= max_results)
    
    return search_response.execute()

playlist_result = get_youtube_playlist(playlist_id)

In [None]:
videos = []

for item in playlist_result["items"]:
    A=item["snippet"]["resourceId"]["videoId"]
    print A
    videos.append(A)

In [None]:
print videos

The following API call retrieves the "id" part of the channel resource that contains information about Khan Academy's channel. This is the channel id.

In [None]:
def get_channel_id(channel_name, max_results=50):
    
    youtube = build(serviceName = YOUTUBE_API_SERVICE_NAME,
                    version = YOUTUBE_API_VERSION,
                    developerKey = DEVELOPER_KEY)
    
    query_response = youtube.channels().list(
        forUsername = channel_name,
        part = 'id',
        maxResults= max_results)
    
    return query_response.execute()

result = get_channel_id("khanacademy")

In [None]:
print result["items"][0]["id"]

This is the channel id for Khan Academy

Now we'll take that list of 50 video ids and get a bunch of statistics for them using the Videos resource.

In [None]:
test_video_ids = ",".join(videos)
test_video_ids = test_video_ids+","+test_video_ids
print test_video_ids

In [None]:
def get_video_statistics(video_ids, max_results=50):
    """returns a youtube API Video resource, containing details for a list of videos"""
    
    youtube = build(serviceName = YOUTUBE_API_SERVICE_NAME,
                    version = YOUTUBE_API_VERSION,
                    developerKey = DEVELOPER_KEY)
    
    search_response = youtube.videos().list(
        id = video_ids,
        part = 'snippet,statistics',
        maxResults= max_results)
    
    return search_response.execute()

video_result = get_video_statistics(test_video_ids)

We've now got some statistics for the 50 videos captured above.

In [None]:

for item in video_result["items"]:
    A = float(item["statistics"]["viewCount"])
    B = float(item["statistics"]["likeCount"])
    C = float(B/A)
    print int(A), int(B), C


Next we should figure out how to use pagination of results. We'll choose the default playlist of 3Blue1Brown as our testing grounds. This channel currently has about 70 videos, so we should expect just 2 pages of results.

In [None]:
bb_channelId = "UCYO_jab_esuFRV4b17AJtAw"
bb_playlist_id = "UUYO_jab_esuFRV4b17AJtAw"

def get_next_page_token(response):
    """Given json Youtube API response, return next_page token"""
    try:
        return response["nextPageToken"] 
    except KeyError:
        return None
    
def num_results(response):
    "Get number of results to query"
    return int(response["pageInfo"]["totalResults"])


def add_playlist_video_details(response, dataset):
    """reads a playlist call response and extracts dict of desired information""" 
    for item in response["items"]:
        video_id = item["snippet"]["resourceId"]["videoId"]
        dataset[video_id] = {"likes":1, "dislikes":3, "views":20} #arbitrary
    return dataset

def add_search_video_details(response, dataset):
    """reads a search call response and extracts dict of desired information""" 
    for item in response["items"]:
        print item
        video_id = item["id"]["videoId"]
        dataset[video_id] = {"likes":1, "dislikes":3, "views":20} #arbitrary
    return dataset

def get_paginated_playlist(playlist_id, max_results=50, first_token=None):
    
    youtube = build(serviceName = YOUTUBE_API_SERVICE_NAME,
                    version = YOUTUBE_API_VERSION,
                    developerKey = DEVELOPER_KEY)
    
    token = None
    
    search_response = youtube.playlistItems().list(
        playlistId = playlist_id,
        part = 'snippet',
        maxResults= max_results,
        pageToken =None)
    
    result = search_response.execute()
    
    token =  get_next_page_token(result)
    
    search_response2 = youtube.playlistItems().list(
        playlistId = playlist_id,
        part = 'snippet',
        maxResults= max_results,
        pageToken = token
        )
    
    return search_response2.execute()

In [None]:
bb_results = get_youtube_playlist(bb_playlist_id)
bb_results_2 = get_paginated_playlist(bb_playlist_id)

In [None]:
def get_video_details_from_playlist(playlist_id):
    """Gets all videos on a youtube playlist by collecting paginated results"""
    
    youtube = build(serviceName = YOUTUBE_API_SERVICE_NAME,
                    version = YOUTUBE_API_VERSION,
                    developerKey = DEVELOPER_KEY)
    dataset = {}
    
    query = youtube.playlistItems().list(
                playlistId = playlist_id,
                part = 'snippet',
                pageToken=None)
    first_page = query.execute()
    
    token = get_next_page_token(first_page)
    dataset = add_playlist_video_details(first_page, dataset) 
    #maybe create a dataset class with some nice methods?
    
    while token:
        
        query = youtube.playlistItems().list(
            playlistId = playlist_id,
            part = 'snippet',
            pageToken = token
            )
        page_response = query.execute()
        
        token = get_next_page_token(page_response)
        dataset = add_playlist_video_details(page_response, dataset) 
    
    return dataset

bb_results_3 = get_video_details_from_playlist(bb_playlist_id)
    

In [None]:
bb_results_3

In [None]:
len(bb_results_3) #expected = 67

Now we'll try to survey a larger channel. The h3h3 productions main channel has 289 videos, we'll try to retrieve the id's of all 289 (this figure read manually from the channel page).

In [None]:
h3h3 = "UUDWIvJwLJsE4LG1Atne2blQ"
h3h3c = "UCDWIvJwLJsE4LG1Atne2blQ"
h3h3_results = get_video_details_from_playlist(h3h3) #288 of 289, missing one?

In [None]:
len(h3h3_results)

Queries using PlaylistItems have a hard cap of 500 items. We'll need to use the Search resource to get beyond this. 

In [None]:
def get_videos_from_channel_using_search(channel_id):
    """Gets all videos on a youtube channel by collecting paginated results"""
    
    youtube = build(serviceName = YOUTUBE_API_SERVICE_NAME,
                    version = YOUTUBE_API_VERSION,
                    developerKey = DEVELOPER_KEY)
    dataset = {}
    
    query = youtube.search().list(
                q="",
                part = 'snippet,id',
                pageToken=None,
                channelId = channel_id)
    first_page = query.execute()
    
    token = get_next_page_token(first_page)
    dataset = add_search_video_details(first_page, dataset) 
    #maybe create a dataset class with some nice methods?
    
    while token:
        
        query = youtube.search().list(
            q="",
            part = 'snippet,id',
            pageToken = token,
            channelId = channel_id
            )
        page_response = query.execute()
        
        token = get_next_page_token(page_response)
        dataset = add_search_video_details(page_response, dataset) 
    
    return dataset

h3h3_results = get_videos_from_channel_using_search(h3h3c)

# Asynchronous requests to the Youtube API


Eventually we will need to increase efficiency by performing our surveys asynchronously, below is an example using trollius, the (now deprecated) python 2.7 port of the python 3.4+ native asynchronous programming module, asyncio.

In [None]:
import trollius as asyncio
from trollius import From

@asyncio.coroutine
def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print("Task %s: Compute factorial(%d)..." % (name, i))
        yield From(asyncio.sleep(1))
        f *= i
    print("Task %s completed! factorial(%d) is %d" % (name, number, f))

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
tasks = [
    asyncio.async(factorial("A", 8)),
    asyncio.async(factorial("B", 3)),
    asyncio.async(factorial("C", 4))]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

I don't know how to use this in conjunction with the Youtube API, so I may have to either/both switch to python 3 or/and write my own POST requests to the API.

Fortunately, there's another asynchronous programming package for python -  gevent. Below we add to a short example from http://sdiehl.github.io/gevent-tutorial/ which demonstrates the speed up from non-blocking code!

In [None]:
import gevent
import random
import time

rands = [random.randint(0,2)*0.1 for i_ in range(10)]

tic = lambda t: (time.time() - t)*1000 #time since t in ms 

def task(pid, wait_t):
    """
    Some non-deterministic task
    """
    task_start = time.time()
    gevent.sleep(wait_t)
    print 'Task {0} done at {1:1.2f} ms, took {2:1.2f} ms'.format(pid, tic(start),tic(task_start))

def synchronous():
    for pid, wait_t in enumerate(rands):
        task(pid, wait_t)

def asynchronous():
    threads = [gevent.spawn(task, *(pid,wait_t)) for pid, wait_t in enumerate(rands)]
    gevent.joinall(threads)

start = time.time()  
print 'Synchronous:'
synchronous()
print "All done in {:2.1f} ms".format(tic(start))

start = time.time()
print 'Asynchronous:'
asynchronous()
print "All done in {:2.1f} ms".format(tic(start))

The second set of tasks are clearly happening at the same time, but there's some extra processing time sneaking in somewhere. Note that the individual working time of each task is the same across async and sync, but that async does them all (nearly) at once, as desired!

In [None]:
import gevent.monkey
gevent.monkey.patch_socket()
import time
import gevent
# import urllib2
import requests
# import simplejson as json

tic = lambda t: (time.time() -t)*1000 # time since t in ms

target_url = 'https://www.googleapis.com/youtube/v3/search?part=snippet&channelId=UCDWIvJwLJsE4LG1Atne2blQ&maxResults=50&type=video&key={0}'.format(DEVELOPER_KEY)
# target_url = 'https://jsonplaceholder.typicode.com/posts/1'

def fetch(pid):
    start = time.time()
    response = requests.get(target_url)
    json_result = response.json()
    print 'Process {}: {:.2f}'.format(pid, tic(start))
    return json_result
    return None

def synchronous():
    for i in range(1,5):
        fetch(i)

def asynchronous():
    threads = []
    for i in range(1,5):
        threads.append(gevent.spawn(fetch, i))
    return gevent.joinall(threads)

p_start = time.time()
print 'Synchronous:'
synchronous()
print "All done in {:.2f}".format(tic(p_start))

p_start = time.time()
print 'Asynchronous:'
responses = asynchronous()
print "All done in {:.2f}".format(tic(p_start))

These requests are sort-of being sent asynchronously (since they are returned out of order), but it seems that requests is blocking IO somewhere. Any difference in total completion time between async and sync processes is just due to the random response time of the server.

Fortunately, there is grequests for async requests!

In [None]:
import nbgrequests as grequests   #changed one tiny boolean
import requests
import time 
import os

#put the textfile containing the apikey into the parent folder of your local git repo
apikey_path = os.path.join(os.path.join(os.getcwd(),os.pardir),"apikey.txt")

with open(apikey_path,"rb") as f:
    apikey = f.readline()

def tic(t):
    # time since t in ms
    return (time.time() -t)*1000 

# n=2
# urls = [url_maker(20+i) for i in range(0,n)]

##Example 1: 7 http requests, repeating some targets. Works asynchronously
# urls = [
#     'http://www.heroku.com',
#     'http://python-tablib.org',
#     'http://httpbin.org',
#     'http://python-requests.org',
#     'http://python-requests.org',
#     'http://python-requests.org',
#     'http://python-requests.org',
#       ]

###Example 2: 4 https requests, repeating some targets. 
# urls = [
#     'https://www.heroku.com',
#     'https://httpbin.org',
#     'https://httpbin.org',
#     'https://httpbin.org',
#       ]

###Example 3: 3 Youtube v3 API search queries for the same channel, 3 different search terms. These queries all successfully return results, but do not run asynchronously
urls =['https://www.googleapis.com/youtube/v3/search?part=snippet&channelId=UCDWIvJwLJsE4LG1Atne2blQ&key={0}&q=1'.format(apikey)
        ,'https://www.googleapis.com/youtube/v3/search?part=snippet&channelId=UCDWIvJwLJsE4LG1Atne2blQ&key={0}&q=2'.format(apikey)
        ,'https://www.googleapis.com/youtube/v3/search?part=snippet&channelId=UCDWIvJwLJsE4LG1Atne2blQ&key={0}&q=3'.format(apikey)
        ]

###Example 4: 3 Youtube v3 API search queries for 3 different channels.
# urls = ['https://www.googleapis.com/youtube/v3/search?part=snippet&channelId=UCDWIvJwLJsE4LG1Atne2blQ&key={0}&q=1'.format(apikey)
#         ,'https://www.googleapis.com/youtube/v3/search?part=snippet&channelId=UC4a-Gbdw7vOaccHmFo40b9g&key={0}&q=1'.format(apikey)
#         ,'https://www.googleapis.com/youtube/v3/search?part=snippet&channelId=UCjwOWaOX-c-NeLnj_YGiNEg&key={0}&q=1'.format(apikey)
#         ]


def fetch(url):
    start = time.time()
    response = requests.get(url)
    result = response.status_code
    print 'Process {}: {:.2f}'.format(url, tic(start))
    return result

def async_fetch(unsent_requests):
    start = time.time()
    responses = grequests.map(unsent_requests)
    results = [response.status_code for response in responses]
    print 'Async Process: {:.2f}'.format( tic(start))
    return results

def synchronous(urls):
    responses = [fetch(i) for i in urls]
    return responses
        
def asynchronous(urls):
    unsent_requests = [grequests.get(url) for url in urls]
    responses = async_fetch(unsent_requests)
    return responses

    
p_start = time.time()
print 'Synchronous:'
sync_results = synchronous(urls)
print "All done in {:.2f}".format(tic(p_start))
print sync_results
    
p_start = time.time()
print 'Asynchronous:'
async_results = asynchronous(urls)
print "All done in {:.2f}".format(tic(p_start))   
print async_results

The async queries are returning in about the time it takes to do one sync query, as expected.

Finally, let's begin writing the workhorse code that will actually do the things we want with youtube!