# Scraping Strategy
1. Provide a list of general search terms
2. Start n threads, evenly split between the search terms, and querty the search terms
3. Loop until a video is chosen with a 50% chance of choosing the video or going to the next page of results

Loop

4. Log the data, but also the name of the channel and the link to the channel
5. With 50% chance, either choose a random suggested video, or random video from the same channel (edge case, if the channel has no more videos, choose a random suggest video)

In [58]:
import os
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import NoSuchElementException
from selenium.common.exceptions import StaleElementReferenceException
import numpy as np
import time
import string
import threading
from threading import Lock
from collections import namedtuple
import pandas as pd

In [59]:
YT_SEARCH_URL_TEMPLATE = 'https://www.youtube.com/results?search_query={}'
ACTION_DELAY_SECONDS = 0.5
RETRY_DELAY_SECONDS = 3.0
LOAD_TIMEOUT_SECONDS = 15.0

XPATH_PATTERNS = {
  'search_thumbnail': '//a[@id="thumbnail"]',
  'suggested_thumbnail': '//div[@id="related"][contains(@class, "ytd-watch-flexy")]/*/*/*/*/*/a[@id="thumbnail"]',
  'view_count': '//span[contains(@class, "view-count")]', #'//div[@id="count"]/ytd-video-view-count-renderer/span[1]',
  'date': '(//div[@id="date"]|//div[@id="info-strings"])/yt-formatted-string',
  'video_title': '//h1[contains(@class, "title")]/yt-formatted-string',
  'video_description': '//div[@id="description"]/yt-formatted-string',
  'channel_name_link': '//ytd-channel-name[@id="channel-name"]/div/div/yt-formatted-string/a',
  'subscriber_count': '//yt-formatted-string[@id="owner-sub-count"]',
  'likes': '//yt-formatted-string[@id="text"][contains(@aria-label, " likes")][1]',
  'dislikes': '//yt-formatted-string[@id="text"][contains(@aria-label, " dislikes")][1]'
}

In [60]:
os.environ['Path'] = os.environ['Path'] + ';.\\chromedriver'

In [61]:
def action_wait():
  time.sleep(ACTION_DELAY_SECONDS)

In [62]:
# driver = webdriver.Chrome()
# driver.get("http://www.python.org")
# assert "Youtube" in driver.title
# elem = driver.find_element_by_name("q")
# elem.clear()
# elem.send_keys("pycon")
# elem.send_keys(Keys.RETURN)
# assert "No results found." not in driver.page_source
# # driver.close()

In [63]:
def load_search_terms(file_path):
  with open(file_path, 'r') as f:
    lines = f.readlines()
    
  search_terms = [term.strip() for term in lines]
  return search_terms

def yt_label_to_num(label):
  """Converts YT formatted numbers with added text into integers."""
  num_str = '0'
  multiplier = 1
  for c in label.lower():
    if c in (string.digits + '.'):
      num_str += c
    elif c in 'kmb':
      if c == 'k':
        multiplier = 1e3
      elif c == 'm':
        multiplier = 1e6
      if c == 'b':
        multiplier = 1e9
      break
    elif c == ',':
      continue
    else:
      break
  
  return int(float(num_str) * multiplier)
  
search_terms = load_search_terms('priming_search_terms.txt')

In [64]:
SELENIUM_WAIT_EXCEPTIONS = (NoSuchElementException, StaleElementReferenceException)

def run_with_retry(func, times=3, refresh_driver=None):
  for i in range(times-1):
    try:
      return func()
    except:
      print('Function call {} failed, retrying...'.format(i + 1))
      if refresh_driver:
        refresh_driver.navigate().refresh()
      time.sleep(RETRY_DELAY_SECONDS)
      
    return func()
      
class YouTubeScraper():
  def __init__(self, headless=True):
    chrome_options = Options()
    if headless:
      chrome_options.add_argument('--headless')
    self.driver = webdriver.Chrome(options=chrome_options)
    
    self._video_data_buffer = []
    self._vdb_lock = Lock()
    
  def terminate(self):
    self.driver.close()

  def perform_yt_search(self, search_term):
    """Opens up YouTube and performs a search for the specified term."""
    self.driver.get(YT_SEARCH_URL_TEMPLATE.format(search_term))
    action_wait()
    if 'youtube' not in self.driver.title.lower():
      return False
    return True
  
  def _retrieve_search_videos(self):
    """Returns all video link elements from a YouTube page."""
    videos = WebDriverWait(
      self.driver,
      LOAD_TIMEOUT_SECONDS,
      ignored_exceptions=SELENIUM_WAIT_EXCEPTIONS
      ).until(
        EC.presence_of_all_elements_located((
          By.XPATH,
          XPATH_PATTERNS['search_thumbnail'])))
    
    valid_videos = []
    for video in videos:
      link = video.get_attribute('href')
      if link is not None and 'youtube.com' in link.lower():
        valid_videos.append(video)
    return valid_videos
  
  def _retrieve_suggested_videos(self):
    """Returns all video link elements from a YouTube suggested bar."""
    videos = WebDriverWait(
      self.driver,
      LOAD_TIMEOUT_SECONDS,
      ignored_exceptions=SELENIUM_WAIT_EXCEPTIONS
      ).until(
        EC.presence_of_all_elements_located((
          By.XPATH,
          XPATH_PATTERNS['suggested_thumbnail'])))
    
    valid_videos = []
    for video in videos:
      try:
        link = video.get_attribute('href')
        if link is not None and 'youtube.com' in link.lower():
          valid_videos.append(video)
      except StaleElementReferenceException as e:
        continue
