### Setup

In [1]:
import logging
import math

import googleapiclient.discovery
from googleapiclient.errors import HttpError


from delta import DeltaTable
from typing import Any, List, Tuple, Optional

from pyspark.sql import DataFrame
from pyspark.rdd import RDD
import pyspark.sql.types as T
import pyspark.sql.functions as F

StatementMeta(, , -1, SessionStarting, , SessionStarting)

In [11]:
%run Google-Helpers

StatementMeta(, 155b6c3c-1030-40e0-8c51-5cc53b535e52, 19, Finished, Available, Finished)

In [12]:
# Youtube API
API_SERVICE_NAME = "youtube"
API_VERSION = "v3"
DEVELOPER_KEY = "AIzaSyB7pb2qXXCophgRs83rpdkk_vYhTYWDS9U"

# Delta table names
VIDEOS_TABLE = "abfss://fd12376e-2797-4027-bb8e-42a3a8228a70@onelake.dfs.fabric.microsoft.com/77b89b44-1bcf-42fa-a9ac-7d0593123d3d/Tables/videos"
COMMENTS_TABLE = "abfss://fd12376e-2797-4027-bb8e-42a3a8228a70@onelake.dfs.fabric.microsoft.com/77b89b44-1bcf-42fa-a9ac-7d0593123d3d/Tables/comments" 
TEMP_TABLE = "abfss://fd12376e-2797-4027-bb8e-42a3a8228a70@onelake.dfs.fabric.microsoft.com/77b89b44-1bcf-42fa-a9ac-7d0593123d3d/Tables/temp_comments" 

StatementMeta(, 155b6c3c-1030-40e0-8c51-5cc53b535e52, 20, Finished, Available, Finished)

In [13]:
logger = setup_logger()
youtube_client = build_youtube_client(API_SERVICE_NAME,API_VERSION,DEVELOPER_KEY)

StatementMeta(, 155b6c3c-1030-40e0-8c51-5cc53b535e52, 21, Finished, Available, Finished)

2025-03-16 22:27:26,424 - INFO - Building YouTube API client


### API Requests

In [14]:
def fetch_comments_for_video_partition(rows_iterator: RDD, comment_count: int = 100, batch_size: int = 100) -> List:
    """
    Fetch the comments for a video in batches up to the comment_count.
    It creates one YouTube client per partition, required to ensure no bottlenecks on the API client.
    
    Args:
        rows_iterator: Partition of video Ids
        comment_count: Max number of comments to return
        batch_size: Size of API batches

    Yields:
        Comments for the partition
    """
    youtube_client = build_youtube_client(API_SERVICE_NAME,API_VERSION,DEVELOPER_KEY)
    for video_id in rows_iterator:
        comments = fetch_comments_for_video_with_client(video_id, youtube_client, comment_count, batch_size)
        # Ensure 'comments' is always iterable.
        if comments is None:
            logger.warning(f"No comments returned for video {video_id}. Skipping.")
            continue
        for comment in comments:
            yield comment

def fetch_comments_for_video_with_client(video_id: str, youtube_client: googleapiclient.discovery.Resource, comment_count: int = 100, batch_size: int = 100) -> List[Tuple]:
    """
    Fetch the comments for a video in batches up to the comment_count.
    
    Args:
        video_id: Id for video 
        youtube_client: Youtube API resource object
        comment_count: Max number of comments to return
        batch_size: Size of API batches, max of 100

    Returns:
        A list of comments
    """
    rows: List[Tuple] = []
    if batch_size > 100:
        logger.warning("Batch over 100, setting to 100")
        batch_size = 100
    
    try:
        request = youtube_client.commentThreads().list(
            part="id,snippet",
            maxResults=batch_size,
            moderationStatus="published",
            order="relevance",
            videoId=video_id
        )
        response = request.execute()
        for d in response.get("items", []):
            row = (
                d["id"],
                d["snippet"]["topLevelComment"]["snippet"]["textDisplay"],
                d["snippet"]["topLevelComment"]["snippet"]["publishedAt"],
                d["snippet"]["topLevelComment"]["snippet"]["likeCount"],
                video_id,
            )
            rows.append(row)
        
        next_page_token = response.get("nextPageToken")
        while next_page_token and len(rows) < comment_count:
            request = youtube_client.commentThreads().list(
                part="id,snippet",
                maxResults=batch_size,
                moderationStatus="published",
                order="relevance",
                videoId=video_id,
                pageToken=next_page_token,
            )
            response = request.execute()
            for d in response.get("items", []):
                row = (
                    d["id"],
                    d["snippet"]["topLevelComment"]["snippet"]["textDisplay"],
                    d["snippet"]["topLevelComment"]["snippet"]["publishedAt"],
                    d["snippet"]["topLevelComment"]["snippet"]["likeCount"],
                    video_id,
                )
                rows.append(row)
            next_page_token = response.get("nextPageToken")
    except HttpError as httpe:
        # When the video is not found, a 404 error is usually thrown.
        if err.resp.status == 404:
            logger.warning(f"Video {video_id} not found. Skipping.")
            return []
        else:
            logger.exception(f"HTTP error fetching comments for video {video_id}: {str(httpe)}")
            raise
    except Exception as e:
        logger.exception(f"Error fetching comments for video {video_id}: {str(e)}")
        raise
        
    return rows

StatementMeta(, 155b6c3c-1030-40e0-8c51-5cc53b535e52, 22, Finished, Available, Finished)

In [15]:
def fetch_video_ids(video_abfs: str, new_videos_only: bool = False, comments_abfs: Optional[str] = None) -> RDD:
    """
    Fetch video IDs from a Delta table located at the given ABFS path. Optionally, return only those videos that do not appear in the comments 
    
    Args:
        video_abfs: ABFS path for the videos table.
        new_videos_only: If True, only returns videos with no associated comments.
        comments_abfs: ABFS path for the comments table. Required when `new_videos_only` is True.
    
    Returns:
        An RDD containing the video IDs.
    
    Raises:
        ValueError: If `new_videos_only` is True but `comments_abfs` is not provided.
    """
    if (new_videos_only and comments_abfs is None):
        message = "No comments_abfs specified while new_videos_only is True."
        logger.error(message)
        raise ValueError(message)

    logger.info("Reading video IDs from Raw.videos table")
    video_ids = spark.read.format("delta").load(video_abfs).select("id")

    if new_videos_only:
        logger.debug("Filtering to only new videos")
        comment_video_ids = spark.read.format("delta").load(comments_abfs).select("videoId")
        # Use left anti join to retain only those video IDs not present in the comments.
        video_ids = video_ids.join(comments_df, video_ids.id == comments_df.videoId, "left_anti")

    if video_ids.isEmpty():
        logger.info(f"Read in 0 video IDs")
        return spark.sparkContext.emptyRDD()
    else:
        logger.info(f"Read in {video_ids.count()} video IDs")
        return video_ids.rdd.map(lambda row: row.id)

StatementMeta(, 155b6c3c-1030-40e0-8c51-5cc53b535e52, 23, Finished, Available, Finished)

In [16]:
video_ids_rdd = fetch_video_ids(VIDEOS_TABLE, new_videos_only=False,comments_abfs=COMMENTS_TABLE)

if video_ids_rdd.isEmpty():
    message = "No video IDs found. Exiting."
    logger.info(message)
    notebookutils.notebook.exit(message)

# Use mapPartitions to process a batch of video IDs with one YouTube client per partition.
comments_rdd = video_ids_rdd.mapPartitions(lambda rows: fetch_comments_for_video_partition(rows, comment_count=100, batch_size=100))

# Define the schema for the comments DataFrame.
comments_schema = T.StructType([
    T.StructField("id", T.StringType(), True),
    T.StructField("textDisplay", T.StringType(), True),
    T.StructField("publishedAt", T.StringType(), True),
    T.StructField("likeCount", T.StringType(), True),
    T.StructField("videoId", T.StringType(), True),
])


# Create a DataFrame from the RDD.
comments_df = spark.createDataFrame(comments_rdd, schema=comments_schema)

# Convert types
comments_df = comments_df.withColumn(
        "publishedAt", 
        F.to_timestamp("publishedAt", "yyyy-MM-dd'T'HH:mm:ss'Z'")
    ).withColumn(
        "likeCount",
        comments_df.likeCount.cast(T.IntegerType())
    )

StatementMeta(, 155b6c3c-1030-40e0-8c51-5cc53b535e52, 24, Finished, Available, Finished)

2025-03-16 22:27:27,251 - INFO - Reading video IDs from Raw.videos table
2025-03-16 22:27:36,559 - INFO - Read in 227 video IDs


### Write data

