In [0]:
posts_df = spark.table("workspace.default.posts_silver")
comments_df = spark.table("workspace.default.comments_silver")

In [0]:
from pyspark.sql.functions import *

In [0]:
# Top Posts by Engagement (sorted by score)
top_posts_engagement = posts_df.select("title", "author", "score").orderBy(desc("score")).limit(3)

# Save the top posts table
top_posts_engagement.write.format("delta").mode("overwrite").saveAsTable("workspace.default.top_posts_engagement_gold")


In [0]:
# Join posts with their comments
post_comment_correlation = posts_df.alias("p").join(comments_df.alias("c"), 
          col("p.id") == regexp_replace(col("c.link_id"), "t3_", ""), 
          "inner").select(
        col("p.id").alias("post_id"),
        col("p.title").alias("post_title"),
        col("p.author").alias("post_author"),
        col("p.score").alias("post_score"),
        col("c.id").alias("comment_id"),
        col("c.author").alias("comment_author"), 
        col("c.score").alias("comment_score")
    )

# Aggregate: Post score vs comment performance
post_vs_comment_scores = post_comment_correlation.groupBy("post_id", "post_title", "post_author", "post_score").agg(
        count("comment_id").alias("total_comments"),
        avg("comment_score").alias("avg_comment_score"),
        sum("comment_score").alias("total_comment_score") 
    ).filter(col("total_comments") >= 3).orderBy(desc("post_score"))

post_vs_comment_scores.write.format("delta").mode("overwrite").saveAsTable("workspace.default.posts_comments_correlation_gold")


In [0]:
top_commenters = comments_df.groupBy("author").agg(
        count("id").alias("total_comments"),
        avg("score").alias("avg_comment_score"),
        max("score").alias("max_comment_score"), 
        sum("score").alias("total_comment_score")
    ).filter(col("total_comments") >= 3).orderBy(desc("total_comment_score"))

top_commenters.write.format("delta").mode("overwrite").saveAsTable("workspace.default.top_commenters_gold")
