In [1]:
# neural_from_scratch_experiments.py
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from collections import defaultdict
import time
import os
from sklearn.preprocessing import OneHotEncoder, StandardScaler

# ----------------------------
# Config
# ----------------------------
CSV_PATH = "Housing.csv"   # ubah ke path file jika perlu
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)

# NN hyperparams (default, bisa diubah)
INPUT_NEURONS = None   # diisi setelah preprocessing (fitur)
HIDDEN_NEURONS = 64
OUTPUT_NEURONS = 1
LEARNING_RATE = 0.01
MAX_EPOCHS = 100       # batasi supaya cepat
MINIBATCH_SIZE = 32


In [2]:
# ----------------------------
# Utility metrics
# ----------------------------
def mse_loss(y_pred, y_true):
    # y_pred, y_true shape (n_samples, 1)
    return np.mean((y_pred - y_true) ** 2)

def mae_loss(y_pred, y_true):
    return np.mean(np.abs(y_pred - y_true))

def mse_grad(y_pred, y_true):
    # derivative per sample: 2*(y_pred - y_true) / n
    n = y_true.shape[0]
    return (2.0 / n) * (y_pred - y_true)

def mae_grad(y_pred, y_true, eps=1e-8):
    # derivative of |e| is sign(e). To avoid nan at zero, add eps
    n = y_true.shape[0]
    diff = y_pred - y_true
    grad = np.sign(diff)
    # if diff == 0, sign gives 0 — it's fine. normalize by n
    return (1.0 / n) * grad

In [None]:
# ----------------------------
# Neural network (from scratch)
# ----------------------------
class SimpleMLP:
    def __init__(self, input_dim, hidden_dim=64, output_dim=1, init_scale=0.01, seed=None):
        if seed is not None:
            np.random.seed(seed)
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim

        # weights: random normal (user requested random normal)
        # W1 shape (hidden_dim, input_dim)
        #(baris, kolom)
        self.W1 = np.random.randn(self.hidden_dim, self.input_dim) * init_scale
        # b1 shape (hidden_dim, 1)
        self.b1 = np.zeros((self.hidden_dim, 1))

        # W2 shape (output_dim, hidden_dim)
        self.W2 = np.random.randn(self.output_dim, self.hidden_dim) * init_scale
        # b2 shape (output_dim, 1)
        self.b2 = np.zeros((self.output_dim, 1))

    # activation: ReLU & its derivative
    @staticmethod
    def relu(z):
        return np.maximum(0, z)

    @staticmethod
    def relu_grad(z):
        return (z > 0).astype(float)

    # forward pass: x shape (n_samples, input_dim)
    def forward(self, x):
        self.x = x.T

        self.z1 = self.W1.dot(self.x) + self.b1 #sum dari masing masing neuron hidden layer, sum h1,sum h2...
        self.a1 = self.relu(self.z1)   # output dari masing masing hidden neuron, ex: output h1,output h2...

        self.z2 = self.W2.dot(self.a1) + self.b2 #sum dari masing masing neuron output layer, sum 01,sum 02...
        self.a2 = self.z2.T # output dari masing masing output neuron, ex: output 01,output 02...
        # we'll return (n, output_dim) as predictions (linear output)

        return self.a2 #a2 ini udah hasil prediksi

    # backward: compute gradients given y_true and loss type
    # y_true shape (n_samples, output_dim)
    def backward(self, y_true, loss_type="mse"):
        n = y_true.shape[0]
        # predictions shape (n, output_dim)
        y_pred = self.a2  # (n, output_dim)
        # Convert to shape (output_dim, n)
        y_pred_T = y_pred.T  # (output_dim, n)
        y_true_T = y_true.T  # (output_dim, n)

        # compute dL/dy_pred / error di output
        if loss_type == "mse":
            dL_da2 = (2.0 / n) * (y_pred_T - y_true_T)
        elif loss_type == "mae":
            dL_da2 = (1.0 / n) * np.sign(y_pred_T - y_true_T)
        else:
            raise ValueError("Unknown loss_type")

        # For linear output, derivative of z2 is 1: dL/dz2 = dL/da2
        dZ2 = dL_da2   # shape (output_dim, n)

        # gradients for W2 and b2
        dW2 = dZ2.dot(self.a1.T)   # (output_dim, n) @ (n, hidden_dim) -> (output_dim, hidden_dim)
        db2 = np.sum(dZ2, axis=1, keepdims=True)  # (output_dim, 1)

        # propagate to hidden: dA1 = W2^T @ dZ2
        dA1 = self.W2.T.dot(dZ2)   # (hidden_dim, n)    

        # dZ1 = dA1 * relu_grad(z1)
        dZ1 = dA1 * self.relu_grad(self.z1)   # (hidden_dim, n)

        # gradients for W1 and b1
        dW1 = dZ1.dot(self.x.T)  # (hidden_dim, n) @ (n, input_dim) -> (hidden_dim, input_dim)
        db1 = np.sum(dZ1, axis=1, keepdims=True)  # (hidden_dim, 1)

        # store grads
        grads = {"dW1": dW1, "db1": db1, "dW2": dW2, "db2": db2}
        return grads

    # update weights with grads
    def apply_grads(self, grads, lr=0.01):
        self.W1 -= lr * grads["dW1"]
        self.b1 -= lr * grads["db1"]
        self.W2 -= lr * grads["dW2"]
        self.b2 -= lr * grads["db2"]

    # predict wrapper (array shape (n_samples, output_dim))
    def predict(self, X):
        return self.forward(X)


