In [8]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import Window
from pyspark.sql import SparkSession

StatementMeta(, 3577672f-387b-4ea3-a96f-9ad46b9cceb6, 10, Finished, Available, Finished)

In [13]:
# Efficient null-counts utility
def null_counts_fast(df):
    exprs = [
        F.count(F.when(F.col(c).isNull() | (F.trim(F.col(c)) == ""), c)).alias(c)
        for c in df.columns
    ]
    row = df.select(*exprs).collect()[0]
    return row.asDict()

StatementMeta(, 3577672f-387b-4ea3-a96f-9ad46b9cceb6, 15, Finished, Available, Finished)

In [17]:
# Read raw CSV

raw_path = "abfss://d47ba78a-8bf0-452e-a7ad-00dcfbe54401@onelake.dfs.fabric.microsoft.com/9c78da36-6763-4715-b3d9-440a066d7e52/Files/rawdata"
cleaned_files_path = "abfss://d47ba78a-8bf0-452e-a7ad-00dcfbe54401@onelake.dfs.fabric.microsoft.com/9c78da36-6763-4715-b3d9-440a066d7e52/Files/cleaneddata"

cleaned_table = "cleaned_house_rent"

StatementMeta(, 3577672f-387b-4ea3-a96f-9ad46b9cceb6, 19, Finished, Available, Finished)

In [18]:
#Load CSV
df_raw = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(raw_path)
)

StatementMeta(, 3577672f-387b-4ea3-a96f-9ad46b9cceb6, 20, Finished, Available, Finished)

In [20]:
# Counts, schema, sample and null counts
print("Raw file path:", raw_path)
print("Total rows (raw):", df_raw.count())   # OK for small lab dataset
print("\nSchema:")
df_raw.printSchema()

print("\nSample rows:")
display(df_raw.limit(10))

print("\nNull / empty counts by column:")
try:
    print(null_counts_fast(df_raw))
except Exception as e:
    print("Could not compute null counts:", e)

StatementMeta(, 3577672f-387b-4ea3-a96f-9ad46b9cceb6, 22, Finished, Available, Finished)

Raw file path: abfss://d47ba78a-8bf0-452e-a7ad-00dcfbe54401@onelake.dfs.fabric.microsoft.com/9c78da36-6763-4715-b3d9-440a066d7e52/Files/rawdata
Total rows (raw): 4746

Schema:
root
 |-- Posted On: date (nullable = true)
 |-- BHK: integer (nullable = true)
 |-- Rent: integer (nullable = true)
 |-- Size: integer (nullable = true)
 |-- Floor: string (nullable = true)
 |-- Area Type: string (nullable = true)
 |-- Area Locality: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Furnishing Status: string (nullable = true)
 |-- Tenant Preferred: string (nullable = true)
 |-- Bathroom: integer (nullable = true)
 |-- Point of Contact: string (nullable = true)


Sample rows:


SynapseWidget(Synapse.DataFrame, 5be35f48-2080-4fae-8277-61cd350ce1c5)


Null / empty counts by column:
{'Posted On': 0, 'BHK': 0, 'Rent': 0, 'Size': 0, 'Floor': 0, 'Area Type': 0, 'Area Locality': 0, 'City': 0, 'Furnishing Status': 0, 'Tenant Preferred': 0, 'Bathroom': 0, 'Point of Contact': 0}


In [23]:
# Cleaning the data
df = df_raw

df = (
    df
    .withColumn("City", F.trim(F.col("City")))
    .withColumn("Area_Type", F.trim(F.col("Area Type")))
    .withColumn("Area_Locality", F.trim(F.col("Area Locality")))
    .withColumn("Tenant_Preferred_raw", F.trim(F.col("Tenant Preferred")))
    .withColumn("Furnishing_Status_raw", F.trim(F.col("Furnishing Status")))
    .withColumn("Point_of_Contact", F.trim(F.col("Point of Contact")))

    .withColumn(
        "Rent_clean",
        F.regexp_replace(F.col("Rent").cast("string"), "[^0-9.]", "").cast("double")
    )

    .withColumn(
        "Size_sqft",
        F.regexp_replace(F.col("Size").cast("string"), "[^0-9.]", "").cast("double")
    )

    .withColumn(
        "BHK_clean",
        F.regexp_extract(F.col("BHK").cast("string"), r"([0-9]+)", 1).cast("int")
    )

    .withColumn(
        "Bathroom_clean",
        F.regexp_extract(F.col("Bathroom").cast("string"), r"([0-9]+)", 1).cast("int")
    )
)

