In [0]:
import json
import matplotlib.pyplot as plt
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    StringIndexer, OneHotEncoder, VectorAssembler, Imputer, StandardScaler
)
from pyspark.ml.regression import LinearRegression
from pyspark.sql import DataFrame
from pyspark.ml.functions import vector_to_array

In [0]:
# ==========================================
# CONFIGURATION
# ==========================================

# 1. File Paths & IDs
INPUT_PATH = "/mnt/lab94290/cluster_19/airbnb_data_updateddd"  
dbutils.widgets.text("property id", "40458495")

TARGET_PROPERTY_ID = dbutils.widgets.get("property id").strip()                  
# 2. Column Names
ID_COL = "property_id"
LABEL_COL = "ratings"
AMENITIES_RAW_COL = "amenities"

# 3. Currency Conversion (Table based)
FX_RATES_DATA = [("USD", 1.0), ("EUR", 1.05)] 
FX_RATES_DF = spark.createDataFrame(FX_RATES_DATA, ["currency", "to_usd"])

# 4. Features Lists
NUMERIC_FEATURES_BASE = [
    "price_per_guest"   
]

BOOL_FEATURES_BASE = [
    "is_super_host" # 0/1
]

CATEGORICAL_FEATURES = [
    "cancellation_bucket"       
]

CATEGORY_RATING_COLS = [
    "cat_cleanliness",          
    "cat_checkin",
    "cat_communication"
]

# 5. Global Amenities to always include
GLOBAL_IMPORTANT_AMENITIES = [
    "Wifi", "Air conditioning", "Heating", "Washer", "Dryer",
    "Free parking on premises", "Pool", "Hot tub", "Kitchen", "TV", "Hair dryer"
]

# 6. Schema for Amenities parsing
AMENITIES_SCHEMA = T.ArrayType(
    T.StructType([
        T.StructField("group_name", T.StringType(), True),
        T.StructField("items", T.ArrayType(
            T.StructType([
                T.StructField("name",  T.StringType(), True),
                T.StructField("value", T.StringType(), True)
            ])
        ), True)
    ])
)

In [0]:
def get_neighbors_by_geo_bucket(df, property_id, id_col="property_id", res_col="geo_bucket_res", bucket_col="geo_bucket", h3_prefix="h3_"):
    target = df.filter(F.col(id_col) == F.lit(property_id)).select(res_col, bucket_col).limit(1).collect()
    
    if not target:
        raise ValueError(f"property_id {property_id} not found")

    res = int(target[0][res_col])
    bucket = target[0][bucket_col]
    h3_col = f"{h3_prefix}{res}"
    
    # Filter neighbors
    neighbors_df = df.filter(F.col(h3_col) == F.lit(bucket))
    return neighbors_df

def extract_features(df, fx_rates_df):
    # 1. Currency Conversion
    df = df.withColumn("currency", F.when(F.col("currency").isNull(), F.lit("USD")).otherwise(F.col("currency")))
    df = df.join(fx_rates_df, on="currency", how="left")
    df = df.withColumn("price_usd", F.col("price") * F.col("to_usd"))
    
    # 2. Price per Guest
    df = df.withColumn("guests", F.when(F.col("guests").isNull() | (F.col("guests") <= 0), F.lit(1.0)).otherwise(F.col("guests")))
    df = df.withColumn("price_per_guest", F.when((F.col("price_usd").isNotNull()), F.col("price_usd") / F.col("guests")).otherwise(None))

    # 3. Cancellation Policy Parsing
    df = df.withColumn("cp_text", F.lower(F.col("cancellation_policy").cast("string")))
    df = df.withColumn("cancellation_bucket",
        F.when(F.col("cp_text").contains("no refund"), "strict")
        .when(F.col("cp_text").contains("partial refund"), "moderate")
        .when(F.col("cp_text").contains("full refund"), "flexible")
        .otherwise("unknown")
    )

    # 4. Extract Category Ratings
    def extract_cat(name):
        return F.regexp_extract(F.col("category_rating").cast("string"), f'"name":"{name}".*?"value":"([0-9\\.]+)"', 1).cast("double")

    df = df.withColumn("cat_cleanliness", extract_cat("Cleanliness"))
    df = df.withColumn("cat_checkin", extract_cat("Check-in"))
    df = df.withColumn("cat_communication", extract_cat("Communication"))

    # 5. Type Casting for Numerics (Ensuring Double Type for ML)
    for c in NUMERIC_FEATURES_BASE + BOOL_FEATURES_BASE:
        if c in df.columns:
            df = df.withColumn(c, F.col(c).cast("double"))
            
    # Target Label Casting
    if LABEL_COL in df.columns:
        df = df.withColumn(LABEL_COL, F.col(LABEL_COL).cast("double"))
            
    return df

