# Script to automate collection of YouTube data for DiSC lessons

This script uses the YouTube API to download viewing stats (an alternative to manual downloading using the YouTube Studio Analytics), then uses the GitHub API to update CSV files archiving the data.

In [1]:
# import, configuration, functions, etc.

# Notes: you will have to use PIP to install the GitHub and Google SDKs before these import statements will work
# The YouTube API is part of the family of Google APIs and uses Google's generic SDK

import os
import json
import requests
from time import sleep
import csv
import io
import datetime
from pathlib import Path
from github import Github # GitHub SDK

# Google API SDKs:
import google.oauth2.credentials
import google_auth_oauthlib.flow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google_auth_oauthlib.flow import InstalledAppFlow

accept_media_type = 'application/json' # Not sure if I actually use this anywhere

# Set configuration details necessary for interacting with the GitHub API

# the access token should be generated for read/write access to public repos
# see https://developer.github.com/v3/auth/#working-with-two-factor-authentication
# see https://github.com/settings/tokens/new
# select public_repo

# reference on PyGithub: https://pygithub.readthedocs.io/en/latest/github_objects/Repository.html
# reference on GitHub API: https://developer.github.com/v3/guides/getting-started/

github_username = ''  # set to empty string if using a token (for 2FA)
organization_name = 'heardlibrary'
organization_is_user = False
repo_name = 'dashboard'
cred_directory = 'home' # set to 'home' if the credential is in the home directory, otherwise working directory
path_to_directory = 'disc/youtube/'

# Set configuration details necessary for interacting with the YouTube Analytics API
scopes = ['https://www.googleapis.com/auth/yt-analytics.readonly']
# Disable OAuthlib's HTTPs verification when running locally.
# *DO NOT* leave this option enabled when running in production.
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
api_service_name = 'youtubeAnalytics'
api_version = 'v2'
# You will need to modify this line according to how you named your secrets file and where you put it on your drive.
# Do NOT store the file within the path of a GitHub repo since you don't want to expose it publicly!!!
client_secrets_filename = 'client_secret_youtube_analytics_download.json'
if cred_directory == 'home':
    home = str(Path.home()) #gets path to home directory; supposed to work for Win and Mac
    client_secrets_file = home + '/' + client_secrets_filename
else:
    cred_directory = 'working'
    client_secrets_file = client_secrets_filename

# -----------------
# utility functions
# -----------------

# NOTE: change the user_agent_header string to something appropriate for your project
# Sending a user-agent header is necessary for public Wikidata endpoints, but probably not needed here
# since particular API credentials are needed for the GitHub and YouTube APIs
def generate_header_dictionary(accept_media_type):
    user_agent_header = 'VanderDataBot/0.1 (https://github.com/HeardLibrary/linked-data/tree/master/publications/data; mailto:steve.baskauf@vanderbilt.edu)'
    request_header_dictionary = {
        'Accept' : accept_media_type,
        'User-Agent': user_agent_header
    }
    return request_header_dictionary

def generate_utc_date():
    whole_time_string_z = datetime.datetime.utcnow().isoformat() # form: 2019-12-05T15:35:04.959311
    date_z = whole_time_string_z.split('T')[0] # form 2019-12-05
    return date_z

# RAW FILE FUNCTIONS

# read raw string from a file in GitHub
def read_string_from_github_file(organization_name, repo_name, path_to_directory, filename):
    path = path_to_directory + filename
    r = requests.get('https://raw.githubusercontent.com/' + organization_name + '/' + repo_name + '/master/' + path)
    return r.text

# LIST OF DICTIONARIES FUNCTIONS

# read from a CSV file on disk into a list of dictionaries (representing a table)
def read_dicts_from_csv(filename):
    with open(filename, 'r', newline='', encoding='utf-8') as file_object:
        dict_object = csv.DictReader(file_object)
        table = []
        for row in dict_object:
            table.append(row)
    return table

# write a list of dictionaries to a CSV file on disk
def write_dicts_to_csv(table, filename, fieldnames):
    with open(filename, 'w', newline='', encoding='utf-8') as csv_file_object:
        writer = csv.DictWriter(csv_file_object, fieldnames=fieldnames)
        writer.writeheader()
        for row in table:
            writer.writerow(row)