In [17]:
def merge_comments_data(comments_df: DataFrame, table_path: str) -> None:
    """
    Merge the comments data into the Delta table
    
    Args:
        comments_df: The DataFrame containing the new video data and details.
        table_path: The Delta table ABFS path to merge into.
    """
    try:
        target_table = DeltaTable.forPath(spark, table_path)
        logger.info("Merging data started")
        (
            target_table.alias("target").merge(
                comments_df.alias("source"),
                "target.id = source.id"
            ).whenMatchedUpdate(set={
                "textDisplay": "source.textDisplay",
                "publishedAt": "source.publishedAt",
                "likeCount": "source.likeCount",
                "videoId": "source.videoId",
                "_modified_date": "current_timestamp()"
            })
            .whenNotMatchedInsert(values={
                "id": "source.id",
                "textDisplay": "source.textDisplay",
                "publishedAt": "source.publishedAt",
                "likeCount": "source.likeCount",
                "videoId": "source.videoId",
                "_created_date": "current_timestamp()",
                "_modified_date": "current_timestamp()"
            })
            .execute()
        )
        logger.info("Merging data finished")
        lastCommit = target_table.history(1).collect()[0]
        metrics = lastCommit["operationMetrics"] 

        numInserted = int(metrics.get("numTargetRowsInserted", 0))
        numUpdated = int(metrics.get("numTargetRowsUpdated", 0))
        numDeleted = int(metrics.get("numTargetRowsDeleted", 0))

        logger.info(f"Rows inserted: {numInserted}")
        logger.info(f"Rows updated: {numUpdated}")
        logger.info(f"Rows deleted: {numDeleted}")
        try:
            logger.info("Start optimize")
            target_table.optimize().executeCompaction()
            logger.info("Finished optimize")
        except Exception as e:
            logger.error("Failed to optimize")
            raise

    except Exception as e:
        logger.exception(f"Exception details: {str(e)}")
        raise
        


StatementMeta(, 155b6c3c-1030-40e0-8c51-5cc53b535e52, 25, Finished, Available, Finished)

In [18]:
# Writing to temp delta table to reduce repeat API calls on failures and improve merge performance
try:
    logger.info("Writing out comments to temp table")
    comments_df.write.format("delta").mode("overwrite").save(TEMP_TABLE)
    comments_delta = spark.read.format("delta").load(TEMP_TABLE)
    logger.info(f"Finished writing temp table: {comments_delta.count()} rows")
except Exception as e:
    logger.error(f"Error writing to temporary Delta table at {TEMP_TABLE}: {str(e)}")
    raise

StatementMeta(, 155b6c3c-1030-40e0-8c51-5cc53b535e52, 26, Finished, Available, Finished)

2025-03-16 22:27:39,054 - INFO - Writing out comments to temp table
2025-03-16 22:28:55,848 - INFO - Finished writing temp table: 22700 rows


In [19]:
merge_comments_data(comments_delta, COMMENTS_TABLE)

StatementMeta(, 155b6c3c-1030-40e0-8c51-5cc53b535e52, 27, Finished, Available, Finished)

2025-03-16 22:28:58,235 - INFO - Merging data started
2025-03-16 22:29:05,677 - INFO - Merging data finished
2025-03-16 22:29:06,447 - INFO - Rows inserted: 5984
2025-03-16 22:29:06,448 - INFO - Rows updated: 16716
2025-03-16 22:29:06,448 - INFO - Rows deleted: 0
2025-03-16 22:29:06,449 - INFO - Start optimize
2025-03-16 22:29:09,773 - INFO - Finished optimize


### Cleanup temp

In [20]:
def cleanup_delta_staging_table(temp_path: str) -> None:
    """
    Cleans up a staging Delta table.
    
    Args:
        temp_path (str): The Azure Data Lake path (abfss:// URI) of the staging Delta table.
    """
    try:
        # Get details for the staging table.
        directory, table_name = temp_path.rsplit('/', 1)
        all_databases = spark.catalog.listDatabases()
        database = [t for t in all_databases if t.locationUri == directory][0].name
        spark.sql(f"DROP TABLE IF EXISTS {database}.`{table_name}`")
        logger.info("Dropped temp table")
    except Exception as e:
        logger.error(f"Warning: Could not delete Delta table at {database}.`{table_name}`. Error: {str(e)}")
        raise
    


StatementMeta(, 155b6c3c-1030-40e0-8c51-5cc53b535e52, 28, Finished, Available, Finished)

In [21]:
cleanup_delta_staging_table(TEMP_TABLE)

StatementMeta(, 155b6c3c-1030-40e0-8c51-5cc53b535e52, 29, Finished, Available, Finished)

2025-03-16 22:29:26,116 - INFO - Dropped temp table