#     print(len(valid_videos) / len(videos))
    return valid_videos

  def choose_vid_from_search(self, scroll_chance=0.5, max_scrolls=15):
    """Selects a random YouTube video and clicks on the link. Should only be used on the search page."""
    for n_scrolls in range(max_scrolls):
      if np.random.rand() < scroll_chance:
        self.driver.execute_script('window.scrollTo(0, document.getElementById("content").scrollHeight)')
        action_wait()
      else:
        action_wait()
        break
        
    all_vids = self._retrieve_search_videos()
    if not all_vids:
      return None
    
    bottom_vids = all_vids[int(np.ceil(-len(all_vids) / (n_scrolls + 1))):]
    selected_vid = np.random.choice(bottom_vids)
    thumbnail_element = selected_vid.find_element(By.XPATH, './/img')
    thumbnail_link = thumbnail_element.get_property('src')
    if not thumbnail_element:
      return None
    
    selected_vid.click()
    
    # Return a link to the thumbnail
    return {'thumbnail_link': thumbnail_link}
  
  def choose_vid_from_suggested(self, scroll_chance=0.5, max_scrolls=5):
    """Selects a random YouTube video and clicks on the link. Should only be used on the suggested bar page."""
    for n_scrolls in range(max_scrolls):
      if np.random.rand() < scroll_chance:
        self.driver.execute_script('window.scrollTo(0, document.getElementById("content").scrollHeight)')
        action_wait()
      else:
        action_wait()
        break
        
    all_vids = self._retrieve_suggested_videos()
#     all_vids = run_with_retry(self._retrieve_suggested_videos())
    if not all_vids:
      return None
    
    bottom_vids = all_vids[int(np.ceil(-len(all_vids) / (n_scrolls + 1))):]
    selected_vid = np.random.choice(bottom_vids)
    
    self.driver.execute_script('arguments[0].scrollIntoView(true)', selected_vid);
    self.driver.execute_script('window.scrollBy(0, -50)')
    action_wait()  
    
    thumbnail_element = selected_vid.find_element(By.XPATH, './/img')
    if not thumbnail_element:
      return None
    thumbnail_link = thumbnail_element.get_property('src')
    
    run_with_retry(selected_vid.click)
    
    # Return a link to the thumbnail
    return {'thumbnail_link': thumbnail_link}
  
  def scrape_vid_data(self):
    """Scrapes video data from a YT video page."""
    data = {}
    
    target_items = ('view_count', 'date', 'video_title', 'video_description',
                    'channel_name_link', 'subscriber_count', 'likes', 'dislikes')
    for item in target_items:
      pattern = XPATH_PATTERNS[item]
      element = WebDriverWait(self.driver, LOAD_TIMEOUT_SECONDS,
        ignored_exceptions=SELENIUM_WAIT_EXCEPTIONS).until(
        EC.presence_of_all_elements_located((By.XPATH, pattern)))
      data[item] = element
    
    data['view_count'] = yt_label_to_num(data['view_count'][0].text)
    data['date'] = data['date'][0].text
    data['video_title'] = data['video_title'][0].text
    data['video_description'] = data['video_description'][0].text
    data['channel_name'] = data['channel_name_link'][-1].text
    data['channel_link'] = data['channel_name_link'][-1].get_property('href')
    data['subscriber_count'] = yt_label_to_num(data['subscriber_count'][-1].text)
    data['likes'] = yt_label_to_num(data['likes'][0].get_attribute('aria-label'))
    data['dislikes'] = yt_label_to_num(data['dislikes'][0].get_attribute('aria-label'))
    
    del data['channel_name_link']
    
    return data
  
  def _add_to_video_data_buffer(self, data):
    with self._vdb_lock:
      self._video_data_buffer.append(data)
      
  def flush_video_data(self):
    with self._vdb_lock:
      video_data = self._video_data_buffer
      self._video_data_buffer = []
    return video_data
  
  def _scrape_loop(self, start_term, stop_check):
    # Start initial scrape
    self.perform_yt_search(start_term)

    # Scrape first video
    video_data = self.choose_vid_from_search()
    new_video_data = self.scrape_vid_data()
    video_data.update(new_video_data)
    self._add_to_video_data_buffer(video_data)

    # Start scraping loop
    while True:
      video_data = run_with_retry(self.choose_vid_from_suggested)
      new_video_data = self.scrape_vid_data()
      video_data.update(new_video_data)
      self._add_to_video_data_buffer(video_data)

      # Stop thread when variable set to true
      if stop_check():
        break

