In [5]:
class PlaylistItem(object):
    def __init__(self, title, updated_at, url):
        self.title = title
        self.updated_at = updated_at
        self.url = url
        
    def marshall(self):
        d = {
            'title': self.title,
            'updated_at': self.updated_at,
            'url': self.url
        }
        return d

In [93]:
from urllib.parse import urljoin, urlparse, urlencode, ParseResult


BASE_URL = 'https://www.youtube.com'
WATCH_SUB_DOMAIN = 'watch'


# Construct the YouTube video location given the extracted video id
# https://www.youtube.com/watch?v={video_id}
def video_url_from_id(video_id):
    joined = urljoin(BASE_URL, WATCH_SUB_DOMAIN)
    parsed = urlparse(joined)

    pr = \
        ParseResult(
            scheme=parsed.scheme,
            netloc=parsed.netloc,
            path=parsed.path,
            params=None,
            query=urlencode({'v': video_id}),
            fragment=parsed.fragment
        )

    return pr.geturl()


In [94]:
def _extract_playlist_ids_from_response(resp):
    collect = []
    for i in resp['items']:
        collect.append(i['id'])
    
    return collect

In [6]:
import os
import json
import requests as req
from typing import List
from urllib.parse import urljoin

import googleapiclient.discovery

SERVICE = 'youtube'
API_VERSION = 'v3'

CHANNEL_ID = os.environ['CHANNEL_ID']
API_KEY = os.environ['API_KEY']


# Return an authenticated service session to be used in future requests.
# E.g., `youtube_session = get_authenticated_service()`
# To validate a session an API Key needs to be created as per: 
# https://developers.google.com/youtube/v3/getting-started
def get_authenticated_service():
    return googleapiclient.discovery.build(
        SERVICE, API_VERSION, developerKey=API_KEY
    )


# only public playlists (excludes unlisted and private) belonging to
# my account are listed
def retrieve_playlist_ids(youtube):
    playlist_ids = []
    next_page = None
    make_request = True
    
    while make_request:
        req = youtube.playlists().list(
            part='snippet',
            channelId=CHANNEL_ID,
            pageToken = next_page,
            maxResults=50
        )
        resp = req.execute()
        playlist_ids = playlist_ids + _extract_playlist_ids_from_response(resp)
    
        try:
            next_page = resp['nextPageToken']
        except KeyError:
            make_request = False 

    return playlist_ids
    

def dump_channel_playlist_to_file():
    youtube_session = get_authenticated_service()
    response = retrieve_playlists(youtube_session)
    with open('playlists.json', 'w') as fp:
        js = json.dumps(response, sort_keys=True, indent=2)
        print(js, file=fp)


# The extracted id (from the playlist dump) is equalivalent to the
# playlist url id in the form: youtube.com/watch?v={video}&list={playlist_id}
def dump_from_playlist(youtube, playlist_id):
    req = youtube.playlistItems().list(
        part='snippet',
        playlistId=playlist_id,
        maxResults=50
    )
    return req.execute()


# Extract selected data from an API request
def _extract_playlist_items_from_response(response) -> List[PlaylistItem]:
    collect = []
    for i in response['items']:
        snippet = i['snippet']
        title = snippet['title']
        # description = snippet['description']
        time = snippet['publishedAt']
        video_id = snippet['resourceId']['videoId']
        url = video_url_from_id(video_id)
        playlist_item = PlaylistItem(title, time, url)
        collect.append(playlist_item)
    return collect


In [96]:
# Get the playlist's name from the playlist id
def playlist_name_from_id(youtube, playlist_id):
    req = youtube.playlists().list(
        part='snippet',
        id=playlist_id
    )
    
    resp = req.execute()
    
    if len(resp['items']) < 1:
        raise UnboundLocalError
    else:
        return resp['items'][0]['snippet']['localized']['title']

