In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, isnan
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StringIndexer

# Kh·ªüi t·∫°o Spark Session
print("Initializing Spark Session...")
spark = SparkSession.builder \
    .appName("Real Estate Analysis") \
    .config("spark.driver.memory", "8g") \
    .config("spark.jars.packages", "ml.dmlc:xgboost4j-spark_2.12:1.5.2") \
    .master("local[*]") \
    .getOrCreate()

# ====================== PH·∫¶N TI·ªÄN X·ª¨ L√ù D·ªÆ LI·ªÜU ======================
# ƒê·ªçc d·ªØ li·ªáu
print("ƒêang ƒë·ªçc d·ªØ li·ªáu...")
df = spark.read.csv("realtor-data.zip.csv", header=True, inferSchema=True)

# Ki·ªÉm tra th√¥ng tin d·ªØ li·ªáu g·ªëc
df.printSchema()
# Lo·∫°i b·ªè c√°c c·ªôt kh√¥ng c·∫ßn thi·∫øt
df = df.drop("city", "zip_code", "prev_sold_date")
# Ki·ªÉm tra s·ªë l∆∞·ª£ng gi√° tr·ªã null
null_counts = df.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in df.columns])
print("\nS·ªë l∆∞·ª£ng gi√° tr·ªã null:")
null_counts.show()



# Lo·∫°i b·ªè c√°c h√†ng c√≥ gi√° tr·ªã null trong c·ªôt quan tr·ªçng
df = df.filter(~col("price").isNull())
df = df.filter(~col("bed").isNull())
df = df.filter(~col("bath").isNull())

# X·ª≠ l√Ω c·ªôt status
print("\nGi√° tr·ªã trong c·ªôt status:")
df.groupBy("status").count().show()
df = df.drop("status")

# X·ª≠ l√Ω c·ªôt state
print("\nGi√° tr·ªã trong c·ªôt state:")
df.groupBy("state").count().show()

# Ch·ªâ gi·ªØ l·∫°i c√°c state c√≥ √≠t nh·∫•t 50 m·∫´u
state_counts = df.groupBy("state").count()
valid_states = state_counts.filter(col("count") >= 50).select("state")
df = df.join(valid_states, "state", "inner")

# M√£ h√≥a state th√†nh s·ªë
indexer = StringIndexer(inputCol="state", outputCol="state_numeric", handleInvalid="skip")
indexer_model = indexer.fit(df)
df = indexer_model.transform(df)
df = df.drop("state")

# T·∫°o mapping t·ª´ s·ªë ƒë·∫øn t√™n state
state_labels = indexer_model.labels
numeric_to_state = {i: state for i, state in enumerate(state_labels)}
print("\nMapping state_numeric -> state:")
for k, v in numeric_to_state.items():
    print(f"{k}: {v}")

# Hi·ªÉn th·ªã th·ªëng k√™ m√¥ t·∫£
df.describe().show()


In [None]:
# X·ª≠ l√Ω outlier cho c·ªôt price
q95 = df.approxQuantile("price", [0.95], 0.01)[0]
q25 = df.approxQuantile("price", [0.25], 0.01)[0]
iqrMax = q95 + q25
print(f"\nThreshold cho price: {iqrMax}")
percent_outliers = df.filter(col("price") > 3150000.0).count() / df.count() * 100
print(f"T·ª∑ l·ªá outliers trong price: {percent_outliers:.2f}%")
df = df.filter(col("price") <= 3150000.0)

# X·ª≠ l√Ω outlier cho acre_lot
percent_outliers = df.filter(col("acre_lot") > 200).count() / df.count() * 100
print(f"\nT·ª∑ l·ªá outliers trong acre_lot: {percent_outliers:.2f}%")
df = df.filter(col("acre_lot") <= 200)

# X·ª≠ l√Ω outlier cho house_size
percent_outliers = df.filter(col("house_size") >= 20000).count() / df.count() * 100
print(f"\nT·ª∑ l·ªá outliers trong house_size: {percent_outliers:.2f}%")
df = df.filter(col("house_size") < 20000)

# Ki·ªÉm tra l·∫°i gi√° tr·ªã null sau khi x·ª≠ l√Ω outliers
null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
print("\nS·ªë l∆∞·ª£ng gi√° tr·ªã null sau khi x·ª≠ l√Ω outliers:")
null_counts.show()

# Hi·ªÉn th·ªã th·ªëng k√™ m√¥ t·∫£
df.describe().show()

In [None]:
import xgboost as xgb
import pandas as pd
from pyspark.sql.functions import monotonically_increasing_id, when

print("\nƒêang ƒëi·ªÅn gi√° tr·ªã thi·∫øu cho acre_lot b·∫±ng XGBoost...")

