In [2]:
import pandas as pd
import numpy as np
import warnings
# We only use train_test_split from sklearn for a simple data split
from sklearn.model_selection import train_test_split 

warnings.filterwarnings('ignore')

# -----------------------------------------------------------------
# PART 1: METRIC FUNCTIONS (FROM SCRATCH)
# -----------------------------------------------------------------

def my_rmse(y_true, y_pred):
    """Calculates Root Mean Squared Error."""
    return np.sqrt(np.mean((y_true - y_pred)**2))

def my_r2_score(y_true, y_pred):
    """Calculates R-Squared (R²)."""
    # Sum of squares of residuals
    ss_res = np.sum((y_true - y_pred)**2)
    # Total sum of squares
    ss_tot = np.sum((y_true - np.mean(y_true))**2)
    
    if ss_tot == 0:
        return 1.0 # Perfect model or no variance
    
    r2 = 1 - (ss_res / ss_tot)
    return r2

# -----------------------------------------------------------------
# PART 2: PREPROCESSING CLASS (FROM SCRATCH)
# -----------------------------------------------------------------

class MyStandardScaler:
    """Implements StandardScaler from scratch."""
    def __init__(self):
        self.mean_ = None
        self.std_ = None

    def fit(self, X):
        """Calculates and stores the mean and std dev of each feature."""
        self.mean_ = np.mean(X, axis=0)
        self.std_ = np.std(X, axis=0)
        # Avoid division by zero for constant features
        self.std_[self.std_ == 0] = 1.0

    def transform(self, X):
        """Applies the standardization using the stored mean and std."""
        if self.mean_ is None or self.std_ is None:
            raise ValueError("Must call fit() before transform()")
        return (X - self.mean_) / self.std_

    def fit_transform(self, X):
        """A helper function to do both fit and transform."""
        self.fit(X)
        return self.transform(X)

# -----------------------------------------------------------------
# PART 3: MODEL CLASSES (FROM SCRATCH)
# -----------------------------------------------------------------

class MyLinearRegressionBase:
    """
    Base class for Linear Regression models, using Gradient Descent.
    This class is not meant to be used directly.
    """
    def __init__(self, learning_rate=0.01, n_iters=1000, alpha=0.1):
        self.lr = learning_rate
        self.n_iters = n_iters
        self.alpha = alpha  # Regularization parameter
        self.weights = None
        self.bias = None

    def _init_params(self, n_features):
        """Initialize weights and bias."""
        self.weights = np.zeros(n_features)
        self.bias = 0

    def predict(self, X):
        """Predict y values for new X."""
        return np.dot(X, self.weights) + self.bias

    def _base_cost_gradient(self, X, y_true, y_pred):
        """Gradients for Mean Squared Error."""
        n_samples = X.shape[0]
        # Gradient of MSE w.r.t weights (dw) and bias (db)
        dw = (2 / n_samples) * np.dot(X.T, (y_pred - y_true))
        db = (2 / n_samples) * np.sum(y_pred - y_true)
        return dw, db

    # --- Methods to be overridden by child classes ---
    def _regularization_gradient(self):
        """Placeholder for regularization penalty on gradient."""
        return 0

    def fit(self, X, y):
        """Trains the model using gradient descent."""
        n_samples, n_features = X.shape
        self._init_params(n_features)

        for _ in range(self.n_iters):
            # 1. Get current predictions
            y_pred = self.predict(X)
            
            # 2. Get base MSE gradients
            dw, db = self._base_cost_gradient(X, y, y_pred)
            
            # 3. Add regularization gradients (from child class)
            dw += self._regularization_gradient()
            
            # 4. Update weights and bias
            self.weights -= self.lr * dw
            self.bias -= self.lr * db
            
class MyRidge(MyLinearRegressionBase):
    """Implements Ridge Regression (L2 Penalty) from scratch."""
    
    def __init__(self, learning_rate=0.01, n_iters=1000, alpha=0.1):
        super().__init__(learning_rate, n_iters, alpha)
        
    def _regularization_gradient(self):
        """L2 penalty for gradient."""
        # Gradient of L2 = (alpha / N) * w
        # We add regularization to the gradient
        return (self.alpha / len(self.weights)) * self.weights

