In [1524]:
import numpy as np
import math
import random
random.seed(1337)
np.random.seed(1337)
np.set_printoptions(suppress=True)

In [1525]:
class Value:
    """ stores a single scalar value and its gradient """

    def __init__(self, data, _children=(), _op=''):
        self.data = data
        self.grad = 0
        # internal variables used for autograd graph construction
        self._backward = lambda: None
        self._prev = set(_children)
        self._op = _op  # the op that produced this node, for graphviz / debugging / etc



In [1526]:
class Value:
    def __init__(self, data, _children=(), _op=''):
        self.data = np.asarray(data, dtype=float) if not (isinstance(data, np.ndarray)) else data
        self.grad = np.zeros_like(self.data)
        self._backward = lambda: None
        self._prev = set(_children)
        self._op = _op  # the op that produced this node, for graphviz / debugging / etc
    
      
    def unbroadcast(self, grad, shape):
        grad = np.asarray(grad)
        # remove leading dims
        while grad.ndim > len(shape):
            grad = grad.sum(axis=0)
        # sum over axes where original had dim 1
        for i, (g, s) in enumerate(zip(grad.shape, shape)):
            if s == 1 and g != 1:
                grad = grad.sum(axis=i, keepdims=True)
        return grad
            
    
    def __add__(self, other):
        other = other if isinstance(other, Value) else Value(other)
        out = Value(self.data + other.data, (self, other), '+')
        def _backward():
            self.grad += self.unbroadcast(out.grad, self.data.shape)
            other.grad += self.unbroadcast(out.grad, other.data.shape)
        out._backward = _backward
        return out
        
        
    def __mul__(self, other):
        other = other if isinstance(other, Value) else Value(other)
        out = Value(self.data * other.data, (self, other), '*')
        def _backward():
            self.grad += self.unbroadcast(other.data * out.grad, self.data.shape)
            other.grad += self.unbroadcast(self.data * out.grad, other.data.shape)
        out._backward = _backward
        return out
    

    
    def __pow__(self, other):
        assert isinstance(other, (int, float)
                          ), "only supporting int/float powers for now"
        out = Value(self.data**other, (self,), f'**{other}')

        def _backward():
            self.grad += (other * self.data**(other-1)) * out.grad
        out._backward = _backward

        return out

    def relu(self):
        out = Value(np.maximum(0, self.data), (self,), 'relu')
        def _backward():
            self.grad += (self.data > 0) * out.grad
        out._backward = _backward
        return out
    
    def log(self):
        EPSILON = 1e-7
        clipped_data = np.maximum(EPSILON, self.data)

        out = Value(np.log(clipped_data), (self, ), 'log')

        def _backward():
            self.grad += (1 / clipped_data) * out.grad
        out._backward = _backward

        return out
    
    def exp(self):
        x = self.data
        out = Value(np.exp(x), (self, ), 'exp')

        def _backward():
            self.grad += out.data * out.grad
        out._backward = _backward

        return out
    
    def sigmoid(self):
        x = self.data
        t = 1 / (1 + (np.exp(-x)))

        out = Value(t, (self, ), 'sigmoid')

        def _backward():
            self.grad += (out.data * (1 - out.data)) * out.grad
        out._backward = _backward

        return out
   
    def sum(self):
        out = Value(np.sum(self.data), (self,), 'sum')
        
        def _backward():
            # grad of sum is 1, broadcasted to the input shape
            self.grad += np.ones_like(self.data) * out.grad
        out._backward = _backward
        return out
    
    def __matmul__(self, other):
        other = other if isinstance(other, Value) else Value(other)
        out = Value(self.data @ other.data, (self, other), '@')
        def _backward():
            # handle vectors and matrices by relying on numpy broadcasting 
            self.grad += out.grad @ other.data.T
            other.grad += self.data.T @ out.grad
        out._backward = _backward
        return out
   
    def backward(self):
    
        # topological order all of the children in the graph
        topo = []
        visited = set()

        def build_topo(v):
            if v not in visited:
                visited.add(v)
                for child in v._prev:
                    build_topo(child)
                topo.append(v)
        build_topo(self)

        # go one variable at a time and apply the chain rule to get its gradient
        self.grad = np.ones_like(self.data)
        for v in reversed(topo):
            v._backward()
            
    def __ge__(self, other):
        return self.data >= other.data

    def __le__(self, other):
        return self.data <= other.data

    def __lt__(self, other):
        other_data = other.data if isinstance(other, Value) else other
        return self.data < other_data

    def __gt__(self, other):
        other_data = other.data if isinstance(other, Value) else other
        return self.data > other_data

    def __neg__(self):  # -self
        return self * -1

    def __radd__(self, other):  # other + self
        return self + other

    def __sub__(self, other):  # self - other
        return self + (-other)

    def __rsub__(self, other):  # other - self
        return other + (-self)

    def __rmul__(self, other):  # other * self
        return self * other

    def __truediv__(self, other):  # self / other
        return self * other**-1

    def __rtruediv__(self, other):  # other / self
        return other * self**-1

    def __repr__(self):
        return f"Value(data={self.data}, grad={self.grad})"