df = df.withColumn(
    "Tenant_Preferred",
    F.when(F.lower(F.col("Tenant_Preferred_raw")).rlike("bachelor.*family|family.*bachelor|bachelors/family|bachelors & family|bachelors and family|bachelor/family"), "Bachelors/Family")
     .when(F.lower(F.col("Tenant_Preferred_raw")).rlike("bachelor"), "Bachelors")
     .when(F.lower(F.col("Tenant_Preferred_raw")).rlike("family"), "Family")
     .when(F.col("Tenant_Preferred_raw").isNull() | (F.trim(F.col("Tenant_Preferred_raw")) == ""), "Bachelors/Family")
     .otherwise("Bachelors/Family")  # default to Bachelors/Family to keep exactly 3 categories
)

df = df.withColumn(
    "Furnishing_Status",
    F.when(F.lower(F.col("Furnishing_Status_raw")).rlike("furnishe"), "Furnished")
     .when(F.lower(F.col("Furnishing_Status_raw")).rlike("semi"), "Semi-Furnished")
     .when(F.lower(F.col("Furnishing_Status_raw")).rlike("unfurnishe|not furnished|nil|none"), "Unfurnished")
     .otherwise(F.col("Furnishing_Status_raw"))
)

StatementMeta(, 3577672f-387b-4ea3-a96f-9ad46b9cceb6, 25, Finished, Available, Finished)

In [25]:
# Selecting only the required columns
df_clean = df.select(
    F.col("BHK_clean").alias("BHK"),
    F.col("Rent_clean").alias("Rent"),
    F.col("Size_sqft"),
    F.col("Bathroom_clean").alias("Bathroom"),
    F.col("City"),
    F.col("Furnishing_Status"),
    F.col("Tenant_Preferred"),
    F.col("Point_of_Contact").alias("contact"),
    F.col("Area_Type"),
    F.col("Area_Locality")
)

rows_before = df_clean.count()
print(f"Rows in df_clean before dropping critical nulls: {rows_before}")
print("Null / empty counts (before):")
print(null_counts_fast(df_clean))

critical_missing_df = df_clean.filter(
    F.col("Rent").isNull() | F.col("City").isNull() | F.col("Size_sqft").isNull()
)
rows_missing = critical_missing_df.count()
print(f"Rows missing critical fields (Rent/City/Size_sqft): {rows_missing} ({round(rows_missing/rows_before*100,2)}%)")

if rows_missing > 0:
    print("Sample rows that will be dropped due to missing critical fields:")
    display(critical_missing_df.limit(10))

df_clean = df_clean.na.drop(subset=["Rent", "City", "Size_sqft"])
rows_after_drop = df_clean.count()
print(f"Rows after dropping critical nulls: {rows_after_drop} (dropped {rows_before - rows_after_drop})")

before_dedup = df_clean.count()
df_clean = df_clean.dropDuplicates()
after_dedup = df_clean.count()
print(f"Duplicates removed: {before_dedup - after_dedup}")

invalid_vals = df_clean.filter((F.col("Rent") <= 0) | (F.col("Size_sqft") <= 0)).count()
print(f"Rows with non-positive Rent or Size: {invalid_vals}")
if invalid_vals > 0:
    print("Dropping rows with non-positive Rent or Size (lab decision).")
    df_clean = df_clean.filter((F.col("Rent") > 0) & (F.col("Size_sqft") > 0))

avg_rent = df_clean.agg(F.round(F.avg("Rent"), 2).alias("avg_rent")).collect()[0]["avg_rent"]
median_size = df_clean.approxQuantile("Size_sqft", [0.5], 0.01)[0]  # approximate median
print(f"Average Rent (clean): {avg_rent}")
print(f"Median Size (approx, sqft): {round(median_size,2)}")

print("\nCleaned DataFrame schema:")
df_clean.printSchema()

print("\nSample cleaned rows:")
display(df_clean.limit(10))

print("\nNull / empty counts (after cleaning):")
print(null_counts_fast(df_clean))

StatementMeta(, 3577672f-387b-4ea3-a96f-9ad46b9cceb6, 27, Finished, Available, Finished)

