In [4]:
import os
import json
import re
import pandas as pd
import numpy as np
from typing import Tuple, Dict, Any
from sklearn.model_selection import train_test_split
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from xgboost import XGBRegressor
import joblib
from sklearn.impute import SimpleImputer

DATA_PATH = "lagos-rent.csv"
MODEL_DIR = "models"
ARTIFACTS_DIR = "artifacts"
MODEL_PATH = os.path.join(MODEL_DIR, "best_model.joblib")
METRICS_PATH = os.path.join(ARTIFACTS_DIR, "metrics.json")

PROPERTY_TYPES = [
    "duplex", "semi-detached", "semi detached", "detached", "apartment", "flat",
    "bungalow", "terrace", "terraced", "mansion", "studio", "penthouse", "mini flat",
    "self contain", "self-contained", "maisonette", "terrace duplex", "terraced duplex"
]

def ensure_dirs():
    os.makedirs(MODEL_DIR, exist_ok=True)
    os.makedirs(ARTIFACTS_DIR, exist_ok=True)

def parse_price_ngn(text: str) -> float | None:
    if not isinstance(text, str) or not text.strip():
        return None
    # Extract first numeric block; ignore any "/year" etc.
    m = re.search(r"([\d,]+)", text)
    if not m:
        return None
    try:
        return float(m.group(1).replace(",", ""))
    except Exception:
        return None

def extract_price_period(text: str) -> str | None:
    if not isinstance(text, str):
        return None
    t = text.lower()
    if "/year" in t or "per year" in t or "/ annum" in t or "/annum" in t:
        return "per_year"
    if "/month" in t or "per month" in t:
        return "per_month"
    if "/day" in t or "per day" in t:
        return "per_day"
    if "/sqm" in t or "per sqm" in t:
        return "per_sqm"
    return "unspecified"

def parse_count(text: str) -> float | None:
    if not isinstance(text, str):
        return None
    m = re.search(r"(\d+)", text)
    return float(m.group(1)) if m else None

def extract_property_type(title: str, more_info: str) -> str:
    blob = f"{title or ''} {more_info or ''}".lower()
    for t in PROPERTY_TYPES:
        if t in blob:
            # Normalize
            return t.replace("-", " ").title()
    # Fallback for common phrases
    if "house" in blob:
        return "House"
    return "Other"

def load_and_clean(path: str) -> pd.DataFrame:
    df = pd.read_csv(path)
    # Standardize column names (strip spaces)
    df.columns = [c.strip() for c in df.columns]

    # Parse numeric targets and features
    df["price_ngn"] = df["Price"].apply(parse_price_ngn)
    df["price_period"] = df["Price"].apply(extract_price_period)

    df["bedrooms"] = df["Bedrooms"].apply(parse_count)
    df["bathrooms"] = df["Bathrooms"].apply(parse_count)
    df["toilets"] = df["Toilets"].apply(parse_count)

    # Boolean flags (already 0/1 in sample, but coerce safely)
    for col in ["Serviced", "Newly Built", "Furnished"]:
        if col in df.columns:
            df[col] = df[col].apply(lambda x: 1 if str(x).strip() in ["1", "True", "true"] else 0)

    # Property type from Title/More Info
    df["property_type"] = df.apply(lambda r: extract_property_type(r.get("Title", ""), r.get("More Info", "")), axis=1)

    # Basic location normalization
    if "City" not in df.columns:
        df["City"] = np.nan
    if "Neighborhood" not in df.columns:
        df["Neighborhood"] = np.nan

    # Drop rows with missing essential values
    df = df.dropna(subset=["price_ngn", "bedrooms", "bathrooms"])
    # Coerce types
    for c in ["bedrooms", "bathrooms", "toilets"]:
        df[c] = pd.to_numeric(df[c], errors="coerce")

    # Keep a clean working set of columns
    keep_cols = [
        "price_ngn", "bedrooms", "bathrooms", "toilets",
        "Serviced", "Newly Built", "Furnished",
        "property_type", "City", "Neighborhood"
    ]
    # In case some flags are missing in the CSV, add defaults
    for flag in ["Serviced", "Newly Built", "Furnished"]:
        if flag not in df.columns:
            df[flag] = 0

    return df[keep_cols].copy()

