- chomedriver: https://sites.google.com/a/chromium.org/chromedriver/ / https://chromedriver.chromium.org/
- chromium command line switches: https://peter.sh/experiments/chromium-command-line-switches/#net-log-capture-mode
    - important flags: "--log-net-log", "--net-log-capture-mode" / "--enable-logging --v=1"
    - How to capture a NetLog dump: https://www.chromium.org/for-testers/providing-network-details

- canvas tutorial: https://towardsdatascience.com/controlling-the-web-with-python-6fceb22c5f08

In [1]:
# basic webdriver imports
from selenium import webdriver
from selenium.webdriver.chrome.options import Options as ChromeOptions
# imports for waiting
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

import os

from bs4 import BeautifulSoup

import json
import requests
import re
from tqdm import tqdm

import time


# get credentials
CANVAS_USERNAME = os.getenv('CANVAS_USERNAME')
CANVAS_PASSWORD = os.getenv('CANVAS_PASSWORD')

# set paths
DATA_PATH = os.path.abspath('./../data')

LOG_PATH = os.path.join(DATA_PATH, 'tmp/net_log.json')
DRIVER_PATH = os.path.join(DATA_PATH, 'drivers/chromedriver')
VIDEO_PATH = os.path.join(DATA_PATH, 'videos')

In [2]:
def generate_driver():
    """
    return a fully configured chrome driver
    """
    
    # configure options
    chrome_options = ChromeOptions()
    chrome_options.add_argument('--log-net-log={}'.format(LOG_PATH))
    # note: the "--log-net-log" switch is of *vital* importance to this projct as it records network activity
    
    # start the driver
    driver = webdriver.Chrome(executable_path=DRIVER_PATH, options=chrome_options)
    
    return driver


def setup_and_login(default_2FA=True):
    """
    automatically walk through the process of starting Canvas and passing 2FA
    
    default_2FA (bool) : if true, automatically "call" the fist 2FA method presented
    """
    
    # step 0: start a configured driver
    driver = generate_driver()
    
    # step 0.5: open the base canvas url --> triggers a login scree
    driver.get('https://canvas.harvard.edu/')
    
    
    #-------------------------------------------------------
    #-------------------- step 1: login --------------------
    #-------------------------------------------------------
    
    # wait for the "username" element to indicate the login page has loaded
    _ = WebDriverWait(driver, 15).until(EC.presence_of_element_located((By.ID, "username")))
    
    # input username
    username_box = driver.find_element_by_id('username') # note: could just remove this line and use the WebDriverWait return
    username_box.send_keys(CANVAS_USERNAME)
    
    # input password
    pass_box = driver.find_element_by_id('password')
    pass_box.send_keys(CANVAS_PASSWORD)
    
    # click submit
    login_button = driver.find_element_by_id('submitLogin')
    login_button.click()
    
    
    #--------------------------------------------------------------
    #-------------------- step 2: get past 2FA --------------------
    #--------------------------------------------------------------
    
    # select the default 2FA method
    # note: if this is not used, the user must select and trigger the 2FA manually
    if default_2FA is True:
        # wait for 2FA iframe to load and switch to it
        _ = WebDriverWait(driver, 15).until(EC.frame_to_be_available_and_switch_to_it('duo_iframe'))
        
        # click the "call" button
        call_button = driver.find_element_by_css_selector('.positive.auth-button')
        call_button.click()
    
    
    # wait for dashboard to load
    print('INFO: if 2FA is not completed within 120s, this program will exit automatically')
    _ = WebDriverWait(driver, 120).until(EC.presence_of_element_located((By.ID, "dashboard")))
    
    # return the authenticated driver object
    return driver