Rows in df_clean before dropping critical nulls: 4746
Null / empty counts (before):
{'BHK': 0, 'Rent': 0, 'Size_sqft': 0, 'Bathroom': 0, 'City': 0, 'Furnishing_Status': 0, 'Tenant_Preferred': 0, 'contact': 0, 'Area_Type': 0, 'Area_Locality': 0}
Rows missing critical fields (Rent/City/Size_sqft): 0 (0.0%)
Rows after dropping critical nulls: 4746 (dropped 0)
Duplicates removed: 34
Rows with non-positive Rent or Size: 0
Average Rent (clean): 34951.3
Median Size (approx, sqft): 850.0

Cleaned DataFrame schema:
root
 |-- BHK: integer (nullable = true)
 |-- Rent: double (nullable = true)
 |-- Size_sqft: double (nullable = true)
 |-- Bathroom: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- Furnishing_Status: string (nullable = true)
 |-- Tenant_Preferred: string (nullable = false)
 |-- contact: string (nullable = true)
 |-- Area_Type: string (nullable = true)
 |-- Area_Locality: string (nullable = true)


Sample cleaned rows:


SynapseWidget(Synapse.DataFrame, fa048534-e6a4-4471-aa13-feedca1db29e)


Null / empty counts (after cleaning):
{'BHK': 0, 'Rent': 0, 'Size_sqft': 0, 'Bathroom': 0, 'City': 0, 'Furnishing_Status': 0, 'Tenant_Preferred': 0, 'contact': 0, 'Area_Type': 0, 'Area_Locality': 0}


In [33]:
# Creating cleaned table
cleaned_table = "cleaned_house_rent"
cleaned_path = cleaned_files_path.rstrip("/") + "/cleaned_house_rent"

print("Writing cleaned DataFrame to Delta path:", cleaned_path)
(
    df_clean
    .write
    .mode("overwrite")
    .format("delta")
    .option("path", cleaned_path)
    .saveAsTable(cleaned_table)
)

# Verify table
print("Row count (cleaned):", spark.table(cleaned_table).count())
print("Schema (cleaned):")
spark.table(cleaned_table).printSchema()
print("Sample rows:")
display(spark.table(cleaned_table).limit(10))


StatementMeta(, 3577672f-387b-4ea3-a96f-9ad46b9cceb6, 35, Finished, Available, Finished)

Writing cleaned DataFrame to Delta path: abfss://d47ba78a-8bf0-452e-a7ad-00dcfbe54401@onelake.dfs.fabric.microsoft.com/9c78da36-6763-4715-b3d9-440a066d7e52/Files/cleaneddata/cleaned_house_rent
Row count (cleaned): 4712
Schema (cleaned):
root
 |-- BHK: integer (nullable = true)
 |-- Rent: double (nullable = true)
 |-- Size_sqft: double (nullable = true)
 |-- Area_Type: string (nullable = true)
 |-- Area_Locality: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Furnishing_Status: string (nullable = true)
 |-- Tenant_Preferred: string (nullable = true)
 |-- Bathroom: integer (nullable = true)
 |-- contact: string (nullable = true)

Sample rows:


SynapseWidget(Synapse.DataFrame, 2748a24b-db9d-41be-b696-36514e6a131b)

In [40]:
# Deleting existing old ml table

ml_table = "cleaned_house_rent_ml"

candidate_paths = [
    cleaned_files_path.rstrip("/") + "/cleaned_house_rent_ml",  # e.g. /Files/cleaneddata/cleaned_house_rent_ml
    "abfss://d47ba78a-8bf0-452e-a7ad-00dcfbe54401@onelake.dfs.fabric.microsoft.com/9c78da36-6763-4715-b3d9-440a066d7e52/Tables/dbo/" + ml_table  # path reported in AnalysisException
]

print(f"Attempting to DROP TABLE IF EXISTS {ml_table} ...")
try:
    spark.sql(f"DROP TABLE IF EXISTS {ml_table}")
    print("Dropped table from metastore (if it existed).")
except Exception as e:
    print("Warning: could not drop table via spark.sql:", e)

for p in candidate_paths:
    try:
        print("Attempting to remove path:", p)
        dbutils.fs.rm(p, recurse=True)
        print("Removed:", p)
    except NameError:
        # dbutils not available in some environments
        print("dbutils not available; cannot delete path:", p)
    except Exception as e:
        print("Could not delete path (maybe it didn't exist) -", p, " error:", e)