# Ki·ªÉm tra n·∫øu c√≥ gi√° tr·ªã thi·∫øu
if df.filter(col("acre_lot").isNull()).count() > 0:

    # Chuy·ªÉn d·ªØ li·ªáu c√≥ acre_lot sang Pandas
    filled_pdf = filled_df.toPandas()
    missing_pdf = missing_df.toPandas()

    # Ki·ªÉm tra n·∫øu c√≥ d·ªØ li·ªáu ƒë·ªÉ hu·∫•n luy·ªán
    if not filled_pdf.empty and not missing_pdf.empty:
        # Chu·∫©n b·ªã d·ªØ li·ªáu hu·∫•n luy·ªán
        X_train_fill = filled_pdf.drop(columns=["acre_lot"])
        y_train_fill = filled_pdf["acre_lot"]

        X_test_fill = missing_pdf.drop(columns=["acre_lot"])

        # Hu·∫•n luy·ªán m√¥ h√¨nh XGBoost
        model_fill = xgb.XGBRegressor(
            n_estimators=100,
            learning_rate=0.1,
            max_depth=3,
            subsample=0.8,
            colsample_bytree=0.8
        )
        model_fill.fit(X_train_fill, y_train_fill)

        # D·ª± ƒëo√°n gi√° tr·ªã thi·∫øu
        preds = model_fill.predict(X_test_fill)

        # Chuy·ªÉn k·∫øt qu·∫£ v·ªÅ DataFrame PySpark
        preds_df = pd.DataFrame({"acre_lot": preds})
        preds_spark = spark.createDataFrame(preds_df)

        # Th√™m ID ƒë·ªÉ k·∫øt h·ª£p l·∫°i d·ªØ li·ªáu
        df = df.withColumn("id", monotonically_increasing_id())
        missing_df = missing_df.withColumn("id", monotonically_increasing_id())
        preds_spark = preds_spark.withColumn("id", monotonically_increasing_id())

        # K·∫øt h·ª£p gi√° tr·ªã d·ª± ƒëo√°n v√†o t·∫≠p d·ªØ li·ªáu g·ªëc
        df = df.join(preds_spark, "id", "left_outer").drop("id")
        df = df.withColumn("acre_lot", when(col("acre_lot").isNull(), col("prediction")).otherwise(col("acre_lot"))).drop("prediction")

        print(f"ƒê√£ ƒëi·ªÅn {len(preds)} gi√° tr·ªã thi·∫øu cho acre_lot b·∫±ng XGBoost!")

    else:
        print("Kh√¥ng c√≥ ƒë·ªß d·ªØ li·ªáu ƒë·ªÉ hu·∫•n luy·ªán XGBoost. B·ªè qua vi·ªác ƒëi·ªÅn gi√° tr·ªã thi·∫øu.")

else:
    print("Kh√¥ng c√≥ gi√° tr·ªã thi·∫øu trong acre_lot. Kh√¥ng c·∫ßn ƒëi·ªÅn.")



In [None]:
import xgboost as xgb
import pandas as pd
from pyspark.sql.functions import monotonically_increasing_id, when

print("\nƒêang ƒëi·ªÅn gi√° tr·ªã thi·∫øu cho house_size b·∫±ng XGBoost...")

# Ki·ªÉm tra n·∫øu c√≥ gi√° tr·ªã thi·∫øu
if df.filter(col("house_size").isNull()).count() > 0:

    # Chuy·ªÉn d·ªØ li·ªáu PySpark sang Pandas
    filled_pdf = df.filter(col("house_size").isNotNull()).toPandas()
    missing_pdf = df.filter(col("house_size").isNull()).toPandas()

    # Ki·ªÉm tra n·∫øu c√≥ ƒë·ªß d·ªØ li·ªáu ƒë·ªÉ hu·∫•n luy·ªán
    if not filled_pdf.empty and not missing_pdf.empty:
        # Chu·∫©n b·ªã d·ªØ li·ªáu hu·∫•n luy·ªán
        X_train_fill = filled_pdf.drop(columns=["house_size"])
        y_train_fill = filled_pdf["house_size"]

        X_test_fill = missing_pdf.drop(columns=["house_size"])

        # X·ª≠ l√Ω gi√° tr·ªã NaN b·∫±ng gi√° tr·ªã trung b√¨nh
        X_train_fill = X_train_fill.fillna(X_train_fill.mean())
        X_test_fill = X_test_fill.fillna(X_train_fill.mean())

        # Hu·∫•n luy·ªán m√¥ h√¨nh XGBoost
        model_fill = xgb.XGBRegressor(
            n_estimators=100,
            learning_rate=0.1,
            max_depth=3,
            subsample=0.8,
            colsample_bytree=0.8,
            tree_method='hist'
        )
        model_fill.fit(X_train_fill, y_train_fill)

        # D·ª± ƒëo√°n gi√° tr·ªã thi·∫øu
        preds = model_fill.predict(X_test_fill)

        # Chuy·ªÉn k·∫øt qu·∫£ v·ªÅ DataFrame PySpark
        preds_df = pd.DataFrame({"house_size": preds})
        preds_spark = spark.createDataFrame(preds_df)

        # Th√™m ID ƒë·ªÉ k·∫øt h·ª£p l·∫°i d·ªØ li·ªáu
        df = df.withColumn("id", monotonically_increasing_id())
        missing_df = df.filter(col("house_size").isNull()).withColumn("id", monotonically_increasing_id())
        preds_spark = preds_spark.withColumn("id", monotonically_increasing_id())

        # K·∫øt h·ª£p gi√° tr·ªã d·ª± ƒëo√°n v√†o t·∫≠p d·ªØ li·ªáu g·ªëc
        df = df.join(preds_spark, "id", "left_outer").drop("id")
        df = df.withColumn("house_size", when(col("house_size").isNull(), col("prediction")).otherwise(col("house_size"))).drop("prediction")

        print(f"ƒê√£ ƒëi·ªÅn {len(preds)} gi√° tr·ªã thi·∫øu cho house_size b·∫±ng XGBoost!")

    else:
        print("Kh√¥ng c√≥ ƒë·ªß d·ªØ li·ªáu ƒë·ªÉ hu·∫•n luy·ªán XGBoost. B·ªè qua vi·ªác ƒëi·ªÅn gi√° tr·ªã thi·∫øu.")

