<a href="https://colab.research.google.com/github/desaibhargav/VR/blob/main/Demo_YouTube_Client.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Dependencies & Imports**

In [None]:
!pip install -q youtube_transcript_api
!pip install -q google-api-python-client

In [None]:
import re
import pandas as pd
import numpy as np
import pickle
import googleapiclient.discovery
import googleapiclient.errors
import time

from itertools import chain
from functools import reduce
from typing import Union, Callable, Generator
from youtube_transcript_api import YouTubeTranscriptApi
from tqdm.notebook import tqdm
from pprint import pprint

## **Class `YouTubeClient`**


In [None]:
class YouTubeClient():
  """
  Only to scrape data. Not to upload or delete videos. 
  """
  def __init__(self, API_KEY: str):
    self.__API_KEY = API_KEY
    self._api_service_name = "youtube"
    self._api_version = "v3"
    self.youtube_metadata_client = googleapiclient.discovery.build(self._api_service_name, 
                                                                   self._api_version, 
                                                                   developerKey=self.__API_KEY)
    self.youtube_transcript_client = YouTubeTranscriptApi()
    self.state = {'units_consumed': 0, 'daily_quota': 10000}

  def _execute_query(self, query_kind: str, query: str, **query_params) -> list:
    """
    """
    assert query_kind == 'metadata' or query_kind == 'transcript', f"'query_kind' must be one of ['metadata', 'transcript']"
    if query_kind == 'metadata':
      resource, action = query.split('.')
      query_params["pageToken"] = query_params.get("pageToken", "")
      has_next_page = True
      while has_next_page:
        response = getattr(getattr(self.youtube_metadata_client, resource.replace("()", ""))(), action)(**query_params).execute()
        query_params["pageToken"] = response.get("nextPageToken", "")
        has_next_page = bool(query_params.get("pageToken", ""))
        self.state["units_consumed"] += query_params.get("maxResults", 1)
        yield response['items']
    if query_kind == 'transcript':
      try:
        response = getattr(self.youtube_transcript_client, query)(**query_params)
        time.sleep(8)
        yield response
      except:
        yield [{'duration': np.NaN, 'start': np.NaN, 'text': np.NaN}]

  def _process_query(self, query_kind: str, response: Generator) -> pd.DataFrame:
    """
    A generic processing function that should apply to any query. 
    To achieve more specific processing modify code in the specific public methods.
    """
    assert query_kind == 'metadata' or query_kind == 'transcript', f"'query_kind' must be one of ['metadata', 'transcript']"
    df = pd.DataFrame(list(chain.from_iterable(response)))
    if query_kind == "metadata":
      try:
        df.drop(columns=['kind', 'etag'], inplace=True)
      except KeyError:
        pass
    if query_kind == "transcript":
      subtitles = df.text.to_list()
      timestamps = list(zip(df.start.to_list(), (df.start + df.duration).to_list()))
      df = pd.DataFrame({'subtitles': [subtitles], 
                         'timestamps': [timestamps]})
    return df

  def _extract_and_add_as_column(self, df: pd.DataFrame, extract_dict: dict, clean_up: bool) -> pd.DataFrame:
    """
    """
    assert len(extract_dict.keys()) == 2 and extract_dict.keys() == {'extract', 'from'}, "Passed dict should have only two fields: ('extract', 'from')"
    assert len(extract_dict['extract']) == len(extract_dict['from']), "Fields to be extracted are not equal to the columns specified"
    assert pd.Series(extract_dict['from']).isin(df.columns).all(), "Column(s) from which fields are to be extracted, do not exist in the passed pd.DataFrame object"
    for extract, from_column in zip(*extract_dict.values()):
      df[extract] = df[from_column].apply(lambda x: x.get(extract, np.NaN))
    if clean_up:
      df.drop(columns=list(set(extract_dict['from'])), inplace=True)
    return df

  def _align(self, *list_of_dfs: pd.DataFrame, on: str, how: str) -> pd.DataFrame:
    """
    """
    assert how in ['inner', 'outer'], f"The argument 'how' should be one of [{'inner', 'outer'}]"
    return reduce(lambda x,y: pd.merge(x,y, on=on, how=how), list_of_dfs)

  def _from_playlist_ids(self, *playlist_ids: Union[str, list]) -> pd.DataFrame:
    """
    """
    extract_dict = {'extract': ['title', 
                                'description', 
                                'itemCount'], 
                    'from': ['snippet', 
                             'snippet', 
                             'contentDetails']}
    metadata = pd.concat([self._process_query(query_kind="metadata", 
                                              response=self._execute_query(query_kind="metadata", 
                                                                           query="playlists().list", 
                                                                           id=playlist_id, 
                                                                           part="contentDetails, snippet")) 
                          for playlist_id in tqdm(playlist_ids)])
    metadata = self._extract_and_add_as_column(metadata, extract_dict, clean_up=True)
    metadata.rename(columns={'id': 'playlistId', 'title': 'playlist_title', 'description': 'playlist_description'}, inplace=True)
    return metadata

  def _from_video_ids(self, *video_ids: Union[str, list]) -> pd.DataFrame:
    """
    """
    extract_dict = {'extract': ['definition', 
                                'defaultAudioLanguage',
                                'publishedAt',
                                'description',
                                'title',
                                'tags',
                                'commentCount',
                                'dislikeCount',
                                'favoriteCount',
                                'likeCount',
                                'viewCount'],
                    'from': ['contentDetails', 
                             'snippet',
                             'snippet',
                             'snippet',
                             'snippet',
                             'snippet',
                             'statistics',
                             'statistics',
                             'statistics',
                             'statistics',
                             'statistics']}
    metadata = pd.concat([self._process_query(query_kind="metadata", 
                                              response=self._execute_query(query_kind="metadata", 
                                                                           query="videos().list", 
                                                                           id=video_id, 
                                                                           part="snippet, contentDetails, statistics")) 
                          for video_id in tqdm(video_ids)])
    metadata = self._extract_and_add_as_column(metadata, extract_dict, clean_up=True)
    metadata.rename(columns={'id': 'videoId', 'title': 'video_title', 'description': 'video_description'}, inplace=True)
    return metadata

  def from_playlist(self, *playlists: Union[str, list]) -> pd.DataFrame:
    """
    """
    extract_playlist_id = lambda playlist: playlist[playlist.find('&list=')+6:]
    playlist_ids = [extract_playlist_id(playlist) for playlist in playlists] 
    playlist_metadata = self._from_playlist_ids(*playlist_ids)
    extract_dict = {'extract': ['playlistId', 
                                'videoId'], 
                    'from': ['snippet', 
                             'contentDetails']}
    metadata = pd.concat([self._process_query(query_kind="metadata", 
                                              response=self._execute_query(query_kind="metadata", 
                                                                           query="playlistItems().list", 
                                                                           playlistId=playlist_id, 
                                                                           part="snippet, contentDetails", 
                                                                           maxResults=50)) 
                          for playlist_id in tqdm(playlist_ids)])
    metadata = self._extract_and_add_as_column(metadata, extract_dict, clean_up=True)
    metadata.drop(columns=['id'], inplace=True)
    video_metadata = self._from_video_ids(*metadata.videoId.to_list())
    transcript = pd.concat([self._process_query(query_kind="transcript", 
                                                response=self._execute_query(query_kind="transcript", 
                                                                             query="get_transcript", 
                                                                             video_id=video_id))
                            for video_id in tqdm(video_metadata.videoId.values)])
    transcript["videoId"] = video_metadata.videoId.to_list()

    # merge on 'videoId', 'playlistId' and return
    video_data_merged = self._align(metadata, transcript, video_metadata, on="videoId", how="inner")
    playlist_data_merged = self._align(video_data_merged, playlist_metadata, on="playlistId", how="outer").drop_duplicates(subset='videoId')
    dataset = playlist_data_merged.set_index(['playlistId', 'itemCount', 'playlist_title', 'playlist_description', 'videoId'])
    return dataset

  def from_channel(self, channel_id: str) -> pd.DataFrame:
    """
    """

