# bigboyz - Dubai Real Estate Price Prediction

**CSCI316: Big Data Mining Techniques and Implementation**  
University of Wollongong in Dubai

---

## Project Overview

This notebook implements a large-scale machine learning pipeline to predict real estate transaction prices in Dubai using the Dubai Land Department's transactions dataset (~1.5 million records).

**Business Question:** *Can we accurately predict real estate transaction prices in Dubai based on property characteristics, location, and temporal factors?*

### Key Requirements
- **10-Fold Cross-Validation**: Implemented from scratch (no CrossValidator/TrainValidationSplit)
- **Bagging Ensemble**: Implemented from scratch (no RandomForestRegressor)
- **Apache Spark**: All data processing uses PySpark

---
## Table of Contents

1. [Environment Setup](#1-environment-setup)
2. [Data Loading & Exploration](#2-data-loading--exploration)
3. [Data Cleaning](#3-data-cleaning)
4. [Feature Engineering](#4-feature-engineering)
5. [Train/Test Split](#5-traintest-split)
6. [Model Training with Manual Cross-Validation](#6-model-training-with-manual-cross-validation)
7. [Bagging Ensemble (From Scratch)](#7-bagging-ensemble-from-scratch)
8. [Final Evaluation on Holdout Test Set](#8-final-evaluation-on-holdout-test-set)
9. [Results & Visualizations](#9-results--visualizations)
10. [Conclusion](#10-conclusion)

---
## 1. Environment Setup

In [None]:
# Standard imports
import sys
import os
import warnings
warnings.filterwarnings('ignore')

# Add src to path
sys.path.insert(0, '../src')

# Data processing
import numpy as np
import pandas as pd

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style('whitegrid')
%matplotlib inline

# PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Spark MLlib
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator

# Our custom modules
from data_ingestion import create_spark_session, load_transactions, explore_data
from data_cleaning import clean_data, analyze_missing_values
from feature_engineering import engineer_features, train_test_split
from cross_validation import manual_cross_validate, create_folds, compare_models_cv
from bagging_ensemble import BaggingRegressor, bootstrap_sample
from evaluation import (
    calculate_metrics, evaluate_model, create_comparison_table,
    plot_model_comparison, plot_predictions_vs_actual, 
    plot_residuals, plot_feature_importance, plot_price_distribution
)

print("All imports successful!")

In [None]:
# Create Spark Session
spark = create_spark_session("bigboyz-dubai-real-estate")

# Verify Spark is running
print(f"Spark Version: {spark.version}")
print(f"Spark App Name: {spark.sparkContext.appName}")

---
## 2. Data Loading & Exploration

In [None]:
# Load the transactions data
DATA_PATH = '../data/Transactions.csv'

# Load with schema inference (recommended for initial exploration)
df_raw = load_transactions(spark, DATA_PATH, use_schema=False)

print(f"\nDataset loaded successfully!")
print(f"Total records: {df_raw.count():,}")
print(f"Number of columns: {len(df_raw.columns)}")

In [None]:
# Display schema
print("Dataset Schema:")
df_raw.printSchema()

In [None]:
# Show sample data
print("Sample Data (5 rows):")
df_raw.show(5, truncate=False)

In [None]:
# Summary statistics
print("Summary Statistics:")
df_raw.describe().show()

In [None]:
# Analyze missing values
print("Missing Values Analysis:")
missing_data = analyze_missing_values(df_raw)
for item in missing_data:
    if item['missing_count'] > 0:
        print(f"  {item['column']}: {item['missing_count']:,} ({item['missing_pct']:.1f}%)")

In [None]:
# Define column names based on actual dataset
PRICE_COL = 'actual_worth'  # Target variable (transaction price in AED)

# Price distribution statistics
df_raw.select(PRICE_COL).describe().show()

---
## 3. Data Cleaning

In [None]:
# Column names for cleaning
PRICE_COL = 'actual_worth'  # Target variable
TRANSACTION_TYPE_COL = 'trans_group_en'  # Transaction type column (Sales, Gifts, Mortgages, etc.)

# Run the cleaning pipeline
df_clean = clean_data(
    df_raw, 
    price_col=PRICE_COL, 
    transaction_type_col=TRANSACTION_TYPE_COL
)

In [None]:
# Verify cleaning results
print(f"\nRows after cleaning: {df_clean.count():,}")
print(f"Columns after cleaning: {len(df_clean.columns)}")
print(f"\nRemaining columns: {df_clean.columns}")

In [None]:
# Plot price distribution after cleaning
plot_price_distribution(df_clean, price_col=PRICE_COL, save_path='../outputs/figures/price_distribution.png')

---
## 4. Feature Engineering

In [None]:
# Feature columns based on actual dataset

# Categorical columns to encode (using English columns with low missing rates)
CATEGORICAL_COLS = [
    'property_type_en',      # Villa, Land, Building, Unit
    'property_usage_en',     # Residential, Commercial, etc.
    'area_name_en',          # Dubai areas (Mankhool, Al Karama, etc.)
    'nearest_metro_en',      # Metro station proximity
    'nearest_mall_en',       # Mall proximity
]

# Numeric columns to include as features
NUMERIC_COLS = [
    'procedure_area',        # Property size in sqm
    'has_parking',           # 0 or 1
]

# Date column for temporal features
DATE_COL = 'instance_date'   # Format: DD-MM-YYYY

# Target column
LABEL_COL = PRICE_COL

In [None]:
# Apply feature engineering
df_features, feature_names = engineer_features(
    df_clean,
    date_col=DATE_COL,
    categorical_cols=CATEGORICAL_COLS,
    numeric_cols=NUMERIC_COLS,
    label_col=LABEL_COL
)

print(f"\nFeature-engineered dataset:")
print(f"  Rows: {df_features.count():,}")
print(f"  Features: {len(feature_names)}")

In [None]:
# Show the feature columns
print("Feature columns:")
for i, name in enumerate(feature_names):
    print(f"  {i}: {name}")

---
## 5. Train/Test Split

In [None]:
# Perform 80/20 train/test split
# The test set will be our HOLDOUT set - never touched during training/CV

df_train, df_holdout = train_test_split(df_features, train_ratio=0.8, seed=42)

# Cache training data for efficiency
df_train.cache()
print(f"\nTraining data cached: {df_train.count():,} rows")

---
## 6. Model Training with Manual Cross-Validation

**IMPORTANT:** This section uses our custom `manual_cross_validate()` function, which implements 10-fold cross-validation from scratch WITHOUT using Spark MLlib's `CrossValidator` or `TrainValidationSplit`.

In [None]:
# Define model builder functions
# These functions return fresh (untrained) model instances

def linear_regression_builder():
    return LinearRegression(
        featuresCol='features',
        labelCol='label',
        predictionCol='prediction',
        maxIter=100,
        regParam=0.1
    )

def decision_tree_builder():
    return DecisionTreeRegressor(
        featuresCol='features',
        labelCol='label',
        predictionCol='prediction',
        maxDepth=10
    )

def random_forest_builder():
    return RandomForestRegressor(
        featuresCol='features',
        labelCol='label',
        predictionCol='prediction',
        numTrees=20,
        maxDepth=10
    )

In [None]:
# Define all models to compare
model_configs = {
    'Linear Regression': linear_regression_builder,
    'Decision Tree': decision_tree_builder,
    'Random Forest (baseline)': random_forest_builder,
}

# Run manual 10-fold CV for all baseline models
cv_results = compare_models_cv(df_train, model_configs, k=10, seed=42)

---
## 7. Bagging Ensemble (From Scratch)

**IMPORTANT:** This section uses our custom `BaggingRegressor` class, which implements bagging from scratch WITHOUT using `RandomForestRegressor`. We use `DecisionTreeRegressor` as base learners and manually implement:
- Bootstrap sampling
- Training multiple trees
- Averaging predictions

In [None]:
# Create Bagging model builder
def bagging_builder():
    return BaggingRegressor(
        n_estimators=10,
        max_depth=10,
        seed=42,
        features_col='features',
        label_col='label'
    )

# Run manual 10-fold CV for Bagging
print("Evaluating Custom Bagging Ensemble with 10-Fold CV...")
bagging_cv_results = manual_cross_validate(df_train, bagging_builder, k=10, seed=42)

In [None]:
# Add Bagging results to comparison
cv_results['Bagging (ours)'] = bagging_cv_results

In [None]:
# Display comparison table
print("\n" + "=" * 60)
print("CROSS-VALIDATION RESULTS SUMMARY")
print("=" * 60)
comparison_df = create_comparison_table(cv_results)
print(comparison_df.to_string(index=False))

In [None]:
# Plot CV results comparison
plot_model_comparison(cv_results, save_path='../outputs/figures/cv_results.png')

---
## 8. Final Evaluation on Holdout Test Set

Now we train final models on the full training set (80%) and evaluate on the holdout test set (20%) that was never seen during cross-validation.

In [None]:
# Train final models on full training set
print("Training final models on full training set...")

# Linear Regression
lr_model = linear_regression_builder().fit(df_train)
print("  Linear Regression trained.")

# Decision Tree
dt_model = decision_tree_builder().fit(df_train)
print("  Decision Tree trained.")

# Random Forest (baseline)
rf_model = random_forest_builder().fit(df_train)
print("  Random Forest trained.")

# Bagging (our custom implementation)
bagging_model = bagging_builder()
bagging_model.fit(df_train, verbose=True)

In [None]:
# Evaluate on holdout test set
print("\n" + "=" * 60)
print("FINAL EVALUATION ON HOLDOUT TEST SET")
print("=" * 60)

test_results = {}

# Linear Regression
_, lr_metrics = evaluate_model(lr_model, df_holdout, 'Linear Regression')
test_results['Linear Regression'] = lr_metrics

# Decision Tree
_, dt_metrics = evaluate_model(dt_model, df_holdout, 'Decision Tree')
test_results['Decision Tree'] = dt_metrics

# Random Forest
_, rf_metrics = evaluate_model(rf_model, df_holdout, 'Random Forest (baseline)')
test_results['Random Forest (baseline)'] = rf_metrics

# Bagging (custom)
bagging_predictions = bagging_model.predict(df_holdout)
bagging_test_metrics = calculate_metrics(bagging_predictions)
test_results['Bagging (ours)'] = bagging_test_metrics
print(f"\n{'=' * 50}")
print(f"EVALUATION RESULTS: Bagging (ours)")
print(f"{'=' * 50}")
print(f"  RMSE: {bagging_test_metrics['rmse']:,.2f}")
print(f"  MAE:  {bagging_test_metrics['mae']:,.2f}")
print(f"  R²:   {bagging_test_metrics['r2']:.4f}")

In [None]:
# Display final comparison table
print("\n" + "=" * 60)
print("FINAL TEST SET RESULTS SUMMARY")
print("=" * 60)
test_comparison_df = create_comparison_table(test_results)
print(test_comparison_df.to_string(index=False))

In [None]:
# Plot test results comparison
plot_model_comparison(test_results, save_path='../outputs/figures/model_comparison.png')

---
## 9. Results & Visualizations

In [None]:
# Plot predictions vs actual for best model (Bagging)
print("Predictions vs Actual (Bagging Ensemble):")
plot_predictions_vs_actual(bagging_predictions, sample_size=10000, 
                          save_path='../outputs/figures/predictions_vs_actual.png')

In [None]:
# Plot residuals for Bagging model
print("Residual Analysis (Bagging Ensemble):")
plot_residuals(bagging_predictions, sample_size=10000,
              save_path='../outputs/figures/residuals.png')

In [None]:
# Feature importance from Bagging ensemble
print("Feature Importance (Bagging Ensemble):")
importance = bagging_model.get_feature_importance(feature_names)
plot_feature_importance(importance, top_n=15, 
                       save_path='../outputs/figures/feature_importance.png')

---
## 10. Conclusion

### Summary of Results

| Model | CV RMSE | CV R² | Test RMSE | Test R² |
|-------|---------|-------|-----------|--------|
| Linear Regression | ... | ... | ... | ... |
| Decision Tree | ... | ... | ... | ... |
| Random Forest (baseline) | ... | ... | ... | ... |
| **Bagging (ours)** | ... | ... | ... | ... |

### Key Findings

1. **TODO**: Fill in observations about model performance
2. **TODO**: Discuss feature importance insights
3. **TODO**: Compare custom Bagging vs Random Forest baseline

### Lessons Learned

1. **Manual Cross-Validation**: Implementing 10-fold CV from scratch provided deeper understanding of...
2. **Bagging Ensemble**: Building bagging from scratch showed...
3. **Big Data Considerations**: Working with 1.5M records using Spark...

In [None]:
# Clean up
df_train.unpersist()
spark.stop()
print("Spark session stopped. Notebook complete!")