else:
    print("Kh√¥ng c√≥ gi√° tr·ªã thi·∫øu trong house_size. Kh√¥ng c·∫ßn ƒëi·ªÅn.")


In [None]:
from pyspark.sql.functions import col, when, expr

print("ƒêang t·∫°o c√°c ƒë·∫∑c tr∆∞ng m·ªõi...")

# Tr√°nh chia cho 0 b·∫±ng c√°ch s·ª≠ d·ª•ng F.when
df = df.withColumn("bed_bath_ratio", col("bed") / when(col("bath") == 0, 1).otherwise(col("bath")))
df = df.withColumn("total_rooms", col("bed") + col("bath"))
df = df.withColumn("room_density", col("total_rooms") / when(col("house_size") == 0, 1).otherwise(col("house_size")))
df = df.withColumn("house_size_per_bed", col("house_size") / when(col("bed") == 0, 1).otherwise(col("bed")))
df = df.withColumn("house_size_per_bath", col("house_size") / when(col("bath") == 0, 1).otherwise(col("bath")))
df = df.withColumn("lot_to_house_ratio", col("acre_lot") / when(col("house_size") == 0, 1).otherwise(col("house_size")))
df = df.withColumn("size_by_state", col("house_size") * col("state_numeric"))
df = df.withColumn("rooms_by_state", col("total_rooms") * col("state_numeric"))

# ƒêi·ªÅn gi√° tr·ªã thi·∫øu cho c√°c c·ªôt s·ªë (tr√°nh l·ªói)
numeric_columns = [c for c, t in df.dtypes if t in ('int', 'double')]
df = df.fillna(0, subset=numeric_columns)

print("Ho√†n t·∫•t t·∫°o ƒë·∫∑c tr∆∞ng m·ªõi!")


In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

print("\nChu·∫©n b·ªã d·ªØ li·ªáu cho m√¥ h√¨nh...")

# X√°c ƒë·ªãnh c·ªôt ƒë·∫∑c tr∆∞ng (b·ªè c·ªôt price)
feature_cols = [c for c in df.columns if c != "price"]

# D√πng VectorAssembler ƒë·ªÉ g·ªôp t·∫•t c·∫£ ƒë·∫∑c tr∆∞ng v√†o m·ªôt c·ªôt duy nh·∫•t
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df).select("features", "price")

# Chia d·ªØ li·ªáu th√†nh t·∫≠p hu·∫•n luy·ªán v√† ki·ªÉm tra (80/20)
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

print(f"K√≠ch th∆∞·ªõc t·∫≠p hu·∫•n luy·ªán: {train_df.count()} m·∫´u, t·∫≠p ki·ªÉm tra: {test_df.count()} m·∫´u")


In [None]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator

print("\nB·∫Øt ƒë·∫ßu t√¨m ki·∫øm si√™u tham s·ªë t·ªëi ∆∞u...")

# S·ª≠ d·ª•ng GBTRegressor v·ªõi c√°c tham s·ªë ƒë∆∞·ª£c t·ªëi ∆∞u
gbt = GBTRegressor(
    featuresCol="features", 
    labelCol="price",
    maxBins=32,        # TƒÉng maxBins ƒë·ªÉ c·∫£i thi·ªán hi·ªáu su·∫•t
    maxIter=100,       # S·ªë l·∫ßn l·∫∑p
    stepSize=0.1,      # Learning rate
    maxDepth=5,        # ƒê·ªô s√¢u c√¢y
    subsamplingRate=0.8  # Subsampling rate
)

