In [0]:
import dlt
from pyspark.sql.functions import round
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, FloatType

# Define schema for consistency
schema = StructType([
    StructField("record_type", StringType(), True),
    StructField("author", StringType(), True),
    StructField("author_channel_url", StringType(), True),
    StructField("author_profile_image_url", StringType(), True),
    StructField("date", TimestampType(), True),
    StructField("like_count", IntegerType(), True),
    StructField("reply_count", FloatType(), True),
    StructField("text", StringType(), True),
    StructField("video_id", StringType(), True),
    StructField("cluster_sentiment", StringType(), True),
    StructField("cluster_sentiment_reasoning", StringType(), True),
    StructField("id", StringType(), True),
    StructField("creator", StringType(), True),
    StructField("video_title", StringType(), True),
    StructField("video_thumbnail_url", StringType(), True),
    StructField("channel_name", StringType(), True),
    StructField("ingestion_timestamp", TimestampType(), True)
])

# Read bronze table as a streaming source
@dlt.table(name="bronze_raw_api_data")
def bronze_table():
    return spark.readStream.format("delta").table("youtube2.bronze.raw_api_data")

# Read bronze volume CSV files as a streaming source
@dlt.table(name="bronze_volume_data")
def bronze_volume():
    return spark.readStream.format("csv").option("header", "true").schema(schema).load("/Volumes/youtube2/bronze/volume1/*.csv")

# Silver transformation: Union, clean, and enforce schema
@dlt.table(
    name="youtube2.silver.cleaned_api_data",
    comment="Silver table with cleaned and unioned data from bronze table and volume",
    table_properties={"delta.feature.allowColumnDefaults": "enabled"}
)
def silver_table():
    bronze_table_df = dlt.read_stream("bronze_raw_api_data")
    bronze_volume_df = dlt.read_stream("bronze_volume_data")
    
    # Union data using UNION ALL equivalent
    unioned_df = bronze_table_df.union(bronze_volume_df)
    
    # Clean data: Remove nulls from key columns, round reply_count to 2 decimal places
    return (unioned_df
            .filter("record_type IS NOT NULL AND author IS NOT NULL AND date IS NOT NULL AND text IS NOT NULL")
            .withColumn("reply_count", round("reply_count", 2))
            .select(
                "record_type", "author", "author_channel_url", "author_profile_image_url",
                "date", "like_count", "reply_count", "text", "video_id",
                "cluster_sentiment", "cluster_sentiment_reasoning", "id",
                "creator", "video_title", "video_thumbnail_url", "channel_name",
                "ingestion_timestamp"
            ))