In [6]:
spark.stop()

In [1]:
# Notebook ph√¢n t√≠ch & x√¢y d·ª±ng mapping cho d·ªØ li·ªáu b·∫•t ƒë·ªông s·∫£n
import os
import sys
from datetime import datetime
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import (
    col, to_timestamp, current_timestamp, lit, regexp_replace, trim,
    when, upper, lower, split, element_at, round as spark_round,
    avg, count, percentile_approx, stddev, min as spark_min, max as spark_max,
    udf, length, expr
)
from pyspark.sql.types import StringType, DoubleType, BooleanType
from pyspark.sql.window import Window

# Th√™m th∆∞ m·ª•c g·ªëc v√†o sys.path
project_root = os.path.abspath(os.path.join(os.getcwd(), "../.."))
sys.path.append(project_root)

# T·∫°o Spark Session
spark = SparkSession.builder \
    .appName("BatDongSan Mapping Analysis") \
    .config("spark.ui.port", "4050") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
    .getOrCreate()

print("Spark session created successfully")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/24 19:07:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark session created successfully


In [2]:
# H√†m ti·ªán √≠ch ph√¢n t√≠ch
def get_date_format(date_obj=None):
    """Tr·∫£ v·ªÅ ng√†y theo ƒë·ªãnh d·∫°ng YYYY-MM-DD"""
    if date_obj is None:
        date_obj = datetime.now()
    return date_obj.strftime("%Y-%m-%d")

def log_dataframe_info(df, name="dataframe"):
    """In th√¥ng tin v·ªÅ DataFrame"""
    print(f"\n===== Th√¥ng tin v·ªÅ {name} =====")
    print(f"S·ªë l∆∞·ª£ng b·∫£n ghi: {df.count()}")
    print(f"Schema:")
    df.printSchema()
    print("\nM·∫´u d·ªØ li·ªáu:")
    df.show(5, truncate=False)

    # Th·ªëng k√™ null values
    null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
    print("\nS·ªë l∆∞·ª£ng gi√° tr·ªã NULL trong t·ª´ng c·ªôt:")
    null_counts.show(truncate=False)

def get_unique_values(df, column_name, limit=100):
    """Hi·ªÉn th·ªã c√°c gi√° tr·ªã duy nh·∫•t c·ªßa m·ªôt c·ªôt v√† s·ªë l∆∞·ª£ng c·ªßa ch√∫ng"""
    value_counts = df.groupBy(column_name).count().orderBy("count", ascending=False)
    print(f"\n===== Gi√° tr·ªã duy nh·∫•t c·ªßa c·ªôt {column_name} =====")
    value_counts.show(limit, truncate=False)
    return value_counts

def plot_histogram(df, column_name, bins=20, title=None):
    """V·∫Ω bi·ªÉu ƒë·ªì histogram cho m·ªôt c·ªôt s·ªë"""
    if not title:
        title = f"Histogram c·ªßa {column_name}"
    
    # Chuy·ªÉn c·ªôt th√†nh pandas ƒë·ªÉ v·∫Ω bi·ªÉu ƒë·ªì
    data = df.select(column_name).na.drop().toPandas()
    plt.figure(figsize=(10, 6))
    plt.hist(data[column_name], bins=bins, alpha=0.7)
    plt.title(title)
    plt.xlabel(column_name)
    plt.ylabel('Frequency')
    plt.grid(True, alpha=0.3)
    plt.show()

def plot_boxplot(df, column_name, title=None):
    """V·∫Ω bi·ªÉu ƒë·ªì boxplot cho m·ªôt c·ªôt s·ªë"""
    if not title:
        title = f"Boxplot c·ªßa {column_name}"
    
    # Chuy·ªÉn c·ªôt th√†nh pandas ƒë·ªÉ v·∫Ω bi·ªÉu ƒë·ªì
    data = df.select(column_name).na.drop().toPandas()
    plt.figure(figsize=(10, 6))
    sns.boxplot(x=data[column_name])
    plt.title(title)
    plt.xlabel(column_name)
    plt.grid(True, alpha=0.3)
    plt.show()

def detect_outliers(df, column_name, method='iqr', threshold=1.5):
    """Ph√°t hi·ªán outlier d√πng ph∆∞∆°ng ph√°p IQR"""
    
    result = {}
    
    # T√≠nh to√°n c√°c th·ªëng k√™
    stats = df.select(
        spark_min(column_name).alias("min"),
        percentile_approx(column_name, 0.25).alias("q1"),
        percentile_approx(column_name, 0.5).alias("median"),
        percentile_approx(column_name, 0.75).alias("q3"),
        spark_max(column_name).alias("max"),
        avg(column_name).alias("mean"),
        stddev(column_name).alias("stddev"),
        count(column_name).alias("count")
    ).collect()[0]
    
    # In c√°c th·ªëng k√™ c∆° b·∫£n
    print(f"\n===== Th·ªëng k√™ cho c·ªôt {column_name} =====")
    print(f"Minimum: {stats['min']}")
    print(f"Q1 (25%): {stats['q1']}")
    print(f"Median: {stats['median']}")
    print(f"Q3 (75%): {stats['q3']}")
    print(f"Maximum: {stats['max']}")
    print(f"Mean: {stats['mean']}")
    print(f"Standard Deviation: {stats['stddev']}")
    print(f"Count: {stats['count']}")
    
    # T√≠nh IQR
    iqr = stats['q3'] - stats['q1']
    
    # X√°c ƒë·ªãnh ng∆∞·ª°ng outlier
    lower_bound = stats['q1'] - threshold * iqr
    upper_bound = stats['q3'] + threshold * iqr
    
    print(f"IQR: {iqr}")
    print(f"Lower Bound: {lower_bound}")
    print(f"Upper Bound: {upper_bound}")
    
    # ƒê·∫øm s·ªë l∆∞·ª£ng outlier
    outliers_count = df.filter(
        (col(column_name) < lower_bound) | 
        (col(column_name) > upper_bound)
    ).count()
    
    outliers_percentage = (outliers_count / stats['count']) * 100 if stats['count'] > 0 else 0
    
    print(f"S·ªë l∆∞·ª£ng outlier: {outliers_count} ({outliers_percentage:.2f}%)")
    
    result["lower_bound"] = lower_bound
    result["upper_bound"] = upper_bound
    result["outliers_count"] = outliers_count
    result["outliers_percentage"] = outliers_percentage
    result["stats"] = stats
    
    return result

In [35]:
# ƒê·ªçc d·ªØ li·ªáu
# ∆Øu ti√™n s·ª≠ d·ª•ng file CSV trong th∆∞ m·ª•c tmp n·∫øu c√≥
csv_file = "/home/fer/data/real_estate_project/tmp/csv_files/bds_data_may2025.csv"
json_path = "hdfs://namenode:9000/data/realestate/raw/batdongsan/house/2025/05/*"

try:
    # Th·ª≠ ƒë·ªçc t·ª´ file CSV local
    if os.path.exists(csv_file):
        df = spark.read.option("header", "true").csv(csv_file)
        print(f"ƒê√£ ƒë·ªçc d·ªØ li·ªáu t·ª´ CSV local: {csv_file}")
    else:
        # N·∫øu kh√¥ng c√≥ file CSV, ƒë·ªçc t·ª´ JSON tr√™n HDFS
        df = spark.read.option("multiline", "false").json(json_path)
        print(f"ƒê√£ ƒë·ªçc d·ªØ li·ªáu JSON t·ª´: {json_path}")
    
    # Ki·ªÉm tra d·ªØ li·ªáu ƒë·ªçc ƒë∆∞·ª£c
    log_dataframe_info(df, "raw_data")
    
except Exception as e:
    print(f"L·ªói khi ƒë·ªçc d·ªØ li·ªáu: {str(e)}")
    # T·∫°o DataFrame tr·ªëng n·∫øu c√≥ l·ªói
    from pyspark.sql.types import StructType, StructField, StringType
    schema = StructType([
        StructField("url", StringType(), True),
        StructField("title", StringType(), True),
        StructField("price", StringType(), True),
        StructField("price_per_m2", StringType(), True),
        StructField("area", StringType(), True),
        StructField("bedroom", StringType(), True),
        StructField("bathroom", StringType(), True),
        StructField("floor_count", StringType(), True),
        StructField("facade_width", StringType(), True),
        StructField("road_width", StringType(), True),
        StructField("house_direction", StringType(), True),
        StructField("legal_status", StringType(), True),
        StructField("interior", StringType(), True),
        StructField("location", StringType(), True),
        StructField("description", StringType(), True),
        StructField("posted_date", StringType(), True),
        StructField("crawl_timestamp", StringType(), True),
        StructField("latitude", StringType(), True),
        StructField("longitude", StringType(), True),
        StructField("seller_info", StringType(), True),
        StructField("source", StringType(), True),
        StructField("data_type", StringType(), True),
    ])
    df = spark.createDataFrame([], schema)

ƒê√£ ƒë·ªçc d·ªØ li·ªáu JSON t·ª´: hdfs://namenode:9000/data/realestate/raw/batdongsan/house/2025/05/*