In [65]:
# yts = YouTubeScraper()
# d = yts.driver
# yts.perform_yt_search(search_terms[0])
# vide = yts.choose_vid_from_search()

In [66]:
# print(yts.scrape_vid_data())
# vide = yts.choose_vid_from_suggested()
# vide

In [67]:
class YTSManager():
  def __init__(self):
    self.video_data = []
    self._threads = {}
    self._stop_scrape_thread = False
    self._thread_lock = Lock()
    self._video_flush_interval = 2 # Flush video data every x seconds
    self.checking_thread = None
  
  def start_scrape_loops(self, start_terms):
    if hasattr(start_terms, '__len__') and len(start_terms) == 0:
      return
    
    if isinstance(start_terms, str):
      start_terms = (start_terms,)
    
    for start_term in start_terms:
      yts = YouTubeScraper()
      thread = threading.Thread(target=yts._scrape_loop, args=(start_term, self._stop_check))
      self._threads[thread] = (start_term, yts)
      thread.start()
      
    if not self.is_thread_checking_active():
      self._start_check_thread()
      
  def _check_threads(self):
    """Check to renew dead threads and flush video data buffers on a regular interval."""
    while len(self._threads) > 0:
      time.sleep(self._video_flush_interval)
      
      with self._thread_lock:
        # Flush video data on all threads
        for thread, (start_term, yts) in self._threads.items():
          self.video_data.extend(yts.flush_video_data())
            
        # Remove deleted threads, but keep the start words
        start_words_refresh_list = []
        updated_threads = {}
        for thread, (sw, yts) in self._threads.items():
          if thread.is_alive():
            updated_threads[thread] = (sw, yts)
          else:
            yts.terminate()
            start_words_refresh_list.append(sw)
        self._threads = updated_threads
        
        # Refresh any removed threads
        self.start_scrape_loops(start_words_refresh_list)
        
  def is_thread_checking_active(self):
    return self.checking_thread and self.checking_thread.is_alive()
        
  def _start_check_thread(self):
    self.checking_thread = threading.Thread(target=self._check_threads)
    self.checking_thread.start()
                   
  def _stop_check(self):
    return self._stop_scrape_thread
      
  def stop_scraping(self):
    with self._thread_lock:
      self._stop_scrape_thread = True
      for thread, (_, yts) in self._threads.items():
        thread.join()
        yts.terminate()
      self._stop_scrape_thread = False
      self._threads = {}
      
  def print_status(self):
    print('# Videos Scraped: {}'.format(len(self.video_data)))
    print('# Threads Running: {}'.format(len(self._threads)))

  def get_dataframe(self):
    return pd.DataFrame(self.video_data)

In [68]:
# 'view_count': '//div[@id="count"]/ytd-video-view-count-renderer/span[1]',
#   'date': '//div[@id="date"]/yt-formatted-string',
#   'video_title': '//h1[contains(@class, "title")]/yt-formatted-string',
#   'video_description': '//div[@id="description"]/yt-formatted-string',
#   'channel_name': '//ytd-channel-name/yt-formatted-string[@id="text"]/a',
#   'channel_link': '//ytd-channel-name/yt-formatted-string[@id="text"]/a',
#   'subscriber_count': '//yt-formatted-string[@id="owner-sub-count"]',
#   'likes': '//yt-formatted-string[@id="text"][contains(@aria-label, " likes")][1]',
#   'dislikes': '//yt-formatted-string[@id="text"][contains(@aria-label, " dislikes")][1]'

In [69]:
manager = YTSManager()

In [70]:
manager.start_scrape_loops(search_terms[4])

In [71]:
while True:
  manager.print_status()
  time.sleep(5)

# Videos Scraped: 0
# Threads Running: 1
# Videos Scraped: 0
# Threads Running: 1
# Videos Scraped: 3
# Threads Running: 1


KeyboardInterrupt: 

In [72]:
manager.stop_scraping()