def build_amenity_feature_df(df_in, id_col, top_k_local=15, global_list=None, raw_col="amenities"):
    """ Optimized version using manual aggregation instead of slow pivot """
    if global_list is None: global_list = []
    
    # 1. Parse JSON
    df_parsed = df_in.withColumn("amenities_parsed", F.from_json(F.col(raw_col).cast("string"), AMENITIES_SCHEMA))
    
    # 2. Explode to find top amenities - FIXED (Double Explode for Group -> Items)
    amenities_long = (
        df_parsed.select(F.col(id_col).alias("pid"), F.explode_outer("amenities_parsed").alias("group")) # קודם מפרקים קבוצות
        .select("pid", F.explode_outer("group.items").alias("item")) # ואז מפרקים את הפריטים בתוך הקבוצה
        .select("pid", F.lower(F.col("item.name")).alias("name")) # עכשיו השדה הוא String תקין
        .filter(F.col("name").isNotNull())
    )
    
    # Simple normalization (tv variants, etc.)
    amenities_long = amenities_long.withColumn("name", 
        F.when(F.col("name").contains("tv"), "tv")
         .when(F.col("name").contains("hair dryer"), "hair dryer")
         .otherwise(F.col("name"))
    )

    # 3. Determine Chosen Amenities (Local Top K + Global)
    local_top = (amenities_long.groupBy("name").count().orderBy(F.desc("count")).limit(top_k_local))
    local_list = [r["name"] for r in local_top.collect()]
    global_norm = [g.lower() for g in global_list]
    chosen_amenities = list(set(local_list + global_norm))

    # 4. Manual Pivot (Faster!) - Create 0/1 columns
    aggs = []
    amen_cols = []
    
    for amen in chosen_amenities:
        safe_name = "amen_" + amen.replace(" ", "_").replace("-", "_")
        amen_cols.append(safe_name)
        # אם המילה מופיעה ברשימה של הנכס = 1, אחרת 0
        aggs.append(F.max(F.when(F.col("name") == amen, 1.0).otherwise(0.0)).alias(safe_name))
    
    # Group by PID and calculate all flags at once
    features_df = amenities_long.groupBy("pid").agg(*aggs)
    
    # Join back to main DF
    df_out = df_in.join(features_df, df_in[id_col] == features_df["pid"], "left")
    
    # Fill nulls with 0
    df_out = df_out.fillna(0.0, subset=amen_cols)
    
    return df_out, amen_cols

In [0]:
# 1. Load Data
raw_df = spark.read.format("delta").load(INPUT_PATH)

# 2. Get Neighbors
df_similar = get_neighbors_by_geo_bucket(raw_df, TARGET_PROPERTY_ID)

# 3. Feature Extraction
df_similar = extract_features(df_similar, FX_RATES_DF)

# 4. Amenities Features
df_final, amenity_cols = build_amenity_feature_df(
    df_similar, 
    ID_COL, 
    top_k_local=20, 
    global_list=GLOBAL_IMPORTANT_AMENITIES
)

# 5. Define final list of columns for the model
FINAL_NUMERIC_COLS = [c for c in (NUMERIC_FEATURES_BASE + CATEGORY_RATING_COLS) if c in df_final.columns]
FINAL_BOOL_COLS = [c for c in BOOL_FEATURES_BASE if c in df_final.columns]
FINAL_AMENITY_COLS = amenity_cols
FINAL_CATEGORICAL_COLS = [c for c in CATEGORICAL_FEATURES if c in df_final.columns]

