# Medlfuencer Scraping


## Setup


In [None]:
!wget http://archive.ubuntu.com/ubuntu/pool/main/libu/libu2f-host/libu2f-udev_1.1.4-1_all.deb
!dpkg -i libu2f-udev_1.1.4-1_all.deb
!wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb
!dpkg -i google-chrome-stable_current_amd64.deb

!wget -N https://edgedl.me.gvt1.com/edgedl/chrome/chrome-for-testing/118.0.5993.70/linux64/chromedriver-linux64.zip -P /tmp/
!unzip -o /tmp/chromedriver-linux64.zip -d /tmp/
!unzip -o /tmp/chromedriver-linux64.zip -d /tmp/
!mv /tmp/chromedriver-linux64/chromedriver /usr/local/bin/chromedriver
%pip install selenium chromedriver_autoinstaller

In [2]:
import sys

sys.path.insert(0, "/usr/lib/chromium-browser/chromedriver")
from selenium import webdriver
import chromedriver_autoinstaller

## Scraping


In [41]:
import json
from tqdm.auto import tqdm
import requests
import os
from selenium.webdriver.common.by import By
import json
from selenium.webdriver.common.by import By
from selenium import webdriver
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
import requests
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
import time
from dotenv import load_dotenv
import os
from itertools import islice
from youtube_comment_downloader import YoutubeCommentDownloader, SORT_BY_POPULAR


class bcolors:
    HEADER = "\033[95m"
    OKBLUE = "\033[94m"
    OKCYAN = "\033[96m"
    OKGREEN = "\033[92m"
    WARNING = "\033[93m"
    FAIL = "\033[91m"
    ENDC = "\033[0m"
    BOLD = "\033[1m"
    UNDERLINE = "\033[4m"


