# ETL of YouTube Video Transcripts

## EXTRACT

In [8]:
# Importing Libraries
import requests
import json
import polars as pl
from my_sk import my_key

from youtube_transcript_api import YouTubeTranscriptApi

#### Functions

In [None]:


def getVideoRecords(response: requests.models.Response) -> list:
    """
        Function to extract YouTube video from GET request response
    """

    # initialize list to store data from page results
    video_record_list = []

    for raw_item in json.loads(response.text)['items']:

        # only execute for youtube videos
        if raw_item['id']['kind'] != "youtube#video":
            continue

        # extract relevant data
        Video_record = {}
        Video_record['video_id'] = raw_item['id']['videoId']
        Video_record['datetime'] = raw_item['snippet']['publishedAt']
        Video_record['title'] = raw_item['snippet']['title']

        # append record to list
        video_record_list.append(Video_record)

    return video_record_list

In [None]:
def extract_text(transcript: list) -> str:
    """
        Function to extract text from transcript dictionary
    """

    text_list = [transcript[i]['text'] for i in range(len(transcript))]
    return ''.join(text_list)

#### extract videos IDs (+ datetime, title)

In [7]:
# define channel ID
channel_id = 'UCa9gErQ9AE5jT2DZLjXBIdA'

# define url for API
url = 'https://www.googleapis.com/youtube/v3/search'

# initialize page token 
page_token = None

# initialize list to store video data
video_record_list = []

In [None]:
# extract video data across multiple search result pages

while page_token != 0:
    # define parameters for API call
    params = {'key': my_key, 'channelId': channel_id, 'part': ["snippet","id"], 'order':"date", 'maxResults':50, 'pageToken':page_token}

    # make get request
    response = requests.get(url, params=params)

    # append video data from page results to list
    video_record_list += getVideoRecords(response)

    try: 
        # get next page token
        page_token = json.loads(response.text)['nextPageToken']
    except:
        # if no next page token, kill while loop
        page_token = 0
        

#### Code

In [None]:
# store data in polars dataframe
df = pl.DataFrame(video_record_list)
print(df.head())

In [None]:
# initialize list to score video captions
transcript_text_list = []

# loop through each row of dataframe
for i in range(len(df)):

    # try to extract captions
    try:
        # get transcript
        transcript = YouTubeTranscriptApi.get_transcript(df['video_id'][i])
        # extract text transcript
        transcript_text = extract_text(transcript)
    # if not captions available set as n/a
    except:
        transcript_text = "n/a"

    # append transcript text to list 
    transcript_text_list.append(transcript_text)

In [None]:
# add transcripts to dataframe
df = df.with_columns(pl.Series(name="transcript", values=transcript_text_list))
print(df.head())

## TRANSFORM

#### Check for duplicates

In [None]:
# shape + unique values
print("shape:", df.shape)
print("n unique rows:", df.n_unique())
for j in range(df.shape[1]):
    print("n unique elements (" + df.columns[j] + "):", df[:,j].n_unique())

In [None]:
# change datetime to Datetime dtype
df = df.with_columns(pl.col('datetime').cast(pl.Datetime))
print(df.head())

#### Handling special characters

In [None]:
# list all special strings and their replacements
special_strings = ['&#39;', '&amp;', 'sha ']
special_strings_replacements = ["'", "&", "Shaw "]

# replace each special string appearing in title and transcript columns
for i in range(len(special_strings)):
    df = df.with_columns(df['title'].str.replace(special_strings[i],
                        special_strings_replacements[i]).alias('title'))
    df = df.with_columns(df['transcript'].str.replace(special_strings[i],
                        special_strings_replacements[i]).alias('transcript'))

## LOAD

In [None]:
# write data to file
pl.DataFrame(video_record_list).write_parquet('video-ids.parquet')
pl.DataFrame(video_record_list).write_csv('video-ids.csv')