
# Youtube - Scaper



In [1]:
from pyppeteer import launch
from fake_useragent import UserAgent
from bs4 import BeautifulSoup
import time

# for extended documentation visit --> https://miyakogi.github.io/pyppeteer/
# !!! function could only be called with await !!!
async def scrape(url_: str, selector_: str, page_function_ = "(element) => element.outerHTML",
                 bypass_google_anti_scrape_algorithm_ = False, log_ = True):
    if log_ : print("-------------------------Scrape Log Begin--------------------------", "\n")
    #create random user agent so YouTube's algorithm gets pypassed
    ua = UserAgent()
    agent = ua.random
    
    # create browser, incognito context and page
    browser = await launch()
    context = await browser.createIncognitoBrowserContext()
    page = await context.newPage()
    if log_ : print("Browser, Incognito Context and Page created")
    
    # set user agent
    await page.setUserAgent(agent)
    if log_ : print("User Agent:", agent)
    
    # open url
    await page.goto(url_)
    if log_ : print("Url opened:", url_)
        
    if bypass_google_anti_scrape_algorithm_:
        await page.waitForSelector("h1.title")
        await page.click("h1.title")
        time.sleep(5)
        await page.keyboard.press("End")
    
    # wait until page gets loaded
    await page.waitForSelector(selector_)
    if log_ : print("Selector loaded:", selector_)
        
    await page.click(selector_)
        
    if bypass_google_anti_scrape_algorithm_:
        time.sleep(3)
        
    await page.click(selector_)
    
    # get element from query selector and relating function
    request_result = await page.querySelectorEval(selector_, page_function_)
    if log_ : print("Request finished")

    # close browser
    await browser.close()
    if log_ : print("Browser closed", "\n")
    if log_ : print("-------------------------Scrape Log End----------------------------", "\n")
    
    return request_result

## Let's test the "scrape" method: ##

In [2]:
# get YouTube Video Title

url = "https://www.youtube.com/watch?v=dyN_WtjdfpA&list=PLhTjy8cBISEoOtB5_nwykvB9wfEDscuEo"
query_selector = "h1.title"
function = "(element) => element.firstChild.innerHTML"

#title = await scrape(url, query_selector, function)                    
#print(title)

## Let's play around with BeautifulSoup for html parsing: ##

In [3]:
# get comments and their authors as html

url = "https://www.youtube.com/watch?v=dyN_WtjdfpA&list=PLhTjy8cBISEoOtB5_nwykvB9wfEDscuEo"
query_selector = "ytd-comments"
function = "(element) => element.outerHTML"

#html = await scrape(url, query_selector, function, True)

In [4]:
# parse html and assign them

def _parse_comments_with_corresponding_authors(html_, log_ = True):
    soup = BeautifulSoup(html_, features="html.parser")

    # get authors of comments and clear html data
    authors = [item.text.strip() for item in soup.select("a[id=author-text] > span")]

    # get comments and clear html data
    comments = [
        item.text.strip().replace("\r\n", " ").replace("\n", " ").replace("\"", "'") 
        for item in soup.select("yt-formatted-string[id=content-text]")
    ]
    print(comments)

    likes = [
        item.text.strip().replace("\r\n", " ").replace("\n", " ") 
        for item in soup.select("span[id=vote-count-middle]")
    ]
    
    #<span id="vote-count-middle" class="style-scope ytd-comment-action-buttons-renderer" aria-label="2&nbsp;&quot;Mag ich&quot;-Bewertungen">
    #print(likes)
    comments_with_authors_and_likes = list(zip(authors, comments, likes))

    
    if log_:
        print("Finished parsing")
        #for author, comment, likes in comments_with_authors_and_likes:
        #    print(author, "wrote:\n -" + comment + " with "+likes+" likes")
    
    return comments_with_authors_and_likes

# Let's try it:
#_parse_comments_with_corresponding_authors(html, False)

## Let's build parser methods: ##

In [5]:
# scrape and parse video metadata

# returns metadata as dict
# function is asynchronous and therefore it has to be awaited
async def _scrape_and_parse_video_meta_data(url: str, log_ = True):
    if "youtube.com" in url:
        
        trials = 1
        
        while trials <= 2:
            try:
                html = await scrape(
                    url, 
                    "div#info-contents",
                    "(element) => element.outerHTML", 
                    bypass_google_anti_scrape_algorithm_ = (trials != 2),
                    log_ = log_
                )
                
                trials = 100000
            except Exception as e:
                print('WARNING! : Metadata Scraping trial',trials,'failed for url="',url,'"!')
                print('ERROR:\n',e)
                trials += 1
                
                
        if trials == 3 : 
            raise Exception("Meta-Data scraping trials all failed!! :(")
       
        
        soup = BeautifulSoup(html, features="html.parser")

        title = soup.find("h1", {"class": "title"}).find("yt-formatted-string").text
        primary_info = soup.find_all("yt-formatted-string", {"class": "ytd-video-primary-info-renderer"})
        
        date = (primary_info[len(primary_info) - 1].text)
    
        hashtags = [ tag.text.strip() for tag in primary_info[0].find_all("a") if tag != None]
        
        likes = soup.select("yt-formatted-string[id=text]")[0].text
        dislikes = soup.select("yt-formatted-string[id=text]")[1].text
        
        return {"title": title, "date": date, "hashtags": hashtags, "likes": likes, "dislikes": dislikes}
    else:
        print("Wrong url format given!")

In [6]:
# scrape and parse comments with authors