===== Th√¥ng tin v·ªÅ raw_data =====
S·ªë l∆∞·ª£ng b·∫£n ghi: 17336
Schema:
root
 |-- area: string (nullable = true)
 |-- bathroom: string (nullable = true)
 |-- bedroom: string (nullable = true)
 |-- crawl_timestamp: string (nullable = true)
 |-- data_type: string (nullable = true)
 |-- description: string (nullable = true)
 |-- facade_width: string (nullable = true)
 |-- floor_count: string (nullable = true)
 |-- house_direction: string (nullable = true)
 |-- interior: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- legal_status: string (nullable = true)
 |-- location: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- posted_date: string (nullable = true)
 |-- price: string (nullable = true)
 |-- price_per_m2: string (nullable = true)
 |-- road_width: string (nullable = true)
 |-- seller_info: string (nullable = true)
 |-- source: string (

In [6]:
# 1. Ph√¢n t√≠ch v√† x√¢y d·ª±ng mapping cho house_direction v·ªõi ti·∫øng Vi·ªát c√≥ d·∫•u
print("\n===== PH√ÇN T√çCH H∆Ø·ªöNG NH√Ä (house_direction) =====")

# H√†m chu·∫©n h√≥a chu·ªói: lowercase, lo·∫°i b·ªè d·∫•u c√°ch v√† d·∫•u g·∫°ch ngang
def normalize_direction(value):
    if value is None:
        return None
    return value.lower().replace(" ", "").replace("-", "")

# T·∫°o UDF ƒë·ªÉ chu·∫©n h√≥a
normalize_direction_udf = udf(normalize_direction, StringType())

# L√†m s·∫°ch v√† chu·∫©n h√≥a d·ªØ li·ªáu house_direction
cleaned_df = df.withColumn(
    "house_direction_normalized",
    normalize_direction_udf(col("house_direction"))
)

# Ph√¢n t√≠ch gi√° tr·ªã duy nh·∫•t sau khi chu·∫©n h√≥a
direction_counts = get_unique_values(cleaned_df, "house_direction_normalized")

# T·∫°o mapping ƒë·∫ßy ƒë·ªß v·ªõi t·∫•t c·∫£ c√°c bi·∫øn th·ªÉ c√≥ th·ªÉ (c√≥ d·∫•u v√† kh√¥ng d·∫•u)
print("\n===== ƒê·ªÄ XU·∫§T MAPPING CHO H∆Ø·ªöNG NH√Ä =====")
direction_mapping = {
    # H∆∞·ªõng ƒë∆°n - kh√¥ng d·∫•u
    "dong": "EAST",
    "tay": "WEST", 
    "nam": "SOUTH",
    "bac": "NORTH",
    
    # H∆∞·ªõng ƒë∆°n - c√≥ d·∫•u
    "ƒë√¥ng": "EAST",
    "t√¢y": "WEST", 
    "nam": "SOUTH",
    "b·∫Øc": "NORTH",
    
    # H∆∞·ªõng k√©p - kh√¥ng d·∫•u
    "dongnam": "SOUTHEAST",
    "dongbac": "NORTHEAST",
    "taynam": "SOUTHWEST",
    "taybac": "NORTHWEST",
    
    # H∆∞·ªõng k√©p - c√≥ d·∫•u
    "ƒë√¥ngnam": "SOUTHEAST",
    "ƒë√¥ngb·∫Øc": "NORTHEAST", 
    "t√¢ynam": "SOUTHWEST",
    "t√¢yb·∫Øc": "NORTHWEST",
    
    # C√°c bi·∫øn th·ªÉ vi·∫øt kh√°c - kh√¥ng d·∫•u
    "namdong": "SOUTHEAST",
    "bacdong": "NORTHEAST", 
    "namtay": "SOUTHWEST",
    "bactay": "NORTHWEST",
    
    # C√°c bi·∫øn th·ªÉ vi·∫øt kh√°c - c√≥ d·∫•u
    "namƒë√¥ng": "SOUTHEAST",
    "b·∫Øcƒë√¥ng": "NORTHEAST", 
    "namt√¢y": "SOUTHWEST",
    "b·∫Øct√¢y": "NORTHWEST",
    
    # Tr∆∞·ªùng h·ª£p null ho·∫∑c kh√¥ng x√°c ƒë·ªãnh
    None: "UNKNOWN",
    "": "UNKNOWN"
}

print("Mapping ƒë∆∞·ª£c t·∫°o:")
for key, value in direction_mapping.items():
    print(f"  '{key}' -> '{value}'")

# T·∫°o UDF ƒë·ªÉ √°p d·ª•ng mapping
map_direction_udf = udf(lambda x: direction_mapping.get(x, "UNKNOWN"), StringType())

# Th·ª≠ nghi·ªám mapping
mapped_directions = cleaned_df.withColumn(
    "house_direction_mapped", 
    map_direction_udf(col("house_direction_normalized"))
)

# Hi·ªÉn th·ªã k·∫øt qu·∫£ mapping
print("\n===== K·∫æT QU·∫¢ MAPPING =====")
get_unique_values(mapped_directions, "house_direction_mapped")

# Ki·ªÉm tra c√°c gi√° tr·ªã ch∆∞a ƒë∆∞·ª£c map (UNKNOWN)
unknown_directions = mapped_directions.filter(col("house_direction_mapped") == "UNKNOWN")
if unknown_directions.count() > 0:
    print("\n===== C√ÅC GI√Å TR·ªä CH∆ØA ƒê∆Ø·ª¢C MAP =====")
    get_unique_values(unknown_directions, "house_direction_normalized")
    print("C·∫ßn b·ªï sung mapping cho c√°c gi√° tr·ªã n√†y!")


===== PH√ÇN T√çCH H∆Ø·ªöNG NH√Ä (house_direction) =====

===== Gi√° tr·ªã duy nh·∫•t c·ªßa c·ªôt house_direction_normalized =====
+--------------------------+-----+
|house_direction_normalized|count|
+--------------------------+-----+
|                          |13729|
|ƒë√¥ngnam                   |668  |
|ƒë√¥ngb·∫Øc                   |515  |
|t√¢yb·∫Øc                    |455  |
|ƒë√¥ng                      |443  |
|t√¢ynam                    |441  |
|nam                       |428  |
|b·∫Øc                       |350  |
|t√¢y                       |307  |
+--------------------------+-----+


===== ƒê·ªÄ XU·∫§T MAPPING CHO H∆Ø·ªöNG NH√Ä =====
Mapping ƒë∆∞·ª£c t·∫°o:
  'dong' -> 'EAST'
  'tay' -> 'WEST'
  'nam' -> 'SOUTH'
  'bac' -> 'NORTH'
  'ƒë√¥ng' -> 'EAST'
  't√¢y' -> 'WEST'
  'b·∫Øc' -> 'NORTH'
  'dongnam' -> 'SOUTHEAST'
  'dongbac' -> 'NORTHEAST'
  'taynam' -> 'SOUTHWEST'
  'taybac' -> 'NORTHWEST'
  'ƒë√¥ngnam' -> 'SOUTHEAST'
  'ƒë√¥ngb·∫Øc' -> 'NORTHEAST'
  't√¢ynam' -> 'SOUT

In [8]:
# 2. Ph√¢n t√≠ch v√† x√¢y d·ª±ng mapping cho interior v·ªõi keyword matching
print("\n===== PH√ÇN T√çCH N·ªòI TH·∫§T (interior) V·ªöI KEYWORD MATCHING =====")

# H√†m chu·∫©n h√≥a cho interior (gi·ªØ nguy√™n ƒë·ªÉ ph√¢n t√≠ch)
def normalize_interior(value):
    if value is None:
        return None
    normalized = value.lower().replace(" ", "").replace("-", "").replace(".", "").replace(",", "")
    return normalized

# H√†m mapping theo keyword
def map_interior_by_keywords(value):
    if value is None or value == "":
        return "UNKNOWN"
    
    value_lower = value.lower()
    
    # Ki·ªÉm tra c√°c keyword cho LUXURY (∆∞u ti√™n cao nh·∫•t)
    luxury_keywords = [
        "caoc·∫•p", "cao c·∫•p", "luxury", "sangtr·ªçng", "sang tr·ªçng", "x·ªãn", "5*", "5 sao",
        "nh·∫≠pkh·∫©u", "nh·∫≠p kh·∫©u", "ch√¢u√¢u", "ch√¢u √¢u", "ti√™uchu·∫©n", "ti√™u chu·∫©n",
        "chu·∫©nkh√°ch", "chu·∫©n kh√°ch", "h·∫°ngsang", "h·∫°ng sang"
    ]
    
    for keyword in luxury_keywords:
        if keyword in value_lower:
            return "LUXURY"
    
    # Ki·ªÉm tra c√°c keyword cho FULLY_FURNISHED
    fully_furnished_keywords = [
        "ƒë·∫ßyƒë·ªß", "ƒë·∫ßy ƒë·ªß", "full", "ho√†nthi·ªán", "ho√†n thi·ªán",
        "trangb·ªã", "trang b·ªã", "ƒëi·ªÅuh√≤a", "ƒëi·ªÅu h√≤a", "t·ªßl·∫°nh", "t·ªß l·∫°nh",
        "n·ªôith·∫•t", "n·ªôi th·∫•t", "ƒë·ªÉl·∫°i", "ƒë·ªÉ l·∫°i", "t·∫∑ng"
    ]
    
    for keyword in fully_furnished_keywords:
        if keyword in value_lower:
            return "FULLY_FURNISHED"
    
    # Ki·ªÉm tra c√°c keyword cho BASIC
    basic_keywords = [
        "c∆°b·∫£n", "c∆° b·∫£n", "b√¨nhth∆∞·ªùng", "b√¨nh th∆∞·ªùng", "chu·∫©n"
    ]
    
    for keyword in basic_keywords:
        if keyword in value_lower:
            return "BASIC"
    
    # Ki·ªÉm tra c√°c keyword cho UNFURNISHED
    unfurnished_keywords = [
        "th√¥", "tr·ªëng", "kh√¥ng", "k ", "nt", "nh√†th√¥", "nh√† th√¥"
    ]
    
    for keyword in unfurnished_keywords:
        if keyword in value_lower:
            return "UNFURNISHED"
    
    return "UNKNOWN"

# T·∫°o UDF ƒë·ªÉ √°p d·ª•ng keyword mapping
from pyspark.sql.types import StringType
map_interior_keywords_udf = udf(map_interior_by_keywords, StringType())

# √Åp d·ª•ng mapping m·ªõi
mapped_interiors_keywords = cleaned_df.withColumn(
    "interior_mapped_keywords", 
    map_interior_keywords_udf(col("interior"))
)

print("\n===== K·∫æT QU·∫¢ MAPPING V·ªöI KEYWORD MATCHING =====")
get_unique_values(mapped_interiors_keywords, "interior_mapped_keywords")

# So s√°nh v·ªõi mapping c≈©
print("\n===== SO S√ÅNH K·∫æT QU·∫¢ =====")
comparison_df = mapped_interiors_keywords.groupBy("interior_mapped_keywords").count().orderBy("count", ascending=False)
comparison_df.show()

# Ki·ªÉm tra c√°c gi√° tr·ªã v·∫´n c√≤n UNKNOWN
unknown_interiors_keywords = mapped_interiors_keywords.filter(col("interior_mapped_keywords") == "UNKNOWN")
if unknown_interiors_keywords.count() > 0:
    print(f"\nS·ªë l∆∞·ª£ng gi√° tr·ªã v·∫´n UNKNOWN: {unknown_interiors_keywords.count()}")
    print("\n===== M·ªòT S·ªê GI√Å TR·ªä V·∫™N UNKNOWN =====")
    unknown_samples = unknown_interiors_keywords.select("interior").distinct().limit(20)
    unknown_samples.show(truncate=False)


===== PH√ÇN T√çCH N·ªòI TH·∫§T (interior) V·ªöI KEYWORD MATCHING =====

===== K·∫æT QU·∫¢ MAPPING V·ªöI KEYWORD MATCHING =====

===== Gi√° tr·ªã duy nh·∫•t c·ªßa c·ªôt interior_mapped_keywords =====
+------------------------+-----+
|interior_mapped_keywords|count|
+------------------------+-----+
|UNKNOWN                 |9954 |
|FULLY_FURNISHED         |5336 |
|BASIC                   |1854 |
|LUXURY                  |172  |
|UNFURNISHED             |20   |
+------------------------+-----+


===== SO S√ÅNH K·∫æT QU·∫¢ =====
+------------------------+-----+
|interior_mapped_keywords|count|
+------------------------+-----+
|                 UNKNOWN| 9954|
|         FULLY_FURNISHED| 5336|
|                   BASIC| 1854|
|                  LUXURY|  172|
|             UNFURNISHED|   20|
+------------------------+-----+


S·ªë l∆∞·ª£ng gi√° tr·ªã v·∫´n UNKNOWN: 9954

===== M·ªòT S·ªê GI√Å TR·ªä V·∫™N UNKNOWN =====
+---------------------------+
|interior                   |
+--------------

In [22]:
# C·∫£i thi·ªán mapping cho legal_status v·ªõi ph√¢n lo·∫°i chi ti·∫øt h∆°n
def map_legal_status_by_keywords_v2(value):
    if value is None or value == "":
        return "UNKNOWN"
    
    # Normalize: lowercase, lo·∫°i b·ªè t·∫•t c·∫£ d·∫•u c√°ch v√† k√Ω t·ª± ƒë·∫∑c bi·ªát
    value_lower = (
        value.lower()
        .replace(" ", "")
        .replace("-", "")
        .replace(".", "")
        .replace("/", "")
        .replace("\\", "")
        .replace(",", "")
        .replace(":", "")
        .replace(";", "")
        .replace("(", "")
        .replace(")", "")
        .replace("+", "")
    )
    
    # NO_LEGAL - Kh√¥ng ph√°p l√Ω ho·∫∑c kh√¥ng s·ªï (ki·ªÉm tra ƒë·∫ßu ti√™n)
    no_legal_keywords = [
        "kh√¥ngph√°pl√Ω", "khongphap≈Çy", "kh√¥ngs·ªï", "khongso", "kos·ªï", "koso",
        "kh√¥ng", "khong", "ch∆∞ac√≥", "chuaco", "ch∆∞a", "chua", "ko"
    ]
    if any(keyword in value_lower for keyword in no_legal_keywords):
        return "NO_LEGAL"

    # LAND_USE_CERTIFICATE - Th·ªï c∆∞ ri√™ng bi·ªát
    land_use_keywords = [
        "th·ªïc∆∞", "thocu", "th·ªïc∆∞100", "thocu100", "th·ªïc∆∞100%", "thocu100%",
        "ƒë·∫•tth·ªïc∆∞", "datthoju", "cnqsdƒë", "cnqsdd", "s·ª≠d·ª•ngƒë·∫•t", "sudungdat"
    ]
    if any(keyword in value_lower for keyword in land_use_keywords):
        return "LAND_USE_CERTIFICATE"

    # RED_BOOK - S·ªï ƒë·ªè/h·ªìng (thu h·∫πp l·∫°i, ch·ªâ nh·ªØng tr∆∞·ªùng h·ª£p r√µ r√†ng)
    red_book_keywords = [
        # S·ªï ƒë·ªè/h·ªìng c∆° b·∫£n - ch√≠nh x√°c
        "s·ªïƒë·ªè", "sodo", "s·ªïh·ªìng", "sohong", "s·ªïƒë·ªès·ªïh·ªìng", "sƒëcc", "sdhh",
        
        # C√°c bi·∫øn th·ªÉ r√µ r√†ng c·ªßa s·ªï ƒë·ªè/h·ªìng
        "b√¨aƒë·ªè", "biado", "s·ªïch√≠nhch·ªß", "sochinhchu", 
        
        # Ch·ªâ nh·ªØng tr∆∞·ªùng h·ª£p th·ª±c s·ª± c√≥ s·ªï ƒë·ªè/h·ªìng
        "s·ªïƒë·∫πp", "sodep", "s·ªïvu√¥ng", "sovuong", "s·ªïvu√¥ngv·∫Øn", "sovuongvan",
        "s·ªïs·∫°ch", "sosach", "s·ªïƒë", "sod"
    ]
    if any(keyword in value_lower for keyword in red_book_keywords):
        return "RED_BOOK"

    # OWNERSHIP_CERTIFICATE - Ch·ª©ng nh·∫≠n quy·ªÅn s·ªü h·ªØu v√† c√°c lo·∫°i s·ªï kh√°c
    ownership_keywords = [
        # C√°c lo·∫°i ch·ª©ng nh·∫≠n
        "shcc", "shr", "ccqsh", "ch·ª©ngnh·∫≠n", "chungnhan", "s·ªïcc", "socc",
        "c√¥ngc√¥ng", "congcong", "s·ªïc√¥ngnh·∫≠n", "socognhan", "s·ªïc√¥ngnh·∫≠nƒë·ªß", "socognhanƒëu",
        "s·ªïho√†nc√¥ng", "sohoancong", "vbcn", "gi·∫•ych·ª©ngnh·∫≠n", "giaychungnhan",
        
        # Ph√°p l√Ω r√µ r√†ng nh∆∞ng kh√¥ng ch·ªâ r√µ lo·∫°i s·ªï
        "ph√°pl√Ω", "phap≈Çy", "ph√°pl√Ωs·∫°ch", "phaplysach", "ph√°pl√Ωchu·∫©n", "phaplychuan",
        "ph√°pl√Ωc√°nh√¢n", "phaplycanh√¢n", "ph√°pl√Ωr√µr√†ng", "phapl√Ωrorang",
        
        # Quy·ªÅn s·ªü h·ªØu
        "ch√≠nhch·ªß", "chinhchu", "s·ªüh·ªØu", "sohuu", "quy·ªÅns·ªüh·ªØu", "quyensohuu", 
        "ch·ªßquy·ªÅn", "chuquyen", "ƒë·ªìngs·ªüh·ªØu", "dongsohuu",
        
        # C√≥ gi·∫•y t·ªù nh∆∞ng kh√¥ng r√µ lo·∫°i
        "c√≥", "co", "c√≥s·ªë", "coso", "ƒë·∫ßyƒë·ªß", "daydu", "s·ªïs·∫µn", "sosan",
        "gi·∫•yt·ªù", "giayto", "gi·∫•yt·ªùƒë·∫ßyƒë·ªß", "giaytodaydu"
    ]
    if any(keyword in value_lower for keyword in ownership_keywords):
        return "OWNERSHIP_CERTIFICATE"

    # TRANSACTION_READY - S·∫µn s√†ng giao d·ªãch nh∆∞ng kh√¥ng r√µ lo·∫°i gi·∫•y t·ªù
    transaction_keywords = [
        "s·∫µns√†nggiaod·ªãch", "sansanggiaodich", "c√¥ngch·ª©ng", "congchung",
        "giaod·ªãchngay", "giaodichngay", "vu√¥ngv·∫Øn", "vuongvan", "s·∫µns√†n", "sansan",
        "c√¥ngch·ª©ngtrongn√†y", "congchungtrongngay", "giaod·ªãchnhanh", "giaodichhanh",
        "s·∫°ch", "sach", "chu·∫©n", "chuan", "r√µr√†ng", "rorang", "thi·ªánch√≠", "thienchi"
    ]
    if any(keyword in value_lower for keyword in transaction_keywords):
        return "TRANSACTION_READY"

    # PURCHASE_CONTRACT - H·ª£p ƒë·ªìng mua b√°n v√† gi·∫•y t·ªù tay
    contract_keywords = [
        "hƒëmb", "hdmb", "h·ª£pƒë·ªìng", "hopdong", "h·ª£pƒë·ªìngmuab√°n", "hopdongmuaban",
        "vi·∫øts·ªï", "vietso", "gi·∫•ytay", "giaytay", "ccvb", "vib·∫±ng", "vibang",
        "hƒë", "hd"
    ]
    if any(keyword in value_lower for keyword in contract_keywords):
        return "PURCHASE_CONTRACT"

    # PENDING_CERTIFICATE - ƒêang ch·ªù s·ªï ho·∫∑c s·∫Øp c√≥ s·ªï
    pending_keywords = [
        "ƒëangch·ªù", "dangcho", "ch·ªùs·ªï", "choso", "s·∫Øpc·∫•p", "sapcap",
        "ƒëangx·ª≠", "dangxu", "ch·ªùc·∫•p", "chocap", "s·∫Øpc√≥", "sapco",
        "ch·ªùra", "chora", "ƒëangho√†nth√†nh", "danghoanthanh", "l√†ms·ªï", "lamso",
        "nh·∫≠nnh√†v√†l√†ms·ªï", "nhanhanvalamso", "s·ªïg·ª≠ibank", "soguibank"
    ]
    if any(keyword in value_lower for keyword in pending_keywords):
        return "PENDING_CERTIFICATE"

    # INDIVIDUAL_CERTIFICATE - S·ªï ri√™ng v√† c√° nh√¢n
    individual_keywords = [
        "s·ªïri√™ng", "sorieng", "ri√™ng", "rieng", "t√°ch", "tach",
        "ƒë·ªôcl·∫≠p", "doclap", "c√°nh√¢n", "canhan", "c√°", "ca", "nh√¢n", "nhan",
        "th·ª´ak·∫ø", "thuake"
    ]
    if any(keyword in value_lower for keyword in individual_keywords):
        return "INDIVIDUAL_CERTIFICATE"

    return "UNKNOWN"

# √Åp d·ª•ng mapping v2
print("\n===== √ÅP D·ª§NG MAPPING V2 CHO LEGAL_STATUS =====")
map_legal_status_v2_udf = udf(map_legal_status_by_keywords_v2, StringType())

mapped_legal_status_v2 = cleaned_df.withColumn(
    "legal_status_mapped_v2", 
    map_legal_status_v2_udf(col("legal_status"))
)

print("\n===== K·∫æT QU·∫¢ MAPPING V2 =====")
get_unique_values(mapped_legal_status_v2, "legal_status_mapped_v2")

# So s√°nh v1 vs v2
comparison_v1_v2 = mapped_legal_status_v2.withColumn(
    "legal_status_mapped_v1", 
    map_legal_status_improved_udf(col("legal_status"))
)

print("\n===== SO S√ÅNH V1 VS V2 =====")
comparison_v1v2 = comparison_v1_v2.groupBy("legal_status_mapped_v1", "legal_status_mapped_v2").count().orderBy("count", ascending=False)
comparison_v1v2.show(20, truncate=False)


===== √ÅP D·ª§NG MAPPING V2 CHO LEGAL_STATUS =====

===== K·∫æT QU·∫¢ MAPPING V2 =====

===== Gi√° tr·ªã duy nh·∫•t c·ªßa c·ªôt legal_status_mapped_v2 =====
+----------------------+-----+
|legal_status_mapped_v2|count|
+----------------------+-----+
|RED_BOOK              |13852|
|UNKNOWN               |3245 |
|OWNERSHIP_CERTIFICATE |112  |
|PURCHASE_CONTRACT     |69   |
|PENDING_CERTIFICATE   |27   |
|NO_LEGAL              |20   |
|INDIVIDUAL_CERTIFICATE|8    |
|LAND_USE_CERTIFICATE  |2    |
|TRANSACTION_READY     |1    |
+----------------------+-----+


===== SO S√ÅNH V1 VS V2 =====
+----------------------+----------------------+-----+
|legal_status_mapped_v1|legal_status_mapped_v2|count|
+----------------------+----------------------+-----+
|RED_BOOK              |RED_BOOK              |13850|
|UNKNOWN               |UNKNOWN               |3243 |
|RED_BOOK              |OWNERSHIP_CERTIFICATE |110  |
|PURCHASE_CONTRACT     |PURCHASE_CONTRACT     |69   |
|PENDING_CERTIFICATE   |PENDI

In [23]:
# 3. Ph√¢n t√≠ch v√† l√†m s·∫°ch d·ªØ li·ªáu s·ªë (area, price, bedroom, bathroom)
print("\n===== PH√ÇN T√çCH V√Ä L√ÄM S·∫†CH D·ªÆ LI·ªÜU S·ªê =====")

# Chuy·ªÉn ƒë·ªïi c√°c c·ªôt s·ªë
def clean_numeric_data(df):
    """L√†m s·∫°ch v√† chuy·ªÉn ƒë·ªïi d·ªØ li·ªáu s·ªë"""
    
    # L√†m s·∫°ch area
    df_cleaned = df.withColumn(
        "area_cleaned", 
        regexp_replace(col("area"), "[^0-9\\.]", "").cast("double")
    )
    
    # L√†m s·∫°ch bedroom
    df_cleaned = df_cleaned.withColumn(
        "bedroom_cleaned", 
        regexp_replace(col("bedroom"), "[^0-9]", "").cast("double")
    )
    
    # L√†m s·∫°ch bathroom
    df_cleaned = df_cleaned.withColumn(
        "bathroom_cleaned", 
        regexp_replace(col("bathroom"), "[^0-9]", "").cast("double")
    )
    
    # L√†m s·∫°ch floor_count
    df_cleaned = df_cleaned.withColumn(
        "floor_count_cleaned", 
        regexp_replace(col("floor_count"), "[^0-9]", "").cast("double")
    )
    
    # L√†m s·∫°ch facade_width
    df_cleaned = df_cleaned.withColumn(
        "facade_width_cleaned", 
        regexp_replace(col("facade_width"), "[^0-9\\.]", "").cast("double")
    )
    
    # L√†m s·∫°ch road_width
    df_cleaned = df_cleaned.withColumn(
        "road_width_cleaned", 
        regexp_replace(col("road_width"), "[^0-9\\.]", "").cast("double")
    )
    
    return df_cleaned

# √Åp d·ª•ng l√†m s·∫°ch
numeric_cleaned_df = clean_numeric_data(mapped_legal_status_v2)

# Ph√¢n t√≠ch area
print("\n===== PH√ÇN T√çCH DI·ªÜN T√çCH (AREA) =====")
area_stats = detect_outliers(numeric_cleaned_df, "area_cleaned")

# L·ªçc area h·ª£p l√Ω (10m¬≤ ƒë·∫øn 1000m¬≤)
reasonable_area_df = numeric_cleaned_df.filter(
    (col("area_cleaned") >= 10) & (col("area_cleaned") <= 1000)
)
print(f"S·ªë b·∫£n ghi sau khi l·ªçc area h·ª£p l√Ω: {reasonable_area_df.count()}")

# Ph√¢n t√≠ch bedroom
print("\n===== PH√ÇN T√çCH S·ªê PH√íNG NG·ª¶ (BEDROOM) =====")
bedroom_stats = get_unique_values(reasonable_area_df, "bedroom_cleaned", 20)

# L·ªçc bedroom h·ª£p l√Ω (1-10 ph√≤ng)
reasonable_bedroom_df = reasonable_area_df.filter(
    (col("bedroom_cleaned") >= 1) & (col("bedroom_cleaned") <= 10)
)
print(f"S·ªë b·∫£n ghi sau khi l·ªçc bedroom h·ª£p l√Ω: {reasonable_bedroom_df.count()}")

# Ph√¢n t√≠ch bathroom
print("\n===== PH√ÇN T√çCH S·ªê PH√íNG T·∫ÆM (BATHROOM) =====")
bathroom_stats = get_unique_values(reasonable_bedroom_df, "bathroom_cleaned", 15)

# L·ªçc bathroom h·ª£p l√Ω (1-5 ph√≤ng)
reasonable_bathroom_df = reasonable_bedroom_df.filter(
    (col("bathroom_cleaned") >= 1) & (col("bathroom_cleaned") <= 5)
)
print(f"S·ªë b·∫£n ghi sau khi l·ªçc bathroom h·ª£p l√Ω: {reasonable_bathroom_df.count()}")


===== PH√ÇN T√çCH V√Ä L√ÄM S·∫†CH D·ªÆ LI·ªÜU S·ªê =====

===== PH√ÇN T√çCH DI·ªÜN T√çCH (AREA) =====

===== Th·ªëng k√™ cho c·ªôt area_cleaned =====
Minimum: 1.0
Q1 (25%): 80.0
Median: 126.0
Q3 (75%): 250.0
Maximum: 66171.0
Mean: 406.6580533289086
Standard Deviation: 1877.6895574470827
Count: 17327
IQR: 170.0
Lower Bound: -175.0
Upper Bound: 505.0
S·ªë l∆∞·ª£ng outlier: 2184 (12.60%)
S·ªë b·∫£n ghi sau khi l·ªçc area h·ª£p l√Ω: 16076

===== PH√ÇN T√çCH S·ªê PH√íNG NG·ª¶ (BEDROOM) =====

===== Gi√° tr·ªã duy nh·∫•t c·ªßa c·ªôt bedroom_cleaned =====
+---------------+-----+
|bedroom_cleaned|count|
+---------------+-----+
|null           |7667 |
|4.0            |1874 |
|3.0            |1416 |
|5.0            |1085 |
|2.0            |963  |
|6.0            |777  |
|1.0            |424  |
|8.0            |349  |
|7.0            |283  |
|10.0           |246  |
|9.0            |177  |
|12.0           |109  |
|20.0           |65   |
|11.0           |55   |
|14.0           |52   |
|15.0       

In [27]:
# 4. X·ª≠ l√Ω v√† ph√¢n t√≠ch gi√° (price v√† price_per_m2)
print("\n===== X·ª¨ L√ù V√Ä PH√ÇN T√çCH GI√Å =====")

def clean_price_data(df):
    """L√†m s·∫°ch v√† chuy·ªÉn ƒë·ªïi d·ªØ li·ªáu gi√°"""
    
    # X·ª≠ l√Ω tr∆∞·ªùng gi√° v·ªõi th·ªèa thu·∫≠n
    df_price = df.withColumn("price_text", trim(col("price")))
    
    # ƒê√°nh d·∫•u th·ªèa thu·∫≠n
    df_price = df_price.withColumn(
        "is_negotiable", 
        when(
            lower(col("price_text")).contains("th·ªèa thu·∫≠n") |
            lower(col("price_text")).contains("thoathuan") |
            lower(col("price_text")).contains("thoa thuan"), 
            lit(True)
        ).otherwise(lit(False))
    )
    
    # Chuy·ªÉn ƒë·ªïi gi√°
    df_price = df_price.withColumn(
        "price_cleaned",
        when(
            col("is_negotiable"), 
            lit(None)
        )
        .when(
            lower(col("price_text")).contains("t·ª∑") |
            lower(col("price_text")).contains("ty"),
            regexp_replace(col("price_text"), "[^0-9\\.]", "").cast("double") * 1000000000
        )
        .when(
            lower(col("price_text")).contains("tri·ªáu") |
            lower(col("price_text")).contains("trieu"),
            regexp_replace(col("price_text"), "[^0-9\\.]", "").cast("double") * 1000000
        )
        .otherwise(
            regexp_replace(col("price_text"), "[^0-9\\.]", "").cast("double")
        )
    )
    
    # X·ª≠ l√Ω price_per_m2
    df_price = df_price.withColumn("price_per_m2_text", trim(col("price_per_m2")))
    
    df_price = df_price.withColumn(
        "price_per_m2_cleaned",
        when(
            lower(col("price_per_m2_text")).contains("t·ª∑") |
            lower(col("price_per_m2_text")).contains("ty"),
            regexp_replace(col("price_per_m2_text"), "[^0-9\\.]", "").cast("double") * 1000000000
        )
        .when(
            lower(col("price_per_m2_text")).contains("tri·ªáu") |
            lower(col("price_per_m2_text")).contains("trieu"),
            regexp_replace(col("price_per_m2_text"), "[^0-9\\.]", "").cast("double") * 1000000
        )
        .otherwise(
            regexp_replace(col("price_per_m2_text"), "[^0-9\\.]", "").cast("double")
        )
    )
    
    return df_price.drop("price_text", "price_per_m2_text")

# √Åp d·ª•ng l√†m s·∫°ch gi√°
price_cleaned_df = clean_price_data(df)

# Ph√¢n t√≠ch gi√°
print("\n===== PH√ÇN T√çCH GI√Å (PRICE) =====")
price_with_value = price_cleaned_df.filter(col("price_cleaned").isNotNull())
print(f"S·ªë b·∫£n ghi c√≥ gi√°: {price_with_value.count()}")

if price_with_value.count() > 0:
    price_stats = detect_outliers(price_with_value, "price_cleaned")
    
    # L·ªçc gi√° h·ª£p l√Ω (100 tri·ªáu ƒë·∫øn 100 t·ª∑)
    reasonable_price_df = price_with_value.filter(
        (col("price_cleaned") >= 100000000) & (col("price_cleaned") <= 100000000000)
    )
    print(f"S·ªë b·∫£n ghi sau khi l·ªçc gi√° h·ª£p l√Ω: {reasonable_price_df.count()}")

# Ph√¢n t√≠ch th·ªèa thu·∫≠n
negotiable_count = price_cleaned_df.filter(col("is_negotiable") == True).count()
total_count = price_cleaned_df.count()
negotiable_rate = (negotiable_count / total_count * 100) if total_count > 0 else 0
print(f"\nT·ª∑ l·ªá tin 'th·ªèa thu·∫≠n': {negotiable_rate:.1f}% ({negotiable_count}/{total_count})")

# Ph√¢n t√≠ch price_per_m2
print("\n===== PH√ÇN T√çCH GI√Å/M¬≤ (PRICE_PER_M2) =====")
price_per_m2_with_value = price_cleaned_df.filter(col("price_per_m2_cleaned").isNotNull())
print(f"S·ªë b·∫£n ghi c√≥ gi√°/m¬≤: {price_per_m2_with_value.count()}")

if price_per_m2_with_value.count() > 0:
    price_per_m2_stats = detect_outliers(price_per_m2_with_value, "price_per_m2_cleaned")


===== X·ª¨ L√ù V√Ä PH√ÇN T√çCH GI√Å =====

===== PH√ÇN T√çCH GI√Å (PRICE) =====
S·ªë b·∫£n ghi c√≥ gi√°: 16559

===== Th·ªëng k√™ cho c·ªôt price_cleaned =====
Minimum: 15000000.0
Q1 (25%): 35000000000.0
Median: 78000000000.0
Q3 (75%): 161000000000.0
Maximum: 15999000000000.0
Mean: 156890683918.111
Standard Deviation: 333648160427.5351
Count: 16559
IQR: 126000000000.0
Lower Bound: -154000000000.0
Upper Bound: 350000000000.0
S·ªë l∆∞·ª£ng outlier: 1454 (8.78%)
S·ªë b·∫£n ghi sau khi l·ªçc gi√° h·ª£p l√Ω: 9761

T·ª∑ l·ªá tin 'th·ªèa thu·∫≠n': 4.4% (768/17336)

===== PH√ÇN T√çCH GI√Å/M¬≤ (PRICE_PER_M2) =====
S·ªë b·∫£n ghi c√≥ gi√°/m¬≤: 16559

===== Th·ªëng k√™ cho c·ªôt price_per_m2_cleaned =====
Minimum: 108.0
Q1 (25%): 3125000000.0
Median: 15972000000.0
Q3 (75%): 32857000000.0
Maximum: 92491000000000.0
Mean: 60773573896.30105
Standard Deviation: 986915632178.8951
Count: 16559
IQR: 29732000000.0
Lower Bound: -41473000000.0
Upper Bound: 77455000000.0
S·ªë l∆∞·ª£ng outlier: 655 (3.96%)


In [31]:
# B·ªï sung import thi·∫øu
from pyspark.sql.functions import (
    col, to_timestamp, current_timestamp, lit, regexp_replace, trim,
    when, upper, lower, split, element_at, round as spark_round,
    avg, count, percentile_approx, stddev, min as spark_min, max as spark_max,
    udf, length, expr, regexp_extract  # Th√™m regexp_extract
)

# 6. Ph√¢n t√≠ch v·ªã tr√≠ v√† coordinates (S·ª≠a l·∫°i)
print("\n===== PH√ÇN T√çCH V·ªä TR√ç V√Ä COORDINATES =====")

# L√†m s·∫°ch coordinates
coords_cleaned_df = df.withColumn(
    "latitude_cleaned", col("latitude").cast("double")
).withColumn(
    "longitude_cleaned", col("longitude").cast("double")
)

# Ph√¢n t√≠ch coordinates
valid_coords = coords_cleaned_df.filter(
    col("latitude_cleaned").isNotNull() & 
    col("longitude_cleaned").isNotNull() &
    (col("latitude_cleaned") >= 8.0) & (col("latitude_cleaned") <= 23.5) &  # Vi·ªát Nam lat range
    (col("longitude_cleaned") >= 102.0) & (col("longitude_cleaned") <= 110.0)  # Vi·ªát Nam lon range
)

print(f"S·ªë b·∫£n ghi c√≥ coordinates h·ª£p l·ªá: {valid_coords.count()}")
print(f"T·ª∑ l·ªá c√≥ coordinates: {(valid_coords.count() / coords_cleaned_df.count() * 100):.1f}%")

# Ph√¢n t√≠ch location text
print("\n===== PH√ÇN T√çCH LOCATION TEXT =====")
location_analysis = coords_cleaned_df.withColumn(
    "location_cleaned", trim(col("location"))
).withColumn(
    "location_length", length(col("location"))
)

# Th·ªëng k√™ ƒë·ªô d√†i location
location_stats = location_analysis.select(
    avg("location_length").alias("avg_length"),
    spark_min("location_length").alias("min_length"),
    spark_max("location_length").alias("max_length"),
    count(when(col("location_length") > 10, True)).alias("detailed_locations")
).collect()[0]

print(f"ƒê·ªô d√†i location trung b√¨nh: {location_stats['avg_length']:.1f} k√Ω t·ª±")
print(f"ƒê·ªô d√†i t·ªëi thi·ªÉu: {location_stats['min_length']}")
print(f"ƒê·ªô d√†i t·ªëi ƒëa: {location_stats['max_length']}")
print(f"S·ªë location chi ti·∫øt (>10 k√Ω t·ª±): {location_stats['detailed_locations']}")

# Tr√≠ch xu·∫•t th√¥ng tin t·ª´ location (S·ª≠a l·∫°i)
def extract_location_info(df):
    """Tr√≠ch xu·∫•t th√¥ng tin qu·∫≠n/huy·ªán, th√†nh ph·ªë t·ª´ location"""
    
    # Tr√≠ch xu·∫•t qu·∫≠n/huy·ªán v·ªõi pattern c·∫£i thi·ªán
    df_location = df.withColumn(
        "district_extracted",
        when(
            lower(col("location_cleaned")).contains("qu·∫≠n"),
            regexp_extract(lower(col("location_cleaned")), r"qu·∫≠n\s*(\d+)", 1)
        ).when(
            lower(col("location_cleaned")).contains("huy·ªán"),
            regexp_extract(lower(col("location_cleaned")), r"huy·ªán\s*([^,\s]+)", 1)
        ).when(
            lower(col("location_cleaned")).contains("q."),
            regexp_extract(lower(col("location_cleaned")), r"q\.?\s*(\d+)", 1)
        ).when(
            lower(col("location_cleaned")).contains("qu·∫≠n"),
            regexp_extract(lower(col("location_cleaned")), r"qu·∫≠n\s*([^,\s]+)", 1)
        ).otherwise(lit(None))
    )
    
    # Tr√≠ch xu·∫•t th√†nh ph·ªë v·ªõi nhi·ªÅu pattern h∆°n
    df_location = df_location.withColumn(
        "city_extracted",
        when(
            lower(col("location_cleaned")).contains("h·ªì ch√≠ minh") |
            lower(col("location_cleaned")).contains("tp.hcm") |
            lower(col("location_cleaned")).contains("tphcm") |
            lower(col("location_cleaned")).contains("hcm") |
            lower(col("location_cleaned")).contains("s√†i g√≤n") |
            lower(col("location_cleaned")).contains("saigon"),
            lit("Ho Chi Minh")
        ).when(
            lower(col("location_cleaned")).contains("h√† n·ªôi") |
            lower(col("location_cleaned")).contains("hanoi") |
            lower(col("location_cleaned")).contains("ha noi"),
            lit("Hanoi")
        ).when(
            lower(col("location_cleaned")).contains("ƒë√† n·∫µng") |
            lower(col("location_cleaned")).contains("da nang") |
            lower(col("location_cleaned")).contains("danang"),
            lit("Da Nang")
        ).when(
            lower(col("location_cleaned")).contains("c·∫ßn th∆°") |
            lower(col("location_cleaned")).contains("can tho"),
            lit("Can Tho")
        ).when(
            lower(col("location_cleaned")).contains("h·∫£i ph√≤ng") |
            lower(col("location_cleaned")).contains("hai phong"),
            lit("Hai Phong")
        ).otherwise(lit("Other"))
    )
    
    return df_location

location_info_df = extract_location_info(location_analysis)

# Th·ªëng k√™ theo th√†nh ph·ªë
print("\n===== TH·ªêNG K√ä THEO TH√ÄNH PH·ªê =====")
city_stats = get_unique_values(location_info_df, "city_extracted")

# Th·ªëng k√™ theo qu·∫≠n (cho TPHCM)
hcm_data = location_info_df.filter(col("city_extracted") == "Ho Chi Minh")
if hcm_data.count() > 0:
    print("\n===== TH·ªêNG K√ä QU·∫¨N TPHCM =====")
    district_stats = get_unique_values(hcm_data, "district_extracted", 20)


===== PH√ÇN T√çCH V·ªä TR√ç V√Ä COORDINATES =====
S·ªë b·∫£n ghi c√≥ coordinates h·ª£p l·ªá: 17319
T·ª∑ l·ªá c√≥ coordinates: 99.9%

===== PH√ÇN T√çCH LOCATION TEXT =====
ƒê·ªô d√†i location trung b√¨nh: 53.6 k√Ω t·ª±
ƒê·ªô d√†i t·ªëi thi·ªÉu: 0
ƒê·ªô d√†i t·ªëi ƒëa: 109
S·ªë location chi ti·∫øt (>10 k√Ω t·ª±): 17327

===== TH·ªêNG K√ä THEO TH√ÄNH PH·ªê =====

===== Gi√° tr·ªã duy nh·∫•t c·ªßa c·ªôt city_extracted =====
+--------------+-----+
|city_extracted|count|
+--------------+-----+
|Ho Chi Minh   |10009|
|Hanoi         |4177 |
|Other         |1555 |
|Da Nang       |1269 |
|Hai Phong     |255  |
|Can Tho       |71   |
+--------------+-----+


===== TH·ªêNG K√ä QU·∫¨N TPHCM =====

===== Gi√° tr·ªã duy nh·∫•t c·ªßa c·ªôt district_extracted =====
+------------------+-----+
|district_extracted|count|
+------------------+-----+
|null              |4981 |
|1                 |1269 |
|3                 |750  |
|7                 |609  |
|10                |448  |
|2                 |384 

In [37]:
# S·ª≠a l·∫°i h√†m smart_fill_bedroom_bathroom v·ªõi concat ƒë√∫ng c√∫ ph√°p
from pyspark.sql.functions import (
    col, to_timestamp, current_timestamp, lit, regexp_replace, trim,
    when, upper, lower, split, element_at, round as spark_round,
    avg, count, percentile_approx, stddev, min as spark_min, max as spark_max,
    udf, length, expr, regexp_extract, concat, isnull, isnan
)

def smart_fill_bedroom_bathroom(df):
    """
    ƒêi·ªÅn bedroom/bathroom null d·ª±a tr√™n:
    1. Di·ªán t√≠ch nh√†
    2. Gi√° nh√†  
    3. Th√¥ng tin t·ª´ title/description
    4. Statistical imputation d·ª±a tr√™n nh√≥m t∆∞∆°ng t·ª±
    """
    
    # ƒê·∫ßu ti√™n, l√†m s·∫°ch c√°c c·ªôt c·∫ßn thi·∫øt
    df_cleaned = df.withColumn(
        "area_cleaned", 
        regexp_replace(col("area"), "[^0-9\\.]", "").cast("double")
    ).withColumn(
        "bedroom_cleaned", 
        regexp_replace(col("bedroom"), "[^0-9]", "").cast("double")
    ).withColumn(
        "bathroom_cleaned", 
        regexp_replace(col("bathroom"), "[^0-9]", "").cast("double")
    ).withColumn(
        "price_cleaned",
        when(
            lower(col("price")).contains("t·ª∑") | lower(col("price")).contains("ty"),
            regexp_replace(col("price"), "[^0-9\\.]", "").cast("double") * 1000000000
        ).when(
            lower(col("price")).contains("tri·ªáu") | lower(col("price")).contains("trieu"),
            regexp_replace(col("price"), "[^0-9\\.]", "").cast("double") * 1000000
        ).otherwise(
            regexp_replace(col("price"), "[^0-9\\.]", "").cast("double")
        )
    )
    
    # Tr√≠ch xu·∫•t th√†nh ph·ªë ƒë∆°n gi·∫£n
    df_with_city = df_cleaned.withColumn(
        "city_extracted",
        when(
            lower(col("location")).contains("h·ªì ch√≠ minh") |
            lower(col("location")).contains("tp.hcm") |
            lower(col("location")).contains("tphcm") |
            lower(col("location")).contains("hcm"),
            lit("Ho Chi Minh")
        ).when(
            lower(col("location")).contains("h√† n·ªôi") |
            lower(col("location")).contains("hanoi"),
            lit("Hanoi")
        ).otherwise(lit("Other"))
    )
    
    # B∆∞·ªõc 1: Tr√≠ch xu·∫•t th√¥ng tin t·ª´ title v√† description
    print("B∆∞·ªõc 1: Tr√≠ch xu·∫•t t·ª´ title/description...")
    
    df_extracted = df_with_city.withColumn(
        "title_desc_combined",
        concat(
            when(col("title").isNotNull(), lower(col("title"))).otherwise(lit("")),
            lit(" "),
            when(col("description").isNotNull(), lower(col("description"))).otherwise(lit(""))
        )
    ).withColumn(
        "bedroom_from_text",
        when(
            col("title_desc_combined").rlike(r"(\d+)\s*(ph√≤ng\s*ng·ªß|pn|bedroom)"),
            regexp_extract(col("title_desc_combined"), r"(\d+)\s*(?:ph√≤ng\s*ng·ªß|pn|bedroom)", 1).cast("double")
        ).otherwise(lit(None))
    ).withColumn(
        "bathroom_from_text", 
        when(
            col("title_desc_combined").rlike(r"(\d+)\s*(ph√≤ng\s*t·∫Øm|wc|toilet|bathroom)"),
            regexp_extract(col("title_desc_combined"), r"(\d+)\s*(?:ph√≤ng\s*t·∫Øm|wc|toilet|bathroom)", 1).cast("double")
        ).otherwise(lit(None))
    )
    
    # B∆∞·ªõc 2: ∆Ø·ªõc l∆∞·ª£ng d·ª±a tr√™n di·ªán t√≠ch
    print("B∆∞·ªõc 2: ∆Ø·ªõc l∆∞·ª£ng d·ª±a tr√™n di·ªán t√≠ch...")
    
    df_area_based = df_extracted.withColumn(
        "bedroom_from_area",
        when(col("area_cleaned").isNotNull(),
            when(col("area_cleaned") <= 30, lit(1))
            .when(col("area_cleaned") <= 50, lit(2))
            .when(col("area_cleaned") <= 80, lit(3))
            .when(col("area_cleaned") <= 120, lit(4))
            .when(col("area_cleaned") <= 200, lit(5))
            .otherwise(lit(6))
        ).otherwise(lit(None))
    ).withColumn(
        "bathroom_from_area",
        when(col("area_cleaned").isNotNull(),
            when(col("area_cleaned") <= 40, lit(1))
            .when(col("area_cleaned") <= 80, lit(2))
            .when(col("area_cleaned") <= 150, lit(3))
            .otherwise(lit(4))
        ).otherwise(lit(None))
    )
    
    # B∆∞·ªõc 3: T√≠nh median theo nh√≥m location v√† price range
    print("B∆∞·ªõc 3: T√≠nh median theo nh√≥m t∆∞∆°ng t·ª±...")
    
    # T·∫°o price range groups
    df_grouped = df_area_based.withColumn(
        "price_range",
        when(col("price_cleaned").isNull(), lit("unknown"))
        .when(col("price_cleaned") < 1000000000, lit("under_1b"))      # D∆∞·ªõi 1 t·ª∑
        .when(col("price_cleaned") < 3000000000, lit("1b_3b"))         # 1-3 t·ª∑
        .when(col("price_cleaned") < 5000000000, lit("3b_5b"))         # 3-5 t·ª∑
        .when(col("price_cleaned") < 10000000000, lit("5b_10b"))       # 5-10 t·ª∑
        .otherwise(lit("over_10b"))                                    # Tr√™n 10 t·ª∑
    ).withColumn(
        "area_range",
        when(col("area_cleaned").isNull(), lit("unknown"))
        .when(col("area_cleaned") < 50, lit("small"))                  # Nh·ªè < 50m¬≤
        .when(col("area_cleaned") < 100, lit("medium"))                # Trung b√¨nh 50-100m¬≤
        .when(col("area_cleaned") < 200, lit("large"))                 # L·ªõn 100-200m¬≤
        .otherwise(lit("very_large"))                                  # R·∫•t l·ªõn > 200m¬≤
    )
    
    # T√≠nh median cho bedroom
    try:
        bedroom_median_by_group = df_grouped.filter(col("bedroom_cleaned").isNotNull()) \
            .select("city_extracted", "price_range", "area_range", "bedroom_cleaned") \
            .groupBy("city_extracted", "price_range", "area_range") \
            .agg(percentile_approx("bedroom_cleaned", 0.5).alias("bedroom_median"),
                 count("*").alias("bedroom_count")) \
            .filter(col("bedroom_count") >= 3)  # Ch·ªâ l·∫•y nh√≥m c√≥ √≠t nh·∫•t 3 samples
    except:
        # Fallback n·∫øu kh√¥ng c√≥ ƒë·ªß d·ªØ li·ªáu
        bedroom_median_by_group = spark.createDataFrame([], "city_extracted string, price_range string, area_range string, bedroom_median double, bedroom_count long")
    
    # T√≠nh median cho bathroom  
    try:
        bathroom_median_by_group = df_grouped.filter(col("bathroom_cleaned").isNotNull()) \
            .select("city_extracted", "price_range", "area_range", "bathroom_cleaned") \
            .groupBy("city_extracted", "price_range", "area_range") \
            .agg(percentile_approx("bathroom_cleaned", 0.5).alias("bathroom_median"),
                 count("*").alias("bathroom_count")) \
            .filter(col("bathroom_count") >= 3)
    except:
        # Fallback n·∫øu kh√¥ng c√≥ ƒë·ªß d·ªØ li·ªáu
        bathroom_median_by_group = spark.createDataFrame([], "city_extracted string, price_range string, area_range string, bathroom_median double, bathroom_count long")
    
    # Join l·∫°i ƒë·ªÉ c√≥ median values
    df_with_median = df_grouped \
        .join(bedroom_median_by_group, 
              ["city_extracted", "price_range", "area_range"], "left") \
        .join(bathroom_median_by_group, 
              ["city_extracted", "price_range", "area_range"], "left")
    
    # B∆∞·ªõc 4: Fallback medians (to√†n dataset)
    try:
        overall_bedroom_median = df_with_median.filter(col("bedroom_cleaned").isNotNull()) \
            .select(percentile_approx("bedroom_cleaned", 0.5)).collect()[0][0]
        overall_bathroom_median = df_with_median.filter(col("bathroom_cleaned").isNotNull()) \
            .select(percentile_approx("bathroom_cleaned", 0.5)).collect()[0][0]
    except:
        overall_bedroom_median = 2.0  # Default fallback
        overall_bathroom_median = 1.0  # Default fallback
    
    print(f"Overall bedroom median: {overall_bedroom_median}")
    print(f"Overall bathroom median: {overall_bathroom_median}")
    
    # B∆∞·ªõc 5: Logic ƒëi·ªÅn th√¥ng minh v·ªõi ƒë·ªô ∆∞u ti√™n
    df_smart_filled = df_with_median.withColumn(
        "bedroom_final",
        # ∆Øu ti√™n 1: Gi√° tr·ªã g·ªëc n·∫øu c√≥
        when(col("bedroom_cleaned").isNotNull(), col("bedroom_cleaned"))
        # ∆Øu ti√™n 2: Tr√≠ch xu·∫•t t·ª´ text
        .when(col("bedroom_from_text").isNotNull() & 
              (col("bedroom_from_text") >= 1) & (col("bedroom_from_text") <= 10), 
              col("bedroom_from_text"))
        # ∆Øu ti√™n 3: Median c·ªßa nh√≥m t∆∞∆°ng t·ª±
        .when(col("bedroom_median").isNotNull(), col("bedroom_median"))
        # ∆Øu ti√™n 4: ∆Ø·ªõc l∆∞·ª£ng t·ª´ di·ªán t√≠ch
        .when(col("bedroom_from_area").isNotNull(), col("bedroom_from_area"))
        # Cu·ªëi c√πng: Overall median
        .otherwise(lit(overall_bedroom_median))
    ).withColumn(
        "bathroom_final",
        # ∆Øu ti√™n 1: Gi√° tr·ªã g·ªëc n·∫øu c√≥
        when(col("bathroom_cleaned").isNotNull(), col("bathroom_cleaned"))
        # ∆Øu ti√™n 2: Tr√≠ch xu·∫•t t·ª´ text
        .when(col("bathroom_from_text").isNotNull() & 
              (col("bathroom_from_text") >= 1) & (col("bathroom_from_text") <= 5), 
              col("bathroom_from_text"))
        # ∆Øu ti√™n 3: Median c·ªßa nh√≥m t∆∞∆°ng t·ª±
        .when(col("bathroom_median").isNotNull(), col("bathroom_median"))
        # ∆Øu ti√™n 4: ∆Ø·ªõc l∆∞·ª£ng t·ª´ di·ªán t√≠ch
        .when(col("bathroom_from_area").isNotNull(), col("bathroom_from_area"))
        # Cu·ªëi c√πng: Overall median
        .otherwise(lit(overall_bathroom_median))
    )
    
    # Th√™m flag ƒë·ªÉ bi·∫øt ngu·ªìn g·ªëc c·ªßa d·ªØ li·ªáu
    df_smart_filled = df_smart_filled.withColumn(
        "bedroom_source",
        when(col("bedroom_cleaned").isNotNull(), lit("original"))
        .when(col("bedroom_from_text").isNotNull() & 
              (col("bedroom_from_text") >= 1) & (col("bedroom_from_text") <= 10), 
              lit("extracted_from_text"))
        .when(col("bedroom_median").isNotNull(), lit("group_median"))
        .when(col("bedroom_from_area").isNotNull(), lit("area_based"))
        .otherwise(lit("overall_median"))
    ).withColumn(
        "bathroom_source",
        when(col("bathroom_cleaned").isNotNull(), lit("original"))
        .when(col("bathroom_from_text").isNotNull() & 
              (col("bathroom_from_text") >= 1) & (col("bathroom_from_text") <= 5), 
              lit("extracted_from_text"))
        .when(col("bathroom_median").isNotNull(), lit("group_median"))
        .when(col("bathroom_from_area").isNotNull(), lit("area_based"))
        .otherwise(lit("overall_median"))
    )
    
    return df_smart_filled

# √Åp d·ª•ng smart filling
print("\n===== B·∫ÆT ƒê·∫¶U SMART FILLING =====")
smart_filled_df = smart_fill_bedroom_bathroom(df)

# Th·ªëng k√™ k·∫øt qu·∫£
print("\n===== TH·ªêNG K√ä K·∫æT QU·∫¢ SMART FILLING =====")

# So s√°nh tr∆∞·ªõc v√† sau
original_bedroom_nulls = smart_filled_df.filter(col("bedroom_cleaned").isNull()).count()
original_bathroom_nulls = smart_filled_df.filter(col("bathroom_cleaned").isNull()).count()
total_records = smart_filled_df.count()

print(f"T·ªïng s·ªë b·∫£n ghi: {total_records}")
print(f"Bedroom null ban ƒë·∫ßu: {original_bedroom_nulls} ({original_bedroom_nulls/total_records*100:.1f}%)")
print(f"Bathroom null ban ƒë·∫ßu: {original_bathroom_nulls} ({original_bathroom_nulls/total_records*100:.1f}%)")

# Sau khi ƒëi·ªÅn
final_bedroom_nulls = smart_filled_df.filter(col("bedroom_final").isNull()).count()
final_bathroom_nulls = smart_filled_df.filter(col("bathroom_final").isNull()).count()

print(f"Bedroom null sau ƒëi·ªÅn: {final_bedroom_nulls} ({final_bedroom_nulls/total_records*100:.1f}%)")
print(f"Bathroom null sau ƒëi·ªÅn: {final_bathroom_nulls} ({final_bathroom_nulls/total_records*100:.1f}%)")

# Th·ªëng k√™ ngu·ªìn g·ªëc d·ªØ li·ªáu
print("\n===== NGU·ªíN G·ªêC D·ªÆ LI·ªÜU BEDROOM =====")
get_unique_values(smart_filled_df, "bedroom_source")

print("\n===== NGU·ªíN G·ªêC D·ªÆ LI·ªÜU BATHROOM =====")
get_unique_values(smart_filled_df, "bathroom_source")

# Ki·ªÉm tra ƒë·ªô h·ª£p l√Ω c·ªßa k·∫øt qu·∫£
print("\n===== KI·ªÇM TRA ƒê·ªò H·ª¢P L√ù =====")
print("Ph√¢n b·ªë bedroom_final:")
get_unique_values(smart_filled_df, "bedroom_final", 15)

print("Ph√¢n b·ªë bathroom_final:")
get_unique_values(smart_filled_df, "bathroom_final", 10)


===== B·∫ÆT ƒê·∫¶U SMART FILLING =====
B∆∞·ªõc 1: Tr√≠ch xu·∫•t t·ª´ title/description...
B∆∞·ªõc 2: ∆Ø·ªõc l∆∞·ª£ng d·ª±a tr√™n di·ªán t√≠ch...
B∆∞·ªõc 3: T√≠nh median theo nh√≥m t∆∞∆°ng t·ª±...
Overall bedroom median: 4.0
Overall bathroom median: 4.0

===== TH·ªêNG K√ä K·∫æT QU·∫¢ SMART FILLING =====
T·ªïng s·ªë b·∫£n ghi: 17336
Bedroom null ban ƒë·∫ßu: 8343 (48.1%)
Bathroom null ban ƒë·∫ßu: 8912 (51.4%)
Bedroom null sau ƒëi·ªÅn: 0 (0.0%)
Bathroom null sau ƒëi·ªÅn: 0 (0.0%)

===== NGU·ªíN G·ªêC D·ªÆ LI·ªÜU BEDROOM =====

===== Gi√° tr·ªã duy nh·∫•t c·ªßa c·ªôt bedroom_source =====
+-------------------+-----+
|bedroom_source     |count|
+-------------------+-----+
|original           |8993 |
|group_median       |8090 |
|extracted_from_text|210  |
|area_based         |34   |
|overall_median     |9    |
+-------------------+-----+


===== NGU·ªíN G·ªêC D·ªÆ LI·ªÜU BATHROOM =====

===== Gi√° tr·ªã duy nh·∫•t c·ªßa c·ªôt bathroom_source =====
+-------------------+-----+
|bathroom_source 

DataFrame[bathroom_final: double, count: bigint]

In [39]:
# S·ª≠a l·∫°i ph·∫ßn validation v√† t·∫°o dataset cu·ªëi c√πng
print("\n===== VALIDATION V√Ä FINE-TUNING =====")

def validate_smart_filling(df):
    """Validation v√† ƒëi·ªÅu ch·ªânh k·∫øt qu·∫£ smart filling"""
    
    # Rule 1: Bathroom kh√¥ng ƒë∆∞·ª£c nhi·ªÅu h∆°n bedroom + 1
    df_validated = df.withColumn(
        "bathroom_final_validated",
        when(
            col("bathroom_final") > (col("bedroom_final") + 1),
            col("bedroom_final")  # ƒêi·ªÅu ch·ªânh bathroom = bedroom n·∫øu qu√° nhi·ªÅu
        ).otherwise(col("bathroom_final"))
    )
    
    # Rule 2: Nh√† nh·ªè (< 30m¬≤) kh√¥ng th·ªÉ c√≥ qu√° nhi·ªÅu ph√≤ng
    df_validated = df_validated.withColumn(
        "bedroom_final_validated",
        when(
            (col("area_cleaned") < 30) & (col("bedroom_final") > 2),
            lit(1)  # Nh√† < 30m¬≤ t·ªëi ƒëa 1-2 ph√≤ng ng·ªß
        ).when(
            (col("area_cleaned") < 50) & (col("bedroom_final") > 3),
            lit(2)  # Nh√† < 50m¬≤ t·ªëi ƒëa 2-3 ph√≤ng ng·ªß
        ).otherwise(col("bedroom_final"))
    )
    
    # Rule 3: ƒê·∫£m b·∫£o gi√° tr·ªã h·ª£p l√Ω
    df_validated = df_validated.withColumn(
        "bedroom_final_validated",
        when(col("bedroom_final_validated") < 1, lit(1))
        .when(col("bedroom_final_validated") > 10, lit(6))
        .otherwise(col("bedroom_final_validated"))
    ).withColumn(
        "bathroom_final_validated",
        when(col("bathroom_final_validated") < 1, lit(1))
        .when(col("bathroom_final_validated") > 6, lit(4))
        .otherwise(col("bathroom_final_validated"))
    )
    
    # Rule 4: ƒê√°nh d·∫•u c√°c tr∆∞·ªùng h·ª£p c·∫ßn review
    df_validated = df_validated.withColumn(
        "needs_review",
        when(
            # Nh√† r·∫ª (<2 t·ª∑) nh∆∞ng nhi·ªÅu ph√≤ng (>4)
            (col("price_cleaned") < 2000000000) & (col("bedroom_final_validated") > 4),
            lit(True)
        ).when(
            # Di·ªán t√≠ch v√† s·ªë ph√≤ng kh√¥ng kh·ªõp
            (col("area_cleaned") < 60) & (col("bedroom_final_validated") > 3),
            lit(True)
        ).otherwise(lit(False))
    )
    
    return df_validated

# √Åp d·ª•ng validation
validated_df = validate_smart_filling(smart_filled_df)

# Th·ªëng k√™ c√°c tr∆∞·ªùng h·ª£p c·∫ßn review
print("\n===== C√ÅC TR∆Ø·ªúNG H·ª¢P C·∫¶N REVIEW =====")
need_review = validated_df.filter(col("needs_review") == True)
print(f"S·ªë b·∫£n ghi c·∫ßn review: {need_review.count()}")

if need_review.count() > 0 and need_review.count() < 100:
    print("M·∫´u c√°c tr∆∞·ªùng h·ª£p c·∫ßn review:")
    need_review.select(
        "area_cleaned", "price_cleaned", "bedroom_final_validated", 
        "bathroom_final_validated", "bedroom_source", "title"
    ).show(10, truncate=False)

# T·∫°o dataset cu·ªëi c√πng - lo·∫°i b·ªè c·ªôt g·ªëc tr∆∞·ªõc khi t·∫°o alias m·ªõi
final_room_df = validated_df.drop("bedroom", "bathroom").select(
    "*",
    col("bedroom_final_validated").alias("bedroom_new"),
    col("bathroom_final_validated").alias("bathroom_new")
)

print(f"\n===== DATASET CU·ªêI C√ôNG =====")
print(f"T·ªïng s·ªë b·∫£n ghi: {final_room_df.count()}")
print(f"Bedroom null: {final_room_df.filter(col('bedroom_new').isNull()).count()}")
print(f"Bathroom null: {final_room_df.filter(col('bathroom_new').isNull()).count()}")

# So s√°nh v·ªõi dataset ban ƒë·∫ßu
original_valid = df.filter(
    col("bedroom").isNotNull() & col("bathroom").isNotNull()
).count()

final_valid = final_room_df.filter(
    col("bedroom_new").isNotNull() & col("bathroom_new").isNotNull()
).count()

print(f"Dataset ban ƒë·∫ßu c√≥ bedroom+bathroom: {original_valid}")
print(f"Dataset sau smart filling: {final_valid}")
improvement = final_valid - original_valid
if original_valid > 0:
    improvement_pct = improvement / original_valid * 100
    print(f"TƒÉng th√™m: {improvement} b·∫£n ghi ({improvement_pct:.1f}%)")

print("\n‚úÖ HO√ÄN TH√ÄNH SMART IMPUTATION CHO BEDROOM/BATHROOM")


===== VALIDATION V√Ä FINE-TUNING =====

===== C√ÅC TR∆Ø·ªúNG H·ª¢P C·∫¶N REVIEW =====
S·ªë b·∫£n ghi c·∫ßn review: 595

===== DATASET CU·ªêI C√ôNG =====
T·ªïng s·ªë b·∫£n ghi: 17336
Bedroom null: 0
Bathroom null: 0
Dataset ban ƒë·∫ßu c√≥ bedroom+bathroom: 17336
Dataset sau smart filling: 17336
TƒÉng th√™m: 0 b·∫£n ghi (0.0%)

‚úÖ HO√ÄN TH√ÄNH SMART IMPUTATION CHO BEDROOM/BATHROOM


In [41]:
# Notebook x·ª≠ l√Ω t·ªïng h·ª£p d·ªØ li·ªáu b·∫•t ƒë·ªông s·∫£n t·ª´ A-Z
print("üöÄ B·∫ÆT ƒê·∫¶U QUY TR√åNH X·ª¨ L√ù D·ªÆ LI·ªÜU B·∫§T ƒê·ªòNG SAN")
print("=" * 70)

# Import ƒë·∫ßy ƒë·ªß
from pyspark.sql.functions import (
    col, to_timestamp, current_timestamp, lit, regexp_replace, trim,
    when, upper, lower, split, element_at, round as spark_round,
    avg, count, percentile_approx, stddev, min as spark_min, max as spark_max,
    udf, length, expr, regexp_extract, concat, isnull, isnan
)
from pyspark.sql.types import StringType, DoubleType

# =============================================================================
# B∆Ø·ªöC 1: ƒê·ªåC D·ªÆ LI·ªÜU
# =============================================================================
print("\nüìÇ B∆Ø·ªöC 1: ƒê·ªåC D·ªÆ LI·ªÜU")
csv_file = "/home/fer/data/real_estate_project/tmp/csv_files/bds_data_may2025.csv"
json_path = "hdfs://namenode:9000/data/realestate/raw/batdongsan/house/2025/05/*"

try:
    if os.path.exists(csv_file):
        df = spark.read.option("header", "true").csv(csv_file)
        print(f"‚úÖ ƒê·ªçc th√†nh c√¥ng t·ª´ CSV: {df.count()} b·∫£n ghi")
    else:
        df = spark.read.option("multiline", "false").json(json_path)
        print(f"‚úÖ ƒê·ªçc th√†nh c√¥ng t·ª´ JSON: {df.count()} b·∫£n ghi")
except Exception as e:
    print(f"‚ùå L·ªói ƒë·ªçc d·ªØ li·ªáu: {e}")
    # T·∫°o DataFrame tr·ªëng v·ªõi schema m·∫´u
    from pyspark.sql.types import StructType, StructField
    schema = StructType([
        StructField("url", StringType(), True),
        StructField("title", StringType(), True),
        StructField("price", StringType(), True),
        StructField("area", StringType(), True),
        StructField("bedroom", StringType(), True),
        StructField("bathroom", StringType(), True),
        StructField("house_direction", StringType(), True),
        StructField("interior", StringType(), True),
        StructField("legal_status", StringType(), True),
        StructField("location", StringType(), True),
        StructField("description", StringType(), True),
        StructField("posted_date", StringType(), True),
        StructField("crawl_timestamp", StringType(), True),
        StructField("latitude", StringType(), True),
        StructField("longitude", StringType(), True),
        StructField("seller_info", StringType(), True),
        StructField("source", StringType(), True),
        StructField("data_type", StringType(), True),
    ])
    df = spark.createDataFrame([], schema)

original_count = df.count()
print(f"üìä Dataset g·ªëc: {original_count} b·∫£n ghi")

# =============================================================================
# B∆Ø·ªöC 2: MAPPING V√Ä CHU·∫®N H√ìA D·ªÆ LI·ªÜU CATEGORICAL
# =============================================================================
print("\nüîÑ B∆Ø·ªöC 2: MAPPING V√Ä CHU·∫®N H√ìA D·ªÆ LI·ªÜU CATEGORICAL")

# 2.1. Mapping house_direction
print("   üìç Mapping house_direction...")
def normalize_direction(value):
    if value is None:
        return None
    return value.lower().replace(" ", "").replace("-", "")

direction_mapping = {
    # H∆∞·ªõng ƒë∆°n
    "dong": "EAST", "ƒë√¥ng": "EAST", "tay": "WEST", "t√¢y": "WEST",
    "nam": "SOUTH", "bac": "NORTH", "b·∫Øc": "NORTH",
    # H∆∞·ªõng k√©p
    "dongnam": "SOUTHEAST", "ƒë√¥ngnam": "SOUTHEAST", "namdong": "SOUTHEAST", "namƒë√¥ng": "SOUTHEAST",
    "dongbac": "NORTHEAST", "ƒë√¥ngb·∫Øc": "NORTHEAST", "bacdong": "NORTHEAST", "b·∫Øcƒë√¥ng": "NORTHEAST",
    "taynam": "SOUTHWEST", "t√¢ynam": "SOUTHWEST", "namtay": "SOUTHWEST", "namt√¢y": "SOUTHWEST",
    "taybac": "NORTHWEST", "t√¢yb·∫Øc": "NORTHWEST", "bactay": "NORTHWEST", "b·∫Øct√¢y": "NORTHWEST",
    None: "UNKNOWN", "": "UNKNOWN"
}

normalize_direction_udf = udf(normalize_direction, StringType())
map_direction_udf = udf(lambda x: direction_mapping.get(x, "UNKNOWN"), StringType())

# 2.2. Mapping interior
print("   üè† Mapping interior...")
def map_interior_by_keywords(value):
    if value is None or value == "":
        return "UNKNOWN"
    
    value_lower = value.lower()
    
    # LUXURY keywords
    luxury_keywords = ["caoc·∫•p", "cao c·∫•p", "luxury", "sangtr·ªçng", "sang tr·ªçng", "x·ªãn", "5*", "5 sao",
                      "nh·∫≠pkh·∫©u", "nh·∫≠p kh·∫©u", "ch√¢u√¢u", "ch√¢u √¢u", "ti√™uchu·∫©n", "ti√™u chu·∫©n"]
    if any(keyword in value_lower for keyword in luxury_keywords):
        return "LUXURY"
    
    # FULLY_FURNISHED keywords
    fully_furnished_keywords = ["ƒë·∫ßyƒë·ªß", "ƒë·∫ßy ƒë·ªß", "full", "ho√†nthi·ªán", "ho√†n thi·ªán",
                               "trangb·ªã", "trang b·ªã", "ƒëi·ªÅuh√≤a", "ƒëi·ªÅu h√≤a", "t·ªßl·∫°nh", "t·ªß l·∫°nh",
                               "n·ªôith·∫•t", "n·ªôi th·∫•t", "ƒë·ªÉl·∫°i", "ƒë·ªÉ l·∫°i", "t·∫∑ng"]
    if any(keyword in value_lower for keyword in fully_furnished_keywords):
        return "FULLY_FURNISHED"
    
    # BASIC keywords
    basic_keywords = ["c∆°b·∫£n", "c∆° b·∫£n", "b√¨nhth∆∞·ªùng", "b√¨nh th∆∞·ªùng", "chu·∫©n"]
    if any(keyword in value_lower for keyword in basic_keywords):
        return "BASIC"
    
    # UNFURNISHED keywords
    unfurnished_keywords = ["th√¥", "tr·ªëng", "kh√¥ng", "k ", "nt", "nh√†th√¥", "nh√† th√¥"]
    if any(keyword in value_lower for keyword in unfurnished_keywords):
        return "UNFURNISHED"
    
    return "UNKNOWN"

map_interior_udf = udf(map_interior_by_keywords, StringType())

# 2.3. Mapping legal_status
print("   üìã Mapping legal_status...")
def map_legal_status_v2(value):
    if value is None or value == "":
        return "UNKNOWN"
    
    value_lower = (value.lower().replace(" ", "").replace("-", "").replace(".", "")
                  .replace("/", "").replace("\\", "").replace(",", "").replace(":", "")
                  .replace(";", "").replace("(", "").replace(")", "").replace("+", ""))
    
    # NO_LEGAL
    no_legal_keywords = ["kh√¥ngph√°pl√Ω", "khongphap≈Çy", "kh√¥ngs·ªï", "khongso", "kos·ªï", "koso",
                        "kh√¥ng", "khong", "ch∆∞ac√≥", "chuaco", "ch∆∞a", "chua", "ko"]
    if any(keyword in value_lower for keyword in no_legal_keywords):
        return "NO_LEGAL"

    # LAND_USE_CERTIFICATE
    land_use_keywords = ["th·ªïc∆∞", "thocu", "th·ªïc∆∞100", "thocu100", "th·ªïc∆∞100%", "thocu100%",
                        "ƒë·∫•tth·ªïc∆∞", "datthoju", "cnqsdƒë", "cnqsdd", "s·ª≠d·ª•ngƒë·∫•t", "sudungdat"]
    if any(keyword in value_lower for keyword in land_use_keywords):
        return "LAND_USE_CERTIFICATE"

    # RED_BOOK
    red_book_keywords = ["s·ªïƒë·ªè", "sodo", "s·ªïh·ªìng", "sohong", "s·ªïƒë·ªès·ªïh·ªìng", "sƒëcc", "sdhh",
                        "b√¨aƒë·ªè", "biado", "s·ªïch√≠nhch·ªß", "sochinhchu", "s·ªïƒë·∫πp", "sodep",
                        "s·ªïvu√¥ng", "sovuong", "s·ªïvu√¥ngv·∫Øn", "sovuongvan", "s·ªïs·∫°ch", "sosach"]
    if any(keyword in value_lower for keyword in red_book_keywords):
        return "RED_BOOK"

    # OWNERSHIP_CERTIFICATE
    ownership_keywords = ["shcc", "shr", "ccqsh", "ch·ª©ngnh·∫≠n", "chungnhan", "s·ªïcc", "socc",
                         "c√¥ngc√¥ng", "congcong", "s·ªïc√¥ngnh·∫≠n", "socognhan", "ph√°pl√Ω", "phap≈Çy",
                         "ch√≠nhch·ªß", "chinhchu", "s·ªüh·ªØu", "sohuu", "c√≥", "co", "ƒë·∫ßyƒë·ªß", "daydu"]
    if any(keyword in value_lower for keyword in ownership_keywords):
        return "OWNERSHIP_CERTIFICATE"

    # TRANSACTION_READY
    transaction_keywords = ["s·∫µns√†nggiaod·ªãch", "sansanggiaodich", "c√¥ngch·ª©ng", "congchung",
                           "giaod·ªãchngay", "giaodichngay", "vu√¥ngv·∫Øn", "vuongvan", "s·∫°ch", "sach"]
    if any(keyword in value_lower for keyword in transaction_keywords):
        return "TRANSACTION_READY"

    return "UNKNOWN"

map_legal_status_udf = udf(map_legal_status_v2, StringType())

# =============================================================================
# B∆Ø·ªöC 3: SMART IMPUTATION CHO BEDROOM/BATHROOM
# =============================================================================
print("\nüß† B∆Ø·ªöC 3: SMART IMPUTATION CHO BEDROOM/BATHROOM")

def smart_fill_bedroom_bathroom(df):
    print("   üîç B∆∞·ªõc 3.1: L√†m s·∫°ch d·ªØ li·ªáu s·ªë...")
    
    # L√†m s·∫°ch c√°c c·ªôt s·ªë
    df_cleaned = df.withColumn("area_cleaned", regexp_replace(col("area"), "[^0-9\\.]", "").cast("double")) \
                  .withColumn("bedroom_cleaned", regexp_replace(col("bedroom"), "[^0-9]", "").cast("double")) \
                  .withColumn("bathroom_cleaned", regexp_replace(col("bathroom"), "[^0-9]", "").cast("double")) \
                  .withColumn("price_cleaned",
                             when(lower(col("price")).contains("t·ª∑") | lower(col("price")).contains("ty"),
                                  regexp_replace(col("price"), "[^0-9\\.]", "").cast("double") * 1000000000)
                             .when(lower(col("price")).contains("tri·ªáu") | lower(col("price")).contains("trieu"),
                                   regexp_replace(col("price"), "[^0-9\\.]", "").cast("double") * 1000000)
                             .otherwise(regexp_replace(col("price"), "[^0-9\\.]", "").cast("double")))
    
    # Tr√≠ch xu·∫•t th√†nh ph·ªë
    df_with_city = df_cleaned.withColumn(
        "city_extracted",
        when(lower(col("location")).contains("h·ªì ch√≠ minh") | lower(col("location")).contains("tp.hcm") |
             lower(col("location")).contains("tphcm") | lower(col("location")).contains("hcm"), lit("Ho Chi Minh"))
        .when(lower(col("location")).contains("h√† n·ªôi") | lower(col("location")).contains("hanoi"), lit("Hanoi"))
        .otherwise(lit("Other"))
    )
    
    print("   üìù B∆∞·ªõc 3.2: Tr√≠ch xu·∫•t t·ª´ title/description...")
    
    # Tr√≠ch xu·∫•t th√¥ng tin t·ª´ text
    df_extracted = df_with_city.withColumn(
        "title_desc_combined",
        concat(when(col("title").isNotNull(), lower(col("title"))).otherwise(lit("")),
               lit(" "),
               when(col("description").isNotNull(), lower(col("description"))).otherwise(lit("")))
    ).withColumn(
        "bedroom_from_text",
        when(col("title_desc_combined").rlike(r"(\d+)\s*(ph√≤ng\s*ng·ªß|pn|bedroom)"),
             regexp_extract(col("title_desc_combined"), r"(\d+)\s*(?:ph√≤ng\s*ng·ªß|pn|bedroom)", 1).cast("double"))
        .otherwise(lit(None))
    ).withColumn(
        "bathroom_from_text", 
        when(col("title_desc_combined").rlike(r"(\d+)\s*(ph√≤ng\s*t·∫Øm|wc|toilet|bathroom)"),
             regexp_extract(col("title_desc_combined"), r"(\d+)\s*(?:ph√≤ng\s*t·∫Øm|wc|toilet|bathroom)", 1).cast("double"))
        .otherwise(lit(None))
    )
    
    print("   üìè B∆∞·ªõc 3.3: ∆Ø·ªõc l∆∞·ª£ng t·ª´ di·ªán t√≠ch...")
    
    # ∆Ø·ªõc l∆∞·ª£ng t·ª´ di·ªán t√≠ch
    df_area_based = df_extracted.withColumn(
        "bedroom_from_area",
        when(col("area_cleaned").isNotNull(),
             when(col("area_cleaned") <= 30, lit(1))
             .when(col("area_cleaned") <= 50, lit(2))
             .when(col("area_cleaned") <= 80, lit(3))
             .when(col("area_cleaned") <= 120, lit(4))
             .when(col("area_cleaned") <= 200, lit(5))
             .otherwise(lit(6)))
        .otherwise(lit(None))
    ).withColumn(
        "bathroom_from_area",
        when(col("area_cleaned").isNotNull(),
             when(col("area_cleaned") <= 40, lit(1))
             .when(col("area_cleaned") <= 80, lit(2))
             .when(col("area_cleaned") <= 150, lit(3))
             .otherwise(lit(4)))
        .otherwise(lit(None))
    )
    
    print("   üìä B∆∞·ªõc 3.4: T√≠nh median theo nh√≥m...")
    
    # T·∫°o price v√† area ranges
    df_grouped = df_area_based.withColumn(
        "price_range",
        when(col("price_cleaned").isNull(), lit("unknown"))
        .when(col("price_cleaned") < 1000000000, lit("under_1b"))
        .when(col("price_cleaned") < 3000000000, lit("1b_3b"))
        .when(col("price_cleaned") < 5000000000, lit("3b_5b"))
        .when(col("price_cleaned") < 10000000000, lit("5b_10b"))
        .otherwise(lit("over_10b"))
    ).withColumn(
        "area_range",
        when(col("area_cleaned").isNull(), lit("unknown"))
        .when(col("area_cleaned") < 50, lit("small"))
        .when(col("area_cleaned") < 100, lit("medium"))
        .when(col("area_cleaned") < 200, lit("large"))
        .otherwise(lit("very_large"))
    )
    
    # T√≠nh median cho bedroom v√† bathroom
    try:
        bedroom_median_by_group = df_grouped.filter(col("bedroom_cleaned").isNotNull()) \
            .groupBy("city_extracted", "price_range", "area_range") \
            .agg(percentile_approx("bedroom_cleaned", 0.5).alias("bedroom_median"),
                 count("*").alias("bedroom_count")) \
            .filter(col("bedroom_count") >= 3)
            
        bathroom_median_by_group = df_grouped.filter(col("bathroom_cleaned").isNotNull()) \
            .groupBy("city_extracted", "price_range", "area_range") \
            .agg(percentile_approx("bathroom_cleaned", 0.5).alias("bathroom_median"),
                 count("*").alias("bathroom_count")) \
            .filter(col("bathroom_count") >= 3)
    except:
        bedroom_median_by_group = spark.createDataFrame([], "city_extracted string, price_range string, area_range string, bedroom_median double, bedroom_count long")
        bathroom_median_by_group = spark.createDataFrame([], "city_extracted string, price_range string, area_range string, bathroom_median double, bathroom_count long")
    
    # Join median values
    df_with_median = df_grouped \
        .join(bedroom_median_by_group, ["city_extracted", "price_range", "area_range"], "left") \
        .join(bathroom_median_by_group, ["city_extracted", "price_range", "area_range"], "left")
    
    # T√≠nh overall medians
    try:
        overall_bedroom_median = df_with_median.filter(col("bedroom_cleaned").isNotNull()) \
            .select(percentile_approx("bedroom_cleaned", 0.5)).collect()[0][0]
        overall_bathroom_median = df_with_median.filter(col("bathroom_cleaned").isNotNull()) \
            .select(percentile_approx("bathroom_cleaned", 0.5)).collect()[0][0]
    except:
        overall_bedroom_median = 2.0
        overall_bathroom_median = 1.0
    
    print(f"   üìà Overall medians - Bedroom: {overall_bedroom_median}, Bathroom: {overall_bathroom_median}")
    
    print("   üéØ B∆∞·ªõc 3.5: √Åp d·ª•ng logic ƒëi·ªÅn th√¥ng minh...")
    
    # Logic ƒëi·ªÅn th√¥ng minh theo ƒë·ªô ∆∞u ti√™n
    df_smart_filled = df_with_median.withColumn(
        "bedroom_final",
        when(col("bedroom_cleaned").isNotNull(), col("bedroom_cleaned"))
        .when(col("bedroom_from_text").isNotNull() & 
              (col("bedroom_from_text") >= 1) & (col("bedroom_from_text") <= 10), 
              col("bedroom_from_text"))
        .when(col("bedroom_median").isNotNull(), col("bedroom_median"))
        .when(col("bedroom_from_area").isNotNull(), col("bedroom_from_area"))
        .otherwise(lit(overall_bedroom_median))
    ).withColumn(
        "bathroom_final",
        when(col("bathroom_cleaned").isNotNull(), col("bathroom_cleaned"))
        .when(col("bathroom_from_text").isNotNull() & 
              (col("bathroom_from_text") >= 1) & (col("bathroom_from_text") <= 5), 
              col("bathroom_from_text"))
        .when(col("bathroom_median").isNotNull(), col("bathroom_median"))
        .when(col("bathroom_from_area").isNotNull(), col("bathroom_from_area"))
        .otherwise(lit(overall_bathroom_median))
    )
    
    # Th√™m source tracking
    df_smart_filled = df_smart_filled.withColumn(
        "bedroom_source",
        when(col("bedroom_cleaned").isNotNull(), lit("original"))
        .when(col("bedroom_from_text").isNotNull() & 
              (col("bedroom_from_text") >= 1) & (col("bedroom_from_text") <= 10), 
              lit("extracted_from_text"))
        .when(col("bedroom_median").isNotNull(), lit("group_median"))
        .when(col("bedroom_from_area").isNotNull(), lit("area_based"))
        .otherwise(lit("overall_median"))
    ).withColumn(
        "bathroom_source",
        when(col("bathroom_cleaned").isNotNull(), lit("original"))
        .when(col("bathroom_from_text").isNotNull() & 
              (col("bathroom_from_text") >= 1) & (col("bathroom_from_text") <= 5), 
              lit("extracted_from_text"))
        .when(col("bathroom_median").isNotNull(), lit("group_median"))
        .when(col("bathroom_from_area").isNotNull(), lit("area_based"))
        .otherwise(lit("overall_median"))
    )
    
    return df_smart_filled

# =============================================================================
# B∆Ø·ªöC 4: √ÅP D·ª§NG T·∫§T C·∫¢ C√ÅC X·ª¨ L√ù
# =============================================================================
print("\n‚öôÔ∏è B∆Ø·ªöC 4: √ÅP D·ª§NG T·∫§T C·∫¢ C√ÅC X·ª¨ L√ù")

# √Åp d·ª•ng t·∫•t c·∫£ mappings
processed_df = df.withColumn("house_direction_normalized", normalize_direction_udf(col("house_direction"))) \
                .withColumn("house_direction_mapped", map_direction_udf(col("house_direction_normalized"))) \
                .withColumn("interior_mapped", map_interior_udf(col("interior"))) \
                .withColumn("legal_status_mapped", map_legal_status_udf(col("legal_status")))

print("   ‚úÖ Ho√†n th√†nh mapping categorical data")

# √Åp d·ª•ng smart filling
smart_filled_df = smart_fill_bedroom_bathroom(processed_df)
print("   ‚úÖ Ho√†n th√†nh smart imputation")

# =============================================================================
# B∆Ø·ªöC 5: VALIDATION V√Ä FINE-TUNING
# =============================================================================
print("\nüîç B∆Ø·ªöC 5: VALIDATION V√Ä FINE-TUNING")

# Validation rules
validated_df = smart_filled_df.withColumn(
    "bathroom_final_validated",
    when(col("bathroom_final") > (col("bedroom_final") + 1), col("bedroom_final"))
    .otherwise(col("bathroom_final"))
).withColumn(
    "bedroom_final_validated",
    when((col("area_cleaned") < 30) & (col("bedroom_final") > 2), lit(1))
    .when((col("area_cleaned") < 50) & (col("bedroom_final") > 3), lit(2))
    .when(col("bedroom_final") < 1, lit(1))
    .when(col("bedroom_final") > 10, lit(6))
    .otherwise(col("bedroom_final"))
).withColumn(
    "bathroom_final_validated",
    when(col("bathroom_final_validated") < 1, lit(1))
    .when(col("bathroom_final_validated") > 6, lit(4))
    .otherwise(col("bathroom_final_validated"))
).withColumn(
    "needs_review",
    when((col("price_cleaned") < 2000000000) & (col("bedroom_final_validated") > 4), lit(True))
    .when((col("area_cleaned") < 60) & (col("bedroom_final_validated") > 3), lit(True))
    .otherwise(lit(False))
)

print("   ‚úÖ Ho√†n th√†nh validation")

# =============================================================================
# B∆Ø·ªöC 6: T·∫†O DATASET CU·ªêI C√ôNG
# =============================================================================
print("\nüìã B∆Ø·ªöC 6: T·∫†O DATASET CU·ªêI C√ôNG")

# L√†m s·∫°ch v√† chu·∫©n h√≥a th√™m c√°c c·ªôt kh√°c
final_processed_df = validated_df.withColumn(
    "floor_count_cleaned", regexp_replace(col("floor_count"), "[^0-9]", "").cast("double")
).withColumn(
    "facade_width_cleaned", regexp_replace(col("facade_width"), "[^0-9\\.]", "").cast("double")
).withColumn(
    "road_width_cleaned", regexp_replace(col("road_width"), "[^0-9\\.]", "").cast("double")
).withColumn(
    "latitude_cleaned", col("latitude").cast("double")
).withColumn(
    "longitude_cleaned", col("longitude").cast("double")
).withColumn(
    "location_cleaned", trim(col("location"))
).withColumn(
    "is_negotiable", 
    when(lower(col("price")).contains("th·ªèa thu·∫≠n") | 
         lower(col("price")).contains("thoathuan") | 
         lower(col("price")).contains("thoa thuan"), lit(True))
    .otherwise(lit(False))
).withColumn(
    "price_per_m2_cleaned",
    when(lower(col("price_per_m2")).contains("t·ª∑") | lower(col("price_per_m2")).contains("ty"),
         regexp_replace(col("price_per_m2"), "[^0-9\\.]", "").cast("double") * 1000000000)
    .when(lower(col("price_per_m2")).contains("tri·ªáu") | lower(col("price_per_m2")).contains("trieu"),
          regexp_replace(col("price_per_m2"), "[^0-9\\.]", "").cast("double") * 1000000)
    .otherwise(regexp_replace(col("price_per_m2"), "[^0-9\\.]", "").cast("double"))
).withColumn(
    "district_extracted",
    when(lower(col("location")).contains("qu·∫≠n"),
         regexp_extract(lower(col("location")), r"qu·∫≠n\s*(\d+)", 1))
    .when(lower(col("location")).contains("q."),
          regexp_extract(lower(col("location")), r"q\.?\s*(\d+)", 1))
    .otherwise(lit(None))
)

# T·∫°o dataset cu·ªëi c√πng v·ªõi c·ªôt ƒë√£ ƒë·ªïi t√™n
final_clean_df = final_processed_df.drop("bedroom", "bathroom").select(
    # Th√¥ng tin c∆° b·∫£n
    col("url"), col("title"), col("description"), col("posted_date"), col("crawl_timestamp"),
    col("seller_info"), col("source"), col("data_type"),
    
    # Th√¥ng tin nh√† ƒë·∫•t ƒë√£ ƒë∆∞·ª£c l√†m s·∫°ch
    col("area_cleaned").alias("area"),
    col("bedroom_final_validated").alias("bedroom"),
    col("bathroom_final_validated").alias("bathroom"),
    col("floor_count_cleaned").alias("floor_count"),
    col("facade_width_cleaned").alias("facade_width"),
    col("road_width_cleaned").alias("road_width"),
    
    # Th√¥ng tin gi√°
    col("price_cleaned").alias("price"),
    col("price_per_m2_cleaned").alias("price_per_m2"),
    col("is_negotiable"),
    
    # Th√¥ng tin ƒë√£ ƒë∆∞·ª£c mapped
    col("house_direction_mapped").alias("house_direction"),
    col("interior_mapped").alias("interior"),
    col("legal_status_mapped").alias("legal_status"),
    
    # Th√¥ng tin v·ªã tr√≠
    col("location_cleaned").alias("location"),
    col("latitude_cleaned").alias("latitude"),
    col("longitude_cleaned").alias("longitude"),
    col("city_extracted").alias("city"),
    col("district_extracted").alias("district"),
    
    # Metadata
    col("bedroom_source"), col("bathroom_source"), col("needs_review")
)

print("   ‚úÖ Ho√†n th√†nh t·∫°o dataset cu·ªëi c√πng")

# =============================================================================
# B∆Ø·ªöC 7: PH√ÇN T√çCH CH·∫§T L∆Ø·ª¢NG D·ªÆ LI·ªÜU
# =============================================================================
print("\nüìä B∆Ø·ªöC 7: PH√ÇN T√çCH CH·∫§T L∆Ø·ª¢NG D·ªÆ LI·ªÜU")

# T√≠nh ƒëi·ªÉm ch·∫•t l∆∞·ª£ng
quality_df = final_clean_df.withColumn(
    "data_quality_score",
    (when(col("area").isNotNull() & (col("area") > 0), lit(20)).otherwise(lit(0))) +
    (when(col("price").isNotNull() & (col("price") > 0), lit(20)).otherwise(lit(0))) +
    (when(col("bedroom").isNotNull() & (col("bedroom") > 0), lit(15)).otherwise(lit(0))) +
    (when(col("bathroom").isNotNull() & (col("bathroom") > 0), lit(15)).otherwise(lit(0))) +
    (when(col("location").isNotNull() & (length(col("location")) > 10), lit(15)).otherwise(lit(0))) +
    (when(col("latitude").isNotNull() & col("longitude").isNotNull(), lit(15)).otherwise(lit(0)))
)

# Th·ªëng k√™ ch·∫•t l∆∞·ª£ng
final_count = quality_df.count()
quality_stats = quality_df.select(
    avg("data_quality_score").alias("avg_quality"),
    spark_min("data_quality_score").alias("min_quality"),
    spark_max("data_quality_score").alias("max_quality"),
    count(when(col("data_quality_score") >= 80, True)).alias("high_quality_count"),
    count(when(col("data_quality_score") >= 60, True)).alias("medium_quality_count")
).collect()[0]

# Dataset theo ch·∫•t l∆∞·ª£ng
high_quality_df = quality_df.filter(col("data_quality_score") >= 80)
medium_quality_df = quality_df.filter(col("data_quality_score") >= 60)

print("   ‚úÖ Ho√†n th√†nh ph√¢n t√≠ch ch·∫•t l∆∞·ª£ng")

# =============================================================================
# B∆Ø·ªöC 8: TH·ªêNG K√ä V√Ä B√ÅO C√ÅO K·∫æT QU·∫¢
# =============================================================================
print("\nüìà B∆Ø·ªöC 8: TH·ªêNG K√ä V√Ä B√ÅO C√ÅO K·∫æT QU·∫¢")
print("=" * 70)

# Th·ªëng k√™ t·ªïng quan
print(f"üìä T·ªîNG QUAN:")
print(f"   ‚Ä¢ Dataset g·ªëc: {original_count:,} b·∫£n ghi")
print(f"   ‚Ä¢ Dataset sau x·ª≠ l√Ω: {final_count:,} b·∫£n ghi")
print(f"   ‚Ä¢ T·ª∑ l·ªá gi·ªØ l·∫°i: {(final_count/original_count*100):.1f}%")

print(f"\nüéØ CH·∫§T L∆Ø·ª¢NG D·ªÆ LI·ªÜU:")
print(f"   ‚Ä¢ ƒêi·ªÉm ch·∫•t l∆∞·ª£ng trung b√¨nh: {quality_stats['avg_quality']:.1f}/100")
print(f"   ‚Ä¢ Dataset ch·∫•t l∆∞·ª£ng cao (‚â•80): {quality_stats['high_quality_count']:,} ({quality_stats['high_quality_count']/final_count*100:.1f}%)")
print(f"   ‚Ä¢ Dataset ch·∫•t l∆∞·ª£ng trung b√¨nh (‚â•60): {quality_stats['medium_quality_count']:,} ({quality_stats['medium_quality_count']/final_count*100:.1f}%)")

# Th·ªëng k√™ null values
print(f"\nüîç NULL VALUES TRONG C√ÅC C·ªòT QUAN TR·ªåNG:")
important_cols = ["area", "bedroom", "bathroom", "price", "house_direction", "interior", "legal_status", "location"]
for col_name in important_cols:
    null_count = final_clean_df.filter(col(col_name).isNull()).count()
    null_pct = (null_count / final_count * 100) if final_count > 0 else 0
    print(f"   ‚Ä¢ {col_name}: {null_count:,} null ({null_pct:.1f}%)")

# Th·ªëng k√™ smart imputation
print(f"\nüß† K·∫æT QU·∫¢ SMART IMPUTATION:")
bedroom_sources = final_clean_df.groupBy("bedroom_source").count().orderBy("count", ascending=False).collect()
bathroom_sources = final_clean_df.groupBy("bathroom_source").count().orderBy("count", ascending=False).collect()

print("   üìù Ngu·ªìn g·ªëc Bedroom:")
for row in bedroom_sources:
    pct = (row['count'] / final_count * 100) if final_count > 0 else 0
    print(f"     - {row['bedroom_source']}: {row['count']:,} ({pct:.1f}%)")

print("   üöø Ngu·ªìn g·ªëc Bathroom:")  
for row in bathroom_sources:
    pct = (row['count'] / final_count * 100) if final_count > 0 else 0
    print(f"     - {row['bathroom_source']}: {row['count']:,} ({pct:.1f}%)")

# Th·ªëng k√™ mapping results
print(f"\nüó∫Ô∏è K·∫æT QU·∫¢ MAPPING:")
print("   üß≠ House Direction:")
direction_stats = final_clean_df.groupBy("house_direction").count().orderBy("count", ascending=False).limit(5).collect()
for row in direction_stats:
    pct = (row['count'] / final_count * 100) if final_count > 0 else 0
    print(f"     - {row['house_direction']}: {row['count']:,} ({pct:.1f}%)")

print("   üè† Interior:")
interior_stats = final_clean_df.groupBy("interior").count().orderBy("count", ascending=False).limit(5).collect()
for row in interior_stats:
    pct = (row['count'] / final_count * 100) if final_count > 0 else 0
    print(f"     - {row['interior']}: {row['count']:,} ({pct:.1f}%)")

print("   üìã Legal Status:")
legal_stats = final_clean_df.groupBy("legal_status").count().orderBy("count", ascending=False).limit(5).collect()
for row in legal_stats:
    pct = (row['count'] / final_count * 100) if final_count > 0 else 0
    print(f"     - {row['legal_status']}: {row['count']:,} ({pct:.1f}%)")

# Th·ªëng k√™ theo th√†nh ph·ªë
print(f"\nüåÜ PH√ÇN B·ªê THEO TH√ÄNH PH·ªê:")
city_stats = final_clean_df.groupBy("city").count().orderBy("count", ascending=False).collect()
for row in city_stats:
    pct = (row['count'] / final_count * 100) if final_count > 0 else 0
    print(f"   ‚Ä¢ {row['city']}: {row['count']:,} ({pct:.1f}%)")

# Ph√¢n t√≠ch gi√° (ch·ªâ v·ªõi d·ªØ li·ªáu c√≥ gi√°)
price_data = final_clean_df.filter(col("price").isNotNull() & (col("price") > 0))
if price_data.count() > 0:
    price_stats = price_data.select(
        avg("price").alias("avg_price"),
        percentile_approx("price", 0.5).alias("median_price"),
        spark_min("price").alias("min_price"),
        spark_max("price").alias("max_price")
    ).collect()[0]
    
    print(f"\nüí∞ TH·ªêNG K√ä GI√Å ({price_data.count():,} b·∫£n ghi c√≥ gi√°):")
    print(f"   ‚Ä¢ Gi√° trung b√¨nh: {price_stats['avg_price']/1000000000:.2f} t·ª∑ VNƒê")
    print(f"   ‚Ä¢ Gi√° trung v·ªã: {price_stats['median_price']/1000000000:.2f} t·ª∑ VNƒê")
    print(f"   ‚Ä¢ Gi√° th·∫•p nh·∫•t: {price_stats['min_price']/1000000:.0f} tri·ªáu VNƒê")
    print(f"   ‚Ä¢ Gi√° cao nh·∫•t: {price_stats['max_price']/1000000000:.2f} t·ª∑ VNƒê")

# C·∫ßn review
need_review_count = final_clean_df.filter(col("needs_review") == True).count()
print(f"\n‚ö†Ô∏è C·∫¶N REVIEW: {need_review_count:,} b·∫£n ghi ({need_review_count/final_count*100:.1f}%)")

print("\n" + "=" * 70)
print("üéâ HO√ÄN TH√ÄNH QUY TR√åNH X·ª¨ L√ù D·ªÆ LI·ªÜU!")
print("‚úÖ C√°c th√†nh t·ª±u ch√≠nh:")
print("   ‚Ä¢ Mapping th√†nh c√¥ng house_direction, interior, legal_status")
print("   ‚Ä¢ Smart imputation cho bedroom/bathroom v·ªõi nhi·ªÅu chi·∫øn l∆∞·ª£c")
print("   ‚Ä¢ L√†m s·∫°ch v√† chu·∫©n h√≥a d·ªØ li·ªáu s·ªë")
print("   ‚Ä¢ Tr√≠ch xu·∫•t th√¥ng tin v·ªã tr√≠")
print("   ‚Ä¢ Ph√¢n lo·∫°i ch·∫•t l∆∞·ª£ng d·ªØ li·ªáu")
print("   ‚Ä¢ Validation v√† ki·ªÉm tra t√≠nh h·ª£p l√Ω")
print("=" * 70)

# L∆∞u k·∫øt qu·∫£ (t√πy ch·ªçn)
try:
    output_path = "/home/fer/data/real_estate_project/tmp/processed_data"
    os.makedirs(output_path, exist_ok=True)
    
    # L∆∞u dataset ch·∫•t l∆∞·ª£ng cao
    high_quality_df.drop("data_quality_score").coalesce(1).write.mode("overwrite").parquet(f"{output_path}/high_quality_data.parquet")
    print(f"üíæ ƒê√£ l∆∞u {high_quality_df.count():,} b·∫£n ghi ch·∫•t l∆∞·ª£ng cao v√†o: {output_path}/high_quality_data.parquet")
    
    # L∆∞u dataset ƒë·∫ßy ƒë·ªß
    quality_df.coalesce(1).write.mode("overwrite").parquet(f"{output_path}/full_processed_data.parquet")
    print(f"üíæ ƒê√£ l∆∞u {quality_df.count():,} b·∫£n ghi ƒë·∫ßy ƒë·ªß v√†o: {output_path}/full_processed_data.parquet")
    
except Exception as e:
    print(f"‚ö†Ô∏è Kh√¥ng th·ªÉ l∆∞u file: {e}")

print("\nüöÄ C√≥ th·ªÉ s·ª≠ d·ª•ng c√°c DataFrame sau ƒë·ªÉ ph√¢n t√≠ch ti·∫øp:")
print("   ‚Ä¢ final_clean_df: Dataset ƒë·∫ßy ƒë·ªß ƒë√£ x·ª≠ l√Ω")
print("   ‚Ä¢ high_quality_df: Dataset ch·∫•t l∆∞·ª£ng cao (‚â•80 ƒëi·ªÉm)")
print("   ‚Ä¢ medium_quality_df: Dataset ch·∫•t l∆∞·ª£ng trung b√¨nh (‚â•60 ƒëi·ªÉm)")

üöÄ B·∫ÆT ƒê·∫¶U QUY TR√åNH X·ª¨ L√ù D·ªÆ LI·ªÜU B·∫§T ƒê·ªòNG SAN

üìÇ B∆Ø·ªöC 1: ƒê·ªåC D·ªÆ LI·ªÜU
‚úÖ ƒê·ªçc th√†nh c√¥ng t·ª´ JSON: 17336 b·∫£n ghi
üìä Dataset g·ªëc: 17336 b·∫£n ghi

üîÑ B∆Ø·ªöC 2: MAPPING V√Ä CHU·∫®N H√ìA D·ªÆ LI·ªÜU CATEGORICAL
   üìç Mapping house_direction...
   üè† Mapping interior...
   üìã Mapping legal_status...

üß† B∆Ø·ªöC 3: SMART IMPUTATION CHO BEDROOM/BATHROOM

‚öôÔ∏è B∆Ø·ªöC 4: √ÅP D·ª§NG T·∫§T C·∫¢ C√ÅC X·ª¨ L√ù
   ‚úÖ Ho√†n th√†nh mapping categorical data
   üîç B∆∞·ªõc 3.1: L√†m s·∫°ch d·ªØ li·ªáu s·ªë...
   üìù B∆∞·ªõc 3.2: Tr√≠ch xu·∫•t t·ª´ title/description...
   üìè B∆∞·ªõc 3.3: ∆Ø·ªõc l∆∞·ª£ng t·ª´ di·ªán t√≠ch...
   üìä B∆∞·ªõc 3.4: T√≠nh median theo nh√≥m...
   üìà Overall medians - Bedroom: 4.0, Bathroom: 4.0
   üéØ B∆∞·ªõc 3.5: √Åp d·ª•ng logic ƒëi·ªÅn th√¥ng minh...
   ‚úÖ Ho√†n th√†nh smart imputation

üîç B∆Ø·ªöC 5: VALIDATION V√Ä FINE-TUNING
   ‚úÖ Ho√†n th√†nh validation

üìã B∆Ø·ªöC 6: T·∫†O DATASET CU·ªêI C√ôNG
  

25/05/24 21:11:08 WARN JavaUtils: Attempt to delete using native Unix OS command failed for path = /tmp/blockmgr-223a686d-e2de-4535-9c91-de68178c4ec9. Falling back to Java IO way
java.io.IOException: Failed to delete: /tmp/blockmgr-223a686d-e2de-4535-9c91-de68178c4ec9
	at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingUnixNative(JavaUtils.java:177)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:113)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
	at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1231)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1(DiskBlockManager.scala:368)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1$adapted(DiskBlockManager.scala:364)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach