<a href="https://colab.research.google.com/github/giorking/Backprop_practice_colab/blob/main/practice_backprop_implem.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [89]:
!pip install psutil

import numpy as np
import torch
import torch.nn.functional as F
from abc import ABC, abstractmethod
import multiprocessing as mp
import threading
import psutil
import os

class Operation(ABC):
    @abstractmethod
    def forward(self, *inputs):
        pass

    @abstractmethod
    def backward(self, dout):
        pass

    @abstractmethod
    def forward_torch(self, *inputs):
        pass




In [90]:
def numerical_gradient(func, inputs, epsilon=1e-5):
    gradients = []
    for i, input in enumerate(inputs):
        grad = np.zeros_like(input)
        it = np.nditer(input, flags=['multi_index'], op_flags=['readwrite'])
        while not it.finished:
            idx = it.multi_index

            original_value = input[idx]

            # f(x + epsilon)
            input[idx] = original_value + epsilon
            plus_epsilon = func(*inputs)

            # f(x - epsilon)
            input[idx] = original_value - epsilon
            minus_epsilon = func(*inputs)

            # Numerical gradient
            grad[idx] = (plus_epsilon - minus_epsilon) / (2 * epsilon)

            # Restore original value
            input[idx] = original_value
            it.iternext()

        gradients.append(grad)
    return gradients

def numerical_gradient_loss(func, X, y, epsilon=1e-5):
    gradients = []
    for i, input in enumerate(X):
        grad = np.zeros_like(input)
        it = np.nditer(input, flags=['multi_index'], op_flags=['readwrite'])
        while not it.finished:
            idx = it.multi_index

            original_value = input[idx]

            # f(x + epsilon)
            input[idx] = original_value + epsilon
            plus_epsilon = func(X, y)

            # f(x - epsilon)
            input[idx] = original_value - epsilon
            minus_epsilon = func(X, y)

            # Numerical gradient
            grad[idx] = (plus_epsilon - minus_epsilon) / (2 * epsilon)

            # Restore original value
            input[idx] = original_value
            it.iternext()

        gradients.append(grad)
    return gradients

In [91]:
def dot_mt(X, W, num_threads=4):
    if W.ndim == 2:
        result = np.zeros((X.shape[0], W.shape[1]))
    else:
        result = np.zeros((X.shape[0], ))
    rows_per_thread = X.shape[0] // num_threads

    def worker(A, B, result, start_row, end_row):
        if W.ndim == 2:
            result[start_row:end_row, :] = np.dot(A[start_row:end_row, :], B)
        else:
            result[start_row:end_row] = np.dot(A[start_row:end_row, :], B)

    threads = []
    for i in range(num_threads):
        start_row = i * rows_per_thread
        end_row = (i + 1) * rows_per_thread if i != num_threads - 1 else X.shape[0]

        thread = threading.Thread(target=worker, args=(X, W, result, start_row, end_row))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

    return result

def dot_mp(X, W, num_threads=4):
    if W.ndim == 2:
        result = mp.Array('d', X.shape[0] * W.shape[1])
    else:
        result = mp.Array('d', X.shape[0])

    if W.ndim == 2:
        result_np = np.frombuffer(result.get_obj()).reshape(X.shape[0], W.shape[1])
    else:
        result_np = np.frombuffer(result.get_obj()).reshape(X.shape[0], )
    rows_per_process = X.shape[0] // num_threads

    def worker(A, B, result, start_row, end_row):
        if W.ndim == 2:
            result[start_row:end_row, :] = np.dot(A[start_row:end_row, :], B)
        else:
            result[start_row:end_row, ] = np.dot(A[start_row:end_row, :], B)

    processes = []
    for i in range(num_threads):
        start_row = i * rows_per_process
        end_row = (i + 1) * rows_per_process if i != num_threads - 1 else X.shape[0]

        process = mp.Process(target=worker, args=(X, W, result_np, start_row, end_row))
        processes.append(process)
        process.start()

    for process in processes:
        process.join()

    return result_np



In [92]:

