In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns


In [None]:
def add_intercept(X):
    """
    Return X with a leading column of ones (bias term).
    X: (n_samples, n_features) -> (n_samples, n_features + 1)
    Notes: ensure float dtype; do not modify X in-place.
    """
    new_X = np.asarray(X, dtype=float)
    ones = np.ones((new_X.shape[0], 1))
    return np.hstack((ones, new_X))


def standardize_fit(X_train, eps=1e-12):
    """
    Standardize TRAIN features to zero mean / unit variance.
    1
    Return (Xs, mean, std). Use only TRAIN stats.
    """
    mean = np.mean(X_train, axis=0)
    std = np.std(X_train, axis=0)
    Xs = (X_train - mean) / (std + eps)
    return Xs, mean, std

def standardize_apply(X, mean, std, eps=1e-12):
    """
    Apply TRAIN (mean, std) to any new matrix X.
    Do not recompute mean/std here.
    """
    return (X - mean) / (std + eps)


def train_test_split_idx(n, test_size, seed=0):
    """
    Return (train_idx, test_idx) with fixed RNG.
    You may also use sklearn’s splitter to obtain indices only.
    """
    rng = np.random.default_rng(seed)
    indices = np.arange(n)
    rng.shuffle(indices)

    test_count = int(np.floor(n * test_size))
    test_idx = indices[:test_count]
    train_idx = indices[test_count:]
    return train_idx, test_idx

In [None]:
training_data = pd.read_csv('train.csv')

print("Shape of Dataset:", training_data.shape)
print("\nColumn Types:")
print(training_data.dtypes)

print("\nMissing Values:")
print(training_data.isna().sum())

print("\nSummary Statistics:")
print(training_data.describe())

# %%
print("\nFirst 5 rows:")
training_data.head()

In [None]:
plt.figure(figsize=(10,5))
plt.hist(training_data["trip_duration"], bins=50, edgecolor='black')
plt.title("Trip Duration Distribution")
plt.xlabel("Duration (seconds)")
plt.ylabel("Frequency")
plt.show()

plt.figure(figsize=(10,5))
plt.hist(np.log1p(training_data["trip_duration"]), bins=50, edgecolor='black')
plt.title("Trip Duration Distribution (Log-Transformed)")
plt.xlabel("log(1 + duration)")
plt.ylabel("Frequency")
plt.show()


In [None]:
plt.figure(figsize=(7,6))
plt.scatter(training_data["pickup_longitude"], training_data["pickup_latitude"], s=1, alpha=0.3, color='green')
plt.scatter(training_data["dropoff_longitude"], training_data["dropoff_latitude"], s=1, alpha=0.3, color='red')
plt.title("Pickup and Dropoff Locations (Lon vs Lat)")
plt.xlabel("Longitude")
plt.ylabel("Latitude")
plt.show()


In [None]:
# %%
def haversine(lon1, lat1, lon2, lat2):
    lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2])
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = np.sin(dlat/2)**2 + np.cos(lat1)*np.cos(lat2)*np.sin(dlon/2)**2
    c = 2 * np.arcsin(np.sqrt(a))
    return 6371 * c  # km

training_data["distance_km"] = haversine(
    training_data["pickup_longitude"], training_data["pickup_latitude"],
    training_data["dropoff_longitude"], training_data["dropoff_latitude"]
)

print("\nDistance feature added.")
print(training_data["distance_km"].describe())

In [None]:
plt.figure(figsize=(8,6))
plt.scatter(training_data["distance_km"], training_data["trip_duration"], s=3, alpha=0.3)
plt.title("Distance vs Trip Duration")
plt.xlabel("Distance (km)")
plt.ylabel("Duration (sec)")
plt.show()

plt.figure(figsize=(8,6))
plt.scatter(np.log1p(training_data["distance_km"]), np.log1p(training_data["trip_duration"]), s=3, alpha=0.3)
plt.title("log(Distance) vs log(Duration)")
plt.xlabel("log(1 + distance)")
plt.ylabel("log(1 + duration)")
plt.show()


In [None]:
# %%
training_data["pickup_datetime"] = pd.to_datetime(training_data["pickup_datetime"])
training_data["hour"] = training_data["pickup_datetime"].dt.hour
training_data["day_of_week"] = training_data["pickup_datetime"].dt.dayofweek
training_data["month"] = training_data["pickup_datetime"].dt.month

print("\nTemporal features added:")
print(training_data[["hour", "day_of_week", "month"]].head())

In [None]:
plt.figure(figsize=(10,5))
sns.lineplot(data=training_data, x="hour", y="trip_duration", errorbar=None)
plt.title("Average Trip Duration vs Hour of Day")
plt.show()

plt.figure(figsize=(10,5))
sns.boxplot(data=training_data, x="day_of_week", y="trip_duration")
plt.title("Trip Duration vs Day of Week")
plt.show()

plt.figure(figsize=(10,5))
sns.boxplot(data=training_data, x="month", y="trip_duration")
plt.title("Trip Duration vs Month")
plt.show()

In [None]:
plt.figure(figsize=(8,5))
sns.boxplot(x="vendor_id", y="trip_duration", data=training_data)
plt.title("Trip Duration by Vendor")
plt.show()

plt.figure(figsize=(8,5))
sns.boxplot(x="store_and_fwd_flag", y="trip_duration", data=training_data)
plt.title("Trip Duration by store_and_fwd_flag")
plt.show()

In [None]:
plt.figure(figsize=(8,5))
sns.boxplot(x=training_data["trip_duration"])
plt.title("Trip Duration Outlier Detection")
plt.show()

plt.figure(figsize=(8,5))
sns.scatterplot(x=training_data["distance_km"], y=training_data["trip_duration"], alpha=0.3)
plt.title("Outliers in Distance vs Duration")
plt.show()


In [None]:
# %%
corr_features = training_data[[
    "trip_duration", "distance_km", "hour",
    "day_of_week", "month", "passenger_count"
]]

plt.figure(figsize=(8,6))
sns.heatmap(corr_features.corr(), annot=True, cmap="coolwarm")
plt.title("Correlation Heatmap")
plt.show()


In [None]:
taxi = training_data.copy()

taxi = taxi[(taxi["trip_duration"] <= 7200) & (taxi["distance_km"] > 0)]
valid_lat = taxi["pickup_latitude"].between(40.3, 41.2) & taxi["dropoff_latitude"].between(40.3, 41.2)
valid_lon = taxi["pickup_longitude"].between(-74.5, -72.8) & taxi["dropoff_longitude"].between(-74.5, -72.8)

taxi = taxi[valid_lat & valid_lon]

taxi["distance_km"] = haversine(
    taxi["pickup_latitude"], taxi["pickup_longitude"],
    taxi["dropoff_latitude"], taxi["dropoff_longitude"]
)

taxi = taxi[(taxi["distance_km"] > 0) &(taxi["distance_km"]<=100)]


same_point = (
    (taxi["pickup_longitude"] == taxi["dropoff_longitude"]) &
    (taxi["pickup_latitude"] == taxi["dropoff_latitude"])
)

taxi = taxi[~same_point]

taxi = taxi.reset_index(drop=True)

print("\nFinal cleaned dataset shape:", taxi.shape)
taxi.head()

In [None]:
df = taxi.copy()

df["pickup_datetime"] = pd.to_datetime(df["pickup_datetime"])

df["hour"] = df["pickup_datetime"].dt.hour
df["day_of_week"] = df["pickup_datetime"].dt.dayofweek
df["month"] = df["pickup_datetime"].dt.month

df["ch_lat"] = (df["dropoff_latitude"] - df["pickup_latitude"]).abs()
df["ch_lon"] = (df["dropoff_longitude"] - df["pickup_longitude"]).abs()

df["sq_dist"] = df["distance_km"] ** 2  
df["log_distance"] = np.log1p(df["distance_km"])
df["store_and_fwd_flag"] = df["store_and_fwd_flag"].map({'N': 0, 'Y': 1})

df.head()