## **Scrape YouTube**

In [None]:
# first, let us define our playlists
# it is from these playlists that we will be downloading transcripts

playlists = '''

https://www.youtube.com/watch?v=Dl8MUnLfEsk&list=PL3uDtbb3OvDOWpCZ8ERCXHMcslGaBEOBT
https://www.youtube.com/watch?v=cxoQdEhHaT8&list=PL3uDtbb3OvDNXmmy_3Q7SCHIZdz9ja4SG
https://www.youtube.com/watch?v=yL_fgyXXnSM&list=PL3uDtbb3OvDPHh7DWhekbw-ywA7SCnstr
https://www.youtube.com/watch?v=O1B0lDS1Jnw&list=PL3uDtbb3OvDNsDLMnmyR94MTfGHQh6HtP
https://www.youtube.com/watch?v=zO8QzMWZbN4&list=PL3uDtbb3OvDMpNqWoWfsY9qqT7UZijw0w
https://www.youtube.com/watch?v=4OBLAW7oQYo&list=PL3uDtbb3OvDPup8tDy1viWElFkPZcL4pM
https://www.youtube.com/watch?v=GM0lU5Dq7eA&list=PL3uDtbb3OvDPZG2coablWM-9XX6JQtSQT
https://www.youtube.com/watch?v=vvntRXe6YcU&list=PL3uDtbb3OvDPBGzSYKBeEFlrG48_0DBC4
https://www.youtube.com/watch?v=DTWMwHtF-UA&list=PL3uDtbb3OvDNnH5j_UFzZwR2KWg4TShJn
https://www.youtube.com/watch?v=kAMvYHqTWs0&list=PL3uDtbb3OvDPt8Ayn5QQ_13Juo98-EDxP
https://www.youtube.com/watch?v=xswUGZOVdc4&list=PL3uDtbb3OvDPLLMGlDi3C3-uAwyTBXtnR
https://www.youtube.com/watch?v=3J-cYxxHQGQ&list=PL3uDtbb3OvDNxpFp3baiPKRM4tGtD3_Me
https://www.youtube.com/watch?v=f7-lwz_FacE&list=PL3uDtbb3OvDMjs6pYa27tCweTBKBgxUij
https://www.youtube.com/watch?v=a6danRWYxpo&list=PL3uDtbb3OvDNVQJSz1__CuW-IS2s2kw2A
https://www.youtube.com/watch?v=uoIXz3KcwME&list=PL3uDtbb3OvDMgLTgfZe4fDN48SYfEtesX
https://www.youtube.com/watch?v=bJggjXvB52c&list=PL3uDtbb3OvDMHDwKA8sPrEi2SV3IKnT0S
https://www.youtube.com/watch?v=4OBLAW7oQYo&list=PL3uDtbb3OvDPAcaMIq68euWqHvZosh8JI
https://www.youtube.com/watch?v=QAsJvKsd2Xk&list=PL3uDtbb3OvDNWKnzD4MJRQRX_wBAT9iDC
https://www.youtube.com/watch?v=UT_nWVLi4Ws&list=PL3uDtbb3OvDMMbCg-hvVjXYZ3osM4rpr2
https://www.youtube.com/watch?v=X_fHa73_nOg&list=PL3uDtbb3OvDNo0TvQIHbB6TLndA7jEMTR
https://www.youtube.com/watch?v=HIkgY0Rz1jU&list=PL3uDtbb3OvDMaNezBWgE_SNQ6QkeYkV1w
https://www.youtube.com/watch?v=AHS1c_vqjxI&list=PL3uDtbb3OvDMBO-NUpWCvV_zhJh1pFlEX
https://www.youtube.com/watch?v=diFkCJ802vY&list=PL3uDtbb3OvDMdjRscdox0QYkcE9cghmvx
https://www.youtube.com/watch?v=rbYdXbEVm6E&list=PL3uDtbb3OvDONMcvq4e82gs33laM4IJ_z
https://www.youtube.com/watch?v=235gIzWOkrM&list=PL3uDtbb3OvDNDjm-mp82KCJB6VDpqZi3I

'''

playlists = playlists.strip().split('\n')

In [None]:
client = YouTubeClient("AIzaSyD3z85nkRIPH-mdseO27yZgxXDJzClVXV0")
youtube_scrapped = client.from_playlist(*playlists)

HBox(children=(FloatProgress(value=0.0, max=25.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=25.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=877.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=872.0), HTML(value='')))




In [None]:
youtube_scrapped.to_pickle('youtube_scrapped.pickle')