def test_operation(operation_class, *inputs):
    print(f"Testing {operation_class.__name__}...", "  numpy vs pytorch")
    operation = operation_class()

    out = operation.forward(*inputs)
    dout = np.random.randn(*out.shape)
    grads = operation.backward(dout)

    inputs_torch = [torch.tensor(inp, requires_grad=True, dtype=torch.float32) for inp in inputs]
    out_torch = operation.forward_torch(*inputs_torch)
    dout_torch = torch.tensor(dout, dtype=torch.float32)
    out_torch.backward(dout_torch)

    grads_torch = [inp.grad.detach().numpy() for inp in inputs_torch]


    assert len(grads) == len(grads_torch), "Number of gradients does not match!"


    for grad, grad_torch in zip(grads, grads_torch):
        print(f"Difference in gradients: {np.linalg.norm(grad - grad_torch)}")
        assert np.allclose(grad, grad_torch, atol=1e-5), "Gradients do not match!"

def test_loss_operation(operation_class, X, y):
    print(f"Testing {operation_class.__name__}...", "  numpy vs pytorch")

    operation = operation_class()


    out = operation.forward(X, y)
    dout = 1.0
    grad = operation.backward(dout)

    X_torch = torch.tensor(X, requires_grad=True, dtype=torch.float32)
    y_torch = torch.tensor(y, dtype=torch.long if operation_class == CrossEntropy else torch.float32)
    out_torch = operation.forward_torch(X_torch, y_torch)
    out_torch.backward()

    # 获取 PyTorch 的梯度
    grad_torch = X_torch.grad.detach().numpy()

    assert len(grad) == len(grad_torch), "Number of gradients does not match!"

    print(f"Difference in gradients: {np.linalg.norm(grad - grad_torch)}")
    assert np.allclose(grad, grad_torch, atol=1e-5), "Gradients do not match!"

def test_loss_numerical_gradient(operation_class, X, y, epsilon=1e-5):
    print(f"Testing {operation_class.__name__}...", "  backprop vs numerical")

    operation = operation_class()


    output = operation.forward(X, y)
    dout = 1.0


    def func(X, y):
        return np.sum(operation.forward(X, y))


    numerical_grads = numerical_gradient_loss(func, X, y, epsilon)


    analytical_grads = operation.backward(dout)


    assert len(numerical_grads) == len(analytical_grads), "Number of gradients does not match!"


    for num_grad, ana_grad in zip(numerical_grads, analytical_grads):
        difference = np.linalg.norm(num_grad - ana_grad)
        print(f"Difference in gradients: {difference}")
        assert np.allclose(num_grad, ana_grad, atol=1e-5), f"Gradients do not match! Difference: {difference}"

def test_numerical_gradient(operation_class, *inputs, epsilon=1e-1):
    print(f"Testing {operation_class.__name__}...", "  backprop vs numerical")

    operation = operation_class()


    output = operation.forward(*inputs)


    def func(*inputs):
        return np.sum(operation.forward(*inputs))

    numerical_grads = numerical_gradient(func, list(inputs), epsilon)


    dout = np.ones_like(output)
    analytical_grads = operation.backward(dout)


    assert len(numerical_grads) == len(analytical_grads), "Number of gradients does not match!"


    for num_grad, ana_grad in zip(numerical_grads, analytical_grads):
        difference = np.linalg.norm(num_grad - ana_grad)
        print(f"Difference in gradients: {difference}")
        assert np.allclose(num_grad, ana_grad, atol=1e-5), f"Gradients do not match! Difference: {difference},  {num_grad},   {ana_grad}"