In [4]:
# ----------------------------
# Training function supporting batch/stochastic/minibatch
# ----------------------------
def train_model(model, X_train, y_train, X_val, y_val,
                loss_type="mse",
                mode="batch",
                lr=0.01,
                max_epochs=100,
                minibatch_size=32,
                verbose=False):
    n_samples = X_train.shape[0]
    history = defaultdict(list)

    for epoch in range(1, max_epochs + 1):
        # For stochastic and minibatch, shuffle each epoch
        perm = np.arange(n_samples)
        if mode in ("stochastic", "minibatch"):
            np.random.shuffle(perm)

        if mode == "batch":
            # forward on whole train set
            preds = model.forward(X_train)   # (n, 1)
            # compute grads from entire batch
            grads = model.backward(y_train, loss_type=loss_type)
            model.apply_grads(grads, lr=lr)

            # compute train loss
            if loss_type == "mse":
                train_loss = mse_loss(preds, y_train)
            else:
                train_loss = mae_loss(preds, y_train)

        elif mode == "stochastic":
            # update per sample
            total_loss = 0.0
            for i in perm:
                x_i = X_train[i:i+1]    # shape (1, d)
                y_i = y_train[i:i+1]    # shape (1, 1)
                preds_i = model.forward(x_i)
                grads = model.backward(y_i, loss_type=loss_type)
                model.apply_grads(grads, lr=lr)
                # accumulate loss
                if loss_type == "mse":
                    total_loss += mse_loss(preds_i, y_i)
                else:
                    total_loss += mae_loss(preds_i, y_i)
            train_loss = total_loss / n_samples

        elif mode == "minibatch":
            total_loss = 0.0
            for start in range(0, n_samples, minibatch_size):
                idx = perm[start:start+minibatch_size]
                x_b = X_train[idx]
                y_b = y_train[idx]
                preds_b = model.forward(x_b)
                grads = model.backward(y_b, loss_type=loss_type)
                model.apply_grads(grads, lr=lr)
                if loss_type == "mse":
                    total_loss += mse_loss(preds_b, y_b) * len(idx)
                else:
                    total_loss += mae_loss(preds_b, y_b) * len(idx)
            train_loss = total_loss / n_samples
        else:
            raise ValueError("Unknown training mode")

        # validation loss (MSE and MAE for monitoring)
        val_pred = model.predict(X_val)
        val_mse = mse_loss(val_pred, y_val)
        val_mae = mae_loss(val_pred, y_val)

        history["epoch"].append(epoch)
        history["train_loss"].append(train_loss)
        history["val_mse"].append(val_mse)
        history["val_mae"].append(val_mae)

        if verbose and (epoch % max(1, max_epochs // 5) == 0 or epoch == 1):
            print(f"Epoch {epoch}/{max_epochs} | train({loss_type})={train_loss:.4f} | val_mse={val_mse:.4f} val_mae={val_mae:.4f}")

    return history

In [14]:
# ----------------------------
# Data loading & preprocessing
# ----------------------------
def load_and_preprocess(csv_path, scale_target=True):
    if not os.path.exists(csv_path):
        raise FileNotFoundError(
            f"CSV not found at {csv_path}. Please place the Kaggle 'Housing.csv' there."
        )

    df = pd.read_csv(csv_path)

    # pastikan kolom target ada
    if "price" not in df.columns and "Price" in df.columns:
        df = df.rename(columns={"Price": "price"})
    if "price" not in df.columns:
        raise ValueError("Target column 'price' not found in CSV.")

    df = df.dropna().reset_index(drop=True)

    # --- Pisahkan target & fitur ---
    y = df[["price"]].astype(float).values
    X = df.drop(columns=["price"]).copy()

    # --- Definisi kolom ---
    binary_cols = [
        "mainroad",
        "guestroom",
        "basement",
        "hotwaterheating",
        "airconditioning",
        "prefarea",
    ]
    multi_cols = ["furnishingstatus"]
    # kolom numerik asli (pastikan sesuai dataset)
    num_cols = ["area", "bedrooms", "bathrooms", "stories", "parking"]

    # --- Encode kolom binary (yes/no -> 1/0) ---
    for col in binary_cols:
        if col in X.columns:
            X[col] = X[col].map({"yes": 1, "no": 0})

    # --- One-hot furnishingstatus ---
    if all(col in X.columns for col in multi_cols):
        enc = OneHotEncoder(sparse_output=False, drop="first")
        onehot = enc.fit_transform(X[multi_cols])
        onehot_df = pd.DataFrame(
            onehot, columns=enc.get_feature_names_out(multi_cols), index=X.index
        )
        X = pd.concat([X.drop(columns=multi_cols), onehot_df], axis=1)

    # --- Scaling numerik ---
    scaler_X = StandardScaler()
    X[num_cols] = scaler_X.fit_transform(X[num_cols])

    # --- Scaling target (opsional) ---
    if scale_target:
        scaler_y = StandardScaler()
        y = scaler_y.fit_transform(y)
    else:
        scaler_y = None

    print("INI Y: ",y)

    return X.values.astype(float), y.astype(float)


In [15]:
# ----------------------------
# Running experiments (2 losses x 3 modes)
# ----------------------------
def run_experiments(csv_path,
                    hidden_neurons=HIDDEN_NEURONS,
                    lr=LEARNING_RATE,
                    max_epochs=MAX_EPOCHS,
                    minibatch_size=MINIBATCH_SIZE,
                    random_seed=RANDOM_SEED):
    X, y = load_and_preprocess(csv_path)
    # split 80:20
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=random_seed)

    # also split validation from train (10% of train)
    X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1, random_state=random_seed)

    input_dim = X_train.shape[1]

    experiments = []
    modes = ["batch", "stochastic", "minibatch"]
    losses = ["mse", "mae"]

    for loss_type in losses:
        for mode in modes:
            print(f"\n=== Running: loss={loss_type} | mode={mode} ===")
            # recreate model per experiment (fresh weights)
            model = SimpleMLP(input_dim=input_dim, hidden_dim=hidden_neurons, output_dim=1, init_scale=0.1, seed=random_seed)

            t0 = time.time()
            history = train_model(model, X_train, y_train, X_val, y_val,
                                  loss_type=loss_type,
                                  mode=mode,
                                  lr=lr,
                                  max_epochs=max_epochs,
                                  minibatch_size=minibatch_size,
                                  verbose=True)
            t1 = time.time()

            # evaluate on test set
            y_pred_test = model.predict(X_test)
            test_mse = mse_loss(y_pred_test, y_test)
            test_mae = mae_loss(y_pred_test, y_test)

            experiments.append({
                "loss_used_for_training": loss_type,
                "mode": mode,
                "test_mse": float(test_mse),
                "test_mae": float(test_mae),
                "train_time_sec": round(t1 - t0, 2)
            })

            # optional: store final metrics
            print(f"Finished: loss={loss_type} mode={mode} | test_mse={test_mse:.4f} test_mae={test_mae:.4f} time={t1-t0:.2f}s")

    results_df = pd.DataFrame(experiments)
    return results_df

