In [1]:
from typing import Generator, List, Dict, Any
from dataclasses import dataclass, asdict
import json
from googleapiclient.discovery import build
import time
import sqlite3
import os

In [2]:
DEVELOPER_KEY = json.load(open('../keys/youtube_key.json'))['api_key']

In [3]:
@dataclass
class Comment:
    id: str
    author: str
    content: str
    date: str
    like_count: int


@dataclass
class Thread:
    video_id: str
    reply_count: int
    top_level_comment: Comment
    replies: List[Comment]


In [4]:
class YoutubeCommentsScraper:

    def __init__(
            self,
            api_key: str,
            api_service_name: str = "youtube",
            api_version: str = "v3"
    ) -> None:
        self.API_KEY = api_key

        self.youtube = build(
            api_service_name,
            api_version,
            developerKey=self.API_KEY)

    def fetch_threads(
            self,
            video_id: str
    ) -> Generator[Thread, None, None]:
        request = self.youtube.commentThreads().list(
            part="snippet",
            videoId=video_id,
            maxResults=100,
            textFormat="plainText",
            order="relevance",
        )
        while request:
            response = request.execute()
            for thread in response["items"]:
                replies = list()
                snippet = thread["snippet"]
                reply_count = snippet["totalReplyCount"]
                main_comment_data = snippet["topLevelComment"]
                top_level_comment = self._comment_from_resource(main_comment_data)
                for reply in self._fetch_reply_comments(main_comment_data["id"]):
                    replies.append(reply)
                yield Thread(
                    video_id=video_id,
                    reply_count=reply_count,
                    top_level_comment=top_level_comment,
                    replies=replies
                )
            request = self.youtube.commentThreads().list_next(request, response)

    def _fetch_reply_comments(
            self,
            parent_comment_id: str
    ) -> Generator[Comment, None, None]:
        request = self.youtube.comments().list(
            part="snippet",
            parentId=parent_comment_id,
            maxResults=100,
            textFormat="plainText"
        )
        while request:
            response = request.execute()
            time.sleep(0.1)
            for comment in response.get("items", []):
                yield self._comment_from_resource(comment)
            request = self.youtube.comments().list_next(request, response)

    @staticmethod
    def _comment_from_resource(
            resource: Dict[str, Any]
    ) -> Comment:
        snippet = resource["snippet"]
        return Comment(
            id=resource["id"],
            author=snippet["authorDisplayName"],
            content=snippet["textDisplay"],
            date=snippet["publishedAt"],
            like_count=snippet["likeCount"],
        )


In [5]:
class NDJSONThreadSaver:  #Newline Delimited JSON

    def __init__(self, data_dir: str = 'data'):
        os.makedirs(data_dir, exist_ok=True)
        self.data_dir = data_dir

    def save(self, thread: Thread):
        file_path = os.path.join(self.data_dir, f"{thread.video_id}.ndjson")

        with open(file_path, 'a', encoding='utf-8') as f:
            serializable_thread = asdict(thread)
            json.dump(serializable_thread, f, ensure_ascii=False)
            f.write("\n")

In [6]:
class SQLiteThreadSaver:

    def __init__(self, db_name: str = 'threads.db'):
        self.db_name = db_name
        self.conn = sqlite3.connect(self.db_name)
        self.cursor = self.conn.cursor()
        self._create_table()

    def _create_table(self):
        # Create the comments table
        self.cursor.execute('''
        CREATE TABLE IF NOT EXISTS Comments (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            yt_id TEXT UNIQUE,
            author TEXT,
            content TEXT,
            date TEXT,
            like_count INTEGER,
            parent_id INTEGER,
            video_id TEXT,
            FOREIGN KEY (parent_id) REFERENCES Comments(id)
        )''')
        self.conn.commit()

    def save(self, thread: Thread):
        # Save Top Level Comment
        top_level_comment_id = self._save_comment(
            thread.top_level_comment,
            video_id=thread.video_id,
        )

        # Save replies
        for reply in thread.replies:
            self._save_comment(
                reply,
                parent_id=top_level_comment_id,
                video_id=thread.video_id,
            )

        self.conn.commit()

    def _save_comment(
            self,
            comment: Comment,
            video_id : str,
            parent_id : int | None = None
    ) -> int:
        # Insert the comment data into the comments table
        self.cursor.execute(
            '''
                INSERT INTO comments (yt_id, author, content, date, like_count, parent_id, video_id)
                VALUES (?, ?, ?, ?, ?, ?, ?)
                ''',
            (comment.id, comment.author, comment.content, comment.date, comment.like_count, parent_id, video_id)
        )

        return self.cursor.lastrowid # Get id of the row the cursor is currently at

    def close(self):
        self.conn.close()

In [7]:
# Testing

scraper = YoutubeCommentsScraper(
    api_key=DEVELOPER_KEY
)

ndjson_saver = NDJSONThreadSaver()
sqlite_saver = SQLiteThreadSaver()

for idx, thread in enumerate(scraper.fetch_threads(video_id="qCbfTN-caFI")):
    ndjson_saver.save(thread)
    sqlite_saver.save(thread)
    print(f"[{idx}] Fetched thread of author {thread.top_level_comment.author} with {thread.reply_count} replies")

sqlite_saver.close()

OperationalError: table comments has no column named yt_id