In [None]:
plt.figure(figsize=(10,5))
plt.hist(df["trip_duration"], bins=50, edgecolor='black')
plt.title("Trip Duration Distribution (Raw)")
plt.xlabel("Duration (seconds)")
plt.ylabel("Frequency")
plt.show()

In [None]:
df["log_trip_duration"] = np.log1p(df["trip_duration"])

feature_cols = [
    "distance_km",
    "log_distance",
    "sq_dist",
    "hour",
    "day_of_week",
    "month",
    "ch_lat",
    "ch_lon",
    "vendor_id",
    "store_and_fwd_flag",
    "passenger_count"
]

target_col = "log_trip_duration"
print(df.columns)

X = df[feature_cols].values
y = df[target_col].values

print("Feature matrix shape:", X.shape)
print("Target vector shape:", y.shape)

### Preparing the Test Dataset

In [None]:
test_raw = pd.read_csv('test.csv')
test_df = test_raw.copy()

test_df = test_df.copy()
test_df = test_df.dropna()
valid_lat = test_df["pickup_latitude"].between(40.3, 41.2) & test_df["dropoff_latitude"].between(40.3, 41.2)
valid_lon = test_df["pickup_longitude"].between(-74.5, -72.8) & test_df["dropoff_longitude"].between(-74.5, -72.8)

test_df = test_df[valid_lat & valid_lon]

test_df["distance_km"] = haversine(
    test_df["pickup_longitude"], test_df["pickup_latitude"],
    test_df["dropoff_longitude"], test_df["dropoff_latitude"]
)

test_df = test_df[(test_df["distance_km"] > 0) &(test_df["distance_km"]<=100)]
same_point = (
    (test_df["pickup_longitude"] == test_df["dropoff_longitude"]) &
    (test_df["pickup_latitude"] == test_df["dropoff_latitude"])
)

test_df = test_df[~same_point]

test_df = test_df.reset_index(drop=True)

test_df["pickup_datetime"] = pd.to_datetime(test_df["pickup_datetime"])
test_df["hour"] = test_df["pickup_datetime"].dt.hour
test_df["day_of_week"] = test_df["pickup_datetime"].dt.dayofweek
test_df["month"] = test_df["pickup_datetime"].dt.month

test_df["ch_lat"] = (test_df["dropoff_latitude"] - test_df["pickup_latitude"]).abs()
test_df["ch_lon"] = (test_df["dropoff_longitude"] - test_df["pickup_longitude"]).abs()  

test_df["sq_dist"] = test_df["distance_km"] ** 2
test_df["log_distance"] = np.log1p(test_df["distance_km"])
test_df["store_and_fwd_flag"] = test_df["store_and_fwd_flag"].map({'N': 0, 'Y': 1}) 

test_df = test_df.reset_index(drop=True)
print(test_df.columns)
test_X = test_df[feature_cols].values

train_idx, val_idx = train_test_split_idx(len(X), test_size=0.2, seed=42)
X_train, X_val = X[train_idx], X[val_idx]
y_train, y_val = y[train_idx], y[val_idx]

print("Training set shape:", X_train.shape)
print("Validation set shape:", X_val.shape)
print("Test set shape:", test_X.shape)

In [None]:
X_train_std, mean, std = standardize_fit(X_train)
X_val_std = standardize_apply(X_val, mean, std)
test_X_std = standardize_apply(test_X, mean, std)

In [None]:
def mse(y, yhat):
    """Mean squared error: average of (y - yhat)^2 (1D arrays)."""
    diff = y - yhat
    diff_sq = diff ** 2
    return np.mean(diff_sq)

def r2_score(y, yhat):
    """1 - SS_res/SS_tot; handle constant-y edge case safely."""
    ss_res = np.sum((y - yhat) ** 2)
    ss_tot = np.sum((y - np.mean(y)) ** 2)
    if ss_tot == 0:
        return 0.0
    return 1 - (ss_res / ss_tot)

