In [0]:
# Import necessary libraries
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, avg, count, date_trunc, expr, concat_ws, to_date, lag
from pyspark.sql.types import StringType, StructField, StructType
import logging

# Initialize logging
logging.basicConfig(level=logging.INFO)

# Set export path
EXPORT_PATH_BASE = "s3a://topic-prediction/dashboard_data"



In [0]:
#Define functions

# Function to load and validate data
def load_data(data_path):
    try:
        df = spark.read.format("delta").load(data_path)
        logging.info(f"Data loaded from {data_path}")
        return df
    except Exception as e:
        logging.error(f"Error loading data: {e}")
        return None

# Aggregation functions
def aggregate_by_time(df, time_col, agg_level):
    if agg_level == 'date':
        return df.groupBy(time_col, "topic", "topic_name") \
                 .agg(
                     avg("score").alias("average"),
                     expr("percentile_approx(score, 0.5)").alias("median"),
                     count("score").alias("count")
                 ) \
                 .orderBy(col(time_col).desc()) \
                 .withColumnRenamed(time_col, agg_level) 
    else:
        return df.withColumn(agg_level, date_trunc(agg_level, col(time_col))) \
                 .groupBy(agg_level, "topic", "topic_name") \
                 .agg(
                     avg("score").alias("average"),
                     expr("percentile_approx(score, 0.5)").alias("median"),
                     count("score").alias("count")
                 ) \
                 .orderBy(col(agg_level).desc())


def aggregate_total(df):
    return df.groupBy( "topic", "topic_name").agg(
        avg("score").alias("average"),
        expr("percentile_approx(score, 0.5)").alias("median"),
        count("score").alias("count")
    )

# Export functions
def export_to_parquet(df, export_path):
    try:
        df.coalesce(1).write.mode("overwrite").parquet(export_path)
        logging.info(f"Data exported to {export_path}")
    except Exception as e:
        logging.error(f"Error exporting data to {export_path}: {e}")

In [0]:

# Main logic
# Load the data
data_path = "s3a://topic-prediction/delta/reviews_predictions"
loaded_df = load_data(data_path)

if loaded_df:
    # Add the necessary transformations after loading the data
    loaded_df = loaded_df.withColumn('topic_name', concat_ws('_', col('topic_name')))
    loaded_df = loaded_df.withColumn("date", to_date(col("review_timestamp")))

    # Aggregation by month with additional transformations
    results_by_month_topic_df = aggregate_by_time(loaded_df, "date", "month")

    # Define the window specification for month-to-month comparison
    window_spec = Window.partitionBy("topic").orderBy("month")

    # Calculate previous month's average, absolute difference, and percentage difference
    results_by_month_topic_df = results_by_month_topic_df \
        .withColumn("prev_month_avg", lag("average", 1).over(window_spec)) \
        .withColumn("abs_diff", col("average") - col("prev_month_avg")) \
        .withColumn("pct_diff", ((col("average") - col("prev_month_avg")) / col("prev_month_avg")) * 100) \
        .orderBy(col('topic_name').desc())

        # Export the updated month topic aggregations
    export_to_parquet(results_by_month_topic_df, f"{EXPORT_PATH_BASE}/agg_by_month_topic")

        # Aggregation by week
    results_by_week_topic_df = aggregate_by_time(loaded_df, "date", "week")
    export_to_parquet(results_by_week_topic_df, f"{EXPORT_PATH_BASE}/agg_by_week_topic")

        # Aggregation by date
    results_by_date_topic_df = aggregate_by_time(loaded_df, "date", "date")
    export_to_parquet(results_by_date_topic_df, f"{EXPORT_PATH_BASE}/agg_by_date_topic")

        # Aggregation by total (for overall data)
    total_aggregations_df = aggregate_total(loaded_df)
    export_to_parquet(total_aggregations_df, f"{EXPORT_PATH_BASE}/total_aggregations")

        # Export top topics by score
    top_topics_by_score_df = loaded_df.groupBy("score","topic", "topic_name").agg(
            count("review_timestamp").alias("count")
    ).orderBy(col('score').asc_nulls_last(),col('count').desc())
    export_to_parquet(top_topics_by_score_df, f"{EXPORT_PATH_BASE}/top_topics_by_score")

        # Export scores count
    scores_count_df = loaded_df.groupBy("score").agg(
            count("topic").alias("count_topics"),
            count("topic_name").alias("count_topic_name")
    ).orderBy(col('count_topics').desc())
    export_to_parquet(scores_count_df, f"{EXPORT_PATH_BASE}/scores_count")

        # Export raw data
    export_to_parquet(loaded_df, f"{EXPORT_PATH_BASE}/raw_data")
        
        # Aggregation by week without topic (for overall weekly data)
    agg_by_week_df = loaded_df.withColumn("week", date_trunc("week", col("date"))) \
                                .groupBy("week") \
                                .agg(
                                    avg("score").alias("average"),
                                    expr("percentile_approx(score, 0.5)").alias("median"),
                                    count("score").alias("count")
                                ).orderBy(col("week").desc())
    export_to_parquet(agg_by_week_df, f"{EXPORT_PATH_BASE}/agg_by_week")

        # Aggregation by month without topic (for overall monthly data)
    agg_by_month_df = loaded_df.withColumn("month", date_trunc("month", col("date"))) \
                                .groupBy("month") \
                                .agg(
                                    avg("score").alias("average"),
                                    expr("percentile_approx(score, 0.5)").alias("median"),
                                    count("score").alias("count")
                                ).orderBy(col("month").desc())
    export_to_parquet(agg_by_month_df, f"{EXPORT_PATH_BASE}/agg_by_month")

        # Aggregation by date without topic (for overall daily data)
    agg_by_date_df = loaded_df.withColumn("date", date_trunc("day", col("date"))) \
                                .groupBy("date") \
                                 .agg(
                                    avg("score").alias("average"),
                                    expr("percentile_approx(score, 0.5)").alias("median"),
                                    count("score").alias("count")
                                ).orderBy(col("date").desc())
    export_to_parquet(agg_by_date_df, f"{EXPORT_PATH_BASE}/agg_by_date")

else:
    logging.error("Data not loaded, skipping processing.")

    