In [0]:
# Databricks Job Notebook: property vs neighbors review issues (writes Delta outputs)
# ------------------------------------------------------------------------------
# Inputs (widgets):
#   delta_path:      Delta path of listings/similarity table
#   out_base_path:   Base path to write Delta outputs (required)
#   pid:             target property_id
#   params_json:     JSON string of parameters (optional; overrides defaults)
#
# Outputs (Delta paths under out_base_path):
#   /runs, /neighbors, /topics, /evidence
#
# Notebook exit value (JSON):
#   {"status":"ok","run_id":"...","property_id":"...","written":{...}} or {"status":"error",...}

from __future__ import annotations

import json
import uuid
from datetime import datetime
from typing import Optional, Dict, Any, List, Tuple

import pandas as pd  # kept since your original script imports it (safe)

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F, types as T
from pyspark.sql.window import Window
from pyspark.ml import Pipeline
from pyspark.ml.functions import vector_to_array
from pyspark.storagelevel import StorageLevel

# Spark NLP (assumed available)
from sparknlp.base import DocumentAssembler
from sparknlp.annotator import (
    SentenceDetectorDLModel,
    Tokenizer,
    ViveknSentimentModel,
    UniversalSentenceEncoder,
)

# -----------------------------
# Widgets (Job parameters)
# -----------------------------
dbutils.widgets.text("delta_path", "/mnt/lab94290/cluster_19/airbnb_h3_simvector")
dbutils.widgets.text("out_base_path", "")
dbutils.widgets.text("pid", "")
dbutils.widgets.text("params_json", "")

DELTA_PATH = dbutils.widgets.get("delta_path").strip()
OUT_BASE_PATH = dbutils.widgets.get("out_base_path").strip()
PID = dbutils.widgets.get("pid").strip()
PARAMS_JSON = dbutils.widgets.get("params_json").strip()

if not DELTA_PATH:
    raise ValueError("delta_path widget is required")
if not OUT_BASE_PATH:
    raise ValueError("out_base_path widget is required (base Delta path for outputs)")
if not PID:
    raise ValueError("pid widget is required")

# -----------------------------
# Column names (adjust only if your schema differs)
# -----------------------------
ID_COL  = "property_id"
REV_COL = "reviews"

VEC_COL  = "similarity_vec"
BAD_COL  = "is_bad_vec"
NAN_COL  = "has_nan"
NORM_COL = "vec_norm"

# -----------------------------
# Topics list
# -----------------------------
TOPICS = [
    "cleanliness and hygiene",
    "maintenance and repairs",
    "smell and air quality",
    "noise and sound disturbances",
    "privacy and quietness",
    "safety and security",
    "bed comfort and sleep quality",
    "temperature and climate control",
    "space and layout comfort",
    "kitchen and cooking facilities",
    "bathroom quality",
    "wifi and internet quality",
    "appliances and amenities availability",
    "location and neighborhood",
    "public transportation access",
    "parking availability",
    "host communication and responsiveness",
    "check-in and check-out process",
    "accuracy of listing description",
    "value for money and pricing",
]

# =========================================================
# Helpers
# =========================================================
def _pick_geo_col(df: DataFrame) -> str:
    if "geo_bucket" in df.columns:
        return "geo_bucket"
    if "_geo_bucket_filled" in df.columns:
        return "_geo_bucket_filled"
    raise ValueError("No geo bucket column found: expected 'geo_bucket' or '_geo_bucket_filled'.")

def filter_english_like(df: DataFrame, text_col: str = "sentence_text", non_ascii_ratio_max: float = 0.20) -> DataFrame:
    non_ascii = F.length(F.col(text_col)) - F.length(F.regexp_replace(F.col(text_col), r"[^\x00-\x7F]", ""))
    total = F.greatest(F.length(F.col(text_col)), F.lit(1))
    non_ascii_ratio = non_ascii / total
    has_latin = F.col(text_col).rlike(r"[A-Za-z]")
    return df.filter(has_latin & (non_ascii_ratio <= F.lit(float(non_ascii_ratio_max))))

def _safe_head1(df: DataFrame):
    rows = df.take(1)
    return rows[0] if rows else None