def fit_linear_closed_form(X, y, add_bias=True, rcond=None):
    """
    Closed-form least squares (MLE) using pseudoinverse or solve.
    Steps:
    * If add_bias: prepend ones.
    * Prefer pinv for numerical stability when X^T X is ill-conditioned.
    Return w (length p [+1 if bias]).
    """
    X_mat = np.asarray(X, dtype=float)
    if add_bias:
        X_mat = add_intercept(X_mat)
    y_vec = np.asarray(y, dtype=float)

    condition_number = np.linalg.cond(X_mat.T @ X_mat)
    print(f"Condition number: {condition_number:.2e}")
    if condition_number < 1e8:
        w = np.linalg.solve(X_mat.T @ X_mat, X_mat.T @ y_vec)
        return w
    else:
        w = np.linalg.pinv(X_mat, rcond=rcond) @ y_vec
        return w

def fit_linear_gd(X, y, lr=0.05, iters=5000, add_bias=True):
    """
    Batch gradient descent for least squares.
    Return (w, losses).
    """
    X_mat = np.asarray(X, dtype=float)
    if add_bias:
        X_mat = add_intercept(X_mat)
    y_vec = np.asarray(y, dtype=float)
    n_samples, n_features = X_mat.shape
    w = np.zeros(n_features)
    losses = []

    for i in range(iters):
        yhat = X_mat @ w
        loss = mse(y_vec, yhat)
        losses.append(loss)

        gradient = -2 * (X_mat.T @ (y_vec - yhat)) / n_samples
        w -= lr * gradient

    return w, np.array(losses)

def predict_linear(X, w, has_bias=True):
    """Predict with linear model; add bias column if has_bias. Return 1D yhat."""
    X_mat = np.asarray(X, dtype=float)
    if has_bias:
        X_mat = add_intercept(X_mat)
    w_vec = np.asarray(w, dtype=float)
    return X_mat @ w_vec


In [None]:
w_closed = fit_linear_closed_form(X_train_std, y_train, add_bias=True)

y_train_pred = predict_linear(X_train_std, w_closed, has_bias=True)
y_val_pred = predict_linear(X_val_std, w_closed, has_bias=True)

print("\nClosed-Form Linear Regression Results:")
print("Training MSE:", mse(y_train, y_train_pred))
print("Validation MSE:", mse(y_val, y_val_pred))

print("Training R²:", r2_score(y_train, y_train_pred))
print("Validation R²:", r2_score(y_val, y_val_pred))

In [None]:
# %%
# Predict on test data
test_pred_log = predict_linear(test_X_std, w_closed, has_bias=True)

# Convert back from log scale
test_pred_duration = np.expm1(test_pred_log)

test_pred_duration[:10]


In [None]:
# %%
def polynomial_features_degree2(X):
    """
    Generate polynomial features (degree 2) from matrix X.
    Includes:
      - all original features
      - all squared terms
      - all pairwise interaction terms
    Input:
        X : numpy array (n_samples, n_features)
    Output:
        X_poly : array (n_samples, n_output_features)
    """
    X = np.asarray(X, dtype=float)
    n_samples, n_features = X.shape
    poly_list = []

    # 1. original features
    poly_list.append(X)

    # 2. squared terms
    poly_list.append(X ** 2)

    # 3. interaction terms x_i * x_j (i < j)
    interaction_terms = []
    for i in range(n_features):
        for j in range(i + 1, n_features):
            interaction_terms.append((X[:, i] * X[:, j]).reshape(-1, 1))

    if interaction_terms:
        poly_list.append(np.hstack(interaction_terms))

    # Final polynomial matrix
    X_poly = np.hstack(poly_list)
    return X_poly

def polynomial_features(X, degree):
    """
    Generate polynomial features up to a given degree.
    Includes ONLY monomials (x_i, x_i^2, ..., x_i^degree)
    for stability and speed.
    
    X: (n_samples, n_features)
    degree: integer >= 1
    """
    X = np.asarray(X, dtype=float)
    n_samples, n_features = X.shape
    
    poly_list = [X]  # degree 1
    
    # degrees 2..d (monomials only)
    for deg in range(2, degree + 1):
        poly_list.append(X ** deg)
        
    return np.hstack(poly_list)



