In [1]:
import heapq
import weakref
import functools
from numbers import Number
from collections import namedtuple
from collections.abc import Iterable

import numpy as np
from typing import NoReturn

In [2]:
class Config:
    enable_backprop = True

In [3]:
import contextlib

@contextlib.contextmanager
def using_config(name, value):
    old_value = getattr(Config, name)
    setattr(Config, name, value)
    try:
        yield
    finally:
        setattr(Config, name, old_value)

In [4]:
def no_grad():
    return using_config('enable_backprop', False)

In [5]:
class Variable:
    def __init__(self, data, name=None):
        if data is not None:
            if not isinstance(data, np.ndarray):
                raise TypeError(f"{type(data)} is not supported")

        self.data = data
        self.name = name
        self.grad = None
        self.creator = None
        self.generation = 0
        
    def __len__(self):
        return len(self.data)
    
    def __repr__(self):
        if self.data is None:
            return "variable(None)"
        p = str(self.data).replace("\n", "\n" + " "*9)
        return f"variable({p})"
        
    @property
    def shape(self):
        return self.data.shape
    
    @property
    def ndim(self):
        return self.data.ndim
    
    @property
    def size(self):
        return self.data.size
    
    @property
    def dtype(self):
        return self.data.dtype

    def set_creator(self, function) -> NoReturn:
        self.creator = function
        self.generation = function.generation + 1
        return None

    def backward(self, retain_grad=False) -> NoReturn:
        if self.grad is None:
            self.grad = np.ones_like(self.data)
            
        def priority_set(iterable_queue):
            return PrioritySet()(iterable_queue)
        
        functions_list = priority_set([self.creator])

        while functions_list:
            function = functions_list.pop()
            
            gys = [output().grad for output in function.outputs]
            gxs = function.backward(*gys)
            
            if not isinstance(gxs, tuple):
                gxs = (gxs,)
            
            for x, gx in zip(function.inputs, gxs):
                if x.grad is None:
                    x.grad = gx
                else:
                    x.grad = x.grad + gx

                if x.creator is not None:
                    functions_list.add(PriorityItem(x.creator))

            if not retain_grad:
                for y in function.outputs:
                    y().grad = None

        return None

    def cleargrad(self) -> NoReturn:
        self.grad = None
        return None


class Function:
    def __call__(self, *inputs):
        xs = [x.data for x in inputs]
        ys = self.forward(*xs)
        if not isinstance(ys, tuple):
            ys = (ys,)
        outputs = [Variable(as_array(y)) for y in ys]

        
        if Config.enable_backprop:
            self.generation = max([x.generation for x in inputs])
            for output in outputs:
                output.set_creator(self)
            
            self.inputs = inputs
            self.outputs = [weakref.ref(output) for output in outputs]
        
        return outputs if len(outputs) > 1 else outputs[0]
    
    def __lt__(self, other):
        return self.generation < other.generation
    
    def __eq__(self, other):
        return self.generation == other.generation

    def forward(self, xs):
        raise NotImplementedError()

    def backward(self, gys):
        raise NotImplementedError()


class Add(Function):
    def forward(self, x0: Number, x1: Number) -> Number:
        y = x0 + x1
        return y
    
    def backward(self, gy: Number) -> tuple:
        return gy, gy
    

class Mul(Function):
    def forward(self, x0: Number, x1: Number) -> Number:
        y = x0 * x1
        return y

    def backward(self, gy: Number) -> tuple:
        x0, x1 = self.inputs[0].data, self.inputs[1].data
        return gy * x1, gy * x0


class Square(Function):
    def forward(self, x: Number) -> Number:
        y = x ** 2
        return y

    def backward(self, gy: Number) -> Number:
        x = self.inputs[0].data
        gx = 2 * x * gy
        return gx
    

class Cube(Function):
    def forward(self, x: Number) -> Number:
        y = x ** 3
        return y

    def backward(self, gy: Number) -> Number:
        x = self.inputs[0].data
        gx = 3 * (x ** 2) * gy
        return gx


class Exp(Function):
    def forward(self, x: Number) -> Number:
        y = np.exp(x)
        return y
    
    def backward(self, gy: Number) -> Number:
        x = self.inputs[0].data
        gx = np.exp(x) * gy
        return gx


def add(x0: Number ,x1: Number) -> Number:
    return Add()(x0, x1)


def mul(x0: Number ,x1: Number) -> Number:
    return Mul()(x0, x1)


def square(x: Number) -> Number:
    return Square()(x)


def cube(x: Number) -> Number:
    return Cube()(x)


def exp(x: Number) -> Number:
    return Exp()(x)


def as_array(x) -> np.ndarray:
    if np.isscalar(x):
        return np.array(x)
    return x