# returns list of tuples [(Author, Comment), (...), ...]
# function is asynchronous and therefore it has to be awaited
async def _scrape_and_parse_youtube_comments(url: str, log_ = True):
    
    if "youtube.com" in url:
        trials = 1
        
        while trials <= 2:
            try:
                html = await scrape(url, "ytd-comments", "(element) => element.outerHTML", bypass_google_anti_scrape_algorithm_ = True, log_ = log_)
        
                return _parse_comments_with_corresponding_authors(html, log_ = log_)
        
            except Exception as e:
                print('WARNING! : Comment scraping trial',trials,'failed for url="',url,'"!')
                print('ERROR:\n',e)
                trials += 1


        if trials == 3 : 
            raise Exception("Comment scraping trials all failed!! :(")
    else:
        print("Wrong url format given!")
            

# Let's test it:      
#await _scrape_and_parse_youtube_comments("https://www.youtube.com/watch?v=dyN_WtjdfpA&list=PLhTjy8cBISEoOtB5_nwykvB9wfEDscuEo", log_ = False)

In [7]:
from pypher import Pypher
# Scraping transaction : 

def _scrape_and_store_video_metadata(tx, httpUrl_, metadata_): # "tx" is a neo4j transaction...
    #q = Pypher()
    merges = '\n'.join([
        "MERGE(t"+str(i)+':Tag{name:"'+str(t)+'"})\n' + 
        "MERGE(t"+str(i)+")-[:REFERENCES]->(v)" 
        for i, t in enumerate(metadata_['hashtags'])
    ]) 
    #q.CREATE.node('v', labels='Video')
    #q.SET.
    #for i, t in enumerate(metadata['hashtags']) :
    #    q.MERGE.node("t"+str(i))
        
    result = tx.run(
        "CREATE (v:Video) "
        "SET v = {title: $title, date: $date, likes: $likes, dislikes:$dislikes, url: $url}\n"+merges+"\n"
        "RETURN v.title + ', from node ' + id(v)", 
        title=metadata_["title"], 
        date=metadata_["date"], 
        likes=metadata_["likes"], 
        dislikes=metadata_["dislikes"],
        url=httpUrl_
    )
    print('Video Metadata"', metadata_,'" sent to database...')
    return result

def _scrape_and_store_video_comments(tx, httpUrl_, comments_with_authors_):
    author_result = []
    comment_result = []
    for author, comment, likes in comments_with_authors_:
        author_result.append(tx.run("CREATE (a:Author)"
                  "SET a = {name: $name}"
                  "RETURN a.name + ', created as Author with id ' + id(a)", name=author))
        
        comment_result.append(tx.run(
            """
                MATCH (v:Video), (a:Author)
                WHERE v.url = "%s" AND a.name = "%s"
                CREATE (a) - [r:%s { text: "%s", likes: %s }] -> (v)
                RETURN v.title, type(r), r.text, a.name
            """ % ( httpUrl_, author, "COMMENTED", comment, likes)))
    print('Comments send to database...')
    return zip(author_result, comment_result)

## Using Neo4j for data storage : ##

In [8]:
from neo4j import GraphDatabase

uri, user, password = 'bolt://localhost:7687', 'neo4j', 'neo4j_'


In [9]:
httpUrls = [
    #"https://www.youtube.com/watch?v=dyN_WtjdfpA&list=PLhTjy8cBISEoOtB5_nwykvB9wfEDscuEo",
    #"https://www.youtube.com/watch?v=Ul0ZgDoamco&list=PLhTjy8cBISEoOtB5_nwykvB9wfEDscuEo",
    "https://www.youtube.com/watch?v=lcgqP8g6i84&list=PLhTjy8cBISEoOtB5_nwykvB9wfEDscuEo"
]

driver = GraphDatabase.driver(uri, auth=(user, password))

# resetting database

with driver.session() as session:
    def _q(query) : return session.run(query)
    #---------------------------------------

    _q("MATCH (n) DETACH DELETE n") # remove all graphs and nodes! BE CAREFUL!

    #---------------------------------------
driver.close()

with driver.session() as session:
    for url in httpUrls :
        # run await outside of transaction because asynchronous transactions for Neo4j are not yet available for Python
        print("\n======================================================================================")
        print("| SCRAPING VIDEO : "+url)
        print("======================================================================================")
        metadata = await _scrape_and_parse_video_meta_data(url, log_ = True)
        result = session.write_transaction(_scrape_and_store_video_metadata, url, metadata)
        #print(result)

        comments_with_authors = await _scrape_and_parse_youtube_comments(url, log_ = True)
        if len(comments_with_authors) == 0 :
            print("Video without comments found! This might be wrong!")
            print("Let's try again...")
            comments_with_authors = await _scrape_and_parse_youtube_comments(url, log_ = True)
            
        result = session.write_transaction(_scrape_and_store_video_comments, url, comments_with_authors)
        #print(result)
 

driver.close()


| SCRAPING VIDEO : https://www.youtube.com/watch?v=lcgqP8g6i84&list=PLhTjy8cBISEoOtB5_nwykvB9wfEDscuEo
-------------------------Scrape Log Begin-------------------------- 

Browser, Incognito Context and Page created
User Agent: Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2049.0 Safari/537.36
Url opened: https://www.youtube.com/watch?v=lcgqP8g6i84&list=PLhTjy8cBISEoOtB5_nwykvB9wfEDscuEo
Selector loaded: div#info-contents
Request finished
Browser closed 

-------------------------Scrape Log End---------------------------- 

Video Metadata" {'title': 'Sentiment Analysis Python - 3 -  Cleaning Text for Natural Language Processing (NLP)', 'date': '04.03.2020', 'hashtags': ['#python', '#nltk', '#nlp'], 'likes': '119', 'dislikes': '2'} " sent to database...
-------------------------Scrape Log Begin-------------------------- 

Browser, Incognito Context and Page created
User Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64) AppleWebKit/537.36 (KHT