In [93]:
class Conv2D(Operation):
    def __init__(self, stride=1, padding=0):
        self.stride = stride
        self.padding = padding

    def forward(self, X, W, b):
        self.X, self.W, self.b = X, W, b
        # Get dimensions
        N, C, H, W = self.X.shape
        F, _, HH, WW = self.W.shape
        out_height = (H - HH + 2 * self.padding) // self.stride + 1
        out_width = (W - WW + 2 * self.padding) // self.stride + 1

        # Pad the input
        X_padded = np.pad(self.X, ((0,), (0,), (self.padding,), (self.padding,)), mode='constant')

        # Initialize output
        out = np.zeros((N, F, out_height, out_width))

        for n in range(N):
            for f in range(F):
                for i in range(0, out_height):
                    for j in range(0, out_width):
                        h_start = i * self.stride
                        h_end = h_start + HH
                        w_start = j * self.stride
                        w_end = w_start + WW

                        out[n, f, i, j] = np.sum(X_padded[n, :, h_start:h_end, w_start:w_end] * self.W[f, :, :, :]) + self.b[f]

        return out

    def backward(self, dout):
        X, W, b = self.X, self.W, self.b
        N, C, H, W = self.X.shape
        F, _, HH, WW = self.W.shape
        out_height = (H - HH + 2 * self.padding) // self.stride + 1
        out_width = (W - WW + 2 * self.padding) // self.stride + 1

        # Pad the input and gradients
        X_padded = np.pad(self.X, ((0,), (0,), (self.padding,), (self.padding,)), mode='constant')
        dX_padded = np.pad(np.zeros_like(X), ((0,), (0,), (self.padding,), (self.padding,)), mode='constant')
        dW = np.zeros_like(self.W)
        db = np.zeros_like(self.b)

        for n in range(N):
            for f in range(F):
                for i in range(out_height):
                    for j in range(out_width):
                        h_start = i * self.stride
                        h_end = h_start + HH
                        w_start = j * self.stride
                        w_end = w_start + WW

                        db[f] += dout[n, f, i, j]
                        dW[f] += X_padded[n, :, h_start:h_end, w_start:w_end] * dout[n, f, i, j]
                        dX_padded[n, :, h_start:h_end, w_start:w_end] += self.W[f] * dout[n, f, i, j]

        # Remove padding from dX
        if self.padding > 0:
            dX = dX_padded[:, :, self.padding:-self.padding, self.padding:-self.padding]
        else:
            dX = dX_padded

        return dX, dW, db

    def forward_torch(self, X, W, b):
        return F.conv2d(X, W, bias=b, stride=self.stride, padding=self.padding)

class MatMul(Operation):
    def forward(self, X, W):
        self.X, self.W = X, W
        return dot.dot(X, W)

    def backward(self, dout):
        dX = dot.dot(dout, self.W.T)
        dW = dot.dot(self.X.T, dout)
        return dX, dW

    def forward_torch(self, X, W):
        return torch.matmul(X, W)

class FullyConnected(Operation):
    def forward(self, X, W, b):
        self.X, self.W, self.b = X, W, b
        return dot.dot(X, W) + b

    def backward(self, dout):
        dX = dot.dot(dout, self.W.T)
        dW = dot.dot(self.X.T, dout)
        db = np.sum(dout, axis=0)
        return dX, dW, db

    def forward_torch(self, X, W, b):
        return torch.matmul(X, W) + b

class Softmax(Operation):
    def forward(self, X):
        self.X = X
        e_X = np.exp(X - np.max(X, axis=1, keepdims=True))
        self.Y = e_X / e_X.sum(axis=1, keepdims=True)
        return self.Y

    def backward(self, dout):
        dX = np.zeros_like(self.Y)
        for i, (y, dy) in enumerate(zip(self.Y, dout)):
            y = y.reshape(-1, 1)
            jacobian = np.diagflat(y) - dot.dot(y, y.T)
            dX[i] = dot.dot(jacobian, dy)
        return [dX]

    def forward_torch(self, X):
        return torch.softmax(X, dim=1)


class CrossEntropy(Operation):
    def forward(self, X, y):
        self.X, self.y = X, y
        m = y.shape[0]
        p = np.exp(X - np.max(X, axis=1, keepdims=True))
        p = p / np.sum(p, axis=1, keepdims=True)
        log_likelihood = -np.log(p[range(m), y])
        self.loss = np.sum(log_likelihood) / m
        return self.loss

    def backward(self, dout=1):
        m = self.y.shape[0]
        grad = np.exp(self.X - np.max(self.X, axis=1, keepdims=True))
        grad = grad / np.sum(grad, axis=1, keepdims=True)
        grad[range(m), self.y] -= 1
        grad = grad / m
        return grad * dout

    def forward_torch(self, X, y):
        return torch.nn.functional.cross_entropy(X, y.type(torch.LongTensor))

class MSE(Operation):
    def forward(self, X, y):
        self.X, self.y = X, y
        self.loss = np.mean((X - y) ** 2)
        return self.loss

    def backward(self, dout=1):
        dX = (2 * (self.X - self.y) / self.X.size) * dout
        return dX

    def forward_torch(self, X, y):
        return torch.nn.functional.mse_loss(X, y)


class ReLU(Operation):
    def forward(self, X):
        self.X = X
        return np.maximum(0, X)

    def backward(self, dout):
        dX = dout.copy()
        dX[self.X <= 0] = 0
        return [dX]

    def forward_torch(self, X):
        return torch.nn.functional.relu(X)

