In [1]:
from typing import List, NamedTuple, Callable, Dict, Optional
import numpy as np

_name = 1
def fresh_name():
    global _name
    name = f'v{_name}'
    _name += 1
    return name

`fresh_name` 用于打印跟 `tape` 相关的变量，并用 `_name` 来记录是第几个变量。

为了能够更好滴理解反向模式自动微分的实现，实现代码过程中不依赖PyTorch的autograd。代码中添加了变量类 `Variable` 来跟踪计算梯度，并添加了梯度函数 `grad()` 来计算梯度。

对于标量损失l来说，程序中计算的每个张量 x 的值，都会计算值dl/dX。反向模式从 dl/dl=1 开始，使用偏导数和链式规则向后传播导数，例如：

$$
dl/dx*dx/dy=dl/dy
$$

下面就是具体的实现过程，首先我们所有的操作都是通过Python进行操作符重载的，而操作符重载，通过 `Variable` 来封装跟踪计算的 Tensor。每个变量都有一个全局唯一的名称 `fresh_name`，因此可以在字典中跟踪该变量的梯度。为了便于理解，`__init__` 有时会提供此名称作为参数。否则，每次都会生成一个新的临时值。

为了适配上面图中的简单计算，这里面只提供了 乘、加、减、sin、log 五种计算方式。

In [2]:
class Variable:
    def __init__(self, value, name=None):
        self.value = value
        self.name = name or fresh_name()
    
    def __repr__(self):
        return repr(self.value)
    
    @staticmethod
    def constant(value, name=None):
        var = Variable(value, name=name)
        print(f'{var.name} = {value}')
        return var
    
    def __add__(self, other):
        return ops_add(self, other)
    
    def __mul__(self, other):
        return ops_mul(self, other)
    
    def __sub__(self, other):
        return ops_sub(self, other)
    
    def sin(self):
        return ops_sin(self)
    
    def log(self):
        return ops_log(self)

In [3]:
class Tape(NamedTuple):
    inputs : List[str]
    outputs : List[str]
    propagate : 'Callable[List[Variable], List[Variable]]'

输入 `inputs` 和输出 `outputs` 是原始计算的输入和输出变量的唯一名称。反向传播使用链式规则，将函数的输出梯度传播给输入。其输入为 dL/dOutputs，输出为 dL/dinput。Tape只是一个记录所有计算的累积 List 列表。

下面提供了一种重置 Tape 的方法 `reset_tape`，方便运行多次自动微分，每次自动微分过程都会产生 Tape List。

In [4]:
gradient_tape : List[Tape] = []

def reset_tape():
    global _name
    _name = 1
    gradient_tape.clear()

In [5]:
def ops_mul(self, other):
    x = Variable(self.value * other.value)
    print(f'{x.name} = {self.name} * {other.name}')

    def propagate(dl_doutputs):
        dl_dx, = dl_doutputs
        dx_dself = other
        dx_dother = self
        dl_dself = dl_dx * dx_dself
        dl_dother = dl_dx * dx_dother
        dl_dinputs = [dl_dself, dl_dother]
        return dl_dinputs
    
    tape = Tape(inputs=[self.name, other.name], outputs=[x.name], propagate=propagate)
    gradient_tape.append(tape)
    return x

In [6]:
def ops_add(self, other):
    x = Variable(self.value + other.value)
    print(f'{x.name} = {self.name} + {other.name}')

    def propagate(dl_doutputs):
        dl_dx, = dl_doutputs
        dx_dself = Variable(1.0)
        dx_dother = Variable(1.0)
        dl_dself = dl_dx * dx_dself
        dl_dother = dl_dx * dx_dother
        dl_dinputs = [dl_dself, dl_dother]
        return dl_dinputs
    
    tape = Tape(inputs=[self.name, other.name], outputs=[x.name], propagate=propagate)
    gradient_tape.append(tape)
    return x