class MyLasso(MyLinearRegressionBase):
    """Implements Lasso Regression (L1 Penalty) from scratch."""
    
    def __init__(self, learning_rate=0.01, n_iters=1000, alpha=0.1):
        super().__init__(learning_rate, n_iters, alpha)
        
    def _regularization_gradient(self):
        """
        L1 penalty for gradient (sub-gradient).
        Gradient of L1 = (alpha / N) * sign(w)
        """
        return (self.alpha / len(self.weights)) * np.sign(self.weights)

# -----------------------------------------------------------------
# PART 4: HYPERPARAMETER TUNING (FROM SCRATCH)
# -----------------------------------------------------------------

def my_k_fold_indices(n_samples, k_folds):
    """
    From-scratch K-Fold index generator.
    Yields (train_indices, validation_indices) for each fold.
    """
    indices = np.arange(n_samples)
    np.random.shuffle(indices)
    fold_sizes = np.full(k_folds, n_samples // k_folds, dtype=int)
    fold_sizes[:n_samples % k_folds] += 1
    
    current = 0
    for fold_size in fold_sizes:
        start, end = current, current + fold_size
        val_idx = indices[start:end]
        train_idx = np.concatenate((indices[:start], indices[end:]))
        yield train_idx, val_idx
        current = end

def my_grid_search_cv(model_class, param_grid, X, y, cv=5, scoring='r2'):
    """
    Implements GridSearchCV from scratch.
    
    model_class: The class to instantiate (e.g., MyRidge, MyLasso)
    param_grid: A dict, e.g., {'alpha': [0.1, 1, 10]}
    X, y: *Already scaled* full training data
    """
    print(f"Running from-scratch GridSearch for {model_class.__name__}...")
    
    alphas = param_grid['alpha']
    best_score = -np.inf
    best_alpha = None
    
    # We use lower iters/lr for grid search to speed it up
    search_lr = 0.01
    search_iters = 500

    for alpha in alphas:
        fold_scores = []
        
        # We use the *already scaled* data and split it
        for train_idx, val_idx in my_k_fold_indices(len(X), cv):
            # 1. Get fold data
            X_train_fold, X_val_fold = X[train_idx], X[val_idx]
            y_train_fold, y_val_fold = y[train_idx], y[val_idx]
            
            # 2. Instantiate and train model
            model = model_class(learning_rate=search_lr, n_iters=search_iters, alpha=alpha)
            model.fit(X_train_fold, y_train_fold)
            
            # 3. Predict and score
            y_pred_fold = model.predict(X_val_fold)
            
            # 4. Reverse log-transform for scoring
            y_pred_orig = np.expm1(y_pred_fold)
            y_val_orig = np.expm1(y_val_fold)
            
            if scoring == 'r2':
                score = my_r2_score(y_val_orig, y_pred_orig)
            else: # Default to RMSE (lower is better)
                score = -my_rmse(y_val_orig, y_pred_orig) # Negate so higher is better
                
            fold_scores.append(score)
        
        # 5. Average score for this alpha
        avg_score = np.mean(fold_scores)
        
        if avg_score > best_score:
            best_score = avg_score
            best_alpha = alpha
            
    print(f"GridSearch complete. Best alpha: {best_alpha} with score: {best_score:.4f}")
    return {'alpha': best_alpha}

# -----------------------------------------------------------------
# PART 5: MAIN SCRIPT (PIPELINE)
# -----------------------------------------------------------------

# --- 1. Load and Prepare Data (Using pandas for this part) ---
try:
    df = pd.read_csv('train.csv')
except FileNotFoundError:
    print("Error: train.csv not found.")
    # In a real script, you'd exit() here
    # For this example, we'll assume df is loaded.
    pass

# Fill 'NA' = 'None' for specific columns
na_means_none_cols = [
    'Alley', 'BsmtQual', 'BsmtCond', 'BsmtExposure', 'BsmtFinType1',
    'BsmtFinType2', 'FireplaceQu', 'GarageType', 'GarageFinish',
    'GarageQual', 'GarageCond', 'PoolQC', 'Fence', 'MiscFeature'
]
for col in na_means_none_cols:
    if col in df.columns:
        df[col] = df[col].fillna('None')

X = df.drop('SalePrice', axis=1)
y = df['SalePrice']

# Split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# --- 2. Log-transform target variable (improves model stability) ---
y_train_log = np.log1p(y_train)
# y_test (original) is kept for final scoring
# y_test_log is not needed since we don't train on it

# --- 3. Preprocessing (pandas for imputation/dummies) ---
X_train['MSSubClass'] = X_train['MSSubClass'].astype(str)
X_test['MSSubClass'] = X_test['MSSubClass'].astype(str)

numeric_cols = X_train.select_dtypes(include=np.number).columns.drop('Id')
categorical_cols = X_train.select_dtypes(include='object').columns

# Imputation
for col in numeric_cols:
    if col in X_train.columns:
        median_val = X_train[col].median()
        X_train.loc[:, col] = X_train[col].fillna(median_val)
        X_test.loc[:, col] = X_test[col].fillna(median_val)
for col in categorical_cols:
    if col in X_train.columns:
        mode_val = X_train[col].mode()[0]
        X_train.loc[:, col] = X_train[col].fillna(mode_val)
        X_test.loc[:, col] = X_test[col].fillna(mode_val)

X_train_dummified = pd.get_dummies(X_train.drop('Id', axis=1, errors='ignore'), drop_first=True)
X_test_dummified = pd.get_dummies(X_test.drop('Id', axis=1, errors='ignore'), drop_first=True)
X_train_final, X_test_final = X_train_dummified.align(X_test_dummified, join='left', axis=1, fill_value=0)

feature_names = X_train_final.columns

# --- 4. Convert to NumPy arrays for our from-scratch classes ---
#
# ----- THIS IS THE FIX -----
# Force the dtype to float64 to ensure np.std and np.mean work
X_train_np = X_train_final.to_numpy(dtype=np.float64)
X_test_np = X_test_final.to_numpy(dtype=np.float64)
# -------------------------
y_train_log_np = y_train_log.to_numpy()

# --- 5. Scale Data (Using our from-scratch scaler) ---
# We fit on the training data
scaler_final = MyStandardScaler()
X_train_scaled = scaler_final.fit_transform(X_train_np)
# We transform the test data
X_test_scaled = scaler_final.transform(X_test_np)

# --- 6. Find Optimal Alphas (From Scratch) ---
alphas_to_test = [0.1, 1, 10, 100, 200] # Smaller list for speed
param_grid = {'alpha': alphas_to_test}

# Pass the *scaled* training data to the grid search
ridge_params = my_grid_search_cv(MyRidge, param_grid, X_train_scaled, y_train_log_np, cv=3)
lasso_params = my_grid_search_cv(MyLasso, param_grid, X_train_scaled, y_train_log_np, cv=3)

best_alpha_ridge = ridge_params['alpha']
best_alpha_lasso = lasso_params['alpha']

# --- 7. Train Final Models (From Scratch) ---
final_iters = 2000
final_lr = 0.01

print("\nTraining final Ridge model...")
best_ridge = MyRidge(learning_rate=final_lr, n_iters=final_iters, alpha=best_alpha_ridge)
best_ridge.fit(X_train_scaled, y_train_log_np)

print("Training final Lasso model...")
best_lasso = MyLasso(learning_rate=final_lr, n_iters=final_iters, alpha=best_alpha_lasso)
best_lasso.fit(X_train_scaled, y_train_log_np)

# Train doubled-alpha models
print("Training doubled-alpha models...")
doubled_ridge = MyRidge(learning_rate=final_lr, n_iters=final_iters, alpha=best_alpha_ridge * 2)
doubled_ridge.fit(X_train_scaled, y_train_log_np)

doubled_lasso = MyLasso(learning_rate=final_lr, n_iters=final_iters, alpha=best_alpha_lasso * 2)
doubled_lasso.fit(X_train_scaled, y_train_log_np)

# --- 8. Final Evaluation and Answers ---
print("\n" + "="*50)
print("           FINAL RESULTS (FROM SCRATCH)           ")
print("="*50 + "\n")

# --- Question 1: Optimal Alphas ---
print(f"Question 1: Optimal Alphas (from scratch GridSearch)")
print(f"  - Optimal alpha for Ridge: {best_alpha_ridge}")
print(f"  - Optimal alpha for Lasso: {best_alpha_lasso}")
print("-" * 50)

# --- Question 2: Effect of Doubling Alpha ---
print("Question 2: Effect of Doubling Alpha")

def get_from_scratch_metrics(model, X_test_scaled, y_test_original, model_name):
    # Predict on scaled test data
    y_pred_log = model.predict(X_test_scaled)
    # Reverse the log transform
    y_pred_orig = np.expm1(y_pred_log) 
    
    # Use original y_test for scoring
    r2 = my_r2_score(y_test_original, y_pred_orig)
    rmse = my_rmse(y_test_original, y_pred_orig)
    num_coefs = np.sum(model.weights != 0)
    return f"  - {model_name}:\n    R²: {r2:.4f} | RMSE: ${rmse:,.2f} | Features Used: {num_coefs}"

print("\n--- Performance on Test Set (using original SalePrice) ---")
# Pass the original y_test (not log-transformed) for the final report
print(get_from_scratch_metrics(best_ridge, X_test_scaled, y_test, f"Ridge (alpha={best_alpha_ridge})"))
print(get_from_scratch_metrics(doubled_ridge, X_test_scaled, y_test, f"Ridge (alpha={best_alpha_ridge*2})"))
print(get_from_scratch_metrics(best_lasso, X_test_scaled, y_test, f"Lasso (alpha={best_alpha_lasso})"))
print(get_from_scratch_metrics(doubled_lasso, X_test_scaled, y_test, f"Lasso (alpha={best_alpha_lasso*2})"))

print("\n  Analysis of Doubling Alpha:")
print("  - For Ridge, doubling alpha increases the penalty, shrinking all coefficients further.")
print("  - For Lasso, doubling alpha also increases the penalty, forcing more coefficients to exactly zero (see 'Features Used').")
print("-" * 50)

# --- Question 3: Significant Variables ---
print("Question 3: Significant Variables (from optimal Lasso model)")
lasso_coefs = best_lasso.weights
coef_series = pd.Series(lasso_coefs, index=feature_names)
sorted_coefs = coef_series.abs().sort_values(ascending=False)
significant_predictors = coef_series[sorted_coefs.index].loc[lambda x: x != 0]

print(f"  - Lasso (alpha={best_alpha_lasso}) selected {len(significant_predictors)} out of {len(feature_names)} features.")
print("\n  - Top 15 Most Important Predictors:")
print(significant_predictors.head(15).to_string())
print("-" * 50)

# --- Question 4: How well do they describe the price? ---
print("Question 4: How well do the variables describe the price?")
print("  This is answered by the R² and RMSE metrics on our test set.")
r2_lasso = my_r2_score(y_test, np.expm1(best_lasso.predict(X_test_scaled)))
rmse_lasso = my_rmse(y_test, np.expm1(best_lasso.predict(X_test_scaled)))
print(f"  - Our best from-scratch Lasso model explains {r2_lasso:.1%} of the variance in house prices.")
print(f"  - Its average prediction error (RMSE) is ${rmse_lasso:,.2f}.")
print("\n" + "="*50)

Running from-scratch GridSearch for MyRidge...
GridSearch complete. Best alpha: 100 with score: 0.6504
Running from-scratch GridSearch for MyLasso...
GridSearch complete. Best alpha: 10 with score: 0.6577

Training final Ridge model...
Training final Lasso model...
Training doubled-alpha models...

           FINAL RESULTS (FROM SCRATCH)           

Question 1: Optimal Alphas (from scratch GridSearch)
  - Optimal alpha for Ridge: 100
  - Optimal alpha for Lasso: 10
--------------------------------------------------
Question 2: Effect of Doubling Alpha

--- Performance on Test Set (using original SalePrice) ---
  - Ridge (alpha=100):
    R²: 0.8500 | RMSE: $32,355.73 | Features Used: 266
  - Ridge (alpha=200):
    R²: 0.8759 | RMSE: $29,424.61 | Features Used: 266
  - Lasso (alpha=10):
    R²: 0.8604 | RMSE: $31,207.11 | Features Used: 266
  - Lasso (alpha=20):
    R²: 0.8043 | RMSE: $36,951.88 | Features Used: 266

  Analysis of Doubling Alpha:
  - For Ridge, doubling alpha increases t