# 3) Final verification: print whether table still appears in catalog
try:
    exists = any(t.name == ml_table for t in spark.catalog.listTables())
    print(f"Table exists after drop? {exists}")
except Exception as e:
    print("Could not list tables for verification:", e)

print("Cleanup cell finished. If no errors printed above (except dbutils message), you're clear to recreate the ML table.")


StatementMeta(, 3577672f-387b-4ea3-a96f-9ad46b9cceb6, 42, Finished, Available, Finished)

Attempting to DROP TABLE IF EXISTS cleaned_house_rent_ml ...
Dropped table from metastore (if it existed).
Attempting to remove path: abfss://d47ba78a-8bf0-452e-a7ad-00dcfbe54401@onelake.dfs.fabric.microsoft.com/9c78da36-6763-4715-b3d9-440a066d7e52/Files/cleaneddata/cleaned_house_rent_ml
dbutils not available; cannot delete path: abfss://d47ba78a-8bf0-452e-a7ad-00dcfbe54401@onelake.dfs.fabric.microsoft.com/9c78da36-6763-4715-b3d9-440a066d7e52/Files/cleaneddata/cleaned_house_rent_ml
Attempting to remove path: abfss://d47ba78a-8bf0-452e-a7ad-00dcfbe54401@onelake.dfs.fabric.microsoft.com/9c78da36-6763-4715-b3d9-440a066d7e52/Tables/dbo/cleaned_house_rent_ml
dbutils not available; cannot delete path: abfss://d47ba78a-8bf0-452e-a7ad-00dcfbe54401@onelake.dfs.fabric.microsoft.com/9c78da36-6763-4715-b3d9-440a066d7e52/Tables/dbo/cleaned_house_rent_ml
Table exists after drop? False
Cleanup cell finished. If no errors printed above (except dbutils message), you're clear to recreate the ML table.


In [41]:
# Creating ML-focused table cleaned_house_rent_ml
ml_table = "cleaned_house_rent_ml"

df_ml = (
    df_clean
    .select(
        "BHK",
        "Bathroom",
        "Size_sqft",
        "Rent",
        "City",
        "Furnishing_Status",
        "Tenant_Preferred"
    )
    .na.drop(subset=["BHK", "Bathroom", "Size_sqft", "Rent", "City"])
)

print("Writing ML table to metastore-managed location:", ml_table)
(
    df_ml
    .write
    .mode("overwrite")
    .format("delta")
    .saveAsTable(ml_table)
)

# Verify
print(f"Table {ml_table} created. Row count:", spark.table(ml_table).count())
print("Schema (ML):")
spark.table(ml_table).printSchema()
display(spark.table(ml_table).limit(10))


StatementMeta(, 3577672f-387b-4ea3-a96f-9ad46b9cceb6, 43, Finished, Available, Finished)

Writing ML table to metastore-managed location: cleaned_house_rent_ml
Table cleaned_house_rent_ml created. Row count: 4712
Schema (ML):
root
 |-- BHK: integer (nullable = true)
 |-- Bathroom: integer (nullable = true)
 |-- Size_sqft: double (nullable = true)
 |-- Rent: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Furnishing_Status: string (nullable = true)
 |-- Tenant_Preferred: string (nullable = true)



SynapseWidget(Synapse.DataFrame, f3cc0ccb-838b-4422-8485-a70911b8eecd)

In [56]:
# Build aggregated DataFrame per city
df_city_agg = (
    df_clean
    .groupBy("City")
    .agg(
        F.count("*").alias("num_listings"),
        F.round(F.avg("Rent"), 2).alias("avg_rent"),
        # percentile_approx is a spark SQL function that gives an approximate median
        F.round(F.expr("percentile_approx(Rent, 0.5)"), 2).alias("median_rent")
    )
)

# Preview
print("Sample of aggregated city-level table (top 20 by avg_rent):")
display(df_city_agg.orderBy(F.desc("avg_rent")).limit(20))

# Write as managed Delta table (name: agg_rent_by_city) - metastore-managed location
agg_city_table = "agg_rent_by_city"
(
    df_city_agg
    .write
    .mode("overwrite")
    .format("delta")
    .saveAsTable(agg_city_table)
)

# Verification
print(f"Table {agg_city_table} written. Row count:", spark.table(agg_city_table).count())
print("Preview from metastore table:")
display(spark.table(agg_city_table).orderBy(F.desc("avg_rent")).limit(20))