def split(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]:
    X = df.drop(columns=["price_ngn"])
    y = df["price_ngn"]
    return train_test_split(X, y, test_size=0.2, random_state=42)

def build_preprocessor(num_cols, cat_cols) -> ColumnTransformer:
    numeric_transformer = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="median")),
    ])
    categorical_transformer = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="constant", fill_value="Unknown")),
        ("ohe", OneHotEncoder(handle_unknown="ignore")),
    ])
    return ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, num_cols),
            ("cat", categorical_transformer, cat_cols),
        ],
        remainder="drop"
    )

def evaluate(y_true, y_pred) -> Dict[str, float]:
    rmse = float(np.sqrt(mean_squared_error(y_true, y_pred)))
    mae = float(mean_absolute_error(y_true, y_pred))
    r2 = float(r2_score(y_true, y_pred))
    return {
        "rmse": rmse,
        "mae": mae,
        "r2": r2,
    }

def main():
    ensure_dirs()
    df = load_and_clean(DATA_PATH)
    if df.empty:
        raise SystemExit("No usable rows after cleaning. Check your CSV contents.")
    print(f"Rows after cleaning: {len(df)}")

    X_train, X_test, y_train, y_test = split(df)

    num_cols = ["bedrooms", "bathrooms", "toilets", "Serviced", "Newly Built", "Furnished"]
    cat_cols = ["property_type", "City", "Neighborhood"]
    preprocessor = build_preprocessor(num_cols, cat_cols)

    models: Dict[str, Any] = {
        "LinearRegression": LinearRegression(),
        "RandomForest": RandomForestRegressor(
            n_estimators=300, max_depth=None, random_state=42, n_jobs=-1
        ),
        "XGBoost": XGBRegressor(
            n_estimators=600,
            max_depth=8,
            learning_rate=0.05,
            subsample=0.8,
            colsample_bytree=0.8,
            random_state=42,
            n_jobs=-1,
        ),
    }

    results = {}
    fitted_pipes = {}

    for name, reg in models.items():
        pipe = Pipeline(steps=[("prep", preprocessor), ("reg", reg)])
        pipe.fit(X_train, y_train)
        y_pred = pipe.predict(X_test)
        metrics = evaluate(y_test, y_pred)
        results[name] = metrics
        fitted_pipes[name] = pipe
        print(f"{name} -> RMSE: {metrics['rmse']:.2f}, MAE: {metrics['mae']:.2f}, R2: {metrics['r2']:.4f}")

    # Pick best by RMSE
    best_name = min(results.keys(), key=lambda k: results[k]["rmse"])
    best_pipe = fitted_pipes[best_name]
    print(f"Best model: {best_name}")

    # Save model and metrics
    joblib.dump(best_pipe, MODEL_PATH)
    with open(METRICS_PATH, "w") as f:
        json.dump({"results": results, "best": best_name}, f, indent=2)

    print(f"Saved model to {MODEL_PATH}")
    print(f"Saved metrics to {METRICS_PATH}")

if __name__ == "__main__":
    main()

Rows after cleaning: 51252
LinearRegression -> RMSE: 834711571.43, MAE: 148218229.56, R2: -53.4650


RandomForest -> RMSE: 205225497.43, MAE: 15175519.25, R2: -2.2924
XGBoost -> RMSE: 2513477891.14, MAE: 49737936.92, R2: -492.8495
Best model: RandomForest
XGBoost -> RMSE: 2513477891.14, MAE: 49737936.92, R2: -492.8495
Best model: RandomForest
Saved model to models/best_model.joblib
Saved metrics to artifacts/metrics.json
Saved model to models/best_model.joblib
Saved metrics to artifacts/metrics.json