# Gi·∫£m tham s·ªë trong l∆∞·ªõi t√¨m ki·∫øm ƒë·ªÉ tr√°nh timeout
param_grid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [3]) \
    .addGrid(gbt.stepSize, [0.1]) \
    .build()

# ƒê√°nh gi√° m√¥ h√¨nh
evaluator = RegressionEvaluator(
    labelCol="price", 
    predictionCol="prediction", 
    metricName="r2"
)

# S·ª≠ d·ª•ng TrainValidationSplit v·ªõi t·ª∑ l·ªá train cao h∆°n
tvs = TrainValidationSplit(
    estimator=gbt,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    trainRatio=0.9,  # 90% hu·∫•n luy·ªán, 10% ki·ªÉm ƒë·ªãnh
    seed=42
)

# Hu·∫•n luy·ªán m√¥ h√¨nh
print("ƒêang hu·∫•n luy·ªán m√¥ h√¨nh, vui l√≤ng ƒë·ª£i...")
model = tvs.fit(train_df)

# L·∫•y m√¥ h√¨nh t·ªët nh·∫•t
best_model = model.bestModel
print("\nƒê√£ t√¨m th·∫•y m√¥ h√¨nh t·ªët nh·∫•t!")
print(f"Best maxDepth: {best_model.getMaxDepth()}")
print(f"Best maxIter: {best_model.getMaxIter()}")
print(f"Best stepSize: {best_model.getStepSize()}")



In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

print("\nƒê√°nh gi√° m√¥ h√¨nh tr√™n t·∫≠p ki·ªÉm tra...")

# D·ª± ƒëo√°n
predictions = best_model.transform(test_df)

# Kh·ªüi t·∫°o b·ªô ƒë√°nh gi√°
evaluator_r2 = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
evaluator_rmse = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
evaluator_mae = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mae")

# T√≠nh to√°n c√°c ch·ªâ s·ªë ƒë√°nh gi√°
r2 = evaluator_r2.evaluate(predictions)
rmse = evaluator_rmse.evaluate(predictions)
mae = evaluator_mae.evaluate(predictions)

print("\nüîç ƒê√°nh gi√° m√¥ h√¨nh tr√™n d·ªØ li·ªáu th·ª±c t·∫ø:")
print(f"‚úÖ R¬≤ Score: {r2:.4f}")
print(f"‚úÖ RMSE: ${rmse:.2f}")
print(f"‚úÖ MAE: ${mae:.2f}")


In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

print("ƒêang v·∫Ω bi·ªÉu ƒë·ªì...")

# Chuy·ªÉn d·ªØ li·ªáu PySpark v·ªÅ Pandas
predictions_pd = predictions.select("price", "prediction").toPandas()

# 1. Bi·ªÉu ƒë·ªì d·ª± ƒëo√°n vs th·ª±c t·∫ø
plt.figure(figsize=(12, 8), clear=True)
plt.scatter(predictions_pd["price"], predictions_pd["prediction"], alpha=0.5)
plt.plot(
    [predictions_pd["price"].min(), predictions_pd["price"].max()],
    [predictions_pd["price"].min(), predictions_pd["price"].max()],
    'r--'
)
plt.title('Gi√° tr·ªã d·ª± ƒëo√°n vs Th·ª±c t·∫ø', fontsize=15)
plt.xlabel('Gi√° tr·ªã th·ª±c t·∫ø ($)', fontsize=12)
plt.ylabel('Gi√° tr·ªã d·ª± ƒëo√°n ($)', fontsize=12)
plt.grid(True)
plt.tight_layout()
plt.show()

# 2. Bi·ªÉu ƒë·ªì t·∫ßm quan tr·ªçng c·ªßa ƒë·∫∑c tr∆∞ng
plt.figure(figsize=(12, 8), clear=True)
feature_importance = best_model.featureImportances.toArray()  # Chuy·ªÉn sang m·∫£ng numpy
feature_names = X.columns
indices = np.argsort(feature_importance)[::-1]
top_n = min(10, len(feature_names))

plt.barh(range(top_n), feature_importance[indices][:top_n], align='center')
plt.yticks(range(top_n), [feature_names[i] for i in indices][:top_n])
plt.xlabel('T·∫ßm quan tr·ªçng', fontsize=12)
plt.title('Top 10 ƒë·∫∑c tr∆∞ng quan tr·ªçng nh·∫•t', fontsize=15)
plt.gca().invert_yaxis()  # ƒê·∫£o ng∆∞·ª£c tr·ª•c ƒë·ªÉ ƒë·∫∑c tr∆∞ng quan tr·ªçng nh·∫•t ·ªü tr√™n
plt.tight_layout()
plt.show()