@functools.total_ordering
class PriorityItem(object):
    def __init__(self, obj):
        self.obj = obj
        
    def __lt__(self, other):
        return isinstance(other, PriorityItem) and self.obj > other.obj
    

class PrioritySet(object):
    Item = namedtuple("Item", ["priority", "item"])
    
    def __call__(self, queue=None):
        self.maxheap = []
        self.heapset = set()
        if queue is None:
            return self
        
        if not isinstance(queue, Iterable):
            print(f"{type(queue)} is not iterable")
            return None
        
        queue = map(PriorityItem, queue)
        for reverse_obj in queue:
            self.add(reverse_obj)
        return self

    def add(self, x: PriorityItem) -> None:
        if id(x.obj) not in self.heapset:
            heapq.heappush(self.maxheap, x)
            self.heapset.add(id(x.obj))
        
    def pop(self):
        x = heapq.heappop(self.maxheap)
        self.heapset.remove(id(x.obj))
        return x.obj
    
    def __len__(self):
        return len(self.maxheap)


In [6]:
a = Variable(np.array(3.0))
b = Variable(np.array(2.0))
c = Variable(np.array(1.0))

y = add(mul(a,b), c)
y.backward()

print(y)
print(a.grad)
print(b.grad)

variable(7.0)
2.0
3.0


# define multiplication and addition operators

In [13]:
class Variable:
    def __init__(self, data, name=None):
        if data is not None:
            if not isinstance(data, np.ndarray):
                raise TypeError(f"{type(data)} is not supported")

        self.data = data
        self.name = name
        self.grad = None
        self.creator = None
        self.generation = 0
        
    def __len__(self):
        return len(self.data)
    
    def __repr__(self):
        if self.data is None:
            return "variable(None)"
        p = str(self.data).replace("\n", "\n" + " "*9)
        return f"variable({p})"
    
    # def __mul__(self, other):
    #     return mul(self, other)
        
    @property
    def shape(self):
        return self.data.shape
    
    @property
    def ndim(self):
        return self.data.ndim
    
    @property
    def size(self):
        return self.data.size
    
    @property
    def dtype(self):
        return self.data.dtype

    def set_creator(self, function) -> NoReturn:
        self.creator = function
        self.generation = function.generation + 1
        return None

    def backward(self, retain_grad=False) -> NoReturn:
        if self.grad is None:
            self.grad = np.ones_like(self.data)
            
        def priority_set(iterable_queue):
            return PrioritySet()(iterable_queue)
        
        functions_list = priority_set([self.creator])

        while functions_list:
            function = functions_list.pop()
            
            gys = [output().grad for output in function.outputs]
            gxs = function.backward(*gys)
            
            if not isinstance(gxs, tuple):
                gxs = (gxs,)
            
            for x, gx in zip(function.inputs, gxs):
                if x.grad is None:
                    x.grad = gx
                else:
                    x.grad = x.grad + gx

                if x.creator is not None:
                    functions_list.add(PriorityItem(x.creator))

            if not retain_grad:
                for y in function.outputs:
                    y().grad = None

        return None

    def cleargrad(self) -> NoReturn:
        self.grad = None
        return None


class Function:
    def __call__(self, *inputs):
        xs = [x.data for x in inputs]
        ys = self.forward(*xs)
        if not isinstance(ys, tuple):
            ys = (ys,)
        outputs = [Variable(as_array(y)) for y in ys]

        
        if Config.enable_backprop:
            self.generation = max([x.generation for x in inputs])
            for output in outputs:
                output.set_creator(self)
            
            self.inputs = inputs
            self.outputs = [weakref.ref(output) for output in outputs]
        
        return outputs if len(outputs) > 1 else outputs[0]
    
    def __lt__(self, other):
        return self.generation < other.generation
    
    def __eq__(self, other):
        return self.generation == other.generation

    def forward(self, xs):
        raise NotImplementedError()

    def backward(self, gys):
        raise NotImplementedError()


class Add(Function):
    def forward(self, x0: Number, x1: Number) -> Number:
        y = x0 + x1
        return y
    
    def backward(self, gy: Number) -> tuple:
        return gy, gy
    

class Mul(Function):
    def forward(self, x0: Number, x1: Number) -> Number:
        y = x0 * x1
        return y

    def backward(self, gy: Number) -> tuple:
        x0, x1 = self.inputs[0].data, self.inputs[1].data
        return gy * x1, gy * x0


class Square(Function):
    def forward(self, x: Number) -> Number:
        y = x ** 2
        return y

    def backward(self, gy: Number) -> Number:
        x = self.inputs[0].data
        gx = 2 * x * gy
        return gx
    

class Cube(Function):
    def forward(self, x: Number) -> Number:
        y = x ** 3
        return y

    def backward(self, gy: Number) -> Number:
        x = self.inputs[0].data
        gx = 3 * (x ** 2) * gy
        return gx


class Exp(Function):
    def forward(self, x: Number) -> Number:
        y = np.exp(x)
        return y
    
    def backward(self, gy: Number) -> Number:
        x = self.inputs[0].data
        gx = np.exp(x) * gy
        return gx


def add(x0: Number ,x1: Number) -> Number:
    return Add()(x0, x1)


def mul(x0: Number ,x1: Number) -> Number:
    return Mul()(x0, x1)


def square(x: Number) -> Number:
    return Square()(x)


def cube(x: Number) -> Number:
    return Cube()(x)


def exp(x: Number) -> Number:
    return Exp()(x)


def as_array(x) -> np.ndarray:
    if np.isscalar(x):
        return np.array(x)
    return x


@functools.total_ordering
class PriorityItem(object):
    def __init__(self, obj):
        self.obj = obj
        
    def __lt__(self, other):
        return isinstance(other, PriorityItem) and self.obj > other.obj
    

class PrioritySet(object):
    Item = namedtuple("Item", ["priority", "item"])
    
    def __call__(self, queue=None):
        self.maxheap = []
        self.heapset = set()
        if queue is None:
            return self
        
        if not isinstance(queue, Iterable):
            print(f"{type(queue)} is not iterable")
            return None
        
        queue = map(PriorityItem, queue)
        for reverse_obj in queue:
            self.add(reverse_obj)
        return self

    def add(self, x: PriorityItem) -> None:
        if id(x.obj) not in self.heapset:
            heapq.heappush(self.maxheap, x)
            self.heapset.add(id(x.obj))
        
    def pop(self):
        x = heapq.heappop(self.maxheap)
        self.heapset.remove(id(x.obj))
        return x.obj
    
    def __len__(self):
        return len(self.maxheap)


In [14]:
Variable.__mul__ = mul
Variable.__add__ = add

In [15]:
a = Variable(np.array(3.0))
b = Variable(np.array(2.0))
c = Variable(np.array(1.0))

# y = add(mul(a,b), c)
y = a * b + c
y.backward()

print(y)
print(a.grad)
print(b.grad)

variable(7.0)
2.0
3.0


## implicitly transform non-Variable instance to be Variable instance

In [21]:
try:
    x0 = Variable(np.array(2.0))
    x1 = np.array(3.0)
    y = x0 + x1
    print(y)
except AttributeError:
    s = "'+' is not supported between instance of '{var0}' and '{var1}'"
    print(s.format(var0=type(x0).__name__, var1=type(x1).__name__))

'+' is not supported between instance of 'Variable' and 'ndarray'


In [22]:
def as_variable(obj):
    if isinstance(obj, Variable):
        return obj
    return Variable(obj)

In [41]:
class Variable:
    __array_priority__ = 200
    
    def __init__(self, data, name=None):
        if data is not None:
            if not isinstance(data, np.ndarray):
                raise TypeError(f"{type(data)} is not supported")

        self.data = data
        self.name = name
        self.grad = None
        self.creator = None
        self.generation = 0
        
    def __len__(self):
        return len(self.data)
    
    def __repr__(self):
        if self.data is None:
            return "variable(None)"
        p = str(self.data).replace("\n", "\n" + " "*9)
        return f"variable({p})"
    
    def __mul__(self, other):
        return mul(self, other)
    
    def __add__(self, other):
        return add(self, other)
        
    @property
    def shape(self):
        return self.data.shape
    
    @property
    def ndim(self):
        return self.data.ndim
    
    @property
    def size(self):
        return self.data.size
    
    @property
    def dtype(self):
        return self.data.dtype

    def set_creator(self, function) -> NoReturn:
        self.creator = function
        self.generation = function.generation + 1
        return None

    def backward(self, retain_grad=False) -> NoReturn:
        if self.grad is None:
            self.grad = np.ones_like(self.data)
            
        def priority_set(iterable_queue):
            return PrioritySet()(iterable_queue)
        
        functions_list = priority_set([self.creator])

        while functions_list:
            function = functions_list.pop()
            
            gys = [output().grad for output in function.outputs]
            gxs = function.backward(*gys)
            
            if not isinstance(gxs, tuple):
                gxs = (gxs,)
            
            for x, gx in zip(function.inputs, gxs):
                if x.grad is None:
                    x.grad = gx
                else:
                    x.grad = x.grad + gx

                if x.creator is not None:
                    functions_list.add(PriorityItem(x.creator))

            if not retain_grad:
                for y in function.outputs:
                    y().grad = None

        return None

    def cleargrad(self) -> NoReturn:
        self.grad = None
        return None