# =========================================================
# 1) Neighbors selection by cosine similarity in same geo bucket
#    REQUIRED:
#    - Identify target geo_bucket_res
#    - Use column h3_{geo_bucket_res}
#    - Filter candidates where h3_{geo_bucket_res} == bucket value
# =========================================================
def top_similar_properties_with_metadata(
    df: DataFrame,
    target_property_id,
    *,
    max_candidates: int = 50,
    top_k: int = 20,
    seed: int = 42,
    oversample: float = 1.7,
) -> Tuple[DataFrame, Dict[str, Any]]:
    spark = df.sparkSession
    bucket_val_col = _pick_geo_col(df)

    if "geo_bucket_res" not in df.columns:
        raise ValueError("Missing required column 'geo_bucket_res' in df.")

    target_df = (
        df.filter(F.col(ID_COL).cast("string") == F.lit(str(target_property_id)))
          .select(
              F.col(ID_COL).cast("string").alias(ID_COL),
              F.col(bucket_val_col).alias("bucket_value"),
              F.col("geo_bucket_res").alias("bucket_res"),
              F.col(VEC_COL).alias(VEC_COL),
              *(F.col(c) for c in [BAD_COL, NAN_COL, NORM_COL] if c in df.columns),
          )
          .limit(1)
    )

    target_row = _safe_head1(
        target_df.select(
            F.col("bucket_value").alias("bucket_value"),
            F.col("bucket_res").cast("int").alias("bucket_res"),
            vector_to_array(F.col(VEC_COL)).alias("vec_arr"),
            (F.col(NORM_COL).cast("double").alias("vec_norm") if NORM_COL in df.columns else F.lit(None).alias("vec_norm")),
        )
    )

    empty_neighbors = spark.createDataFrame(
        [], schema=T.StructType([
            T.StructField(ID_COL, T.StringType(), True),
            T.StructField("similarity", T.DoubleType(), True),
        ])
    )

    if target_row is None:
        return empty_neighbors, {"target_property_id": str(target_property_id), "status": "target_not_found"}

    bucket_value = target_row["bucket_value"]
    bucket_res   = target_row["bucket_res"]
    target_arr   = target_row["vec_arr"]
    target_norm  = target_row["vec_norm"]

    if bucket_res is None:
        raise ValueError(f"Target property_id={target_property_id} has null geo_bucket_res.")
    if bucket_value is None:
        raise ValueError(f"Target property_id={target_property_id} has null {_pick_geo_col(df)} (bucket value).")

    h3_col = f"h3_{int(bucket_res)}"
    if h3_col not in df.columns:
        raise ValueError(f"Expected column '{h3_col}' (derived from geo_bucket_res={bucket_res}) not found in df.")

    # Compute target norm if missing
    if target_norm is None and target_arr is not None:
        s = 0.0
        for x in target_arr:
            s += float(x) * float(x)
        target_norm = s ** 0.5

    if target_arr is None or target_norm is None or float(target_norm) == 0.0:
        return empty_neighbors, {
            "target_property_id": str(target_property_id),
            "status": "target_vector_invalid",
            "bucket_res": int(bucket_res),
            "bucket_value": bucket_value,
            "h3_filter_column": h3_col,
        }

    cand = (
        df.filter(F.col(h3_col) == F.lit(bucket_value))
          .filter(F.col(ID_COL).cast("string") != F.lit(str(target_property_id)))
          .filter(F.col(VEC_COL).isNotNull())
    )

    if BAD_COL in df.columns:
        cand = cand.filter(~F.col(BAD_COL))
    if NAN_COL in df.columns:
        cand = cand.filter(~F.col(NAN_COL))
    if NORM_COL in df.columns:
        cand = cand.filter(F.col(NORM_COL).isNotNull() & (F.col(NORM_COL) > 0))

    n_bucket = cand.count()
    frac = min(1.0, (float(max_candidates) * float(oversample)) / max(float(n_bucket), 1.0))

    cand = (
        cand.sample(withReplacement=False, fraction=frac, seed=int(seed))
            .limit(int(max_candidates))
    )

    target_arr_lit = F.array(*[F.lit(float(x)) for x in target_arr])
    cand_arr = vector_to_array(F.col(VEC_COL))
    cand = cand.withColumn("cand_arr", cand_arr).withColumn("target_arr", target_arr_lit)

    dot = F.expr(
        "aggregate("
        "  arrays_zip(cand_arr, target_arr),"
        "  cast(0.0 as double),"
        "  (acc, x) -> acc + (x['cand_arr'] * x['target_arr'])"
        ")"
    )

    if NORM_COL in df.columns:
        cand_norm = F.col(NORM_COL).cast("double")
    else:
        cand_norm = F.sqrt(
            F.expr(
                "aggregate("
                "  transform(cand_arr, x -> x * x),"
                "  cast(0.0 as double),"
                "  (acc, y) -> acc + y"
                ")"
            )
        )

    sim = (dot / (cand_norm * F.lit(float(target_norm)))).alias("similarity")

    neighbors_df = (
        cand.select(F.col(ID_COL).cast("string").alias(ID_COL), sim)
            .orderBy(F.desc("similarity"))
            .limit(int(top_k))
    )

    meta = {
        "target_property_id": str(target_property_id),
        "status": "ok",
        "bucket_res": int(bucket_res),
        "bucket_value": bucket_value,
        "h3_filter_column": h3_col,
        "bucket_candidate_count": int(n_bucket),
        "candidate_sampling_fraction": float(frac),
        "max_candidates_cap": int(max_candidates),
        "oversample_factor": float(oversample),
        "top_k_neighbors": int(top_k),
        "random_seed": int(seed),
    }

    return neighbors_df, meta