class Scraping:
    """Class for managing the YouTube scraping"""

    def __init__(self):
        load_dotenv()

        self.yt_api_key = os.getenv("YT_API_KEY")

        print(
            f"> Using YouTube API key: {bcolors.OKCYAN}{self.yt_api_key[:10]}...{bcolors.ENDC}"
        )

        self.channels = {}
        self.videos = {}
        self.comments = {}
        self.playlists = {}

    def save_json(self, path="scraping"):
        channels_file = open(os.path.join(path, "channels_scraping.json"), "w")
        json.dump(self.channels, channels_file)
        channels_file.close()

        videos_file = open(os.path.join(path, "videos_scraping.json"), "w")
        json.dump(self.videos, videos_file)
        videos_file.close()

        comments_file = open(os.path.join(path, "comments_scraping.json"), "w")
        json.dump(self.comments, comments_file)
        comments_file.close()

        playlists_file = open(os.path.join(path, "playlists_scraping.json"), "w")
        json.dump(self.playlists, playlists_file)
        playlists_file.close()

    def load_json(self, path="scraping"):
        channels_file = open(os.path.join(path, "channels_scraping.json"), "r")
        self.channels = json.load(channels_file)
        channels_file.close()

        videos_file = open(os.path.join(path, "videos_scraping.json"), "r")
        self.videos = json.load(videos_file)
        videos_file.close()

        comments_file = open(os.path.join(path, "comments_scraping.json"), "r")
        self.comments = json.load(comments_file)
        comments_file.close()

        playlists_file = open(os.path.join(path, "playlists_scraping.json"), "r")
        self.playlists = json.load(playlists_file)
        playlists_file.close()

    def _get_videos_without_metadata(self):
        ids = []
        for video_id in self.videos:
            if "title" not in self.videos[video_id]:
                ids.append(video_id)
            if "description" not in self.videos[video_id]:
                ids.append(video_id)
            if "tags" not in self.videos[video_id]:
                ids.append(video_id)
        return ids

    def _get_videos_without_transcription(self):
        ids = []
        for video_id in self.videos:
            if "transcription" not in self.videos[video_id]:
                ids.append(video_id)
        return ids

    def _get_channels_without_playlist(self):
        names = []
        for name in self.channels:
            if name not in self.playlists:
                names.append(name)
        return names

    def _get_videos_without_comments(self):
        videos_with_comments = []
        for comment_id, comment in self.comments.items():
            video_id = comment["video_id"]
            if video_id not in videos_with_comments:
                videos_with_comments.append(video_id)

        videos_without_comments = []
        for video_id in self.videos:
            if video_id not in videos_with_comments:
                if "no_comments" in self.videos[video_id]:
                    continue
                videos_without_comments.append(video_id)
        return videos_without_comments

    def print_status(self):

        missing_transcription = self._get_videos_without_transcription()
        missing_playlists = self._get_channels_without_playlist()
        missing_comments = self._get_videos_without_comments()
        missing_metadata = self._get_videos_without_metadata()

        out = f"""
        Scraping overview
        -----------------

        Current number of channels:\t{len(self.channels)}
        Current number of videos:\t{len(self.videos)}
        Current number of comments:\t{len(self.comments)}

        ----

        Current number of unscraped playlist:\t{len(missing_playlists)}
        Current number of videos with missing transcription:\t{len(missing_transcription)}
        Current number of videos without comments:\t{len(missing_comments)}
        Current number of videos without metadata:\t{len(missing_metadata)}
        """
        print(out)

    def load_channels(
        self,
        path="./medfluencer_channel_names.json",
    ):
        """Load all channel names which should be scraped"""
        with open(path, encoding="utf-8") as f:
            channel_names = json.load(f)
            for channel_name in channel_names:
                if channel_name not in self.channels:
                    self.channels[channel_name] = {
                        "id": None,
                        "playlist_id": None,
                        "description": None,
                    }
        self.save_json()

        self.load_channel_info()

    def load_channel_info(self):
        """Load metadata of channel"""
        print(
            f"> Querying channel ids from YouTube Data API ({bcolors.WARNING}Warning: cost=1/channel{bcolors.ENDC})"
        )
        cost = 0
        for channel_name in tqdm(list(self.channels.keys())):
            if (
                self.channels[channel_name]["id"] is not None
                and self.channels[channel_name]["playlist_id"] is not None
                and self.channels[channel_name]["description"] is not None
            ):
                continue

            cost += 1
            base_url = "https://www.googleapis.com/youtube/v3/channels"
            params = {
                "key": self.yt_api_key,
                "forHandle": channel_name,
                "part": "id, snippet, contentDetails",
            }
            response = requests.get(base_url, params=params, timeout=10)
            response = response.json()

            if "items" not in response:
                continue

            item = response["items"][0]
            channel_id = item["id"]
            description = item["snippet"]["description"]
            playlist_id = item["contentDetails"]["relatedPlaylists"]["uploads"]

            self.channels[channel_name] = {
                "id": channel_id,
                "playlist_id": playlist_id,
                "description": description,
            }

        self.save_json()

    def get_videos_from_upload_playlist(self):
        """
        Retrieve video IDs from upload playlists for all channels.
        """
        print(
            f"> Querying video ids from upload playlists from YouTube Data API ({bcolors.WARNING}Warning: cost=1/channel{bcolors.ENDC})"
        )

        for channel_name, channel in tqdm(self.channels.items()):
            if not channel["playlist_id"]:
                continue

            if channel_name in self.playlists:
                continue

            base_url = "https://www.googleapis.com/youtube/v3/playlistItems"
            params = {
                "key": self.yt_api_key,
                "playlistId": channel["playlist_id"],
                "part": "contentDetails",
                "maxResults": 50,
            }

            next_page_token = None
            while True:
                if next_page_token:
                    params["pageToken"] = next_page_token

                response = requests.get(base_url, params=params, timeout=10)

                response = response.json()

                if "items" not in response:
                    break

                self.playlists[channel_name] = []

                for video in response["items"]:

                    video_id = video["contentDetails"]["videoId"]

                    self.playlists[channel_name].append(video_id)

                    if video_id not in self.videos:
                        self.videos[video_id] = {
                            "channel_name": channel_name,
                            "description": None,
                            "transcription": None,
                        }

                if "nextPageToken" in response:
                    next_page_token = response["nextPageToken"]
                else:
                    break
        self.save_json()

    def get_video_infos(self):
        """
        Retrieve additional metadata of videos
        """
        print(
            f"> Querying video ids from upload playlists from YouTube Data API ({bcolors.WARNING}Warning: cost=1/channel{bcolors.ENDC})"
        )

        video_ids = self._get_videos_without_metadata()
        batches = [video_ids[i : i + 50] for i in range(0, len(video_ids), 50)]

        for batch in tqdm(batches):
            time.sleep(0.1)
            base_url = "https://www.googleapis.com/youtube/v3/videos"
            params = {
                "key": self.yt_api_key,
                "id": ",".join(batch),
                "part": "snippet",
                "maxResults": 50,
            }

            response = requests.get(base_url, params=params, timeout=10)
            response = response.json()

            if "items" not in response:
                break

            for item in response["items"]:

                video_id = item["id"]

                snippet = item["snippet"]

                title = snippet["title"]

                if "description" in snippet:
                    description = snippet["description"]
                else:
                    description = ""

                if "tags" in snippet:
                    tags = snippet["tags"]
                else:
                    tags = []

                self.videos[video_id]["title"] = title
                self.videos[video_id]["description"] = description
                self.videos[video_id]["tags"] = tags

        self.save_json()

    def get_driver(self):

        capabilities = DesiredCapabilities.CHROME
        capabilities["goog:loggingPrefs"] = {"performance": "ALL"}

        chrome_options = webdriver.ChromeOptions()
        chrome_options.add_argument("--no-sandbox")
        chrome_options.add_argument("--disable-dev-shm-usage")
        chromedriver_autoinstaller.install()

        driver = webdriver.Chrome(options=chrome_options)

        return driver

    def get_video_captions_scraped(self, driver):

        videos_without_transcription = self._get_videos_without_transcription()

        for video_id in tqdm(videos_without_transcription):

            video = self.videos[video_id]

            if video["transcription"] is not None:
                continue

            try:
                driver.get(f"https://www.youtube.com/watch?v={video_id}")

                driver.find_element(By.CLASS_NAME, "ytp-subtitles-button").click()

                logs = driver.get_log("performance")
                relevant_logs = []
                for log in logs:
                    if "timedtext" in str(log):
                        relevant_logs.append(log)

                transcription_found = False
                for log in relevant_logs:
                    if video_id in str(log) and '"Network.requestWillBeSent"' in str(
                        log
                    ):
                        url = json.loads(log["message"])["message"]["params"][
                            "request"
                        ]["url"]
                        if video_id in url:
                            res = requests.get(url, timeout=10)
                            data = res.content.decode("utf-8")
                            data = json.loads(data)
                            text = ""
                            for elem in data["events"]:
                                if "segs" in elem.keys():
                                    for seg in elem["segs"]:
                                        text += seg["utf8"]
                            video["transcription"] = text
                            tqdm.write(f"{video_id}: {repr(text[:50])} ...")
                            transcription_found = True
                            break
                if not transcription_found:
                    tqdm.write(f"{video_id}: No transcription found.")
            except Exception as e:
                tqdm.write(f"{video_id}: Error occured while retrieving transcription.")
        self.save_json()

    def get_comments_for_video(self):
        """
        Retrieve video IDs from upload playlists for all channels.
        """
        print(
            f"> Querying video ids from upload playlists from YouTube Data API ({bcolors.WARNING}Warning: cost=1/video{bcolors.ENDC})"
        )

        videos_without_comments = self._get_videos_without_comments()

        for video_id in tqdm(videos_without_comments):
            base_url = (
                "https://content-youtube.googleapis.com/youtube/v3/commentThreads"
            )

            params = {
                "key": self.yt_api_key,
                "videoId": video_id,
                "part": "snippet, replies",
                "maxResults": 100,
            }

            next_page_token = None
            while True:
                time.sleep(0.1)
                if next_page_token:
                    params["pageToken"] = next_page_token

                response = requests.get(base_url, params=params, timeout=10)

                response = response.json()

                if "items" not in response:
                    break

                if len(response["items"]) == 0:
                    self.videos[video_id]["no_comments"] = True

                for comment in response["items"]:

                    comment_id = comment["snippet"]["topLevelComment"]["id"]
                    text = comment["snippet"]["topLevelComment"]["snippet"][
                        "textDisplay"
                    ]
                    author_display_name = comment["snippet"]["topLevelComment"][
                        "snippet"
                    ]["authorDisplayName"]

                    replies = []

                    if "replies" in comment:
                        for reply in comment["replies"]["comments"]:
                            replies.append(reply["snippet"]["textDisplay"])

                    self.comments[comment_id] = {
                        "text": text,
                        "authorDisplayName": author_display_name,
                        "video_id": video_id,
                        "replies": replies,
                    }

                    tqdm.write(f"{video_id}: {text[:50]}")

                # if "nextPageToken" in response:
                #    next_page_token = response["nextPageToken"]    # only use first 50 comments
                # else:
                #    break
                break
        self.save_json()

    def get_comments_for_video_scraped(self):
        print(
            f"> Querying video ids from upload playlists from YouTube Data API ({bcolors.WARNING}Warning: cost=1/video{bcolors.ENDC})"
        )

        downloader = YoutubeCommentDownloader()

        videos_without_comments = self._get_videos_without_comments()

        for video_id in tqdm(videos_without_comments):

            comments = downloader.get_comments_from_url(
                f"https://www.youtube.com/watch?v={video_id}",
                sort_by=SORT_BY_POPULAR,
            )

            no_comments = True
            for comment in islice(comments, 100):
                no_comments = False
                self.comments[comment["cid"]] = {
                    "text": comment["text"],
                    "authorDisplayName": comment["author"],
                    "video_id": video_id,
                    "replies": [],  # unable to get replies, possible to do manually later
                }
                tqdm.write(f"{video_id}: {comment['text'][:50]}")

            if no_comments:
                self.videos[video_id]["no_comments"] = True


