In [1]:
import numpy as np

In [25]:
np.random.seed(0)
n = 200_000
data_X = np.random.normal(0, 1, size=(3, n))
x1, x2, x3 = data_X
data_y = 0.5 * x1 + 0.2 * x2 + 0.3 * x3 + np.random.normal(0, 0.5, size=n)

train_X, test_X = data_X[:,:3*n//4], data_X[:,3*n//4:]
train_y, test_y = data_y[:3*n//4], data_y[3*n//4:]

In [28]:
class Tracker:
    def __init__(self, size):
        self.loss_buffer = [np.nan for _ in range(size)]
        self.var_y_buffer = [np.nan for _ in range(size)]
        self.i = 0
        self.size = size

    def update(self, loss, var_y):
        self.loss_buffer[self.i] = loss
        self.var_y_buffer[self.i] = var_y
        self.i = (self.i + 1) % self.size

    def summary(self):
        return f"loss: {np.nanmean(self.loss_buffer)}, r2: {1 - np.nanmean(self.loss_buffer) / np.nanmean(self.var_y_buffer)}"

In [35]:
class ReLU:
    def __init__(self):
        self.has_params = False
        self.x = None

    def forward(self, x):
        self.x = x
        z = np.maximum(x, 0)
        return z

    def backward(self, grad):
        # d_loss / d_za * d_za / d_z
        grad_out = np.einsum("bi,bi->bi", grad, self.x > 0)
        self.x = None
        return grad_out

    def step_fn(self, lr):
        raise NotImplementedError()


class Linear:
    def __init__(self, input_dim, output_dim, seed=0):
        np.random.seed(seed)
        self.has_params = True
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.weights = np.random.normal(
            0, np.sqrt(2 / self.input_dim), size=(self.output_dim, self.input_dim)
        )
        self.bias = np.zeros((self.output_dim, 1))
        self.x = None
        self.grad_cache = None

    def forward(self, x):
        # print(self.input_dim, self.output_dim, x)
        B, x_dim = x.shape
        # print("forward", B, x_dim)
        assert x_dim == self.input_dim, (x_dim, self.input_dim)
        self.x = x
        # (out, in) * (B, in) -> (B, out)
        z = np.einsum("ji,bi->bj", self.weights, self.x) + self.bias.T
        # print("forward", x.shape, self.weights.shape, self.bias.shape, z.shape)
        return z

    def backward(self, grad):
        # d_loss / d_z * d_z / d_w, d_loss / d_z * d_z / d_bias
        # print("backward", grad.shape, self.x.shape)
        self.grad_cache = {
            "weights": np.einsum("bj,bi->ji", grad, self.x),
            "bias": np.einsum("bj->j", grad).reshape(-1, 1),
        }

        # d_loss / d_z * d_z / d_x
        # print(grad.shape, self.weights.T.shape)
        grad_out = np.einsum("bj,ji->bi", grad, self.weights)
        self.x = None
        # print("backward1", grad_out.shape)
        return grad_out

    def step_fn(self, lr):
        self.weights -= lr * self.grad_cache["weights"]
        # print("step_fn", self.bias.shape, self.grad_cache["bias"].shape)
        self.bias -= lr * self.grad_cache["bias"]
        self.grad_cache = None


class MLP:
    def __init__(self, input_dim, hidden_dim, output_dim):
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.layers = [
            Linear(input_dim, hidden_dim),
            ReLU(),
            Linear(hidden_dim, hidden_dim),
            ReLU(),
            Linear(hidden_dim, output_dim),
        ]

    def forward(self, x):
        for layer in self.layers:
            x = layer.forward(x)
        return x

    def backward(self, grad):
        for layer in self.layers[::-1]:
            grad = layer.backward(grad)

    def step_fn(self, lr):
        for layer in self.layers:
            if layer.has_params:
                layer.step_fn(lr)


def loss_fn(x, y):
    return np.mean((y - x) ** 2), -2 * (y - x) / x.shape[0]


def train():
    tracker = Tracker(size=1000)
    model = MLP(3, 64, 1)

    batch_size = 128
    for i, start in enumerate(range(0, train_X.shape[1], batch_size)):
        end = start + batch_size
        x = train_X[:, start:end].T
        y = train_y[start:end].reshape(-1, 1)

        # for i, (x, y) in enumerate(zip(train_X.T, train_y)):
        # print(x.shape, y.shape)
        z = model.forward(x)
        loss, grad = loss_fn(z, y)
        model.backward(grad)
        model.step_fn(lr=0.01)

        tracker.update(loss=loss, var_y=np.mean(y**2))
        if i % 100 == 0:
            test_z = model.forward(test_X.T)
            val_loss, _ = loss_fn(test_z, test_y.reshape(-1, 1))
            print(
                tracker.summary(),
                f", val loss: {val_loss}, val r2: {1 - val_loss / np.mean(test_y**2)}",
            )


train()

loss: 1.6889609417519895, r2: -1.8467685390179258 , val loss: 0.9768736640470798, val r2: -0.5528011953304315
loss: 0.31879432623994963, r2: 0.4960629322566791 , val loss: 0.26286516086407496, val r2: 0.5821595452687013
loss: 0.2921728750800388, r2: 0.5346411097751934 , val loss: 0.2612449967291477, val r2: 0.5847348965120994
loss: 0.28036644584378007, r2: 0.5537739855076422 , val loss: 0.2593113766197729, val r2: 0.5878085054419513
loss: 0.274478370629519, r2: 0.5632914433364371 , val loss: 0.2631309908157414, val r2: 0.5817369920953542
loss: 0.2714727447275986, r2: 0.5676584041538754 , val loss: 0.2527701086790954, val r2: 0.5982062559915772
loss: 0.2693575165466226, r2: 0.5719487600701045 , val loss: 0.25511114773803767, val r2: 0.5944850294063699
loss: 0.26699687092998103, r2: 0.5755391180933324 , val loss: 0.2563190096891101, val r2: 0.5925650580216884
loss: 0.2660645029390302, r2: 0.5784620400168048 , val loss: 0.25244711530640174, val r2: 0.5987196739632775
loss: 0.2643269104893

In [37]:
import numpy as np

# Closed-form solution w = (X X^T)^-1 X y^T
# train_X: (features, samples), train_y: (samples,)
XtX = train_X @ train_X.T          # (3,3)
Xty = train_X @ train_y            # (3,)
weights = np.linalg.solve(XtX, Xty)  # (3,)

print("Linear regression weights:", weights)

# Predictions
preds = weights @ test_X  # shape: (10000,)
# weights: (3,), train_X: (3,10000) -> (10000,)

# Compute loss and R^2
mse = np.mean((test_y - preds)**2)
r2 = 1 - np.sum((test_y - preds)**2) / np.sum((test_y - np.mean(test_y))**2)

print(f"MSE: {mse:.4f}, R^2: {r2:.4f}")

Linear regression weights: [0.49957827 0.20149033 0.30107705]
MSE: 0.2484, R^2: 0.6051
