## YouTube Data API

In [None]:
import os

import google_auth_oauthlib.flow
import googleapiclient.discovery
import googleapiclient.errors
def get_youtube_object(client_secret_file):
    os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"

    api_service_name = "youtube"
    api_version = "v3"
    scopes = ["https://www.googleapis.com/auth/youtube.readonly",'https://www.googleapis.com/auth/youtubepartner-channel-audit']
    # Get credentials and create an API client

    credentials = service_account.Credentials.from_service_account_file(client_secrets_file, scopes=scopes)
    youtubeObject = googleapiclient.discovery.build(api_service_name, api_version, credentials=credentials)

    return youtubeObject

In [None]:
def collect_videos_matching_query(youtubeObject, order, categoryID="20", startDate, endDate):

    videos = []
    nextPageToken = None
    max_results = 50
    total_videos_to_retrieve = 500

    while True:
        request = youtubeObject.search().list(
            order = order,
            publishedAfter = startDate,
            publishedBefore = endDate,
            part="snippet",
            maxResults=max_results,
            videoCategoryId=categoryID,
            type="video",
            videoDuration="short",
            pageToken=nextPageToken
        )
        response = request.execute()
        videos += response.get('items', [])

        # Check if we reached the desired number of videos or if there's no more videos to fetch
        if len(videos) >= total_videos_to_retrieve or 'nextPageToken' not in response:
            break

        nextPageToken = response.get('nextPageToken')

        # Update max_results if fewer videos are needed to reach the total
        remaining_videos = total_videos_to_retrieve - len(videos)
        if remaining_videos < max_results:
            max_results = remaining_videos

    return videos[:total_videos_to_retrieve]

In [None]:
def collect_all_videoIDs_from_channel(youtubeObject, channelID):
    # Get the Uploads playlist ID
    request = youtubeObject.channels().list(
        part='contentDetails',
        id=channelID
    )
    response = request.execute()
    uploads_playlist_id = response['items'][0]['contentDetails']['relatedPlaylists']['uploads']

    # Retrieve videos from the Uploads playlist
    videos = []
    next_page_token = None

    while True:
        request = youtubeObject.playlistItems().list(
            part='snippet',
            playlistId=uploads_playlist_id,
            maxResults=50,
            pageToken=next_page_token
        )
        response = request.execute()

        videos += response['items']

        next_page_token = response.get('nextPageToken')
        if not next_page_token:
            break
    return videos

In [None]:
def collect_video_details(youtubeObject, videoID):
    request = youtubeObject.videos().list(
        part="contentDetails,id,topicDetails,snippet,liveStreamingDetails,localizations,player,recordingDetails,statistics,status",
        id=videoID
    )
    response = request.execute()

    return response

In [None]:
def collect_channel_metrics(youtubeObject, channelID):
    request = youtubeObject.channels().list(
        part="snippet,statistics,topicDetails,status,brandingSettings,auditDetails,contentOwnerDetails,localizations",
        id=channelID
    )
    response = request.execute()

    return response['items'][0]

## Scraping YouTube

In [None]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
import time

def get_heat_map(videoID):
    # Setup Chrome options
    chrome_options = webdriver.ChromeOptions()
    chrome_options.add_argument("--headless")  # Optional argument to run Chrome in headless mode

    # Initialize WebDriver
    driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

    # Open the webpage
    driver.get(f"https://www.youtube.com/watch?v={videoID}")

    # Wait for the page to load and JavaScript to execute
    time.sleep(3)  # Adjust the sleep time as necessary

    # Find the element
    try:
        element = driver.find_element(By.CLASS_NAME, "ytp-heat-map-path")
        return element.get_attribute('outerHTML')  # Return the HTML of the element
    except Exception as e:
        return "Error finding element:"+ str(e)

    # Close the browser
    driver.quit()


In [None]:
from svgpathtools import parse_path
import svgpathtools

def process_heatmap(path_data, video_duration):
    path = parse_path(path_data[35:-22])
    # Initialize lists to store time and retention data
    time_points = []
    retention_points = []

    # Extract points from the path
    for segment in path:
          if isinstance(segment, (svgpathtools.path.CubicBezier)):
            for t in [0, 0.25, 0.5, 0.75, 1]:
                point = segment.point(t)
                x, y = point.real, point.imag
                # Normalize x to the video duration and y to retention percentage
                time_in_video = x / 1000 * video_duration  # Assuming x ranges from 0 to 1000
                attention_retention = 100 - y  # Assuming y ranges from 0 (top) to 100 (bottom)
                time_points.append(time_in_video)
                retention_points.append(attention_retention)

      # Combine time and retention into a list of tuples
    time_retention_data = list(zip(time_points, retention_points))
    return time_retention_data

In [None]:
import requests

def is_youtube_short(videoID):
    shorts_url = f"https://www.youtube.com/shorts/{videoID}"
    response = requests.head(shorts_url, allow_redirects=False)

    if response.status_code == 200:
        # Status code 200 and no redirection indicates it's a Short
        return True
    elif response.status_code == 303:
        # Status code 303 with a redirect indicates it's not a Short
        return False
    else:
        print(response.status_code, response)
        # Other status codes or behaviors mean the result is inconclusive
        return None

In [None]:
import requests

def get_youtube_html(videoID, is_short = True):
    if is_short:
        url = f"https://www.youtube.com/shorts/{videoID}"
    else:
        print('video, not short:', video_id)
        url = f'https://www.youtube.com/watch?v={videoID}'
    # Send a GET request to the URL
    response = requests.get(url)

    # Check if the request was successful
    if response.status_code == 200:
        return response.text
    else:
        return "Failed to retrieve HTML"

In [None]:
from bs4 import BeautifulSoup
import json, requests
def is_youtube_short_and_get_linked_video(videoID):
    shorts_url = f"https://www.youtube.com/shorts/{videoID}"
    response = requests.get(shorts_url, allow_redirects=False)
    if response.status_code == 200:
      # Status code 200 and no redirection indicates it's a Short
        html_content = response.text

        soup = BeautifulSoup(html_content, 'lxml')

        for a in soup.descendants:
            if 'ytInitialData' in a:
                v = json.loads(a[20:-1])
                try:
                  linked_videoID = v['overlay']['reelPlayerOverlayRenderer']['multiFormatLink']['reelMultiFormatLinkViewModel']['command']['innertubeCommand']['watchEndpoint']['videoId']
                  return (True, (videoID, linked_videoID))
                except:
                  return (True, None)
        return (True, None)
    elif response.status_code == 303:
        # Status code 303 with a redirect indicates it's not a Short
        return (False, None)
    else:
        print(response.status_code, response)
        # Other status codes or behaviors mean the result is inconclusive
        return (None, None)

### Youtube Transcript API

In [None]:
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api._errors import NoTranscriptFound

def get_subtitles(videoID):
    try:
        return YouTubeTranscriptApi.get_transcript(videoID)
    except NoTranscriptFound:
        return {}
    except Exception as e:
        print(f"An error occurred: {e}")
        return {}