# =========================================================
# 2) Spark NLP models (fit once)
# =========================================================
_SPLIT_MODEL = None
_SENT_MODEL  = None
_EMB_MODEL   = None
_TOPICS_EMB_DF = None
_TOPICS_EMB_MODEL_NAME = None

def init_models(spark) -> None:
    global _SPLIT_MODEL, _SENT_MODEL
    if _SPLIT_MODEL is not None and _SENT_MODEL is not None:
        return

    dummy_reviews   = spark.createDataFrame([("x", "hello world. great stay!")], [ID_COL, "review_text"])
    dummy_sentences = spark.createDataFrame([("x", "great stay!")], [ID_COL, "sentence_text"])

    doc1 = DocumentAssembler().setInputCol("review_text").setOutputCol("document")
    sent_detector = (
        SentenceDetectorDLModel.pretrained("sentence_detector_dl", "en")
        .setInputCols(["document"])
        .setOutputCol("sentences")
    )
    _SPLIT_MODEL = Pipeline(stages=[doc1, sent_detector]).fit(dummy_reviews)

    doc2 = DocumentAssembler().setInputCol("sentence_text").setOutputCol("document")
    tok2 = Tokenizer().setInputCols(["document"]).setOutputCol("token")
    sent2 = (
        ViveknSentimentModel.pretrained()
        .setInputCols(["document", "token"])
        .setOutputCol("sentiment")
    )
    _SENT_MODEL = Pipeline(stages=[doc2, tok2, sent2]).fit(dummy_sentences)

def init_embedding_model(spark, use_model_name: str = "tfhub_use") -> None:
    global _EMB_MODEL, _TOPICS_EMB_DF, _TOPICS_EMB_MODEL_NAME
    if _EMB_MODEL is not None and _TOPICS_EMB_MODEL_NAME == use_model_name:
        return

    dummy = spark.createDataFrame([("x", "great stay")], [ID_COL, "sentence_text"])

    doc = DocumentAssembler().setInputCol("sentence_text").setOutputCol("document")
    use = (
        UniversalSentenceEncoder.pretrained(use_model_name, "en")
        .setInputCols(["document"])
        .setOutputCol("sentence_embeddings")
    )
    _EMB_MODEL = Pipeline(stages=[doc, use]).fit(dummy)

    _TOPICS_EMB_DF = None
    _TOPICS_EMB_MODEL_NAME = use_model_name

# =========================================================
# 3) Sentence extraction + sentiment
# =========================================================
def build_sentences_with_sentiment(
    df_subset: DataFrame,
    *,
    max_reviews_per_property: int = 50,
) -> DataFrame:
    if _SPLIT_MODEL is None or _SENT_MODEL is None:
        raise RuntimeError("Models not initialized. Call init_models(spark) before running analysis.")

    df_fixed = df_subset.withColumn(
        "reviews_array",
        F.from_json(F.col(REV_COL), T.ArrayType(T.StringType()))
    )

    df_fixed = df_fixed.withColumn(
        "reviews_array",
        F.when(
            F.col("reviews_array").isNotNull(),
            F.expr(f"slice(reviews_array, 1, {int(max_reviews_per_property)})")
        ).otherwise(F.array())
    )

    df_reviews = (
        df_fixed.select(
            F.col(ID_COL).cast("string").alias(ID_COL),
            F.posexplode_outer("reviews_array").alias("review_idx", "review_text")
        )
        .filter(F.col("review_text").isNotNull())
        .withColumn("review_text", F.regexp_replace(F.lower(F.col("review_text")), r"\s+", " "))
    )

    tmp = _SPLIT_MODEL.transform(df_reviews)

    df_sentences = (
        tmp.select(ID_COL, "review_idx", F.posexplode("sentences").alias("sent_idx", "sent_annot"))
           .withColumn("sentence_text", F.col("sent_annot.result"))
           .drop("sent_annot")
           .filter(F.col("sentence_text").isNotNull() & (F.length("sentence_text") > 0))
    )

    df_sentences = df_sentences.withColumn(
        "sentence_id",
        F.sha2(
            F.concat_ws(
                "||",
                F.col(ID_COL),
                F.col("review_idx").cast("string"),
                F.col("sent_idx").cast("string"),
                F.col("sentence_text"),
            ),
            256,
        )
    )

    scored = _SENT_MODEL.transform(df_sentences)

    return scored.select(
        ID_COL,
        "review_idx",
        "sent_idx",
        "sentence_id",
        "sentence_text",
        F.col("sentiment.result").getItem(0).alias("sentiment_label")
    )

