In [1]:
import math
import numpy as np 
import matplotlib.pyplot as plt 
import random
import struct
from array import array
from os.path  import join
from tqdm import tqdm

%matplotlib inline

In [2]:
class MnistDataloader(object):
    def __init__(self, training_images_filepath,training_labels_filepath,
                 test_images_filepath, test_labels_filepath):
        self.training_images_filepath = training_images_filepath
        self.training_labels_filepath = training_labels_filepath
        self.test_images_filepath = test_images_filepath
        self.test_labels_filepath = test_labels_filepath

    def read_images_labels(self, images_filepath, labels_filepath):
        labels = []
        with open(labels_filepath, 'rb') as file:
            magic, size = struct.unpack(">II", file.read(8))
            if magic != 2049:
                raise ValueError('Magic number mismatch, expected 2049, got {}'.format(magic))
            labels = array("B", file.read())

        with open(images_filepath, 'rb') as file:
            magic, size, rows, cols = struct.unpack(">IIII", file.read(16))
            if magic != 2051:
                raise ValueError('Magic number mismatch, expected 2051, got {}'.format(magic))
            image_data = array("B", file.read())

        images = []
        for i in range(size):
            images.append([0] * rows * cols)
        for i in range(size):
            img = np.array(image_data[i * rows * cols:(i + 1) * rows * cols])
            img = img.reshape(28, 28)
            images[i][:] = img

        return images, labels

            
    def load_data(self):
        x_train, y_train = self.read_images_labels(self.training_images_filepath, self.training_labels_filepath)
        x_test, y_test = self.read_images_labels(self.test_images_filepath, self.test_labels_filepath)
        return (x_train, y_train), (x_test, y_test)

In [3]:
# Verify reading dataset via MnistDataloader class

input_path = 'data/mnist'
training_images_filepath = join(input_path, 'train-images-idx3-ubyte/train-images-idx3-ubyte')
training_labels_filepath = join(input_path, 'train-labels-idx1-ubyte/train-labels-idx1-ubyte')
test_images_filepath = join(input_path, 't10k-images-idx3-ubyte/t10k-images-idx3-ubyte')
test_labels_filepath = join(input_path, 't10k-labels-idx1-ubyte/t10k-labels-idx1-ubyte')

# Helper function to show a list of images with their relating titles

def show_images(images, title_texts):
    cols = 5
    rows = int(len(images)/cols) + 1
    plt.figure(figsize=(30,20))
    index = 1
    for x in zip(images, title_texts):
        image = x[0]
        title_text = x[1]
        plt.subplot(rows, cols, index)
        plt.imshow(image, cmap=plt.cm.gray)
        if (title_text != ''):
            plt.title(title_text, fontsize = 15);  
        index += 1

# Load MNIST dataset
mnist_dataloader = MnistDataloader(training_images_filepath, training_labels_filepath, 
                                   test_images_filepath, test_labels_filepath)
(x_train, y_train), (x_test, y_test) = mnist_dataloader.load_data()


# Show some random training and test images
images_2_show = []
titles_2_show = []
for i in range(0, 10):
    r = random.randint(1, 60000)
    images_2_show.append(x_train[r])
    titles_2_show.append('training image [' + str(r) + '] = ' + str(y_train[r]))

for i in range(0, 5):
    r = random.randint(1, 10000)
    images_2_show.append(x_test[r])        
    titles_2_show.append('test image [' + str(r) + '] = ' + str(y_test[r]))    

#show_images(images_2_show, titles_2_show) 

In [4]:
# Convert each image to a 784 length vector of uint8
x_train = np.array(x_train, dtype=np.float32).reshape(len(x_train), -1) / 255.0   # directly as float32
x_train = x_train.reshape(len(x_train), -1)

In [5]:
# Convert labels to expected output layer activation
gt_output = np.zeros((len(y_train), 10), dtype=np.float32)
for label_ind in range(len(y_train)):
    gt_output[label_ind][y_train[label_ind]] = 1.0

In [None]:
# Helpers: slice selector and accuracy tester