scraping = Scraping()
scraping.load_json()

> Using YouTube API key: [96mAIzaSyAf09...[0m


In [3]:
scraping.print_status()


        Scraping overview
        -----------------

        Current number of channels:	362
        Current number of videos:	94422
        Current number of comments:	998721

        ----

        Current number of unscraped playlist:	13
        Current number of videos with missing transcription:	9440
        Current number of videos without comments:	0
        


In [None]:
scraping.load_channels()

In [None]:
scraping.get_videos_from_upload_playlist()

In [8]:
driver = scraping.get_driver()

In [None]:
scraping.get_video_captions_scraped(driver)

In [6]:
scraping.save_json("scraping")

In [5]:
scraping.get_comments_for_video_scraped()



  0%|          | 0/13155 [00:00<?, ?it/s]

nhSeT4WrGX0: es gibt zig tausende kliniken. woher willst du wis
nhSeT4WrGX0: Bruder fahr dein Bett hoch
nhSeT4WrGX0: He is cute 🥰
sZAyj8Byr-U: 💞💞💞☺☺☺✌✌✌😘😘
sZAyj8Byr-U: Hallo
dW2wkrWCpHY: Uhh voll der süße boooyy

Gruß angelo merte 007
dW2wkrWCpHY: Die Beschreibung unter dem video ist falsch!!!
dW2wkrWCpHY: lappen
dW2wkrWCpHY: Omg wie unsympathisch ist der.
dW2wkrWCpHY: +nico herm. Vielen Dank für den Hinweis, wir haben
tlS6osb8z4s: Das habt ihr sehr gut gemacht! 

Und an alle die d
tlS6osb8z4s: Bist du noch da am arbeiten ? 😂
zCcKqeWnnb8: Ich war auf der Intensivstation in der Schweiz und
zCcKqeWnnb8: 34 Jahre auf Intensiv.. RESPEKT!!!
zCcKqeWnnb8: "...machen wa nochmal ne BGA..." Standard 😆
zCcKqeWnnb8: Alles klar Dr. House. Du bist der Profi.
zCcKqeWnnb8: Hallo richmaster1343, so sorry, dass Sie eine Woch
zCcKqeWnnb8: palfreyatweb😊😊
zCcKqeWnnb8: naja wenn mann genau hin guckt da denkt mann das e
zCcKqeWnnb8: Wenn Sie nun schon 40 Jahre Intensivmedizin hinter
zCcKqeWnnb8: hannah find 

In [42]:
scraping.get_video_infos()



  0%|          | 0/3777 [00:00<?, ?it/s]

In [45]:
scraping.videos["lCSVPS0PLS8"]

{'channel_name': '@PennStateHealth',
 'description': 'http://pennstatehershey.org  Do you own your own pool? Or maybe one of those inflatable toddler pools? Summertime is a great time to have fun in the water.\nBut a small child can drown in as little as one inch of water. Supervision of your children is very important when around pools.\n\nWhether you have an in-ground or above ground pool, consider that you need four sided fencing. Your home is not that fourth side. Install a door or floating pool alarm.\nWater wings, noodles, those are for fun. They are not personal flotation devices, which need to be fitted properly.\n\nFinally, that rigid or inflatable toddler pool -- empty those every day, every time.\nAlso for in-ground pools, check your drains. A missing or flat drain cover can suck a child down to the bottom and hold them there.\n\nFor more tips to keep your family safe around pools, go to pennstatehershey.org and click on Project Health.',
 'transcription': "Do you own your o