# =========================================================
# 4) Topic tagging via USE embeddings + cosine to topic vectors
# =========================================================
def _get_topics_embeddings(spark) -> DataFrame:
    global _TOPICS_EMB_DF
    if _TOPICS_EMB_DF is not None:
        return _TOPICS_EMB_DF

    if _EMB_MODEL is None:
        raise RuntimeError("Embedding model not initialized. Call init_embedding_model(spark, ...)")

    topics_df = spark.createDataFrame([(t,) for t in TOPICS], ["sentence_text"])
    te = _EMB_MODEL.transform(topics_df)

    te = te.withColumn("topic_vector", F.col("sentence_embeddings")[0]["embeddings"]) \
           .drop("sentence_embeddings", "document") \
           .withColumnRenamed("sentence_text", "topic")

    te = te.withColumn(
        "topic_vector_norm",
        F.sqrt(
            F.expr(
                "aggregate(transform(topic_vector, x -> x * x), cast(0.0 as double), (acc, y) -> acc + y)"
            )
        )
    ).filter(F.col("topic_vector_norm") > 0)

    _TOPICS_EMB_DF = te.select("topic", "topic_vector", "topic_vector_norm")
    return _TOPICS_EMB_DF

def classify_topics_with_use(
    sent_df: DataFrame,
    *,
    similarity_threshold: float = 0.35,
    top_topics_per_sentence: Optional[int] = 2,
    non_ascii_ratio_max: float = 0.20,
    topic_partitions: Optional[int] = None,
) -> DataFrame:
    if _EMB_MODEL is None:
        raise RuntimeError("Embedding model not initialized. Call init_embedding_model(spark, ...)")

    spark = sent_df.sparkSession

    base = sent_df.select(
        F.col(ID_COL).cast("string").alias(ID_COL),
        "sentence_id",
        "sentence_text"
    )

    base = filter_english_like(base, text_col="sentence_text", non_ascii_ratio_max=float(non_ascii_ratio_max))

    if topic_partitions is not None:
        base = base.repartition(int(topic_partitions))

    se = _EMB_MODEL.transform(base)

    se = se.withColumn("sentence_vector", F.col("sentence_embeddings")[0]["embeddings"]) \
           .drop("sentence_embeddings", "document")

    se = se.withColumn(
        "sentence_vector_norm",
        F.sqrt(
            F.expr(
                "aggregate(transform(sentence_vector, x -> x * x), cast(0.0 as double), (acc, y) -> acc + y)"
            )
        )
    ).filter(F.col("sentence_vector_norm") > 0)

    topics_emb = F.broadcast(_get_topics_embeddings(spark))
    pairs = se.crossJoin(topics_emb)

    dot = F.expr(
        "aggregate("
        "  arrays_zip(sentence_vector, topic_vector),"
        "  cast(0.0 as double),"
        "  (acc, x) -> acc + (x['sentence_vector'] * x['topic_vector'])"
        ")"
    )

    scored = pairs.withColumn(
        "topic_similarity",
        dot / (F.col("sentence_vector_norm") * F.col("topic_vector_norm"))
    ).filter(F.col("topic_similarity") >= F.lit(float(similarity_threshold)))

    if top_topics_per_sentence is not None:
        w = Window.partitionBy(ID_COL, "sentence_id").orderBy(F.desc("topic_similarity"))
        scored = scored.withColumn("rn", F.row_number().over(w)) \
                       .filter(F.col("rn") <= int(top_topics_per_sentence)) \
                       .drop("rn")

    return scored.select(
        ID_COL,
        "sentence_id",
        "topic",
        F.col("topic_similarity").cast("double").alias("topic_similarity")
    )