StatementMeta(, 3577672f-387b-4ea3-a96f-9ad46b9cceb6, 58, Finished, Available, Finished)

Sample of aggregated city-level table (top 20 by avg_rent):


SynapseWidget(Synapse.DataFrame, 0e4d9c53-ae60-4e06-9830-06d882625369)

Table agg_rent_by_city written. Row count: 6
Preview from metastore table:


SynapseWidget(Synapse.DataFrame, 0892d9ab-4b7f-47e3-90a9-ef96ec01c7ce)

In [57]:
# avg_rent_by_city
df_avg_rent_by_city = (
    df_clean
    .groupBy("City")
    .agg(
        F.count("*").alias("num_listings"),
        F.round(F.avg("Rent"), 2).alias("avg_rent"),
        F.round(F.expr("percentile_approx(Rent, 0.5)"), 2).alias("median_rent")
    )
)

df_avg_rent_by_city.write.mode("overwrite").saveAsTable("avg_rent_by_city")

print("Delta table 'avg_rent_by_city' created.")
display(spark.table("avg_rent_by_city").limit(10))

StatementMeta(, 3577672f-387b-4ea3-a96f-9ad46b9cceb6, 59, Finished, Available, Finished)

Delta table 'avg_rent_by_city' created.


SynapseWidget(Synapse.DataFrame, af009bd0-e73a-46cd-a0d2-56151f1bc380)

In [58]:
# Create rent_by_bhk_top_cities
from pyspark.sql import functions as F

top_cities = (
    df_clean.groupBy("City")
    .agg(F.count("*").alias("cnt"))
    .orderBy(F.desc("cnt"))
    .limit(5)
    .select("City")
    .rdd.flatMap(lambda x: x)
    .collect()
)

df_bhk = (
    df_clean.filter(F.col("City").isin(top_cities))
    .groupBy("City", "BHK")
    .agg(
        F.count("*").alias("listings"),
        F.round(F.avg("Rent"), 2).alias("avg_rent")
    )
)

df_bhk.write.mode("overwrite").saveAsTable("rent_by_bhk_top_cities")

print("Delta table 'rent_by_bhk_top_cities' created.")
display(spark.table("rent_by_bhk_top_cities").limit(20))


StatementMeta(, 3577672f-387b-4ea3-a96f-9ad46b9cceb6, 60, Finished, Available, Finished)

Delta table 'rent_by_bhk_top_cities' created.


SynapseWidget(Synapse.DataFrame, 49e4597c-280a-4072-8383-9cb8930e7e0c)

Machine Learning Section

In [1]:
# ML — Prepare features and train/test split for RandomForest baseline

from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.linalg import Vectors

# 1) Load the ML table
ml_table = "cleaned_house_rent_ml"
print("Loading table:", ml_table)
df = spark.table(ml_table)

print("Row count:", df.count())
print("Schema:")
df.printSchema()
display(df.limit(5))

# 2) Defensive cleaning: drop rows with nulls in features
required_cols = ["BHK", "Bathroom", "Size_sqft", "Rent", "City", "Furnishing_Status", "Tenant_Preferred"]
df = df.na.drop(subset=required_cols)
print("Row count after dropping nulls in required columns:", df.count())

# 3) Categorical columns to index & encode
cat_cols = ["City", "Furnishing_Status", "Tenant_Preferred"]
index_cols = [c + "_idx" for c in cat_cols]
ohe_cols = [c + "_ohe" for c in cat_cols]

# 4) Create StringIndexers
indexers = [
    StringIndexer(inputCol=c, outputCol=out, handleInvalid="keep")
    for c, out in zip(cat_cols, index_cols)
]

# 5) OneHotEncoders
encoders = [
    OneHotEncoder(inputCol=idx, outputCol=ohe, handleInvalid="keep")
    for idx, ohe in zip(index_cols, ohe_cols)
]

# 6) Numeric feature list
numeric_cols = ["BHK", "Bathroom", "Size_sqft"]

# 7) VectorAssembler to assemble all features
assembler_inputs = numeric_cols + ohe_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features", handleInvalid="skip")

# 8) Build pipeline 
stages = []
stages.extend(indexers)
stages.extend(encoders)
stages.append(assembler)

pipeline = Pipeline(stages=stages)

print("Fitting pipeline to create feature vectors...")
pipeline_model = pipeline.fit(df)
df_ml_ready = pipeline_model.transform(df).select("features", F.col("Rent").alias("label"))