In [None]:
def evaluate_poly_degree(X_train, y_train, X_val, y_val, degree):
    # 1. Build polynomial features
    X_train_poly = polynomial_features_degree2(X_train) if degree == 2 else polynomial_features(X_train, degree)
    X_val_poly   = polynomial_features_degree2(X_val) if degree == 2 else polynomial_features(X_val, degree)
    
    # 2. Standardize
    X_train_poly_std, mean_p, std_p = standardize_fit(X_train_poly)
    X_val_poly_std   = standardize_apply(X_val_poly, mean_p, std_p)
    
    # 3. Fit closed-form linear regression
    w = fit_linear_closed_form(X_train_poly_std, y_train, add_bias=True)
    
    # 4. Predictions
    y_train_pred = predict_linear(X_train_poly_std, w)
    y_val_pred   = predict_linear(X_val_poly_std, w)
    
    # 5. Metrics
    train_mse = mse(y_train, y_train_pred)
    val_mse   = mse(y_val, y_val_pred)
    train_r2  = r2_score(y_train, y_train_pred)
    val_r2    = r2_score(y_val, y_val_pred)
    
    return {
        "degree": degree,
        "w": w,
        "train_mse": train_mse,
        "val_mse": val_mse,
        "train_r2": train_r2,
        "val_r2": val_r2
    }


In [None]:
print("\nEvaluating polynomial degrees 1 to 3...")
results = []

for deg in [1, 2, 3]:
    res = evaluate_poly_degree(X_train_std, y_train, X_val_std, y_val, degree=deg)
    results.append(res)
    print(f"Degree {deg}: Train MSE={res['train_mse']:.4f}, Val MSE={res['val_mse']:.4f}, Train R²={res['train_r2']:.4f}, Val R²={res['val_r2']:.4f}")

In [None]:
def fit_ridge_closed_form(X, y, lam, add_bias=True):
    """
    Closed-form ridge regression.
    Does NOT regularize intercept.
    """
    X = np.asarray(X, float)
    y = np.asarray(y, float)
    
    if add_bias:
        X = add_intercept(X)
    
    n_features = X.shape[1]
    
    # Regularization matrix
    I = np.eye(n_features)
    I[0, 0] = 0.0   # do not regularize bias
    
    # Closed form ridge solution
    A = X.T @ X + lam * I
    b = X.T @ y
    
    w = np.linalg.solve(A, b)
    return w


In [None]:
def fit_ridge_gd(X, y, lam, lr=0.01, iters=3000, add_bias=True):
    X = np.asarray(X, float)
    y = np.asarray(y, float)
    
    if add_bias:
        X = add_intercept(X)
        
    n_samples, n_features = X.shape
    w = np.zeros(n_features)
    
    losses = []
    
    for _ in range(iters):
        yhat = X @ w
        error = yhat - y
        
        grad = (X.T @ error) / n_samples
        grad[1:] += lam * w[1:] / n_samples   # no penalty on bias
        
        w -= lr * grad
        
        losses.append(mse(y, X @ w))
    
    return w, losses


In [None]:
def soft_threshold(z, lam):
    if z > lam:
        return z - lam
    elif z < -lam:
        return z + lam
    else:
        return 0.0


In [None]:
def fit_lasso_cd(X, y, lam, iters=100):
    """
    Lasso Regression using coordinate descent.
    Does NOT regularize intercept.
    """
    X = np.asarray(X, float)
    y = np.asarray(y, float)
    
    X = add_intercept(X)
    n_samples, n_features = X.shape
    
    w = np.zeros(n_features)
    
    for _ in range(iters):
        # update intercept separately (no regularization)
        y_pred = X @ w
        r = y - y_pred
        w[0] += np.mean(r)
        
        # update each feature weight
        for j in range(1, n_features):
            # compute partial residual
            y_pred = X @ w
            r_j = y - (y_pred - X[:, j] * w[j])
            
            # update using soft thresholding
            rho = np.dot(X[:, j], r_j)
            z = np.dot(X[:, j], X[:, j])
            
            w[j] = soft_threshold(rho / z, lam / z)
    
    return w


