In [28]:
import requests
import json
import polars as pl
from my_secret_key import my_key

from youtube_transcript_api import YouTubeTranscriptApi

In [29]:
# from apiclient.discovery import build
# 
# # Arguments that need to passed to the build function
# DEVELOPER_KEY = my_key()
# YOUTUBE_API_SERVICE_NAME = "youtube"
# YOUTUBE_API_VERSION = "v3"
# 
# print("API key is: " + DEVELOPER_KEY)
# 
# # creating Youtube Resource Object
# youtube_object = build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION,
#                        developerKey = DEVELOPER_KEY)
# 
# 
# def youtube_search_keyword(query, max_results):
#     
#     # calling the search.list method to
#     # retrieve youtube search results
#     search_keyword = youtube_object.search().list(q = query, part = "id, snippet",
#                                                   maxResults = max_results).execute()
#     
#     # extracting the results from search response
#     results = search_keyword.get("items", [])
#     
#     # empty list to store video, 
#     # channel, playlist metadata
#     videos = []
#     playlists = []
#     channels = []
#     
#     # extracting required info from each result object
#     for result in results:
#         # video result object
#         if result['id']['kind'] == "youtube# video":
#             videos.append("% s (% s) (% s) (% s)" % (result["snippet"]["title"],
#                                                      result["id"]["videoId"], result['snippet']['description'],
#                                                      result['snippet']['thumbnails']['default']['url']))
#         
#         # playlist result object
#         elif result['id']['kind'] == "youtube# playlist":
#             playlists.append("% s (% s) (% s) (% s)" % (result["snippet"]["title"],
#                                                         result["id"]["playlistId"],
#                                                         result['snippet']['description'],
#                                                         result['snippet']['thumbnails']['default']['url']))
#         
#         # channel result object
#         elif result['id']['kind'] == "youtube# channel":
#             channels.append("% s (% s) (% s) (% s)" % (result["snippet"]["title"],
#                                                        result["id"]["channelId"],
#                                                        result['snippet']['description'],
#                                                        result['snippet']['thumbnails']['default']['url']))
#     
#     print("Videos:\n", "\n".join(videos), "\n")
#     print("Channels:\n", "\n".join(channels), "\n")
#     print("Playlists:\n", "\n".join(playlists), "\n")
# 
# if __name__ == "__main__":
#     youtube_search_keyword('Geeksforgeeks', max_results = 10)
# 	


In [37]:
# define channel ID
channel_id = 'UCa9gErQ9AE5jT2DZLjXBIdA' # 'UC6vl6g6Yo1jh24pduTsMSMg' #

# define url for API
url = 'https://www.googleapis.com/youtube/v3/search'

# initialize page token
page_token = None

# intialize list to store video data
video_record_list = []

In [38]:
# extract video data from single search result page

def getVideoRecords(response: requests.models.Response) -> list:
    """
        Function to extract YouTube video data from GET request response
    """
    
    # initialize list to store video data from page results
    video_record_list = []
    for raw_item in json.loads(response.text)['items']:
        
        # only execute for youtube videos
        if raw_item['id']['kind'] != "youtube#video":
            continue
        
        # extract relevant data
        video_record = {}
        video_record['video_id'] = raw_item['id']['videoId']
        video_record['datetime'] = raw_item['snippet']['publishedAt']
        video_record['title'] = raw_item['snippet']['title']
        
        # append record to list
        video_record_list.append(video_record)
    
    return video_record_list

In [39]:
# extract video data across multiple search result pages

while page_token != 0:
    # define parameters for API call
    params = {'key': my_key(), 'channelId': channel_id,
              'part': ["snippet","id"], 'order': "date",
              'maxResults':50, 'pageToken': page_token}
    # make get request
    response = requests.get(url, params=params)
    
    # append video data from page results to list
    video_record_list += getVideoRecords(response)
    
    try:
        # grab next page token
        page_token = json.loads(response.text)['nextPageToken']
    except:
        # if no next page token kill while loop
        page_token = 0

In [40]:
# store data in polars dataframe
df = pl.DataFrame(video_record_list)
print(df.head())

shape: (5, 3)
┌─────────────┬──────────────────────┬─────────────────────────────────┐
│ video_id    ┆ datetime             ┆ title                           │
│ ---         ┆ ---                  ┆ ---                             │
│ str         ┆ str                  ┆ str                             │
╞═════════════╪══════════════════════╪═════════════════════════════════╡
│ X8ZR6yFdg1Q ┆ 2024-06-17T14:12:45Z ┆ AI Explained in 60 Seconds #ai  │
│ sxvyBxLVvKs ┆ 2024-06-13T23:44:59Z ┆ The #1 Skill That Holds (Most)… │
│ XQWhJsXu0sY ┆ 2024-06-07T18:32:43Z ┆ What Nature Can Teach Us About… │
│ wJ794jLP2Tw ┆ 2024-05-30T15:41:30Z ┆ Automating Data Pipelines with… │
│ pJ_nCklQ65w ┆ 2024-05-18T15:24:22Z ┆ How to Deploy ML Solutions wit… │
└─────────────┴──────────────────────┴─────────────────────────────────┘