# read from a CSV file in GitHub into a list of dictionaries (representing a table)
def read_dicts_from_github_csv(organization_name, repo_name, path_to_directory, filename):
    path = path_to_directory + filename
    r = requests.get('https://raw.githubusercontent.com/' + organization_name + '/' + repo_name + '/master/' + path)
    file_text = r.text.split('\n')
    file_rows = csv.DictReader(file_text)
    table = []
    for row in file_rows:
        table.append(row)
    return table

# write a list of dictionaries to a CSV file using filestream
def write_dicts_to_string(table, fieldnames):
    output = io.StringIO()
    writer = csv.DictWriter(output, fieldnames=fieldnames)
    writer.writeheader()
    for row in table:
        writer.writerow(row)
    return output.getvalue()

# LIST OF LISTS FUNCTIONS

# read from a CSV file in GitHub into a list of lists (representing a table)
def read_lists_from_github_csv(organization_name, repo_name, path_to_directory, filename):
    path = path_to_directory + filename
    r = requests.get('https://raw.githubusercontent.com/' + organization_name + '/' + repo_name + '/master/' + path)
    file_text = r.text.split('\n')
    # remove any trailing newlines
    if file_text[len(file_text)-1] == '':
        file_text = file_text[0:len(file_text)-1]
    file_rows = csv.reader(file_text)
    table = []
    for row in file_rows:
        table.append(row)
    return table

# write a list of lists to a CSV file on disk
def write_lists_to_csv(file_name, array):
    with open(file_name, 'w', newline='', encoding='utf-8') as file_object:
        writer_object = csv.writer(file_object)
        for row in array:
            writer_object.writerow(row)

# write a list of lists to a CSV file using filestream
def write_lists_to_string(table):
    output = io.StringIO()
    writer = csv.writer(output)
    for row in table:
        writer.writerow(row)
    return output.getvalue()

# -----------------
# functions for interacting with GitHub
# -----------------

# value of directory should be either 'home' or 'working'
def load_credential(filename, directory):
    cred = ''
    # to change the script to look for the credential in the working directory, change the value of home to empty string
    if directory == 'home':
        home = str(Path.home()) #gets path to home directory; supposed to work for Win and Mac
        credential_path = home + '/' + filename
    else:
        directory = 'working'
        credential_path = filename
    try:
        with open(credential_path, 'rt', encoding='utf-8') as file_object:
            cred = file_object.read()
    except:
        print(filename + ' file not found - is it in your ' + directory + ' directory?')
        exit()
    return(cred)

# pass in an empty string for organization_name to use an individual account
# pass in an empty string for github_username to use a token instead of username login
def login_get_repo(repo_name, github_username, organization_name, organization_is_user, cred_directory):
    if github_username == '':
        token = load_credential('linked-data_github_token.txt', cred_directory)
        g = Github(login_or_token = token)
    else:
        pwd = load_credential('pwd.txt', cred_directory)
        g = Github(github_username, pwd)
    
    if organization_is_user:
        # this option accesses a user's repo instead of an organizational one
        # In this case, the value of organization_name is not used.
        user = g.get_user()
        repo = user.get_repo(repo_name)
    else:
        # this option creates an instance of a repo in an organization
        # to which the token creator has push access
        organization = g.get_organization(organization_name)
        repo = organization.get_repo(repo_name)
    return(repo)

def get_user_list(repo):
    person_list = []
    people = repo.get_collaborators()
    for person in people:
        person_list.append(person.login)
    return person_list

def get_file_sha(account, repo, file_path):
    # get the data about the file to get its blob SHA

    r = requests.get('https://api.github.com/repos/' + account + '/' + repo + '/contents/' + file_path)
    file_data = r.json()
    try:
        sha = file_data['sha']
    except:
        # if the file doesn't already exist on GitHub, no sha will be returned
        sha = ''
    return sha

# use this function to update an existing text file
def update_file(repo, account, repo_name, path_to_directory, filename, content):
    path = path_to_directory + filename
    commit_message = 'Update ' + filename + ' file via API'
    sha = get_file_sha(account, repo_name, path)
    if sha == '':
        response = repo.create_file(path, commit_message, content)
    else:
        response = repo.update_file(path, commit_message, content, sha)
    return response

# -----------------
# functions for interacting with the YouTube Analytics API (not the Data API)
# -----------------

# See sample code at https://developers.google.com/youtube/analytics/reference/reports/query#python
# That's the source of these functions (with some minor modifications)