In [16]:
# ----------------------------
# Main runner
# ----------------------------
if __name__ == "__main__":
    print("Starting experiments...")

    results = run_experiments(CSV_PATH,
                              hidden_neurons=HIDDEN_NEURONS,
                              lr=LEARNING_RATE,
                              max_epochs=MAX_EPOCHS,
                              minibatch_size=MINIBATCH_SIZE,
                              random_seed=RANDOM_SEED)

    # save results
    results_csv = "results.csv"
    results.to_csv(results_csv, index=False)
    print("\nAll experiments done. Results:")
    print(results)
    print(f"\nSaved results to {results_csv}")

Starting experiments...
INI Y:  [[ 4.56636513e+00]
 [ 4.00448405e+00]
 [ 4.00448405e+00]
 [ 3.98575468e+00]
 [ 3.55497918e+00]
 [ 3.25530927e+00]
 [ 2.88072189e+00]
 [ 2.88072189e+00]
 [ 2.73088693e+00]
 [ 2.69342819e+00]
 [ 2.69342819e+00]
 [ 2.62974834e+00]
 [ 2.43121702e+00]
 [ 2.39375829e+00]
 [ 2.39375829e+00]
 [ 2.31884081e+00]
 [ 2.31884081e+00]
 [ 2.24392333e+00]
 [ 2.20646459e+00]
 [ 2.18773522e+00]
 [ 2.13154711e+00]
 [ 2.09408838e+00]
 [ 2.07535901e+00]
 [ 2.07535901e+00]
 [ 2.03790027e+00]
 [ 2.01917090e+00]
 [ 1.97796629e+00]
 [ 1.94425342e+00]
 [ 1.94425342e+00]
 [ 1.94425342e+00]
 [ 1.94425342e+00]
 [ 1.94425342e+00]
 [ 1.88806531e+00]
 [ 1.83187721e+00]
 [ 1.79441847e+00]
 [ 1.77351649e+00]
 [ 1.75321385e+00]
 [ 1.71950099e+00]
 [ 1.71013630e+00]
 [ 1.68204225e+00]
 [ 1.66331288e+00]
 [ 1.64458351e+00]
 [ 1.56966604e+00]
 [ 1.56966604e+00]
 [ 1.49474856e+00]
 [ 1.49474856e+00]
 [ 1.47601919e+00]
 [ 1.45728982e+00]
 [ 1.43856045e+00]
 [ 1.41983108e+00]
 [ 1.41983108e+00]

In [None]:
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error
import numpy as np
scaler = MinMaxScaler()
# Balik ke skala asli
y_pred = scaler.inverse_transform(y_pre)
y_true = scaler.inverse_transform(y_test.reshape(-1, 1))

mae = mean_absolute_error(y_true, y_pred)
mse = mean_squared_error(y_true, y_pred)

print("MAE (rupiah):", mae)
print("MSE (rupiah^2):", mse)
print("RMSE (rupiah):", np.sqrt(mse))