def select_training_slice(x, y, start=0, end=None, set_global=True):
    """Return (x_slice, y_slice) for x[start:end].
    If set_global=True the function will also set `training_subset` and
    `training_slice_start` in the notebook globals for convenience.
    """
    if end is None:
        end = len(x)
    if not (0 <= start <= len(x)) or not (0 <= end <= len(x)):
        raise IndexError("start/end out of range")
    if start >= end:
        raise ValueError("start must be < end")
    x_slice = x[start:end]
    y_slice = y[start:end]
    if set_global:
        globals()['training_subset'] = x_slice
        globals()['training_slice_start'] = start
    return x_slice, y_slice


def _gt_to_label(gt):
    """Normalize ground-truth `gt` (int or one-hot vector/array) to an int label."""
    # If it's already an integer type, return directly
    if isinstance(gt, (int, np.integer)):
        return int(gt)
    # If it's a Value or has .data, try to extract
    if hasattr(gt, 'data'):
        try:
            return int(gt.data)
        except Exception:
            pass
    # If it's array-like, convert and argmax
    try:
        arr = np.array(gt)
        if arr.size == 1:
            return int(arr.reshape(-1)[0])
        return int(np.argmax(arr))
    except Exception:
        # final fallback
        return int(gt)


def test_accuracy_on_slice(model, x_slice, y_slice, verbose=False, max_display=10):
    """Compute accuracy of `model` on given (x_slice, y_slice).

    Accepts y_slice as integers or one-hot vectors.
    """
    num_correct = 0
    total = len(x_slice)
    for i, (img, gt) in enumerate(zip(x_slice, y_slice)):
        yp = model(img)
        # get numeric outputs
        if isinstance(yp, list):
            outputs = [float(o.data) for o in yp]
        else:
            outputs = [float(yp.data)]
        pred = int(np.argmax(outputs))

        gt_label = _gt_to_label(gt)

        if pred == gt_label:
            num_correct += 1
        else:
            if verbose and max_display > 0:
                print(f"idx={i} pred={pred} gt={gt_label} outputs={outputs}")
                max_display -= 1
    percent = num_correct / total if total > 0 else 0.0
    if verbose:
        print(f"Accuracy: {num_correct}/{total} = {percent:.4f}")
    return num_correct, total, percent

# Example convenience wrapper that uses the notebook globals if present
def test_accuracy_on_global_slice(model, verbose=False):
    if 'training_subset' not in globals() or 'training_slice_start' not in globals():
        raise RuntimeError('Call select_training_slice(...) with set_global=True first')
    start = globals().get('training_slice_start', 0)
    x_slice = globals()['training_subset']
    y_slice = globals().get('y_train', None)
    if y_slice is None:
        # fall back to y_train in globals
        y_slice = globals().get('y_train')
    # if y_slice is full y_train, pick the matching range
    if y_slice is not None and len(y_slice) == len(globals().get('x_train', [])):
        # pick matching labels
        end = start + len(x_slice)
        y_slice = y_slice[start:end]
    return test_accuracy_on_slice(model, x_slice, y_slice, verbose=verbose)


In [6]:
class Value:

    def __init__(self, data, _children=(), _op='', label=''):
        # coerce scalar-ish data to Python float to avoid numpy-scalar surprises
        try:
            if np.isscalar(data):
                self.data = float(data)
            else:
                self.data = data
        except Exception:
            # fallback
            try:
                self.data = float(data)
            except Exception:
                self.data = data

        self.grad = 0.0
        self._backward = lambda: None
        self._prev = set(_children)
        self._op = _op
        self.label = label

    def __repr__(self):
        return f"Value(data={self.data})"
    
    def __add__(self, other):
        other = other if isinstance(other, Value) else Value(other)
        out = Value(self.data + other.data, (self, other), '+')

        def _backward():
            self.grad += 1.0 * out.grad
            other.grad += 1.0 * out.grad
        out._backward = _backward

        return out
    
    def __mul__(self, other):
        other = other if isinstance(other, Value) else Value(other)
        out = Value(self.data * other.data, (self, other), '*')

        def _backward():
            self.grad += other.data * out.grad
            other.grad += self.data * out.grad 
        out._backward = _backward

        return out
    
    def __pow__(self, other):
        assert isinstance(other, (int, float)), "only supporting int/float powers for now" 
        out = Value(self.data**other, (self, ), f'**{other}')

        def _backward():
            self.grad += other * (self.data**(other-1)) * out.grad 
        out._backward = _backward

        return out

    def __rmul__(self, other): # other * self
        return self * other
    
    def __radd__(self, other): # other + self
        return self + other

    def __truediv__(self, other): # self / other
        return self * other**-1

    def __neg__(self): # -self
        return self * -1
    
    def __sub__(self, other): # self - other
        return self + (-other)

    def tanh(self):
        x = self.data
        t = math.tanh(x)
        out = Value(t, (self, ), 'tanh')

        def _backward():
            self.grad += (1- t*t) * out.grad
        out._backward = _backward

        return out
    
    def exp(self):
        x = self.data
        out = Value(math.exp(x), (self, ), 'exp')

        def _backward():
            self.grad += out.data * out.grad
        out._backward = _backward

        return out

    def backward(self):

        topo = []
        visited = set()
        def build_topo(v):
            if v not in visited:
                visited.add(v)
                for child in v._prev:
                    build_topo(child)
                topo.append(v)
        build_topo(self)
        
        self.grad = 1.0
        for node in reversed(topo):
            node._backward()