In [1527]:
class Module:
    
    def zero_grad(self):
        for p in self.parameters():
            p.grad = np.zeros_like(p.data)
            
    def parameters(self):
        return []
    
class Linear(Module):
    def __init__(self, nin, nout):
        k = np.sqrt(2.0 / nin)
        self.W = Value(np.random.uniform(-k, k, (nin, nout)))
        self.b = Value(np.full((nout,), 0.1))

    def __call__(self, x):
        # x: (B,nin) or (nin,)
        return (x @ self.W) + self.b

    def parameters(self):
        return [self.W, self.b]

class MLP(Module):
    def __init__(self, nin, nouts, act_func=None):
        sz = [nin] + nouts
        self.layers = [Linear(sz[i], sz[i+1]) for i in range(len(nouts))]
        self.act_func = act_func

    def __call__(self, x):
        for i, layer in enumerate(self.layers):
            x = layer(x)
            if i != len(self.layers)-1:
                x = x.relu()
            else:
                if self.act_func is not None:
                    x = self.act_func(x)
        return x

    def parameters(self):
        return [p for layer in self.layers for p in layer.parameters()]

class AutoEncoder(Module):
    def __init__(self, in_embeds=1, hidden_layers=[], latent_dim=1, act_func=None):
        self.latent_dim = latent_dim
        self.act_func = act_func
        self.encoder = MLP(in_embeds, hidden_layers + [latent_dim])
        self.decoder = MLP(latent_dim, list(
            reversed(hidden_layers)) + [in_embeds], act_func=act_func)

    def __call__(self, x):
        compressed = self.encoder(x)
        out = self.decoder(compressed)
        return out

    def encode(self, x):
        encoded = self.encoder(x)
        return encoded

    def decode(self, x):
        decoded = self.decoder(x)
        return decoded

    def parameters(self):
        return self.encoder.parameters() + self.decoder.parameters()

    def layers(self):
        return self.encoder.layers + self.decoder.layers

    def pretty(self):
        if self.act_func != None:
            hey = str(self.act_func)
            return hey.split()[1][6:]
        else:
            return "no function"

    def __repr__(self):
        return f"encoder has {len(self.encoder.layers)}, decoder has {len(self.decoder.layers)}, latent dim is {self.latent_dim} activated with {self.pretty()}"
    

class Optimizer:
    """Base class for optimizers"""

    def __init__(self, parameters):
        self.parameters = parameters

    def zero_grad(self):
        for p in self.parameters:
            p.grad = np.zeros_like(p.data)
        
        
    def step(self):
        """Take a step of gradient descent"""

        raise NotImplementedError


class SGD(Optimizer):
    def __init__(self, parameters, learning_rate=0.01):
        super().__init__(parameters)
        self.learning_rate = learning_rate 

    def step(self):
        for p in self.parameters:
            p.data = p.data - (self.learning_rate * p.grad)

# def mean_squared_error(y_true, y_pred):
#     total_loss = sum([(true - pred)**2 for true, pred in zip(y_true, y_pred)])

#     return total_loss

def mean_squared_error(target, pred):
    loss = ((pred - target) ** 2).sum()
    
    return loss

def mse_mean(target, pred):
    diff = pred - target
    return (diff*diff).sum() * (1.0 / diff.data.size)


def vae_loss(reconstruction, target, mu, log_var, beta=1.0):
    """
    Variational Autoencoder loss = Reconstruction Loss + beta * KL Divergence

    Args:
        reconstruction: Value or List of Value objects (reconstructed output)
        target: List of Value objects (original input)
        mu: List of Value objects (mean of latent distribution)
        log_var: List of Value objects (log variance of latent distribution)
        beta: Weight for KL divergence term (default 1.0)

    Returns:
        Total VAE loss as a Value object
    """
    # Ensure reconstruction is a list
    if not isinstance(reconstruction, list):
        reconstruction = [reconstruction]
    if not isinstance(target, list):
        target = [target]

    # Reconstruction loss (MSE)
    recon_loss = mean_squared_error(target, reconstruction)

    # KL divergence: -0.5 * sum(1 + log_var - mu^2 - exp(log_var))
    # This encourages the latent distribution to be close to N(0,1)
    kl_terms = []
    for mu_i, log_var_i in zip(mu, log_var):
        # KL term for one dimension: -0.5 * (1 + log_var - mu^2 - exp(log_var))
        kl_term = -0.5 * (Value(1.0) + log_var_i - mu_i**2 - log_var_i.exp())
        kl_terms.append(kl_term)

    kl_loss = sum(kl_terms) if kl_terms else Value(0.0)

    # Total loss
    total_loss = recon_loss + beta * kl_loss

    return total_loss