# =========================================================
# 5) Report building (weaknesses + strengths + evidence)
# =========================================================
def _build_topic_report_df(
    enriched: DataFrame,
    *,
    target_property_id: str,
    top_k_weakness_topics: int,
    top_k_strength_topics: int,
    top_k_examples: int,
) -> Tuple[DataFrame, DataFrame]:
    my_df  = enriched.filter(F.col(ID_COL) == F.lit(str(target_property_id)))
    nbr_df = enriched.filter(F.col(ID_COL) != F.lit(str(target_property_id)))

    def agg(df: DataFrame, prefix: str) -> DataFrame:
        return (
            df.groupBy("topic")
              .agg(
                  F.count("*").alias(f"{prefix}_sentence_topic_assignments"),
                  F.sum(F.when(F.col("sentiment_label") == "negative", 1).otherwise(0)).alias(f"{prefix}_negative_assignments"),
              )
              .withColumn(
                  f"{prefix}_negative_rate",
                  F.when(
                      F.col(f"{prefix}_sentence_topic_assignments") > 0,
                      F.col(f"{prefix}_negative_assignments") / F.col(f"{prefix}_sentence_topic_assignments")
                  ).otherwise(F.lit(0.0))
              )
        )

    my_agg  = agg(my_df, "target")
    nbr_agg = agg(nbr_df, "neighbors")

    comp = (
        my_agg.join(nbr_agg, on="topic", how="outer")
              .na.fill(
                  0,
                  subset=[
                      "target_sentence_topic_assignments",
                      "target_negative_assignments",
                      "target_negative_rate",
                      "neighbors_sentence_topic_assignments",
                      "neighbors_negative_assignments",
                      "neighbors_negative_rate",
                  ],
              )
              .withColumn("negative_rate_gap", F.col("target_negative_rate") - F.col("neighbors_negative_rate"))
    )

    weaknesses_df = comp.orderBy(F.desc("negative_rate_gap")).limit(int(top_k_weakness_topics))
    strengths_df  = comp.orderBy(F.asc("negative_rate_gap")).limit(int(top_k_strength_topics))

    def attach_examples(df_topics: DataFrame) -> DataFrame:
        topics_only = df_topics.select("topic").distinct()

        evidence_struct = F.struct(
            F.col(ID_COL).alias("property_id"),
            F.col("review_idx").cast("int").alias("review_idx"),
            F.col("sent_idx").cast("int").alias("sent_idx"),
            F.col("sentence_id").alias("sentence_id"),
            F.col("topic_similarity").cast("double").alias("topic_similarity"),
            F.col("sentiment_label").alias("sentiment_label"),
            F.col("sentence_text").alias("sentence_text"),
        )

        w_t = Window.partitionBy("topic").orderBy(F.desc("topic_similarity"))
        target_negative_examples = (
            my_df.join(topics_only, on="topic", how="inner")
                .filter(F.col("sentiment_label") == "negative")
                .withColumn("rn", F.row_number().over(w_t))
                .filter(F.col("rn") <= int(top_k_examples))
                .groupBy("topic")
                .agg(F.collect_list(evidence_struct).alias("target_negative_examples"))
        )

        w_np = Window.partitionBy("topic").orderBy(F.desc("topic_similarity"))
        neighbors_positive_examples = (
            nbr_df.join(topics_only, on="topic", how="inner")
                 .filter(F.col("sentiment_label") == "positive")
                 .withColumn("rn", F.row_number().over(w_np))
                 .filter(F.col("rn") <= int(top_k_examples))
                 .groupBy("topic")
                 .agg(F.collect_list(evidence_struct).alias("neighbors_positive_examples"))
        )

        w_nn = Window.partitionBy("topic").orderBy(F.desc("topic_similarity"))
        neighbors_negative_examples = (
            nbr_df.join(topics_only, on="topic", how="inner")
                 .filter(F.col("sentiment_label") == "negative")
                 .withColumn("rn", F.row_number().over(w_nn))
                 .filter(F.col("rn") <= int(top_k_examples))
                 .groupBy("topic")
                 .agg(F.collect_list(evidence_struct).alias("neighbors_negative_examples"))
        )

        return (
            df_topics.join(target_negative_examples, on="topic", how="left")
                    .join(neighbors_positive_examples, on="topic", how="left")
                    .join(neighbors_negative_examples, on="topic", how="left")
                    .withColumn("target_negative_examples", F.coalesce(F.col("target_negative_examples"), F.array()))
                    .withColumn("neighbors_positive_examples", F.coalesce(F.col("neighbors_positive_examples"), F.array()))
                    .withColumn("neighbors_negative_examples", F.coalesce(F.col("neighbors_negative_examples"), F.array()))
        )

    return attach_examples(weaknesses_df), attach_examples(strengths_df)