ALL_FEATURES_RAW = FINAL_NUMERIC_COLS + FINAL_BOOL_COLS + FINAL_AMENITY_COLS

print(f"Total features: {len(ALL_FEATURES_RAW) + len(FINAL_CATEGORICAL_COLS)}")
# Cache to speed up training
df_final.cache()
print("Data ready and cached.")

In [0]:
# Stages setup
stages = []

# 1. Imputer (Median)
imputer = Imputer(inputCols=ALL_FEATURES_RAW, outputCols=[f"{c}__imp" for c in ALL_FEATURES_RAW]).setStrategy("median")
stages.append(imputer)
imp_cols = [f"{c}__imp" for c in ALL_FEATURES_RAW]

# 2. OHE for Categoricals
ohe_cols = []
if FINAL_CATEGORICAL_COLS:
    indexer = StringIndexer(inputCols=FINAL_CATEGORICAL_COLS, outputCols=[f"{c}__idx" for c in FINAL_CATEGORICAL_COLS], handleInvalid="keep")
    encoder = OneHotEncoder(inputCols=[f"{c}__idx" for c in FINAL_CATEGORICAL_COLS], outputCols=[f"{c}__ohe" for c in FINAL_CATEGORICAL_COLS], handleInvalid="keep")
    stages.extend([indexer, encoder])
    ohe_cols = [f"{c}__ohe" for c in FINAL_CATEGORICAL_COLS]

# 3. Assembler & Scaler
assembler = VectorAssembler(inputCols=imp_cols + ohe_cols, outputCol="features_vec", handleInvalid="keep")
scaler = StandardScaler(inputCol="features_vec", outputCol="features_scaled", withMean=True, withStd=True)

# 4. Model (Linear Regression)
lr = LinearRegression(featuresCol="features_scaled", labelCol=LABEL_COL, maxIter=50, regParam=0.1, elasticNetParam=0.5)

stages.extend([assembler, scaler, lr])
pipeline = Pipeline(stages=stages)

# Train
train_df = df_final.filter((F.col(LABEL_COL).isNotNull()) & (F.col(ID_COL) != F.lit(TARGET_PROPERTY_ID)))
model = pipeline.fit(train_df)

print("Model trained successfully.")

In [0]:
import io
import base64
import json
import matplotlib.pyplot as plt
from pyspark.sql import functions as F
from pyspark.ml.functions import vector_to_array
import regex as re

# --- 1. Chart Generation Helper (Base64) ---
def fig_to_base64(fig):
    """ Helper: Convert matplotlib figure to Base64 string """
    buf = io.BytesIO()
    fig.savefig(buf, format='png', bbox_inches='tight')
    buf.seek(0)
    img_str = base64.b64encode(buf.read()).decode('utf-8')
    plt.close(fig) 
    return img_str

def humanize_feature_name(s: str) -> str:
    if s is None:
        return s
    s = str(s)

    # remove leading 'amen_' (only if it's a prefix)
    s = re.sub(r"^amen_", "", s)

    # remove trailing '_imp' (only if it's a suffix)
    s = re.sub(r"_imp$", "", s)

    # underscores -> spaces
    s = s.replace("_", " ")

    # clean extra spaces
    s = re.sub(r"\s+", " ", s).strip()

    # nicer casing
    return s.title()


def generate_charts_data(property_output_df, topN=10):
    """Generates charts and returns them as Base64 strings for the UI"""
    pdf = property_output_df.toPandas().copy()
    images_data = {}

    # Create a human-readable label column for display
    if "feature" in pdf.columns:
        pdf["feature_label"] = pdf["feature"].apply(humanize_feature_name)
    else:
        pdf["feature_label"] = "Unknown"

    # Chart 1: Top absolute effects (impact %)
    df1 = pdf.sort_values("impact_pct_abs", ascending=False).head(topN).iloc[::-1]
    if not df1.empty:
        fig1 = plt.figure(figsize=(10, 5))
        plt.barh(df1["feature_label"], df1["impact_pct_abs"], color="skyblue")
        plt.title(f"Top {topN} Feature Effects on Predicted Score (%)")
        plt.xlabel("Absolute Impact (%)")
        plt.tight_layout()
        images_data["impacts_chart"] = fig_to_base64(fig1)

    # Chart 2: Improvement opportunities (estimated gain)
    df2 = (
        pdf[pdf["potential_gain_scaled"] > 0.01]
        .sort_values("potential_gain_scaled", ascending=False)
        .head(topN)
        .iloc[::-1]
    )

    if not df2.empty:
        fig2 = plt.figure(figsize=(10, 5))
        plt.barh(df2["feature_label"], df2["potential_gain_scaled"], color="lightgreen")
        plt.title(f"Top Improvement Opportunities (Estimated Gain)")
        plt.xlabel("Estimated Gain (score points)")
        plt.tight_layout()
        images_data["gain_chart"] = fig_to_base64(fig2)
    else:
        images_data["gain_chart"] = None

    return images_data


# --- 2. Main Logic: Explanation & JSON Builder ---
def explain_property_output(df, train_df, model, property_id, id_col, label_col, features_raw, categorical_cols=None, topK=60):
    """ Generates the explanation table optimized for AI Agents """
    if categorical_cols is None: categorical_cols = []

    # --- A. Get Target & Top Group Data ---
    target_row = df.filter(F.col(id_col) == F.lit(property_id)).limit(1)
    if target_row.count() == 0: raise ValueError(f"Property {property_id} not found")
    
    # Find Top 20% Benchmark
    q = train_df.approxQuantile(label_col, [0.8], 0.001)[0]
    top_df = train_df.filter(F.col(label_col) >= q)
    
    # Calculate Averages for Numerics
    mean_exprs = [F.avg(c).alias(c) for c in features_raw if c in top_df.columns]
    top_mean_row = top_df.agg(*mean_exprs).withColumn(id_col, F.lit("TOP_MEAN"))

    # Add Mode for Categoricals (Prevent Nulls crashing the model)
    for cat_col in categorical_cols:
        if cat_col in top_df.columns:
            mode_row = top_df.groupBy(cat_col).count().orderBy(F.desc("count")).limit(1).collect()
            mode_val = mode_row[0][cat_col] if mode_row else "unknown"
            top_mean_row = top_mean_row.withColumn(cat_col, F.lit(mode_val))

    # --- B. Run Model Transform ---
    target_scored = model.transform(target_row)
    top_scored = model.transform(top_mean_row)
    
    # --- C. Extract Metadata & Coefficients ---
    attrs = target_scored.schema["features_vec"].metadata["ml_attr"]["attrs"]
    feature_names_vec = []
    all_attrs = []
    if "numeric" in attrs: all_attrs.extend(attrs["numeric"])
    if "binary" in attrs: all_attrs.extend(attrs["binary"])
    if "nominal" in attrs: all_attrs.extend(attrs["nominal"])
    all_attrs.sort(key=lambda x: x["idx"])
    feature_names_vec = [a["name"] for a in all_attrs]

    lr_model = model.stages[-1]
    coefs = lr_model.coefficients.toArray().tolist()
    coef_df = spark.createDataFrame(list(zip(feature_names_vec, coefs)), ["feature", "coef"])
    
    # Extract Vectors
    target_vec = target_scored.select(vector_to_array("features_scaled").alias("vec")).collect()[0]["vec"]
    top_vec = top_scored.select(vector_to_array("features_scaled").alias("vec")).collect()[0]["vec"]
    
    vec_df = spark.createDataFrame(
        [(i, float(target_vec[i]), float(top_vec[i])) for i in range(len(target_vec))],
        ["idx", "target_scaled", "top_scaled"]
    )
    
    idx_map = spark.createDataFrame(list(enumerate(feature_names_vec)), ["idx", "feature"])
    
    # --- D. Melt Raw Values (Helper) ---
    def melt_raw(row_df, val_name):
        stack_parts = []
        valid_cols = [c for c in features_raw if c in row_df.columns]
        if not valid_cols: return spark.createDataFrame([], f"feature string, {val_name} double")
        for c in valid_cols:
            stack_parts.append(f"'{c}__imp', `{c}`") 
        stack_expr = f"stack({len(valid_cols)}, {', '.join(stack_parts)}) as (feature, {val_name})"
        return row_df.select(F.expr(stack_expr))

    target_raw_long = melt_raw(target_row, "target_value")
    top_raw_long = melt_raw(top_mean_row, "top_mean_value")

    # --- E. Join All & Calculate Impacts ---
    final_df = (
        idx_map
        .join(coef_df, "feature")
        .join(vec_df, "idx")
        .join(target_raw_long, "feature", "left")
        .join(top_raw_long, "feature", "left")
        .withColumn("impact_value", F.col("coef") * F.col("target_scaled"))
        .withColumn("potential_gain_scaled", F.col("coef") * (F.col("top_scaled") - F.col("target_scaled")))
        .withColumn("abs_impact", F.abs(F.col("impact_value")))
        .orderBy(F.desc("abs_impact"))
    )
    
    total_impact = final_df.agg(F.sum("abs_impact")).collect()[0][0] or 1.0
    final_df = final_df.withColumn("impact_pct_abs", (F.col("abs_impact") / total_impact) * 100)
    
    # --- F. PREPARE FOR AGENT (Cleanup & Filtering) ---
    
    # 1. Add verbal trend instead of raw coefficient number
    df_clean = final_df.withColumn(
        "market_trend", 
        F.when(F.col("coef") > 0, "Positive (Good to have)")
         .when(F.col("coef") < 0, "Negative (Avoid)")
         .otherwise("Neutral")
    )

    # 3. Select & Rename for LLM Clarity (The Lean Payload)
    df_clean = df_clean.select(
        F.col("feature").alias("name"),
        F.col("target_value").alias("my_value"),      
        F.col("top_mean_value").alias("market_avg"),
        F.col("market_trend"),                         
        F.col("impact_value").alias("current_impact"),
        F.col("potential_gain_scaled").alias("opportunity"), 
        F.col("impact_pct_abs").alias("importance_pct") 
    )

    # 4. Convert to JSON List
    top_rows = df_clean.limit(topK).toJSON().collect()
    json_list = [json.loads(r) for r in top_rows]
    
    # Return both the full DF (for charts) and the list (for LLM)
    return final_df, json_list