In [3]:
def get_player_page_source(driver, lecture_URL, player=None):
    """
    given a authenticated driver, a lecture URL, (and optionally) the player type: return the player page source after
    loading iframes correctly
    
    return: player_page_source, player_name
    """
    
    # if a player was slected, make sure it is valid
    if player is not None and player not in ('matterhorn', 'panopto'):
        raise ValueError(f"invalid player selected. player '{player}' is not in ('matterhorn', 'panopto')")
    
    # load the url in the driver
    driver.get(lecture_URL)

    # wait for the video iframe to become visible and switch to it
    _ = WebDriverWait(driver, 15).until(EC.frame_to_be_available_and_switch_to_it('tool_content'))
    
    # try to figure out what player is being used based on HTML clues (if no player given)
    # note: this is probably very unstable...
    if player is None:
        initial_page = BeautifulSoup(driver.page_source)
        head_element = initial_page.find("head")
        
        description_element = head_element.find("meta", attrs={"name": "description"})

        if description_element['content'] == "HUDCE Publication Listing":
            player = "matterhorn"
        elif description_element['content'] == "Capture, manage, and search all your video content.":
            player = "panopto"
        else:
            raise Exception("looks like the HTML changed and player auto detection is broken")
        
    # get player specific class for "wait" below
    if player == "matterhorn":
        element_class = ".item.ng-scope"
    elif player == "panopto":
        element_class = ".thumbnail-row.draggable"
    
    # wait for the videos to load into the frame (class depends on player)
    _ = WebDriverWait(driver, 15).until(EC.presence_of_element_located((By.CSS_SELECTOR, element_class)))
    # note: "EC.presence_of_all_elements_located" has same wait effect because it only waits for first element
    
    # return the player page source
    return driver.page_source, player


def extract_lecture_links(player_page_source, player):
    """
    given a player_page_source and the type of player it came from, extract a 'lecture: url' dict
    """
    
    player_page = BeautifulSoup(player_page_source)
    
    lecture_to_url = {}
    
    if player == "matterhorn":
        items_container = player_page.find("div", "items-container ng-scope") # lowest level to contain list of vids
        # note: no need to scope to "items-container ng-scope" as other tag is specific enough
        #       but it makes me happy :p
        
        for video in items_container.find_all("div", "item ng-scope"):
            # extract the title attr
            title_element = video.find("div", "publication-title auto-launch")
            
            # extract the link attr
            link_element = video.find("a", "live-event item-link")
            
            title = title_element.text.strip()
            link = 'https:' + link_element['href']
            lecture_to_url[title] = link
    
    elif player == "panopto":
        details_table = player_page.find("table", "details-table") # lowest level to contain list of vids
        # note: no need to scope to "details-table" as other tag is specific enough
        #       but it makes me happy :p
        
        for video in details_table.find_all("tr", "thumbnail-row draggable"):
            # extract the title/link attr
            title_element = video.find("a", "detail-title")
            
            title = title_element.text.strip()
            link = title_element['href']
            lecture_to_url[title] = link
    
    return lecture_to_url

In [4]:
def open_lecture_links(driver, lecture_to_url, player):
    """
    given a driver, a lecture_to_url dict, and a player: open each link in lecture_to_url
    
    this allows the driver to track the network activity generated from each lecture page
    """
    
    # (probably not needed) make sure the player is valid
    assert(player in ('panopto', 'matterhorn'))
    
    # open each link
    for title, link in lecture_to_url.items():
        # go to the link
        driver.get(link)
        
        # wait for the page to fully load
        if player == 'matterhorn':
            # wait for the play button to appear
            _ = WebDriverWait(driver, 15).until(EC.presence_of_element_located((By.ID, 'paella_plugin_PlayButtonOnScreen')))
        elif player == 'panopto':
            # wait for the loading image to appeaer
            _ = WebDriverWait(driver, 15).until(EC.presence_of_element_located((By.ID, 'loadingMessage')))
            # waif for the loading image to disappear (finished loading)
            _ = WebDriverWait(driver, 15).until(EC.invisibility_of_element_located((By.ID, 'loadingMessage')))
    
    # we are done with driver so we can "quit" it
    driver.quit()

In [5]:
def extract_m3u8s_from_netlog():
    """
    extract all .m3u8 links from network log
    """
    
    # read the json file to a string
    with open(LOG_PATH, 'r') as log_file:
        json_str = log_file.read()
    
    # try to parst the json string normally
    try:
        json_obj = json.loads(json_str)
    except json.JSONDecodeError:
        # try to load the data by patching the end of the file
        print('INFO: in JSON parse exception. trying to patch file...')
        json_data = json.loads(json_str[:-2] + ']}')
        print('INFO: file patched')
    # NOTE: the reason you often have to patch the net log is because calling driver.quit() will kill chrome without
    # writing the closing tags on the net log
    
    all_lecture_m3u8s = []
    for event in json_data['events']:
        if 'params' in event:
            params = event['params']
            
            if params.get('network_isolation_key', None) in ('https://matterhorn.dce.harvard.edu',
                                                             'https://harvard.hosted.panopto.com'):
                if '.m3u8' in params['url']:
                    all_lecture_m3u8s.append(params['url'])
    
    return all_lecture_m3u8s