In [7]:
def ops_sub(self, other):
    x = Variable(self.value - other.value)
    print(f'{x.name} = {self.name} - {other.name}')

    def propagate(dl_doutputs):
        dl_dx, = dl_doutputs
        dx_dself = Variable(1.0)
        dx_dother = Variable(-1.0)
        dl_dself = dl_dx * dx_dself
        dl_dother = dl_dx * dx_dother
        dl_dinputs = [dl_dself, dl_dother]
        return dl_dinputs
    
    tape = Tape(inputs=[self.name, other.name], outputs=[x.name], propagate=propagate)
    gradient_tape.append(tape)
    return x

In [8]:
def ops_sin(self):
    x = Variable(np.sin(self.value))
    print(f'{x.name} = sin({self.name})')

    def propagate(dl_doutputs):
        dl_dx, = dl_doutputs
        dx_dself = Variable(np.cos(self.value))
        dl_dself = dl_dx * dx_dself
        dl_dinputs = [dl_dself]
        return dl_dinputs
    
    tape = Tape(inputs=[self.name], outputs=[x.name], propagate=propagate)
    gradient_tape.append(tape)
    return x

In [9]:
def ops_log(self):
    x = Variable(np.log(self.value))
    print(f'{x.name} = log({self.name})')

    def propagate(dl_doutputs):
        dl_dx, = dl_doutputs
        dx_dself = Variable(1.0 / self.value)
        dl_dself = dl_dx * dx_dself
        dl_dinputs = [dl_dself]
        return dl_dinputs

    tape = Tape(inputs=[self.name], outputs=[x.name], propagate=propagate)
    gradient_tape.append(tape)
    return x

In [10]:
def grad(l, results):
    dl_d = {}
    dl_d[l.name] = Variable(1.0)
    print("dl_d", dl_d)

    def gather_grad(entries):
        return [dl_d[entry] if entry in dl_d else None for entry in entries]
    
    for entry in reversed(gradient_tape):
        print(entry)
        dl_doutputs = gather_grad(entry.outputs)
        dl_dinputs = entry.propagate(dl_doutputs)

        for input, dl_dinput in zip(entry.inputs, dl_dinputs):
            if input not in dl_d:
                dl_d[input] = dl_dinput
            else:
                dl_d[input] += dl_dinput

    for name, value in dl_d.items():
        print(f'dl_d{name} = {value.value}')

    return gather_grad(result.name for result in results)

In [11]:
reset_tape()

x = Variable.constant(2.0, name="v-1")
y = Variable.constant(5.0, name="v0")

f = Variable.log(x) + x * y - Variable.sin(y)
print(f)

v-1 = 2.0
v0 = 5.0
v1 = log(v-1)
v2 = v-1 * v0
v3 = v1 + v2
v4 = sin(v0)
v5 = v3 - v4
11.652071455223084


In [12]:
dx, dy = grad(f, [x, y])
print("dx", dx)
print("dy", dy)

dl_d {'v5': 1.0}
Tape(inputs=['v3', 'v4'], outputs=['v5'], propagate=<function ops_sub.<locals>.propagate at 0x7fbd98b24a60>)
v9 = v6 * v7
v10 = v6 * v8
Tape(inputs=['v0'], outputs=['v4'], propagate=<function ops_sin.<locals>.propagate at 0x7fbd98b249d0>)
v12 = v10 * v11
Tape(inputs=['v1', 'v2'], outputs=['v3'], propagate=<function ops_add.<locals>.propagate at 0x7fbd98b24940>)
v15 = v9 * v13
v16 = v9 * v14
Tape(inputs=['v-1', 'v0'], outputs=['v2'], propagate=<function ops_mul.<locals>.propagate at 0x7fbd98b248b0>)
v17 = v16 * v0
v18 = v16 * v-1
v19 = v12 + v18
Tape(inputs=['v-1'], outputs=['v1'], propagate=<function ops_log.<locals>.propagate at 0x7fbd98b24820>)
v21 = v15 * v20
v22 = v17 + v21
dl_dv5 = 1.0
dl_dv3 = 1.0
dl_dv4 = -1.0
dl_dv0 = 1.7163378145367738
dl_dv1 = 1.0
dl_dv2 = 1.0
dl_dv-1 = 5.5
dx 5.5
dy 1.7163378145367738