In [None]:
def evaluate_model(X_train, y_train, X_val, y_val, degree, model_type, lam=0.0):
    
    # 1. Polynomial features
    X_train_poly = polynomial_features_degree2(X_train) if degree == 2 else polynomial_features(X_train, degree)
    X_val_poly   = polynomial_features_degree2(X_val) if degree == 2 else polynomial_features(X_val, degree)
    
    # 2. Standardize
    X_train_std, mean_, std_ = standardize_fit(X_train_poly)
    X_val_std = standardize_apply(X_val_poly, mean_, std_)
    
    # 3. Fit model
    if model_type == "ridge":
        w = fit_ridge_closed_form(X_train_std, y_train, lam, add_bias=True)
        
    elif model_type == "lasso":
        w = fit_lasso_cd(X_train_std, y_train, lam, iters=50)
        
    elif model_type == "linear":
        w = fit_linear_closed_form(X_train_std, y_train, add_bias=True)
    
    else:
        raise ValueError("Invalid model type.")
    
    # 4. Predictions
    y_train_pred = predict_linear(X_train_std, w)
    y_val_pred   = predict_linear(X_val_std, w)
    
    return {
        "degree": degree,
        "lambda": lam,
        "model": model_type,
        "w": w,
        "train_mse": mse(y_train, y_train_pred),
        "val_mse": mse(y_val,  y_val_pred),
        "train_r2": r2_score(y_train, y_train_pred),
        "val_r2": r2_score(y_val,  y_val_pred)
    }