In [None]:
class Neuron:
    def __init__(self, nin):
        self.w = [Value(random.uniform(-1,1)) for _ in range(nin)]
        self.b = Value(random.uniform(-1,1))

    def __call__(self, x):
        # w * x + b
        act = sum((wi*xi for wi, xi in zip(self.w, x)), self.b) 
        out = act.tanh()
        return out
    
    def parameters(self):
        return self.w + [self.b]
        
class Layer:

    def __init__(self, nin, nout):
        self.neurons = [Neuron(nin) for _ in range(nout)]

    def __call__(self, x):
        outs = [n(x) for n in self.neurons]
        return outs[0] if len(outs) == 1 else outs

    def parameters(self):
        return [p for neuron in self.neurons for p in neuron.parameters()]
       

class MLP:

    def __init__(self, nin, nouts): # nouts is a now a list, each element defines the size of each layer
        sz = [nin] + nouts
        self.layers = [Layer(sz[i], sz[i+1]) for i in range(len(nouts))]

    def __call__(self, x):
        for layer in self.layers:
            x = layer(x)
        return x
    
    def parameters(self):
        return [p for layer in self.layers for p in layer.parameters()]

In [8]:
from graphviz import Digraph

def trace(root):
    # builds a set of all nodes and edges in a graph
    nodes, edges = set(), set()
    def build(v):
        if v not in nodes:
            nodes.add(v)
            for child in v._prev:
                edges.add((child, v))
                build(child)
    build(root)
    return nodes, edges

def draw_dot(root):
    dot = Digraph(format='svg', graph_attr={'rankdir': 'LR'}) # LR = left to right

    nodes, edges = trace(root)
    for n in nodes:
        uid = str(id(n))
        # for any value in the graph, create a rectangular ('record') node for it
        dot.node(name = uid, label = "{ %s | data %.4f | grad %.4f}" % (n.label, n.data, n.grad), shape='record')
        if n._op:
            # if this value is the result of some operation, create an op node for it
            dot.node(name = uid + n._op, label  = n._op)
            # and connect this node to it
            dot.edge(uid + n._op, uid)

    for n1, n2 in edges:
        # connect n1 to the op node of n2
        dot.edge(str(id(n1)), str(id(n2)) + n2._op)

    return dot

In [9]:
# Initializing neural network
input_layers = 784
output_layers = 10


n = MLP(input_layers, [16, 16, 16, output_layers])

In [26]:
# Training

x_train_sub, y_train_sub = select_training_slice(x_train, gt_output, 5, 10)

for k in tqdm(range(10)):
    # forward pass
    ypred = [n(image) for image in x_train_sub] # list of output lists or Values

    total_loss = Value(0.0)
    # accumulate loss as Value objects
    for ygt, yp in zip(y_train_sub, ypred): # ygt and yp are matching pairs
        # ensure yp is list of Values
        if not isinstance(yp, list):
            yp = [yp]
        ind_loss = Value(0.0)
        for i in range(len(ygt)):
            # yp[i] is Value, ygt[i] is float
            ind_loss = ind_loss + (yp[i] - ygt[i])**2
        total_loss = total_loss + ind_loss

    # backward pass
    for p in n.parameters():
        p.grad = 0.0

    total_loss.backward()

    # update
    lr = 0.01
    for p in n.parameters():
        # optional gradient clipping
        if p.grad > 1e3:
            p.grad = 1e3
        if p.grad < -1e3:
            p.grad = -1e3
        p.data += -lr * p.grad

    print(k, total_loss.data)

 10%|█         | 1/10 [00:01<00:16,  1.79s/it]