In [0]:
import json
import base64
from IPython.display import Image, display

# 1. Get Parameter (Default for testing)
dbutils.widgets.text("property_id", "40458495") 
target_property_id = dbutils.widgets.get("property_id")

print(f"🕵️ Analyzing Property: {target_property_id}...")

try:
    # 2. Run Analysis (Laucnhed Logic)
    df_full, insights_list = explain_property_output(
        df=df_final,
        train_df=train_df,
        model=model,
        property_id=target_property_id, 
        id_col=ID_COL,
        label_col=LABEL_COL,
        features_raw=ALL_FEATURES_RAW,
        categorical_cols=FINAL_CATEGORICAL_COLS
    )

    # 3. Generate Charts (Base64 Strings)
    charts_b64 = generate_charts_data(df_full, topN=10)
    print("\n" + "="*40)
    print("📊 VISUAL CHECK (For Developer)")
    print("="*40)
    
    if not charts_b64:
        print("No charts generated.")
    
    for chart_name, b64_str in charts_b64.items():
        if b64_str:
            print(f"\nDisplaying: {chart_name}")
            display(Image(data=base64.b64decode(b64_str)))
        else:
            print(f"\n{chart_name}: No data (likely empty)")
    
    print("="*40 + "\n")
    # ---------------------------------------------

    # 4. Construct Final Response Payload
    final_output = {
        "status": "success",
        "property_id": target_property_id,
        "llm_context": {
            "insights": insights_list,
            "summary_note": "market_trend indicates sentiment. opportunity is potential gain."
        },
        "ui_artifacts": {
            "charts": charts_b64
        }
    }

except Exception as e:
    print(f"❌ Error occurred: {e}")
    final_output = {
        "status": "error",
        "message": str(e)
    }

# 5. Return to Agent
output_json = json.dumps(final_output, ensure_ascii=False)
print("✅ Output Payload Ready.")
dbutils.notebook.exit(output_json)