In [1528]:
x = [
    [1, 0, 0, 0, 0],
    [0, 1, 0, 0, 0],
    [0, 0, 1, 0, 0],
    [0, 0, 0, 1, 0],
    [0, 0, 0, 0, 1],
]
targ_x = Value(np.array(x, dtype=float))        # shape (5, 5)
target = Value(np.array(x, dtype=float))

In [1531]:
auto = AutoEncoder(
    in_embeds=5,# input dimension 
    hidden_layers=[3],# len = number of layers, i = size of layer
    latent_dim=2, # compressed layer dimensions
    act_func=Value.sigmoid, # activation function for final decoder layer
)


optimizer = optimizer = SGD(auto.parameters(), learning_rate=0.1)


for p in auto.parameters():
    print(p.data.shape, np.isnan(p.data).any(), p.data.min(), p.data.max())

(5, 3) False -0.6287859868760816 0.6291530304070804
(3,) False 0.1 0.1
(3, 2) False -0.7987022424909085 0.773155961583631
(2,) False 0.1 0.1
(2, 3) False -0.6757651484800478 0.8222448031206269
(3,) False 0.1 0.1
(3, 5) False -0.5266925751676413 0.6704715830524097
(5,) False 0.1 0.1


In [1532]:
for i in range(20000):
    optimizer.zero_grad()
    reconstruction = auto(targ_x)
    loss = mean_squared_error(target, reconstruction)
    loss.backward()
    optimizer.step()
    if i % 10 == 0:
        print(f"Iteration: {i}, AutoEncoder Loss: {loss.data:.12f}")

Iteration: 0, AutoEncoder Loss: 6.753892859672
Iteration: 10, AutoEncoder Loss: 4.435918171812
Iteration: 20, AutoEncoder Loss: 4.008833027870
Iteration: 30, AutoEncoder Loss: 3.951953483310
Iteration: 40, AutoEncoder Loss: 3.895236167980
Iteration: 50, AutoEncoder Loss: 3.797511803108
Iteration: 60, AutoEncoder Loss: 3.672095296078
Iteration: 70, AutoEncoder Loss: 3.477095240012
Iteration: 80, AutoEncoder Loss: 3.172970719593
Iteration: 90, AutoEncoder Loss: 2.910107839118
Iteration: 100, AutoEncoder Loss: 2.732536033329
Iteration: 110, AutoEncoder Loss: 2.608587015995
Iteration: 120, AutoEncoder Loss: 2.517729244166
Iteration: 130, AutoEncoder Loss: 2.494025377702
Iteration: 140, AutoEncoder Loss: 2.345426306355
Iteration: 150, AutoEncoder Loss: 2.343552359123
Iteration: 160, AutoEncoder Loss: 2.250331650139
Iteration: 170, AutoEncoder Loss: 2.104093812627
Iteration: 180, AutoEncoder Loss: 2.055092089597
Iteration: 190, AutoEncoder Loss: 2.022239615661
Iteration: 200, AutoEncoder Los

In [1533]:
reconstruction.data[0]

array([0.99372441, 0.        , 0.00188588, 0.        , 0.00532508])

In [1534]:
for i in range(len(x[0])):
    for j in range(len(x)):
        print(f"expected {x[j][i]}, result, {auto(targ_x).data[j][i]}")
    print("--------------------------------")

expected 1, result, 0.9937245240333988
expected 0, result, 4.105767168258025e-05
expected 0, result, 0.013248325385966863
expected 0, result, 0.0008177478201871155
expected 0, result, 0.002828088838075048
--------------------------------
expected 0, result, 1.6283061438051871e-19
expected 1, result, 0.9957851096311384
expected 0, result, 1.4884960651020122e-06
expected 0, result, 0.012124381112724729
expected 0, result, 0.0013718469096606204
--------------------------------
expected 0, result, 0.0018852724066549596
expected 0, result, 7.381796433418657e-18
expected 1, result, 0.99380081599181
expected 0, result, 0.01322322805113037
expected 0, result, 1.0308214588240528e-90
--------------------------------
expected 0, result, 3.2942816057357597e-34
expected 0, result, 0.0008143493135999044
expected 0, result, 0.0036209043339512537
expected 1, result, 0.9878373069377615
expected 0, result, 2.1867664888037745e-80
--------------------------------
expected 0, result, 0.005325048845978279
e