def analyze_property_realtime_payload(
    listings_df: DataFrame,
    sim_df: DataFrame,
    pid: str,
    *,
    similarity_threshold: float = 0.35,
    top_topics_per_sentence: int = 2,
    top_k_weakness_topics: int = 8,
    top_k_strength_topics: Optional[int] = None,
    top_k_examples: int = 3,
    max_reviews_per_property: int = 50,
    max_candidates: int = 50,
    top_k_neighbors: int = 20,
    seed: int = 42,
    oversample: float = 1.7,
    use_model_name: str = "tfhub_use",
    non_ascii_ratio_max: float = 0.20,
    topic_partitions: Optional[int] = None,
    as_json_string: bool = True,
) -> Any:
    def _struct_list_to_dict_list(struct_list):
        if not struct_list:
            return []
        out = []
        for s in struct_list:
            out.append({
                "property_id": s["property_id"],
                "review_idx": int(s["review_idx"]) if s["review_idx"] is not None else None,
                "sent_idx": int(s["sent_idx"]) if s["sent_idx"] is not None else None,
                "sentence_id": s["sentence_id"],
                "topic_similarity": float(s["topic_similarity"]) if s["topic_similarity"] is not None else None,
                "sentiment_label": s["sentiment_label"],
                "sentence_text": s["sentence_text"],
            })
        return out

    spark = listings_df.sparkSession

    if top_k_strength_topics is None:
        top_k_strength_topics = int(top_k_weakness_topics)

    init_models(spark)
    init_embedding_model(spark, use_model_name=use_model_name)

    neighbors_df, neighbor_meta = top_similar_properties_with_metadata(
        df=sim_df,
        target_property_id=pid,
        max_candidates=max_candidates,
        top_k=top_k_neighbors,
        seed=seed,
        oversample=oversample,
    )

    neighbors_rows = neighbors_df.orderBy(F.desc("similarity")).collect()
    neighbor_ids: List[str] = [r[ID_COL] for r in neighbors_rows]
    neighbor_sims: List[float] = [float(r["similarity"]) if r["similarity"] is not None else None for r in neighbors_rows]

    sim_vals = [s for s in neighbor_sims if s is not None]
    neighbor_similarity_stats = {
        "neighbors_selected_count": int(len(neighbor_ids)),
        "similarity_avg": float(sum(sim_vals) / len(sim_vals)) if sim_vals else None,
        "similarity_min": float(min(sim_vals)) if sim_vals else None,
        "similarity_max": float(max(sim_vals)) if sim_vals else None,
    }

    neighbor_list = [
        {"neighbor_property_id": nid, "similarity": float(s) if s is not None else None}
        for nid, s in zip(neighbor_ids, neighbor_sims)
    ]

    ids_df = (
        spark.createDataFrame([(str(pid),)], [ID_COL])
             .unionByName(neighbors_df.select(ID_COL))
             .distinct()
    )

    df_subset = (
        listings_df.select(F.col(ID_COL).cast("string").alias(ID_COL), F.col(REV_COL))
                   .join(ids_df, on=ID_COL, how="inner")
    )

    sent_df = build_sentences_with_sentiment(
        df_subset=df_subset,
        max_reviews_per_property=max_reviews_per_property
    ).persist(StorageLevel.MEMORY_AND_DISK)

    sent_counts = sent_df.groupBy(ID_COL).agg(F.countDistinct("sentence_id").alias("distinct_sentences"))
    target_sentence_count_row = _safe_head1(sent_counts.filter(F.col(ID_COL) == F.lit(str(pid))))
    target_distinct_sentences = int(target_sentence_count_row["distinct_sentences"]) if target_sentence_count_row else 0
    neighbors_distinct_sentences = int(
        sent_counts.filter(F.col(ID_COL) != F.lit(str(pid)))
                   .agg(F.sum("distinct_sentences").alias("s"))
                   .first()["s"] or 0
    )

    topics_df = classify_topics_with_use(
        sent_df=sent_df,
        similarity_threshold=similarity_threshold,
        top_topics_per_sentence=top_topics_per_sentence,
        non_ascii_ratio_max=non_ascii_ratio_max,
        topic_partitions=topic_partitions,
    ).persist(StorageLevel.MEMORY_AND_DISK)

    enriched = (
        sent_df.join(topics_df, on=[ID_COL, "sentence_id"], how="inner")
              .select(
                  F.col(ID_COL).cast("string").alias(ID_COL),
                  F.col("review_idx").cast("int").alias("review_idx"),
                  F.col("sent_idx").cast("int").alias("sent_idx"),
                  F.col("sentence_id").alias("sentence_id"),
                  F.col("topic").alias("topic"),
                  F.col("topic_similarity").cast("double").alias("topic_similarity"),
                  F.col("sentiment_label").alias("sentiment_label"),
                  F.col("sentence_text").alias("sentence_text"),
              )
              .persist(StorageLevel.MEMORY_AND_DISK)
    )

    weaknesses_df, strengths_df = _build_topic_report_df(
        enriched=enriched,
        target_property_id=str(pid),
        top_k_weakness_topics=int(top_k_weakness_topics),
        top_k_strength_topics=int(top_k_strength_topics),
        top_k_examples=int(top_k_examples),
    )

    weaknesses = []
    for r in weaknesses_df.collect():
        weaknesses.append({
            "topic": r["topic"],
            "target_sentence_topic_assignments": int(r["target_sentence_topic_assignments"]),
            "target_negative_assignments": int(r["target_negative_assignments"]),
            "target_negative_rate": float(r["target_negative_rate"]),
            "neighbors_sentence_topic_assignments": int(r["neighbors_sentence_topic_assignments"]),
            "neighbors_negative_assignments": int(r["neighbors_negative_assignments"]),
            "neighbors_negative_rate": float(r["neighbors_negative_rate"]),
            "negative_rate_gap": float(r["negative_rate_gap"]),
            "target_negative_examples": _struct_list_to_dict_list(r["target_negative_examples"]),
            "neighbors_positive_examples": _struct_list_to_dict_list(r["neighbors_positive_examples"]),
            "neighbors_negative_examples": _struct_list_to_dict_list(r["neighbors_negative_examples"]),
        })

    strengths = []
    for r in strengths_df.collect():
        strengths.append({
            "topic": r["topic"],
            "target_sentence_topic_assignments": int(r["target_sentence_topic_assignments"]),
            "target_negative_assignments": int(r["target_negative_assignments"]),
            "target_negative_rate": float(r["target_negative_rate"]),
            "neighbors_sentence_topic_assignments": int(r["neighbors_sentence_topic_assignments"]),
            "neighbors_negative_assignments": int(r["neighbors_negative_assignments"]),
            "neighbors_negative_rate": float(r["neighbors_negative_rate"]),
            "negative_rate_gap": float(r["negative_rate_gap"]),
            "target_negative_examples": _struct_list_to_dict_list(r["target_negative_examples"]),
            "neighbors_positive_examples": _struct_list_to_dict_list(r["neighbors_positive_examples"]),
            "neighbors_negative_examples": _struct_list_to_dict_list(r["neighbors_negative_examples"]),
        })

    payload: Dict[str, Any] = {
        "target_property_id": str(pid),
        "neighbors": {
            "neighbor_list": neighbor_list,
            "similarity_stats": neighbor_similarity_stats,
            "selection_metadata": neighbor_meta,
        },
        "text_volume": {
            "target_distinct_sentences_used": int(target_distinct_sentences),
            "neighbors_distinct_sentences_used": int(neighbors_distinct_sentences),
            "max_reviews_per_property_cap": int(max_reviews_per_property),
        },
        "topic_tagging_config": {
            "embedding_model": str(use_model_name),
            "similarity_threshold": float(similarity_threshold),
            "top_topics_per_sentence": int(top_topics_per_sentence),
            "non_ascii_ratio_max": float(non_ascii_ratio_max),
        },
        "results": {
            "weaknesses_vs_neighbors": weaknesses,
            "strengths_vs_neighbors": strengths,
        },
    }

    # cleanup
    sent_df.unpersist()
    topics_df.unpersist()
    enriched.unpersist()

    return json.dumps(payload, ensure_ascii=False) if as_json_string else payload