In [41]:
def extract_text(transcript: list) -> str:
    """
        Function to extract text from transcript dictionary
    """
    
    text_list = [transcript[i]['text'] for i in range(len(transcript))]
    return ' '.join(text_list)

In [42]:
# intialize list to store video captions
transcript_text_list = []

# loop through each row of dataframe
for i in range(len(df)):
    
    # try to extract captions
    try:
        # get transcript
        transcript = YouTubeTranscriptApi.get_transcript(df['video_id'][i])
        # extract text transcript
        transcript_text = extract_text(transcript)
    # if not captions available set as n/a
    except:
        transcript_text = "n/a"
    
    # append transcript text to list
    transcript_text_list.append(transcript_text)

In [43]:
# add transcripts to dataframe
df = df.with_columns(pl.Series(name="transcript", values=transcript_text_list))
print(df.head())

shape: (5, 4)
┌─────────────┬──────────────────────┬──────────────────────────────┬──────────────────────────────┐
│ video_id    ┆ datetime             ┆ title                        ┆ transcript                   │
│ ---         ┆ ---                  ┆ ---                          ┆ ---                          │
│ str         ┆ str                  ┆ str                          ┆ str                          │
╞═════════════╪══════════════════════╪══════════════════════════════╪══════════════════════════════╡
│ X8ZR6yFdg1Q ┆ 2024-06-17T14:12:45Z ┆ AI Explained in 60 Seconds   ┆ here's AI explained in 60    │
│             ┆                      ┆ #ai                          ┆ seco…                        │
│ sxvyBxLVvKs ┆ 2024-06-13T23:44:59Z ┆ The #1 Skill That Holds      ┆ let's talk about technical   │
│             ┆                      ┆ (Most)…                      ┆ com…                         │
│ XQWhJsXu0sY ┆ 2024-06-07T18:32:43Z ┆ What Nature Can Teach Us     ┆ if you 

In [44]:
# shape + unique values
print("shape:", df.shape)
print("n unique rows:", df.n_unique())
for j in range(df.shape[1]):
    print("n unique elements (" + df.columns[j] + "):", df[:,j].n_unique())

### output
# shape: (84, 4)
# n unique rows: 84
# n unique elements (video_id): 84
# n unique elements (datetime): 84
# n unique elements (title): 84
# n unique elements (transcript): 82

shape: (91, 4)
n unique rows: 91
n unique elements (video_id): 91
n unique elements (datetime): 91
n unique elements (title): 91
n unique elements (transcript): 89


In [45]:
# change datetime to Datetime dtype
df = df.with_columns(pl.col('datetime').cast(pl.Datetime))
print(df.head())

shape: (5, 4)
┌─────────────┬─────────────────────┬───────────────────────────────┬──────────────────────────────┐
│ video_id    ┆ datetime            ┆ title                         ┆ transcript                   │
│ ---         ┆ ---                 ┆ ---                           ┆ ---                          │
│ str         ┆ datetime[μs]        ┆ str                           ┆ str                          │
╞═════════════╪═════════════════════╪═══════════════════════════════╪══════════════════════════════╡
│ X8ZR6yFdg1Q ┆ 2024-06-17 14:12:45 ┆ AI Explained in 60 Seconds    ┆ here's AI explained in 60    │
│             ┆                     ┆ #ai                           ┆ seco…                        │
│ sxvyBxLVvKs ┆ 2024-06-13 23:44:59 ┆ The #1 Skill That Holds       ┆ let's talk about technical   │
│             ┆                     ┆ (Most)…                       ┆ com…                         │
│ XQWhJsXu0sY ┆ 2024-06-07 18:32:43 ┆ What Nature Can Teach Us      ┆ if you 

In [46]:
# list all special strings and their replacements
special_strings = ['&#39;', '&amp;', 'sha ']
special_string_replacements = ["'", "&", "Shaw "]

# replace each special string appearing in title and transcript columns
for i in range(len(special_strings)):
    df = df.with_columns(df['title'].str.replace(special_strings[i],
                                                 special_string_replacements[i]).alias('title'))
    df = df.with_columns(df['transcript'].str.replace(special_strings[i],
                                                      special_string_replacements[i]).alias('transcript'))

In [47]:
# write data to file
df.write_parquet('data/video-transcripts.parquet')