0 4.4123299967703815


 20%|██        | 2/10 [00:03<00:11,  1.47s/it]

1 4.2603954236347725


 30%|███       | 3/10 [00:04<00:11,  1.60s/it]

2 4.1367702267848045


 40%|████      | 4/10 [00:06<00:08,  1.45s/it]

3 4.036272020039516


 50%|█████     | 5/10 [00:07<00:06,  1.35s/it]

4 3.954621496539032


 60%|██████    | 6/10 [00:08<00:05,  1.49s/it]

5 3.8883035796414886


 70%|███████   | 7/10 [00:10<00:04,  1.41s/it]

6 3.8344426590951874


 80%|████████  | 8/10 [00:11<00:02,  1.34s/it]

7 3.790693552417045


 90%|█████████ | 9/10 [00:13<00:01,  1.49s/it]

8 3.7551481116728205


100%|██████████| 10/10 [00:14<00:00,  1.44s/it]

9 3.726256420583094





In [None]:
start_test = random.randint(0, 50000)
num_test_images = 10
end_test = start_test + num_test_images

x_test_sub, y_test_sub = select_training_slice(x_train, gt_output, start_test, end_test)
# y_test_sub may be one-hot; test_accuracy_on_slice handles that
test_accuracy_on_slice(n, x_test_sub, y_test_sub, verbose=True)


TypeError: only length-1 arrays can be converted to Python scalars

In [15]:
# Diagnostic: print parameter stats and run one forward/backward to detect overflow
import math

def param_stats(params):
    vals = [p.data for p in params]
    arr = np.array(vals, dtype=np.float64)
    return arr.min(), arr.max(), arr.mean(), arr.std()

print('Network parameter stats (min, max, mean, std):', param_stats(n.parameters()))

# take one sample
sample = x_train[0]
label = y_train[0]

# forward
out = n(sample)
if isinstance(out, list):
    out_vals = [o.data for o in out]
else:
    out_vals = [out.data]
print('Output values (sample):', out_vals)

# compute simple loss (sum of squares to one-hot target)
target = np.zeros(len(out_vals), dtype=np.float32)
if isinstance(label, (int, np.integer)):
    target[label] = 1.0
loss = 0
for i, o in enumerate(out_vals):
    loss += (o - target[i])**2

# set grads manually and run backward through Values if available
# we expect out to be Value objects; set grad and call backward
try:
    if isinstance(out, list):
        # set gradients on each scalar output
        for o in out:
            o.grad = 1.0
            # run backward for this output
            o.backward()
    else:
        out.grad = 1.0
        out.backward()
    print('Backward completed')
except Exception as e:
    print('Error during backward:', repr(e))

print('Parameter stats after backward (min, max, mean, std):', param_stats(n.parameters()))

# Check for inf or nan in parameters and grads
has_inf = any(math.isinf(float(p.data)) or math.isnan(float(p.data)) for p in n.parameters())
has_grad_inf = any(math.isinf(float(p.grad)) or math.isnan(float(p.grad)) for p in n.parameters())
print('Params contain inf/nan:', has_inf)
print('Grads contain inf/nan:', has_grad_inf)


Network parameter stats (min, max, mean, std): (np.float64(-0.05192177096451992), np.float64(0.2996647856479231), np.float64(5.8419287785319525e-05), np.float64(0.029120956672439614))
Output values (sample): [0.09903678557529123, 0.29504612576973244, 0.09943930194454544, 0.09910253796167223, 0.19777701683305302, 0.09953046418783086, -1.6685875209578334e-05, 0.0005660990776248529, 0.000471181505220096, 0.09919801986988003]
Backward completed
Parameter stats after backward (min, max, mean, std): (np.float64(-0.05192177096451992), np.float64(0.2996647856479231), np.float64(5.8419287785319525e-05), np.float64(0.029120956672439614))
Params contain inf/nan: False
Grads contain inf/nan: False