In [76]:
manager.get_dataframe()

Unnamed: 0,thumbnail_link,view_count,date,video_title,video_description,subscriber_count,likes,dislikes,channel_name,channel_link
0,,1,"Aug 18, 2021",Bird watching,,2,0,0,Caliplayz,https://www.youtube.com/channel/UCq83vywgpwGaY...
1,https://i.ytimg.com/vi/PFNdIup9kS0/hqdefault.j...,1,"Aug 18, 2021",Bird watching,,2,0,0,Caliplayz,https://www.youtube.com/channel/UCq83vywgpwGaY...
2,,44275420,"Apr 5, 2019",This Ultra Modern Tiny House Will Blow Your Mind,This ultra modern tiny house on wheels is trul...,4110000,817089,19211,Living Big In A Tiny House,https://www.youtube.com/channel/UCoNTMWgGuXtGP...
3,https://i.ytimg.com/vi/TMXWWXB3ddE/hqdefault.j...,736905,"Apr 19, 2021",Modern Tiny Houses with Space Saving Ideas / W...,Modern Tiny Houses with Space Saving Ideas / W...,741000,9790,265,Future Tech,https://www.youtube.com/channel/UCPBIhiri5v4-c...
4,https://i.ytimg.com/vi/cTbqUhL-nK8/hqdefault.j...,585498,"Aug 20, 2020",15 Tiny Houses that will Blow Your Mind,It may be time to join the tiny house revoluti...,1980000,4639,177,Top Fives,https://www.youtube.com/channel/UCbAlVnKhbGLK7...


In [43]:
len(manager.video_data)

41

In [56]:
manager.video_data[9]

{'thumbnail_link': '',
 'view_count': 139028,
 'date': 'Jun 20, 2021',
 'video_title': 'TV for Cats | Backyard Bird and Squirrel Watching | Video 10',
 'video_description': 'Grackles galore!  Please enjoy some backyard birding with your cat.\n10th video for this channel.  Thanks to all that watch, comment, like and subscribe!',
 'subscriber_count': 1060,
 'likes': 389,
 'dislikes': 31,
 'channel_name': 'Blue Wind Creations',
 'channel_link': 'https://www.youtube.com/channel/UCCXKGWNLvhL3YmVA_vUgp_g'}

In [57]:
len(set([x['video_title'] + x['channel_name'] for x in manager.video_data]))

35

## Required

Views: <span class="short-view-count style-scope ytd-video-view-count-renderer">19K views</span>
`//span[contains(@class, "view-count")]` (text)

Date: <yt-formatted-string class="style-scope ytd-video-primary-info-renderer">Jun 16, 2021</yt-formatted-string>
`//div[@id="date"]/yt-formatted-string` (text converted to days passed)

Video Title: <yt-formatted-string force-default-style="" class="style-scope ytd-video-primary-info-renderer"></yt-formatted-string>
`//h1[contains(@class, "title")]/yt-formatted-string` (text from all children)

Description: <yt-formatted-string class="content style-scope ytd-video-secondary-info-renderer" force-default-style="" split-lines=""></yt-formatted-string>
`//div[@id="description"]/yt-formatted-string` (text from all children)

Channel Name: <a class="yt-simple-endpoint style-scope yt-formatted-string" spellcheck="false" href="/channel/UCXuqSBlHAE6Xw-yeJA0Tunw" dir="auto">Linus Tech Tips</a>
`//ytd-channel-name/yt-formatted-string[@id="text"]/a` (text)

Channel Link: <a class="yt-simple-endpoint style-scope yt-formatted-string" spellcheck="false" href="/channel/UCXuqSBlHAE6Xw-yeJA0Tunw" dir="auto">Linus Tech Tips</a>
`//ytd-channel-name/yt-formatted-string[@id="text"]/a` (href)

Subscriber Count: <yt-formatted-string id="owner-sub-count" class="style-scope ytd-video-owner-renderer" aria-label="13.5 million subscribers">13.5M subscribers</yt-formatted-string>
`//yt-formatted-string[@id="owner-sub-count"]` (text converted to int)

## Optional

Likes: <yt-formatted-string id="text" class="style-scope ytd-toggle-button-renderer style-text" aria-label="16,845 likes">16K</yt-formatted-string>
`//yt-formatted-string[@id="text"][contains(@aria-label, " likes")][1]` (text converted to int)

Dislikes: <yt-formatted-string id="text" class="style-scope ytd-toggle-button-renderer style-text" aria-label="273 dislikes">273</yt-formatted-string>
`//yt-formatted-string[@id="text"][contains(@aria-label, " dislikes")][1]` (text converted to int)


convert to int, ("K", "M", ".", ",", "...")