# 9) Show sample and cache for faster training
print("Sample of features/label:")
display(df_ml_ready.limit(5))
df_ml_ready = df_ml_ready.cache()
print("Total rows ready for ML:", df_ml_ready.count())

# 10) Train/test split
train_df, test_df = df_ml_ready.randomSplit([0.8, 0.2], seed=42)
print("Train rows:", train_df.count(), "Test rows:", test_df.count())

# 11) Persist train/test for next cell
# Keep them in this notebook session as variables `train_df` and `test_df`.
print("ML preparation finished. Ready to train a RandomForest baseline in the next cell.")


StatementMeta(, 2738f4c2-dd0a-4018-8f40-99f411bb7f4f, 3, Finished, Available, Finished)

Loading table: cleaned_house_rent_ml
Row count: 4712
Schema:
root
 |-- BHK: integer (nullable = true)
 |-- Size_sqft: double (nullable = true)
 |-- Bathroom: integer (nullable = true)
 |-- Rent: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Furnishing_Status: string (nullable = true)
 |-- Tenant_Preferred: string (nullable = true)



SynapseWidget(Synapse.DataFrame, 9c005507-753c-4241-9b88-59b84d970699)

Row count after dropping nulls in required columns: 4712
Fitting pipeline to create feature vectors...


Sample of features/label:


SynapseWidget(Synapse.DataFrame, cd4f7b62-9e92-4906-be1a-8fad0be4a499)

Total rows ready for ML: 4712
Train rows: 3811 Test rows: 901
ML preparation finished. Ready to train a RandomForest baseline in the next cell.


In [2]:
# ML — Training RandomForest baseline, predict, evaluate
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import PipelineModel
import time

# 1) Define the model
rf = RandomForestRegressor(featuresCol="features", labelCol="label", predictionCol="prediction",
                           numTrees=50, maxDepth=8, seed=42)

# 2) Fit the model on the train set
start = time.time()
rf_model = rf.fit(train_df)
train_time = time.time() - start
print(f"Training completed in {train_time:.1f} seconds.")

# 3) Predict on the test set
pred_df = rf_model.transform(test_df).select("prediction", "label")
print("Sample predictions (prediction, label):")
display(pred_df.limit(10))

# 4) Evaluate: RMSE and R2
evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse = evaluator_rmse.evaluate(pred_df)
r2 = evaluator_r2.evaluate(pred_df)

print(f"RandomForest baseline results -> RMSE: {rmse:.2f}, R²: {r2:.3f}")

# 6) Store rf_model in a variable for later use if needed
trained_rf_model = rf_model

print("ML baseline step finished.")

StatementMeta(, 2738f4c2-dd0a-4018-8f40-99f411bb7f4f, 4, Finished, Available, Finished)

Training completed in 30.4 seconds.
Sample predictions (prediction, label):


SynapseWidget(Synapse.DataFrame, b27bb86c-d239-46dd-b13c-becb5b90d859)

RandomForest baseline results -> RMSE: 30151.57, R²: 0.704
ML baseline step finished.


In [3]:
# ML Final Step — Persist predictions from the trained model into a Delta table

from pyspark.sql import functions as F

# Re-apply the pipeline to df to get consistent features for the full dataset
df_full_ml_features = pipeline_model.transform(df).select(
    "City",
    "BHK",
    "Bathroom",
    "Size_sqft",
    "Furnishing_Status",
    "Tenant_Preferred",
    F.col("Rent").alias("Rent_actual"),
    "features"
)

# Predict using the trained model
df_full_pred = trained_rf_model.transform(df_full_ml_features) \
    .select(
        "City",
        "BHK",
        "Bathroom",
        "Size_sqft",
        "Furnishing_Status",
        "Tenant_Preferred",
        "Rent_actual",
        F.col("prediction").alias("Rent_pred")
    )

# Save as a Delta table for Power BI
pred_table = "rf_predictions"
df_full_pred.write.mode("overwrite").format("delta").saveAsTable(pred_table)

print(f"Predictions saved as Delta table '{pred_table}'.")
display(spark.table(pred_table).limit(10))

StatementMeta(, 2738f4c2-dd0a-4018-8f40-99f411bb7f4f, 5, Finished, Available, Finished)

Predictions saved as Delta table 'rf_predictions'.


SynapseWidget(Synapse.DataFrame, 07364d39-bc0a-4b15-aeb5-e03567c71e44)