In [98]:
# Extract all the items from a playlist and render in 2-tuple format:
# (playlist_name, PlaylistItem objects)
def extract_all_from_playlist(youtube, playlist_id):
    
    playlist_items = []
    next_page = None
    make_request = True
    playlist_name = playlist_name_from_id(youtube, playlist_id)
    
    while make_request:
        req = youtube.playlistItems().list(
            part='snippet',
            playlistId=playlist_id,
            # if the number of items in the playlist > maxResults, there will be a
            # `nextPageToken` in the snippet, pass the `nextPageToken` as the `pageToken` param
            pageToken=next_page,
            maxResults=50
        )
        
        resp = req.execute()
        playlist_items = playlist_items + _extract_playlist_items_from_response(resp)
        
        try:
            next_page = resp['nextPageToken']
        except KeyError:
            make_request = False
    
    return (playlist_name, playlist_items)


In [8]:
class ChannelItem(object):
    def __init__(self, channel, published_at, url):
        self.channel = channel
        self.published_at = published_at
        self.url = url

In [14]:
import json

from os import listdir
from os.path import isfile, join

CHANNEL_SUB_DOMAIN = 'channel'

# Construct the YouTube channel location given the extracted channel id
# https://www.youtube.com/channel/{channel_id}
def channel_url_from_id(channel_id) -> str:
    # Is there any danger in using os.path join rather than urllib's join?
    full_path = join(BASE_URL, CHANNEL_SUB_DOMAIN, channel_id)
    return full_path
    

def _load_from_file(file_path):
    with open(file_path, 'r') as fp:
        js = json.load(fp)
        
    return js
    
    
# Load channels from json file to list of ChannelItem objects
def load_channels_from_file_to_obj(file_path) -> List[ChannelItem]:
    collect = []
    with open(file_path, 'r') as fp:
        js = json.load(fp)
    
    items = js['items']
    for item in items:
        snippet = item['snippet']
        channel = snippet['title']
        channel_id = snippet['resourceId']['channelId']
        published_at = snippet['publishedAt']
        url = channel_url_from_id(channel_id)
        ci = ChannelItem(channel, published_at, url)
        collect.append(ci)
        
    return collect
    

# Traverse all the files in a directory related to JSON data dump of subscription data
# And return a merged list of all the subscriptions as ChannelItem objects
def extract_channels_from_dir(file_path='subs/') -> List[ChannelItem]:
    only_json = [f for f in listdir(file_path) if isfile(join(file_path, f)) and f.endswith('.json')]
    channels = []
    for f in only_json:
        full_path = join(file_path, f)
        channels = channels + load_channels_from_file_to_obj(full_path)
    return channels

In [12]:
_load_from_file('subs/1.json')

{'nextPageToken': 'CDIQAA',
 'items': [{'snippet': {'thumbnails': {'default': {'url': 'https://yt3.ggpht.com/-aViYLfSGHCI/AAAAAAAAAAI/AAAAAAAAAAA/1rfo3C6KGpU/s88-c-k-no-mo-rj-c0xffffff/photo.jpg'},
     'high': {'url': 'https://yt3.ggpht.com/-aViYLfSGHCI/AAAAAAAAAAI/AAAAAAAAAAA/1rfo3C6KGpU/s800-c-k-no-mo-rj-c0xffffff/photo.jpg'},
     'medium': {'url': 'https://yt3.ggpht.com/-aViYLfSGHCI/AAAAAAAAAAI/AAAAAAAAAAA/1rfo3C6KGpU/s240-c-k-no-mo-rj-c0xffffff/photo.jpg'}},
    'title': 'Mike Tornabene',
    'resourceId': {'kind': 'youtube#channel',
     'channelId': 'UCnAP15ug0aWmhEuNFygpRzA'},
    'channelId': 'UC9v4QXjioPoaVW4-fRJFEbw',
    'publishedAt': '2016-04-23T17:22:00Z',
    'description': 'GNARPM is an unscripted automotive series about non- professionals breaking into the world of motorsports. Their goal is to show what is possible when you combine relentless passion and reckless abandon with a determination to explore the limits of what life has to offer. GNARPM combines a rare mix