# =========================================================
# 6) Write payload to Delta output paths
# =========================================================
def write_payload_to_delta_paths(
    *,
    spark: SparkSession,
    payload: dict,
    out_base_path: str,
    run_id: str,
) -> dict:
    if not out_base_path:
        raise ValueError("out_base_path is required")

    ts = datetime.utcnow().isoformat(timespec="seconds") + "Z"
    pid = payload.get("target_property_id")

    runs_path      = out_base_path.rstrip("/") + "/runs"
    neighbors_path = out_base_path.rstrip("/") + "/neighbors"
    topics_path    = out_base_path.rstrip("/") + "/topics"
    evidence_path  = out_base_path.rstrip("/") + "/evidence"

    # Runs row (store payload_json for debugging; you can drop later)
    runs_row = {
        "run_id": run_id,
        "run_utc_ts": ts,
        "property_id": pid,
        "status": "ok",
        "neighbors_selected_count": payload.get("neighbors", {}).get("similarity_stats", {}).get("neighbors_selected_count"),
        "target_distinct_sentences_used": payload.get("text_volume", {}).get("target_distinct_sentences_used"),
        "neighbors_distinct_sentences_used": payload.get("text_volume", {}).get("neighbors_distinct_sentences_used"),
        "payload_json": json.dumps(payload, ensure_ascii=False),
    }
    spark.createDataFrame([runs_row]).write.format("delta").mode("append").save(runs_path)

    # Neighbors
    neighbor_list = payload.get("neighbors", {}).get("neighbor_list", []) or []
    neighbors_rows = []
    for x in neighbor_list:
        neighbors_rows.append({
            "run_id": run_id,
            "run_utc_ts": ts,
            "property_id": pid,
            "neighbor_property_id": x.get("neighbor_property_id"),
            "similarity": x.get("similarity"),
        })
    if neighbors_rows:
        spark.createDataFrame(neighbors_rows).write.format("delta").mode("append").save(neighbors_path)

    # Topics
    topics_rows = []
    for kind, arr in [
        ("weakness", payload.get("results", {}).get("weaknesses_vs_neighbors", []) or []),
        ("strength", payload.get("results", {}).get("strengths_vs_neighbors", []) or []),
    ]:
        for t in arr:
            topics_rows.append({
                "run_id": run_id,
                "run_utc_ts": ts,
                "property_id": pid,
                "kind": kind,
                "topic": t.get("topic"),
                "negative_rate_gap": t.get("negative_rate_gap"),
                "target_negative_rate": t.get("target_negative_rate"),
                "neighbors_negative_rate": t.get("neighbors_negative_rate"),
                "target_sentence_topic_assignments": t.get("target_sentence_topic_assignments"),
                "neighbors_sentence_topic_assignments": t.get("neighbors_sentence_topic_assignments"),
            })
    if topics_rows:
        spark.createDataFrame(topics_rows).write.format("delta").mode("append").save(topics_path)

    # Evidence (bounded text)
    def _push_evidence(kind: str, topic: str, evidence_arr: list, role: str):
        rows = []
        for e in (evidence_arr or []):
            txt = e.get("sentence_text")
            if txt and len(txt) > 600:
                txt = txt[:600] + "â€¦"
            rows.append({
                "run_id": run_id,
                "run_utc_ts": ts,
                "property_id": pid,
                "kind": kind,
                "topic": topic,
                "evidence_role": role,
                "evidence_property_id": e.get("property_id"),
                "review_idx": e.get("review_idx"),
                "sent_idx": e.get("sent_idx"),
                "sentence_id": e.get("sentence_id"),
                "topic_similarity": e.get("topic_similarity"),
                "sentiment_label": e.get("sentiment_label"),
                "sentence_text": txt,
            })
        return rows

    evidence_rows = []
    for kind, arr in [
        ("weakness", payload.get("results", {}).get("weaknesses_vs_neighbors", []) or []),
        ("strength", payload.get("results", {}).get("strengths_vs_neighbors", []) or []),
    ]:
        for t in arr:
            topic = t.get("topic")
            evidence_rows += _push_evidence(kind, topic, t.get("target_negative_examples"), "target_negative")
            evidence_rows += _push_evidence(kind, topic, t.get("neighbors_positive_examples"), "neighbors_positive")
            evidence_rows += _push_evidence(kind, topic, t.get("neighbors_negative_examples"), "neighbors_negative")

    if evidence_rows:
        spark.createDataFrame(evidence_rows).write.format("delta").mode("append").save(evidence_path)

    return {
        "runs_path": runs_path,
        "neighbors_path": neighbors_path,
        "topics_path": topics_path,
        "evidence_path": evidence_path,
    }