In [None]:
def kfold_split(X, y, k=5, seed=42):
    rng = np.random.default_rng(seed)
    n = len(X)
    indices = np.arange(n)
    rng.shuffle(indices)

    fold_sizes = (n // k) * np.ones(k, dtype=int)
    fold_sizes[: n % k] += 1

    folds = []
    start = 0
    for size in fold_sizes:
        end = start + size
        folds.append(indices[start:end])
        start = end

    return folds

def cross_val_evaluate(X, y, degree, model_type, lam=0.0, k=5):
    folds = kfold_split(X, y, k=k)

    mse_scores = []
    r2_scores = []

    for i in range(k):
        val_idx = folds[i]
        train_idx = np.hstack([folds[j] for j in range(k) if j != i])

        X_train_cv, y_train_cv = X[train_idx], y[train_idx]
        X_val_cv, y_val_cv = X[val_idx], y[val_idx]

        # Polynomial expansion
        X_train_poly = polynomial_features(X_train_cv, degree)
        X_val_poly = polynomial_features(X_val_cv, degree)

        # Standardization
        X_train_std, mean_, std_ = standardize_fit(X_train_poly)
        X_val_std = standardize_apply(X_val_poly, mean_, std_)

        # Fit model
        if model_type == "linear":
            w = fit_linear_closed_form(X_train_std, y_train_cv, add_bias=True)

        elif model_type == "ridge":
            w = fit_ridge_closed_form(X_train_std, y_train_cv, lam, add_bias=True)

        elif model_type == "lasso":
            w = fit_lasso_cd(X_train_std, y_train_cv, lam, iters=50)

        # Predict
        y_pred = predict_linear(X_val_std, w)

        # Store metrics
        mse_scores.append(mse(y_val_cv, y_pred))
        r2_scores.append(r2_score(y_val_cv, y_pred))

    return {
        "degree": degree,
        "lambda": lam,
        "model": model_type,
        "cv_mse": np.mean(mse_scores),
        "cv_r2": np.mean(r2_scores)
    }


In [None]:
degrees = [1, 2, 3]
lambdas = [0.01, 0.1, 1, 10]

results = []

for d in degrees:
    for lam in lambdas:
        print(f"CV Ridge: degree={d}, lambda={lam}")
        res = cross_val_evaluate(X_train, y_train, degree=d, model_type="ridge", lam=lam, k=5)
        results.append(res)

for d in degrees:
    for lam in lambdas:
        print(f"CV Lasso: degree={d}, lambda={lam}")
        res = cross_val_evaluate(X_train, y_train, degree=d, model_type="lasso", lam=lam, k=5)
        results.append(res)

for d in degrees:
    print(f"CV Linear: degree={d}")
    res = cross_val_evaluate(X_train, y_train, degree=d, model_type="linear", lam=0.0, k=5)
    results.append(res)


In [None]:
best_model = min(results, key=lambda r: r["cv_mse"])

print("\n======= BEST MODEL (5-fold CV) =======")
print(f"Model  : {best_model['model']}")
print(f"Degree : {best_model['degree']}")
print(f"Lambda : {best_model['lambda']}")
print(f"CV MSE : {best_model['cv_mse']:.4f}")
print(f"CV R²  : {best_model['cv_r2']:.4f}")


In [None]:
best_degree = 3
best_lam = 10
best_type = "ridge"

# Build polynomial features
X_train_best_poly = polynomial_features(X_train, best_degree)
X_test_best_poly  = polynomial_features(test_X, best_degree)

# Standardize
X_train_best_std, mean_b, std_b = standardize_fit(X_train_best_poly)
X_test_best_std = standardize_apply(X_test_best_poly, mean_b, std_b)

# Fit final model
if best_type == "linear":
    w_best = fit_linear_closed_form(X_train_best_std, y_train, add_bias=True)

elif best_type == "ridge":
    w_best = fit_ridge_closed_form(X_train_best_std, y_train, best_lam, add_bias=True)

elif best_type == "lasso":
    w_best = fit_lasso_cd(X_train_best_std, y_train, best_lam, iters=100)

print("Final model trained.")


In [None]:
test_pred_log = predict_linear(X_test_best_std, w_best)
test_pred = np.expm1(test_pred_log)


In [None]:
X_train_poly = polynomial_features(X_train, best_degree)
X_val_poly   = polynomial_features(X_val, best_degree)

X_train_std, mean_best, std_best = standardize_fit(X_train_poly)
X_val_std = standardize_apply(X_val_poly, mean_best, std_best)

y_train_reg = y_train
y_val_reg   = y_val


In [None]:
from sklearn.ensemble import RandomForestRegressor
from xgboost import XGBRegressor


In [None]:
rf = RandomForestRegressor(
    n_estimators=150,
    max_depth=None,
    random_state=42,
    n_jobs=-1
)
rf.fit(X_train_std, y_train_reg)

rf_train_pred = rf.predict(X_train_std)
rf_val_pred   = rf.predict(X_val_std)

rf_results = {
    "model": "Random Forest",
    "train_mse": mse(y_train_reg, rf_train_pred),
    "val_mse": mse(y_val_reg, rf_val_pred),
    "train_r2": r2_score(y_train_reg, rf_train_pred),
    "val_r2": r2_score(y_val_reg, rf_val_pred)
}


In [None]:
xgb = XGBRegressor(
    n_estimators=300,
    max_depth=6,
    learning_rate=0.08,
    subsample=0.9,
    colsample_bytree=0.8,
    random_state=42,
    tree_method="hist"
)
xgb.fit(X_train_std, y_train_reg)

xgb_train_pred = xgb.predict(X_train_std)
xgb_val_pred   = xgb.predict(X_val_std)

xgb_results = {
    "model": "XGBoost",
    "train_mse": mse(y_train_reg, xgb_train_pred),
    "val_mse": mse(y_val_reg, xgb_val_pred),
    "train_r2": r2_score(y_train_reg, xgb_train_pred),
    "val_r2": r2_score(y_val_reg, xgb_val_pred)
}


In [None]:
y_train_pred_best = predict_linear(X_train_std, w_best)
y_val_pred_best   = predict_linear(X_val_std, w_best)

best_custom_results = {
    "model": f"Custom {best_type} (degree={best_degree}, λ={best_lam})",
    "train_mse": mse(y_train_reg, y_train_pred_best),
    "val_mse": mse(y_val_reg,  y_val_pred_best),
    "train_r2": r2_score(y_train_reg, y_train_pred_best),
    "val_r2": r2_score(y_val_reg,  y_val_pred_best)
}


In [None]:
import pandas as pd

comparison_df = pd.DataFrame([
    best_custom_results,
    rf_results,
    xgb_results
])

print("\n======= Model Comparison =======")
print(comparison_df)