class Outer(Operation):
    def forward(self, a, b):
        self.a, self.b = a, b
        return np.outer(a, b)

    def backward(self, dout):
        da = dot.dot(dout, self.b)
        db = dot.dot(dout.T, self.a)
        return da, db

    def forward_torch(self, a, b):
        return torch.ger(a, b)

class Custom(Operation):
    def forward(self, W, X, Y, b):
        self.W, self.X, self.Y, self.b = W, X, Y, b
        return dot.dot(dot.dot(W, X), Y) + b

    def backward(self, dout):
        dW = dot.dot(dout, dot.dot(self.X, self.Y).T)
        dX = dot.dot(dot.dot(self.W.T, dout), self.Y.T)
        dY = dot.dot(dot.dot(self.X.T, self.W.T), dout)
        db = np.sum(dout, axis=0)
        return dW, dX, dY, db

    def forward_torch(self, W, X, Y, b):
        return torch.matmul(torch.matmul(W, X), Y) + b

In [94]:
class dot:
    def __init__(self, method='np'):
        self.method = method

    def dot(self, X, W, num_threads=4):
        if self.method == 'np':
            return np.dot(X, W)
        elif self.method == 'mt':
            return dot_mt(X, W, num_threads)
        elif self.method == 'mp':
            return dot_mp(X, W, num_threads)
        else:
            raise ValueError(f"Unknown method: {self.method}")

DOT_METHOD = 'np'
dot = dot(method=DOT_METHOD)


np.random.seed(0)
X = np.random.randn(4, 5)
W = np.random.randn(5, 3)
b = np.random.randn(3)
y_regression = np.random.randn(4, 5)
y_classification = np.random.randint(0, 3, 4)

#### Testing MatMul ####
test_operation(MatMul, X, W)
test_numerical_gradient(MatMul, X, W)

#### Testing FullyConnected ####
test_operation(FullyConnected, X, W, b)
test_numerical_gradient(FullyConnected, X, W, b)


#### Testing Softmax ####
test_operation(Softmax, X)
test_numerical_gradient(Softmax, X)

#### Testing ReLU ####
test_operation(ReLU, X)
test_numerical_gradient(ReLU, X)

#### Testing Outer ####
test_operation(Outer, X[0], W[:, 0])
test_numerical_gradient(Outer, X[0], W[:, 0])

#### Testing CrossEntropy ####
test_loss_operation(CrossEntropy, X, y_classification)
test_loss_numerical_gradient(CrossEntropy, X, y_classification)

#### Testing MSE ####
test_loss_operation(MSE, X, y_regression)
test_loss_numerical_gradient(MSE, X, y_regression)


#### Testing Conv2D ####
X = np.random.randn(1, 3, 5, 5).astype(np.float32)
W = np.random.randn(2, 3, 3, 3).astype(np.float32)
b = np.random.randn(2).astype(np.float32)
test_operation(Conv2D, X, W, b)
test_numerical_gradient(Conv2D, X, W, b)

#### Custom OP ####
# Z = WXY + b
W = np.random.randn(4, 5)
X = np.random.randn(5, 3)
Y = np.random.randn(3, 2)
b = np.random.randn(2)
test_operation(Custom, W, X, Y, b)
test_numerical_gradient(Custom, W, X, Y, b)

Testing MatMul...   numpy vs pytorch
Difference in gradients: 3.4171469138303537e-07
Difference in gradients: 3.588504285654481e-07
Testing MatMul...   backprop vs numerical
Difference in gradients: 1.6221418424348217e-14
Difference in gradients: 1.0621069129169845e-14
Testing FullyConnected...   numpy vs pytorch
Difference in gradients: 5.832149985818068e-07
Difference in gradients: 5.212060067835709e-07
Difference in gradients: 2.464024254619843e-07
Testing FullyConnected...   backprop vs numerical
Difference in gradients: 2.7328630811257136e-14
Difference in gradients: 2.4237468006307348e-14
Difference in gradients: 1.4540107379219425e-14
Testing Softmax...   numpy vs pytorch
Difference in gradients: 5.350668399524569e-08
Testing Softmax...   backprop vs numerical
Difference in gradients: 9.391328366696729e-15
Testing ReLU...   numpy vs pytorch
Difference in gradients: 1.279301609422155e-07
Testing ReLU...   backprop vs numerical
Difference in gradients: 1.432144669219779e-14
Testin