# =========================================================
# 7) Job main: load data, run analysis, write outputs, exit JSON
# =========================================================
DEFAULT_PARAMS = dict(
    similarity_threshold=0.35,
    top_topics_per_sentence=2,
    top_k_weakness_topics=8,
    top_k_strength_topics=8,
    top_k_examples=3,
    max_reviews_per_property=50,
    max_candidates=100,
    top_k_neighbors=20,
    seed=42,
    oversample=1.7,
    use_model_name="tfhub_use",
    non_ascii_ratio_max=0.20,
    topic_partitions=None,
)

def _parse_params_json(s: str) -> dict:
    if not s:
        return {}
    obj = json.loads(s)
    if not isinstance(obj, dict):
        raise ValueError("params_json must decode to a JSON object (dict).")
    return obj

spark = SparkSession.getActiveSession() or SparkSession.builder.getOrCreate()
run_id = str(uuid.uuid4())
ts = datetime.utcnow().isoformat(timespec="seconds") + "Z"

try:
    user_params = _parse_params_json(PARAMS_JSON)
    params = {**DEFAULT_PARAMS, **user_params}

    airbnb_df = spark.read.format("delta").load(DELTA_PATH)
    sim_df = airbnb_df

    payload = analyze_property_realtime_payload(
        listings_df=airbnb_df,
        sim_df=sim_df,
        pid=str(PID),
        as_json_string=False,
        **params,
    )

    written = write_payload_to_delta_paths(
        spark=spark,
        payload=payload,
        out_base_path=OUT_BASE_PATH,
        run_id=run_id,
    )

    dbutils.notebook.exit(json.dumps({
        "status": "ok",
        "run_id": run_id,
        "run_utc_ts": ts,
        "property_id": str(PID),
        "written": written,
        "params_used": params,
    }, ensure_ascii=False))

except Exception as e:
    # Best-effort: if out_base_path is provided, write an error run row so Flask can see failures via SQL
    err_msg = str(e)
    try:
        runs_path = OUT_BASE_PATH.rstrip("/") + "/runs"
        spark.createDataFrame([{
            "run_id": run_id,
            "run_utc_ts": ts,
            "property_id": str(PID),
            "status": "error",
            "error_message": err_msg[:2000],
            "payload_json": None,
            "neighbors_selected_count": None,
            "target_distinct_sentences_used": None,
            "neighbors_distinct_sentences_used": None,
        }]).write.format("delta").mode("append").save(runs_path)
    except Exception:
        pass

    dbutils.notebook.exit(json.dumps({
        "status": "error",
        "run_id": run_id,
        "run_utc_ts": ts,
        "property_id": str(PID),
        "error_message": err_msg[:4000],
    }, ensure_ascii=False))
