In [None]:
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

dbutils.widgets.text("catalog", "")
dbutils.widgets.text("schema", "")
dbutils.widgets.text("jobs_schema", "")

catalog = dbutils.widgets.get("catalog")
schema = dbutils.widgets.get("schema")
jobs_schema = dbutils.widgets.get("jobs_schema")

if not catalog or not schema or not jobs_schema:
    raise ValueError("Missing required widget: catalog, schema, or jobs_schema")

trend_table = f"{catalog}.{schema}.go_neighborhood_trends"
report_table = f"{jobs_schema}.neighborhood_insight_reports"

class InsightReportProcessor:
    def __init__(self, spark: SparkSession, trend_table: str, report_table: str):
        self.spark = spark
        self.trend_table = trend_table
        self.report_table = report_table

    def generate_neighborhood_insights(self, trend_df: DataFrame) -> DataFrame:
        return (
            trend_df.withColumn("report_month", F.date_trunc("month", F.current_timestamp()))
            .groupBy("borough", "report_month")
            .agg(
                F.round(F.max("avg_median_income_eur"), 2).alias("max_median_income_eur"),
                F.round(F.avg("avg_population_density_per_sqm"), 2).alias("avg_population_density_per_sqm")
            )
        )

    def save_insight_report(self, df: DataFrame) -> None:
        df.write.format("delta").mode("append").saveAsTable(self.report_table)

processor = InsightReportProcessor(spark, trend_table, report_table)
trend_df = spark.table(trend_table)
report_df = processor.generate_neighborhood_insights(trend_df)
processor.save_insight_report(report_df)