def get_service():
    flow = InstalledAppFlow.from_client_secrets_file(client_secrets_file, scopes)
    credentials = flow.run_console()
    return build(api_service_name, api_version, credentials = credentials)

def execute_api_request(client_library_function, **kwargs):
    response = client_library_function(
        **kwargs
        ).execute()
    return response
    
# -----------------
# High level functions
# -----------------

# Loads video data and builds the filter string from video IDs.
# Note: the video filter can send up to 500 IDs. 
# As of 2021-02-02, I have 268 videos I'm tracking, so at some point, 
# this may have to be broken into two or more API calls.
# Load data about videos to monitor.
def built_video_filter_string(video_metadata_filename):
    metadata = read_dicts_from_github_csv(organization_name, repo_name, path_to_directory, video_metadata_filename)
    filter_string = 'video=='
    output_header_list = ['date']
    count = 0
    for video in metadata:
    #for video in metadata[0:5]: # switch to this line for testing
        count += 1
        if count > 500:
            print('Warning: limit of 500 videos exceeded! Modify the script.')
            break
        filter_string += video['id'].strip() + ','
        output_header_list.append(video['id'].strip())
    # remove final trailing comma
    filter_string = filter_string[:len(filter_string)-1]
    return filter_string, output_header_list

def get_youtube_usage_stats(todays_date_utc, filter_string):
    print('sending request to YouTube Analytics API')
    result = execute_api_request(
        youtubeAnalytics.reports().query,
        ids='channel==MINE',
        startDate='2013-01-01', # don't have any videos dated earlier than that
        endDate=todays_date_utc,
        metrics='estimatedMinutesWatched,views',
        filters=filter_string,
        dimensions='video'
        )
    #print(json.dumps(result, indent=2))
    print('done retrieving data from YouTube API')
    return result

# This retrieves data for counts and minutes, then appends the results as a new row in the table.
# The revised tables are then pushed to GitHub
# The first column must be the date.
def add_data_to_tables():
    todays_date_utc = generate_utc_date()
    filter_string, output_header_list = built_video_filter_string('video-metadata.csv')
    
    minutes_table = read_lists_from_github_csv(organization_name, repo_name, path_to_directory, 'total_minutes_watched.csv')
    views_table = read_lists_from_github_csv(organization_name, repo_name, path_to_directory, 'total_views.csv')

    # Check to make sure that there are the same number of videos in the metadata list and the tables
    # If not, nothing happens
    if len(minutes_table[0]) != len(output_header_list):
        print('minutes table:', len(minutes_table[0]), ' header list:', len(output_header_list))
        print('Warning! Minutes table does not have the same number of videos as the videos metadata table!')
        return
    
    if len(views_table[0]) != len(output_header_list):
        print('views table:', len(views_table[0]), ' header list:', len(output_header_list))
        print('Warning! Views table does not have the same number of videos as the videos metadata table!')
        return
    
    tries = 0
    success = False

    # try to acquire the data for an hour
    while (success == False) and (tries < 12):
        try:
            results = get_youtube_usage_stats(todays_date_utc, filter_string)
            api_data = results['rows']
            #print(api_data)

            #dictionary = get_unit_counts(query)
            success = True
            
            # We start the new row with the date (first column)
            minutes_row = [todays_date_utc]
            views_row = [todays_date_utc]

            # The video IDs from the API data are compared with the column headers from the minutes table.
            # We are assuming that all videos in the video metadata table are found in the column headers.
            for header in output_header_list[1:]: # skip the first item (date)
                found = False
                
                # Step through the API records and match with the header
                for video in api_data:
                    if video[0] == header:
                        found = True
                        minutes_row.append(str(video[1]))
                        views_row.append(str(video[2]))
                        break
                # In the case where the videos metadata table has a video not in the API results, it's added as a blank cell
                if not found:
                    minutes_row.append('0')
                    views_row.append('0')
        except:
            tries += 1
            sleep(300) # wait 5 minutes and try again

    if success:
        # log into the GitHub API and create a repo instance
        repo = login_get_repo(repo_name, github_username, organization_name, organization_is_user, cred_directory)
        
        minutes_table.append(minutes_row)
        rawCsvText = write_lists_to_string(minutes_table)
        response = update_file(repo, organization_name, repo_name, path_to_directory, 'total_minutes_watched.csv', rawCsvText)
        print('minutes response: ')
        print(response)

        views_table.append(views_row)
        rawCsvText = write_lists_to_string(views_table)
        response = update_file(repo, organization_name, repo_name, path_to_directory, 'total_views.csv', rawCsvText)
        print('views response: ')
        print(response)

        # Update the date last run
        response = update_file(repo, organization_name, repo_name, path_to_directory, 'last_run.txt', generate_utc_date() )
        print('done')

        #write_lists_to_csv('total_minutes_watched_test.csv', minutes_table)
        #write_lists_to_csv('total_views_test.csv', views_table)
    return


