# Ames Housing price prediction (OLS, Ridge, LASSO)

Pipeline with imputation, scaling, one-hot encoding, and 10-fold CV comparing OLS, Ridge, and LASSO on log-sale-price. Results are printed in-place for easy reuse.

In [None]:
import os
import tempfile
import warnings
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LinearRegression, RidgeCV, LassoCV
from sklearn.model_selection import cross_validate
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler

warnings.filterwarnings("ignore")


def ensure_temp_dir():
    """Find a writable temp directory for joblib; fallback to ./tmp if needed."""
    candidates = [tempfile.gettempdir(), os.path.join(os.getcwd(), "tmp")]
    for cand in candidates:
        try:
            os.makedirs(cand, exist_ok=True)
            test_path = os.path.join(cand, "sk_tmp_test.txt")
            with open(test_path, "w", encoding="utf-8") as f:
                f.write("ok")
            os.remove(test_path)
            return cand
        except OSError:
            continue
    return None


tmp_dir = ensure_temp_dir()
if tmp_dir:
    os.environ["TMP"] = os.environ["TEMP"] = os.environ["TMPDIR"] = tmp_dir
    os.environ["JOBLIB_TEMP_FOLDER"] = tmp_dir
    print(f"Using temp dir: {tmp_dir}")
else:
    print("Warning: no writable temp dir found; joblib will use defaults.")

In [None]:
DATA_PATH = "AmesHousing.csv"

df = pd.read_csv(DATA_PATH).rename(columns=str.strip)
y = np.log(df["SalePrice"])
X = df.drop(columns=["SalePrice"])

print(f"Loaded data: {df.shape[0]} rows, {df.shape[1]} columns")

In [None]:
# Preprocessing: impute, scale numeric; impute, one-hot encode categorical
num_cols = X.select_dtypes(include=["number"]).columns
cat_cols = X.columns.difference(num_cols)

num_pipe = Pipeline([
    ("imputer", SimpleImputer(strategy="median")),
    ("scaler", StandardScaler()),
])
cat_pipe = Pipeline([
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("onehot", OneHotEncoder(handle_unknown="ignore")),
])

preprocess = ColumnTransformer([
    ("num", num_pipe, num_cols),
    ("cat", cat_pipe, cat_cols),
])

In [None]:
# Compare OLS, Ridge, LASSO via 10-fold CV on log-price
models = {
    "ols": LinearRegression(),
    "ridge": RidgeCV(alphas=np.logspace(-3, 3, 25), cv=10),
    "lasso": LassoCV(alphas=np.logspace(-3, 1, 25), cv=10, max_iter=5000, n_jobs=-1),
}

results = {}
for name, model in models.items():
    pipe = Pipeline([
        ("preprocess", preprocess),
        ("model", model),
    ])
    cv = cross_validate(
        pipe,
        X,
        y,
        cv=10,
        scoring=["neg_root_mean_squared_error", "r2"],
        n_jobs=-1,
    )
    results[name] = {
        "rmse_mean": float(-cv["test_neg_root_mean_squared_error"].mean()),
        "rmse_std": float(cv["test_neg_root_mean_squared_error"].std()),
        "r2_mean": float(cv["test_r2"].mean()),
        "r2_std": float(cv["test_r2"].std()),
    }
    print(
        f"{name}: RMSE={results[name]['rmse_mean']:.3f}±{results[name]['rmse_std']:.3f}, "
        f"R2={results[name]['r2_mean']:.3f}±{results[name]['r2_std']:.3f}"
    )

best_name, best_metrics = min(results.items(), key=lambda kv: kv[1]["rmse_mean"])
print(f"\nBest model by CV RMSE: {best_name} (RMSE={best_metrics['rmse_mean']:.3f}, R2={best_metrics['r2_mean']:.3f})")

In [None]:
# Fit the best model on the full dataset and inspect key coefficients
def make_model(name: str):
    if name == "ridge":
        return RidgeCV(alphas=np.logspace(-3, 3, 25), cv=10)
    if name == "lasso":
        return LassoCV(alphas=np.logspace(-3, 1, 25), cv=10, max_iter=5000, n_jobs=-1)
    return LinearRegression()


final_model = make_model(best_name)
final_pipe = Pipeline([
    ("preprocess", preprocess),
    ("model", final_model),
])
final_pipe.fit(X, y)

# Extract feature names after preprocessing
feature_names = final_pipe.named_steps["preprocess"].get_feature_names_out()
coef = final_pipe.named_steps["model"].coef_

coef_frame = (
    pd.DataFrame({"feature": feature_names, "coef": coef})
    .assign(abs_coef=lambda d: d.coef.abs())
    .sort_values("abs_coef", ascending=False)
)

print(f"Model type: {best_name}")
if hasattr(final_model, "alpha_"):
    print(f"Chosen alpha: {final_model.alpha_:.4f}")

display(coef_frame.head(15))

# Helper to predict price on the original scale
def predict_price(pipe, X_new: pd.DataFrame):
    log_pred = pipe.predict(X_new)
    return np.exp(log_pred)

print("Top 15 coefficients shown; use `predict_price(final_pipe, new_data)` for new predictions.")