# ----------------------------------------------------------------------------------------------------

In [6]:
matterhorn_URL = "https://canvas.harvard.edu/courses/69559/external_tools/22940"
panopto_URL = "https://canvas.harvard.edu/courses/69902/external_tools/61579"

# do setup
driver = setup_and_login()

# get sources
matterhorn_source, matterhorn_player = get_player_page_source(driver, matterhorn_URL)
panopto_source, panopto_player = get_player_page_source(driver, panopto_URL)

# open videos
matterhorn_video_dict = extract_lecture_links(matterhorn_source, player=matterhorn_player)
panopto_video_dict = extract_lecture_links(panopto_source, player=panopto_player)

# matterhorn_video_dict, panopto_video_dict

INFO: if 2FA is not completed within 120s, this program will exit automatically


In [7]:
import pickle

In [8]:
# open_lecture_links(driver, matterhorn_video_dict, matterhorn_player)

# all_lecture_m3u8s = extract_m3u8s_from_netlog()

# with open('./matterhorn_m3u8s.pkl', 'wb') as f:
#     pickle.dump(all_lecture_m3u8s, f)

In [9]:
# open_lecture_links(driver, panopto_video_dict, panopto_player)

# all_lecture_m3u8s = extract_m3u8s_from_netlog()

# with open('./panopto_m3u8s.pkl', 'wb') as f:
#     pickle.dump(all_lecture_m3u8s, f)

In [10]:
fname = './matterhorn_m3u8s.pkl'
fname = './panopto_m3u8s.pkl'

with open(fname, 'rb') as f:
    all_lecture_m3u8s = pickle.load(f)

all_lecture_m3u8s