The following line performs the YouTube Analytics API authentication. Run it once at the start of the session. Not sure how long the session lasts but it could be something like 30 days?

After running this cell, be sure to clear the cell output before pushing the notebook file to GitHub.

In [2]:
print('Authentication done:', datetime.datetime.utcnow().isoformat())
youtubeAnalytics = get_service()

Authentication done: 2021-02-04T03:09:02.689449
Please visit this URL to authorize this application: https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=1058294683545-gadov8jftptq5d02rdevgkmv0dtu3ro9.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fyt-analytics.readonly&state=2wuYJr8gUtOt5juE5fmm132JgzXZ3e&prompt=consent&access_type=offline
Enter the authorization code: 4/1AY0e-g4ISpORjEood8FtESIFiY1nS6j6iODBKdpR9TU23wfLdcvX2Ay56is


# Main script body

In [None]:
while True: # infinite loop
    try:
        print('Time checked:', datetime.datetime.utcnow().isoformat())

        date_last_run = read_string_from_github_file(organization_name, repo_name, path_to_directory, 'last_run.txt')
        print('Date last run:', date_last_run)

        date_now_utc = generate_utc_date()
        print('UTC date now is:', date_now_utc)

        if date_now_utc > date_last_run:
            add_data_to_tables()
        print()
        # wait an hour before checking again
        sleep(3600)
    except Exception as ex:
        print('Error occurred, trying again in 10 minutes')
        print(type(ex).__name__, ex.args)
        sleep(600)
        continue

Time checked: 2021-02-04T03:09:30.150553
Date last run: 2021-02-04
UTC date now is: 2021-02-04

Time checked: 2021-02-04T04:09:31.263987
Date last run: 2021-02-04
UTC date now is: 2021-02-04

Time checked: 2021-02-04T07:05:06.406902
Date last run: 2021-02-04
UTC date now is: 2021-02-04

Time checked: 2021-02-04T08:05:06.796864
Date last run: 2021-02-04
UTC date now is: 2021-02-04

Time checked: 2021-02-04T09:05:07.158733
Date last run: 2021-02-04
UTC date now is: 2021-02-04

Time checked: 2021-02-04T10:05:07.514928
Date last run: 2021-02-04
UTC date now is: 2021-02-04

Time checked: 2021-02-04T11:05:07.958905
Date last run: 2021-02-04
UTC date now is: 2021-02-04

Time checked: 2021-02-04T12:05:08.306589
Date last run: 2021-02-04
UTC date now is: 2021-02-04

Time checked: 2021-02-04T13:05:09.200449
Date last run: 2021-02-04
UTC date now is: 2021-02-04

Time checked: 2021-02-04T14:05:09.725736
Date last run: 2021-02-04
UTC date now is: 2021-02-04

Time checked: 2021-02-04T15:05:10.247493

Error occurred, trying again in 10 minutes
ConnectionError (MaxRetryError("HTTPSConnectionPool(host='raw.githubusercontent.com', port=443): Max retries exceeded with url: /heardlibrary/dashboard/master/disc/youtube/last_run.txt (Caused by NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x7fae20a17cf8>: Failed to establish a new connection: [Errno 8] nodename nor servname provided, or not known'))"),)
Time checked: 2021-02-07T13:06:23.588015
Date last run: 2021-02-07
UTC date now is: 2021-02-07

Time checked: 2021-02-07T14:06:23.968257
Date last run: 2021-02-07
UTC date now is: 2021-02-07

Time checked: 2021-02-07T15:06:24.480826
Date last run: 2021-02-07
UTC date now is: 2021-02-07

Time checked: 2021-02-07T16:06:25.055233
Date last run: 2021-02-07
UTC date now is: 2021-02-07

Time checked: 2021-02-07T17:06:25.530549
Date last run: 2021-02-07
UTC date now is: 2021-02-07

Time checked: 2021-02-07T18:06:25.975442
Date last run: 2021-02-07
UTC date now is: 2021-0