class Function:
    def __call__(self, *inputs):
        inputs = [as_variable(x) for x in inputs]
        
        xs = [x.data for x in inputs]
        ys = self.forward(*xs)
        if not isinstance(ys, tuple):
            ys = (ys,)
        outputs = [Variable(as_array(y)) for y in ys]

        
        if Config.enable_backprop:
            self.generation = max([x.generation for x in inputs])
            for output in outputs:
                output.set_creator(self)
            
            self.inputs = inputs
            self.outputs = [weakref.ref(output) for output in outputs]
        
        return outputs if len(outputs) > 1 else outputs[0]
    
    def __lt__(self, other):
        return self.generation < other.generation
    
    def __eq__(self, other):
        return self.generation == other.generation

    def forward(self, xs):
        raise NotImplementedError()

    def backward(self, gys):
        raise NotImplementedError()


class Add(Function):
    def forward(self, x0: Number, x1: Number) -> Number:
        y = x0 + x1
        return y
    
    def backward(self, gy: Number) -> tuple:
        return gy, gy
    

class Mul(Function):
    def forward(self, x0: Number, x1: Number) -> Number:
        y = x0 * x1
        return y

    def backward(self, gy: Number) -> tuple:
        x0, x1 = self.inputs[0].data, self.inputs[1].data
        return gy * x1, gy * x0


class Square(Function):
    def forward(self, x: Number) -> Number:
        y = x ** 2
        return y

    def backward(self, gy: Number) -> Number:
        x = self.inputs[0].data
        gx = 2 * x * gy
        return gx
    

class Cube(Function):
    def forward(self, x: Number) -> Number:
        y = x ** 3
        return y

    def backward(self, gy: Number) -> Number:
        x = self.inputs[0].data
        gx = 3 * (x ** 2) * gy
        return gx


class Exp(Function):
    def forward(self, x: Number) -> Number:
        y = np.exp(x)
        return y
    
    def backward(self, gy: Number) -> Number:
        x = self.inputs[0].data
        gx = np.exp(x) * gy
        return gx


def add(x0: Number ,x1: Number) -> Number:
    x1 = as_array(x1)
    return Add()(x0, x1)


def mul(x0: Number ,x1: Number) -> Number:
    x1 = as_array(x1)
    return Mul()(x0, x1)


def square(x: Number) -> Number:
    return Square()(x)


def cube(x: Number) -> Number:
    return Cube()(x)


def exp(x: Number) -> Number:
    return Exp()(x)


def as_array(x) -> np.ndarray:
    if np.isscalar(x):
        return np.array(x)
    return x


@functools.total_ordering
class PriorityItem(object):
    def __init__(self, obj):
        self.obj = obj
        
    def __lt__(self, other):
        return isinstance(other, PriorityItem) and self.obj > other.obj
    

class PrioritySet(object):
    Item = namedtuple("Item", ["priority", "item"])
    
    def __call__(self, queue=None):
        self.maxheap = []
        self.heapset = set()
        if queue is None:
            return self
        
        if not isinstance(queue, Iterable):
            print(f"{type(queue)} is not iterable")
            return None
        
        queue = map(PriorityItem, queue)
        for reverse_obj in queue:
            self.add(reverse_obj)
        return self

    def add(self, x: PriorityItem) -> None:
        if id(x.obj) not in self.heapset:
            heapq.heappush(self.maxheap, x)
            self.heapset.add(id(x.obj))
        
    def pop(self):
        x = heapq.heappop(self.maxheap)
        self.heapset.remove(id(x.obj))
        return x.obj
    
    def __len__(self):
        return len(self.maxheap)


In [27]:
x = Variable(np.array(2.0))
y = x + np.array(3.0)
print(y)

variable(5.0)


In [28]:
x = Variable(np.array(2.0))
y = x + 3.0
print(y)

variable(5.0)


# add __rmul__ to handle left side of operators

In [29]:
x = Variable(np.array(2.0))
y = 3.0 + x
print(y)

TypeError: unsupported operand type(s) for +: 'float' and 'Variable'

In [42]:
Variable.__add__ = add
Variable.__radd__ = add
Variable.__mul__ = mul
Variable.__rmul__ = mul

In [31]:
x = Variable(np.array(2.0))
y = 3.0 * x + 1.0
print(y)

x = Variable(np.array(2.0))
y = 3.0 + x
print(y)

variable(7.0)
variable(5.0)


# set operator priority

In [40]:
x = Variable(np.array(2.0))
y = np.array([2.0]) + x
print(y)

[variable(4.0)]


In [None]:
### set Variable operation priority
# class Variable:
#     __array_priority__ = 200

In [43]:
x = Variable(np.array(2.0))
y = np.array([2.0]) + x
print(y)

variable([4.])