['https://d2y36twrtb17ty.cloudfront.net/sessions/763e3a97-b8a6-4dc7-b2ce-ab9d002a8a98/976d6033-d105-4bf8-aa4e-ab9d002a8aa2-22445718-807a-49b6-85d0-ab9d00399842.hls/master.m3u8?InvocationID=c06033b4-3089-ea11-a9de-0a8e213f0382&tid=00000000-0000-0000-0000-000000000000&StreamID=20acd79f-e292-4aec-951e-aae0c2cd9dd6&ServerName=harvard.hosted.panopto.com',
 'https://d2y36twrtb17ty.cloudfront.net/sessions/763e3a97-b8a6-4dc7-b2ce-ab9d002a8a98/fd928321-ac38-4b5e-987a-32807b61ca3e.screen.hls/master.m3u8?InvocationID=c06033b4-3089-ea11-a9de-0a8e213f0382&tid=00000000-0000-0000-0000-000000000000&StreamID=fd928321-ac38-4b5e-987a-32807b61ca3e&ServerName=harvard.hosted.panopto.com',
 'https://d2y36twrtb17ty.cloudfront.net/sessions/763e3a97-b8a6-4dc7-b2ce-ab9d002a8a98/976d6033-d105-4bf8-aa4e-ab9d002a8aa2-22445718-807a-49b6-85d0-ab9d00399842.hls/757757/index.m3u8',
 'https://d2y36twrtb17ty.cloudfront.net/sessions/763e3a97-b8a6-4dc7-b2ce-ab9d002a8a98/fd928321-ac38-4b5e-987a-32807b61ca3e.screen.hls/273468

In [20]:
# note: a debug tool...

def print_dict(my_dict):
    for key, value in my_dict.items():
        print(key, ':', value)
        print('*'*100)
    
    print('len:', len(my_dict))

# ----------------------------------------------------------------------------------------------------

In [11]:
# TODO: update this!!

# panopto
# - title_to_lecture_id as title_to_lecture_id2
# - id1_base = all_lecture_m3u8s[N].id1 where all_lecture_m3u8s[N].id2 in title_to_lecture_id2
# - all_m3u8s = all_lecture_m3u8s[i] for all i where all_lecture_m3u8s[i].id1 == id1_base

In [12]:
def get_title_to_m3u8s(lecture_to_url, all_lecture_m3u8s, player):
    """
    SOMETHING GOES HERE
    """
    
    # build a simple dict linking title to lecture_id
    title_to_lecture_id = {title: m3u8.split('id=')[1] for title, m3u8 in lecture_to_url.items()}
    
    # build a dict that links from the id1 (the base id) to all m3u8's that have id1
    id1_to_m3u8s = {}
    for m3u8 in all_lecture_m3u8s:
        id1 = m3u8.split('/')[4]
        
        # note: this will build a 1-1 dict
        if id1 not in id1_to_m3u8s:
            id1_to_m3u8s[id1] = []
        id1_to_m3u8s[id1].append(m3u8)
    
    if player == 'panopto':
        id2_to_id1 = {}
        for m3u8 in all_lecture_m3u8s:
            id1 = m3u8.split('/')[4]
            id2 = m3u8.split('/')[5][:36]
            id2_to_id1[id2] = id1 # this will overwrite a few times (which is fine)
        # note: this will build a 1-1 dict when the initial set had 1-many

        # in panopto, the lecture_id is id2 (the second id) so we need to first get the lower id1 for each id2
        return {title: id1_to_m3u8s[id2_to_id1[id2]] for title, id2 in title_to_lecture_id.items()}
    
    if player == 'matterhorn':
        # in matterhorn, the lecture_id is id1 (the base id) so we can just grab the m3u8s directly
        return {title: id1_to_m3u8s[id1] for title, id1 in title_to_lecture_id.items()}

title_to_m3u8s = get_title_to_m3u8s(panopto_video_dict, all_lecture_m3u8s, 'panopto')
# title_to_m3u8s = get_title_to_m3u8s(matterhorn_video_dict, all_lecture_m3u8s, 'matterhorn')

title_to_m3u8s

{'3-5 Part 3: Regulation of Digestion': ['https://d2y36twrtb17ty.cloudfront.net/sessions/763e3a97-b8a6-4dc7-b2ce-ab9d002a8a98/976d6033-d105-4bf8-aa4e-ab9d002a8aa2-22445718-807a-49b6-85d0-ab9d00399842.hls/master.m3u8?InvocationID=c06033b4-3089-ea11-a9de-0a8e213f0382&tid=00000000-0000-0000-0000-000000000000&StreamID=20acd79f-e292-4aec-951e-aae0c2cd9dd6&ServerName=harvard.hosted.panopto.com',
  'https://d2y36twrtb17ty.cloudfront.net/sessions/763e3a97-b8a6-4dc7-b2ce-ab9d002a8a98/fd928321-ac38-4b5e-987a-32807b61ca3e.screen.hls/master.m3u8?InvocationID=c06033b4-3089-ea11-a9de-0a8e213f0382&tid=00000000-0000-0000-0000-000000000000&StreamID=fd928321-ac38-4b5e-987a-32807b61ca3e&ServerName=harvard.hosted.panopto.com',
  'https://d2y36twrtb17ty.cloudfront.net/sessions/763e3a97-b8a6-4dc7-b2ce-ab9d002a8a98/976d6033-d105-4bf8-aa4e-ab9d002a8aa2-22445718-807a-49b6-85d0-ab9d00399842.hls/757757/index.m3u8',
  'https://d2y36twrtb17ty.cloudfront.net/sessions/763e3a97-b8a6-4dc7-b2ce-ab9d002a8a98/fd928321-ac

In [13]:
def get_title_to_download_links(title_to_m3u8s, player):
    """
    extract final download links from list of possible m3u8 files
    """
    
    title_to_best_m3u8 = {}

    for title, m3u8_list in tqdm(title_to_m3u8s.items()):
        max_resolution_m3u8s = []
        
        # for each m3u8 url...
        for m3u8 in m3u8_list:
            
            # get the full content
            m3u8_content = requests.get(m3u8).content.decode()
            
            # if we're looking at a "master" file (a file with links to other files), do stuff...
            if '#EXT-X-STREAM-INF' in m3u8_content:
                #-------------------- find the m3u8 varient with the max resolution --------------------
                #---------------------------------------------------------------------------------------
                
                resolution_dict = {}
                
                # convert to line-by-line content
                m3u8_content = m3u8_content.splitlines()
                
                # itterate over the lines...
                for i in range(len(m3u8_content)):
                    line = m3u8_content[i]

                    # TODO: add explanation here
                    if line.startswith('#EXT-X-STREAM-INF'):
                        # use a regex to match some "<num>x<num>". this is the resolution (found after a 'RESOLUTION=' tag)
                        resolution = re.findall('\d*x\d*', line)[0]
                        
                        # grab the next line which stores the extension of the resolution variant
                        m3u8_extension = m3u8_content[i+1]
                        
                        resolution_dict[resolution] = m3u8_extension
                
                # GET THE MAX
                max_prod = -1
                max_resolution = None
                for resolution in resolution_dict.keys():
                    x, y = resolution.split('x')
                    prod = int(x)*int(y)

                    if prod > max_prod:
                        max_prod = prod
                        max_resolution = resolution
                
                # get the m3u8 extension at the max resolution
                m3u8_extension = resolution_dict[max_resolution]
                
                #-------------------- find the full link using the base and the max resolution extension --------------------
                #------------------------------------------------------------------------------------------------------------
                
                if player == 'matterhorn':
                    base_re = 'https://dvgni8clk4vbh.cloudfront.net/engage-player/[\w-]*/'
                elif player == 'panopto':
                    base_re = 'https://d2y36twrtb17ty.cloudfront.net/sessions/[\w-]*/[.\w-]*/'
                
                # extract the base from the m3u8 link
                base_m3u8 = re.findall(base_re, m3u8)[0]
                
                if player == 'matterhorn':
                    full_m3u8 = base_m3u8 + m3u8_extension[3:]
                    m3u8_content = requests.get(full_m3u8).content.decode()
                    
                    # extract the mp4 link from the m3u8 content
                    mp4_extension = re.findall('../.*.mp4', m3u8_content)[0]
                    
                    # add the mp4 link to the list
                    max_resolution_m3u8s.append(base_m3u8 + mp4_extension[3:])
                elif player == 'panopto':
                    full_m3u8 = base_m3u8 + m3u8_extension
                    
                    # add the ts list
                    max_resolution_m3u8s.append(full_m3u8)
        
        # add max_resolution_m3u8 list to the main dict
        title_to_best_m3u8[title] = max_resolution_m3u8s
    
    return title_to_best_m3u8

download_links = get_title_to_download_links(title_to_m3u8s, 'panopto')

100%|██████████████████████████████████████████████████████████████████████████████████| 25/25 [00:06<00:00,  4.03it/s]


In [16]:
def download_lecture(url, player, lecture_name, timeout_max=None):
    """
    download a lecture
    """
    
    if timeout_max is None:
        timeout_max = 60*60 # setting a hard cap of 60min for a single download
    
    mp4_path = os.path.join(VIDEO_PATH, lecture_name + '.mp4')
    
    if player == 'matterhorn':
        stream = requests.get(url, stream=True)
    
        start_time = time.time()
        with open(mp4_path, 'wb') as f:
            for chunk in tqdm(stream.iter_content(chunk_size=1048576)):
                f.write(chunk)
                
                time_delta = time.time() - start_time
                if time_delta > max_time:
                    print('broke from loop after {} seconds'.format(time_delta))
                    break
    
    if player == 'panopto':
        m3u8_content = requests.get(url).content.decode()
        
        # extact a ts list from the m3u8 content
        ts_list = []
        for line in m3u8_content.splitlines():
            if line.endswith('.ts'):
                ts_list.append(url.replace('index.m3u8', line))
        
        # download the video by looping over ts files
        with open(mp4_path, 'wb') as mp4:
            start_time = time.time()
            
            for ts_url in tqdm(ts_list):
                mp4.write(requests.get(ts_url).content)

                # break if over max_time
                time_delta = time.time() - start_time
                if time_delta > timeout_max:
                    print('broke from loop after {} seconds'.format(time_delta))
                    break

In [18]:
url = download_links['3-5 Part 3: Regulation of Digestion'][0]
download_lecture(url, 'panopto', '3-5', timeout_max=10)

 32%|█████████████████████████▌                                                       | 36/114 [00:10<00:21,  3.55it/s]

broke from loop after 10.14520001411438 seconds





In [None]:
# def download_all_videos(master_URL, max_time=None):
    
#     # do setup
#     driver = setup_and_login()
    
#     # get sources
#     player_psource, player_type = get_player_psource(driver, master_URL)
    
#     # get video dict
#     video_dict = extract_video_links(player_psource, player=player_type)
    
#     open_video_links(driver, video_dict, player=player_type, URL=master_URL) # pass in original url
    
#     all_video_urls = get_video_urls(LOG_PATH)
    
#     title_to_urls = get_title_to_urls(video_dict, all_video_urls, player=player_type)
    
#     if player_type == 'matterhorn':
#         title_to_urls = get_title_to_urls2(title_to_urls)
    
#     for title, urls in title_to_urls.items():
#         for url_num in range(len(urls)):
#             full_title = title + ' - ' + str(url_num) + ".mp4"
#             download_video(urls[url_num], player=player_type, video_name=full_title, max_time=max_time)


# download_all_videos(matterhorn_URL, max_time=3)