In [0]:
%run ../00_CONFIG/env

In [0]:
import requests
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [0]:
API_KEY = env_api_key 
VIDEO_ID = env_video_id 
BASE_URL = env_base_url

In [0]:
def get_youtube_comments(VIDEO_ID, API_KEY, max_results=100):
    comments = []
    next_page_token = None

    while True:
        params = {
            "part": "snippet",
            "videoId": VIDEO_ID,
            "key": API_KEY,
            "maxResults": max_results,
            "textFormat": "plainText",
        }
        if next_page_token:
            params["pageToken"] = next_page_token

        response = requests.get(BASE_URL, params=params)
        data = response.json()

        if "items" in data:
            for item in data["items"]:
                comment = item["snippet"]["topLevelComment"]["snippet"]["textDisplay"]
                author = item["snippet"]["topLevelComment"]["snippet"]["authorDisplayName"]
                published_at = item["snippet"]["topLevelComment"]["snippet"]["publishedAt"]
                like_count = item["snippet"]["topLevelComment"]["snippet"]["likeCount"]
                comments.append([author, comment, published_at, like_count])

        next_page_token = data.get("nextPageToken")
        if not next_page_token:
            break  # No more comments

    return comments

In [0]:
comments_data = get_youtube_comments(VIDEO_ID, API_KEY)
schema = StructType([
    StructField("Author", StringType(), True),
    StructField("Comment", StringType(), True),
    StructField("Published_At", StringType(), True),
    StructField("Likes", IntegerType(), True),
])

df_comments_data = spark.createDataFrame(comments_data, schema=schema)
df_comments_data.write.mode("overwrite").option("header", "true").csv('dbfs:/mnt/youtube_project/ingestion_layer/')
