In [1]:
from torch import FloatTensor, LongTensor
import math

In [2]:
class Module(object):
    def forward(self, *input):
        raise NotImplementedError

    def backward(self, *gradwrtoutput):
        raise NotImplementedError

    def param(self):
        return []

In [3]:
class Linear(Module):
    def __init__(self, in_features, out_features, bias=True):
        super(Linear).__init__()

        self.name = 'Linear'

        self.in_features = in_features
        self.out_features = out_features

        self.weight = FloatTensor(in_features, out_features)
        self.weight_grad = FloatTensor(in_features, out_features)

        if bias:
            self.bias = FloatTensor(out_features)
            self.bias_grad = FloatTensor(out_features)
        else:
            self.bias = None
            self.bias_grad = None

        self.reset_parameters()

    def reset_parameters(self):
        stdv = 1. / math.sqrt(self.weight.size(1))
        self.weight.uniform_(-stdv, stdv)
        self.weight_grad.fill_(0)
        if self.bias is not None:
            self.bias.uniform_(-stdv, stdv)
            self.bias_grad.fill_(0)

    def forward(self, input):
        # Y = X * W + b
        self.input = input.clone()
        if self.bias is not None:
            return self.input.matmul(self.weight).add(self.bias)
        else:
            return self.input.matmul(self.weight)

    def backward(self, gradwrtoutput):
        # dW = X^T * dL/dY
        self.weight_grad = self.input.t().matmul(gradwrtoutput)
        # db = (dL/dY)^T * 1
        if self.bias is not None:
            self.bias_grad = gradwrtoutput.t().sum(1)
        # dX = dL/dY * W^T
        return gradwrtoutput.matmul(self.weight.t())

    def param(self):
        if self.bias is not None:
            return [(self.weight, self.weight_grad),
                    (self.bias, self.bias_grad)]
        else:
            return [(self.weight, self.weight_grad)]

In [4]:
class ReLU(Module):
    def __init__(self):
        super(ReLU).__init__()

        self.name = 'ReLU'

    def forward(self, input):
        # Y = max(0,X)
        self.input = input.clone()
        return self.input.mul(self.input.gt(0).float())

    def backward(self, gradwrtoutput):
        if self.input is not None:
            return gradwrtoutput.mul(self.input.gt(0).float())
        else:
            print("Forward First")
            return None

    def param(self):
        return []

In [5]:
class Tanh(Module):
    def __init__(self):
        super(Tanh).__init__()
        self.name = 'Tanh'

    def forward(self, input):
        # Y = (exp(X) - exp(-X))/(exp(X) + exp(-X))
        self.input = input.clone()
        return self.input.tanh()
        # self.input.masked_fill_(self.input.gt(50), 50)  # prevent overflow
        # return (self.input.exp() - self.input.mul(-1).exp()) / (self.input.exp() + self.input.mul(-1).exp())

    def backward(self, gradwrtoutput):
        # dY/dX = 4/(exp(x) + exp(-x))^2
        if self.input is not None:
            grad = 4. / (self.input.exp() + self.input.mul(-1).exp()).pow(2)
            return gradwrtoutput.mul(grad)
        else:
            print("Forward First")
            return None

    def param(self):
        return []

In [6]:
class Sequential(Module):
    def __init__(self):
        super(Sequential).__init__()
        self.name = 'Sequential'
        self.module_list = []

    def add(self, *module):
        for m in module:
            self.module_list.append(m)

    def forward(self, input):
        module_input = input.clone()
        for module in self.module_list:
            module_output = module.forward(module_input)
            module_input = module_output
        return module_output

    def backward(self, gradwrtoutput):
        grad = gradwrtoutput
        for module in self.module_list[::-1]:
            grad = module.backward(grad)
        # return grad

    def param(self):
        param_list = []
        for module in self.module_list:
            param_list.append(module.param())
        return param_list

In [7]:
class LossMSE():
    def __init__(self):
        super(LossMSE).__init__()
        self.name = 'LossMSE'

    def calculate(self, predict_value, true_value):
        self.predict_value = predict_value
        self.true_value = true_value
        self.loss = (self.predict_value - self.true_value).float().pow(2).mean()
        return self.loss

    def backward(self):
        return 2 * (self.predict_value - self.true_value).float()

In [8]:
def onehot(label):
    n = label.size(0)
    y = FloatTensor(n, 2)
    y[:, 0] = 2*(0.5-label)
    y[:, 1] = -y[:, 0]
    return y.long()

In [9]:
X_train = FloatTensor(1000, 2).uniform_(0, 1)
y_train = onehot(X_train.sub(0.5).pow(2).sum(1).lt(1./2./math.pi).float())

X_test = FloatTensor(1000, 2).uniform_(0, 1)
y_test = onehot(X_test.sub(0.5).pow(2).sum(1).lt(1./2./math.pi).float())

In [10]:
# Normalization
mu, std = X_train.mean(), X_train.std()
X_train.sub_(mu).div_(std)
X_test.sub_(mu).div_(std);

In [11]:
model = Sequential()
#model.add(Linear(2, 25), ReLU(), Linear(25, 2), Tanh())
model.add(Linear(2, 25), ReLU(), Linear(25, 25), ReLU(), Linear(25, 2), Tanh())

In [12]:
train_output = model.forward(X_train)
print("Before Train Error: {:.2%}".format(train_output.max(1)[1].ne(y_train.max(1)[1]).sum()/train_output.size(0)))

Before Train Error: 61.00%


In [13]:
lr = 0.01
mini_batch_size = 50
epoch = 10

criterion = LossMSE()

for e in range(epoch):
    print("******** Begin Epoch {} ********".format(e))
    for b in range(0, X_train.size(0), mini_batch_size):
        output = model.forward(X_train.narrow(0, b, mini_batch_size))
        #print(output)
        #predict = output.max(1)[1]
        loss = criterion.calculate(output, y_train.narrow(0, b, mini_batch_size).float())
        error = output.max(1)[1].ne(y_train.narrow(0, b, mini_batch_size).max(1)[1]).sum()/output.size(0)
        print("Epoch {} Batch {:2.0f}: {:4.2f}, {:6.2%}".format(e, b/mini_batch_size, loss, error))
        l_grad = criterion.backward()
        #print(l_grad)
        model.backward(l_grad)
        for layer in model.param():
            for p in layer:
                p[0].sub_(lr*p[1])
    print("******** After Epoch {} ********".format(e))
    output = model.forward(X_train)
    loss = criterion.calculate(output, y_train.float())
    error = output.max(1)[1].ne(y_train.max(1)[1]).sum()/output.size(0)
    print("\nLoss: {:4.2f}, Error: {:6.2%}\n".format(loss, error))
    #print(e, criterion.calculate(model.forward(X_train), y_train.float()))

******** Begin Epoch 0 ********
Epoch 0 Batch  0: 1.01, 48.00%
Epoch 0 Batch  1: 1.22, 58.00%
Epoch 0 Batch  2: 1.17, 48.00%
Epoch 0 Batch  3: 1.08, 64.00%
Epoch 0 Batch  4: 1.24, 50.00%
Epoch 0 Batch  5: 0.97, 32.00%
Epoch 0 Batch  6: 1.01, 50.00%
Epoch 0 Batch  7: 0.98, 44.00%
Epoch 0 Batch  8: 0.93, 30.00%
Epoch 0 Batch  9: 0.94, 36.00%
Epoch 0 Batch 10: 0.95, 44.00%
Epoch 0 Batch 11: 0.96, 50.00%
Epoch 0 Batch 12: 0.98, 46.00%
Epoch 0 Batch 13: 0.97, 42.00%
Epoch 0 Batch 14: 0.84, 34.00%
Epoch 0 Batch 15: 0.97, 52.00%
Epoch 0 Batch 16: 0.90, 52.00%
Epoch 0 Batch 17: 1.11, 48.00%
Epoch 0 Batch 18: 1.26, 52.00%
Epoch 0 Batch 19: 0.87, 42.00%
******** After Epoch 0 ********

Loss: 0.80, Error: 29.50%

******** Begin Epoch 1 ********
Epoch 1 Batch  0: 0.71, 22.00%
Epoch 1 Batch  1: 0.84, 34.00%
Epoch 1 Batch  2: 0.94, 36.00%
Epoch 1 Batch  3: 0.91, 36.00%
Epoch 1 Batch  4: 1.07, 50.00%
Epoch 1 Batch  5: 0.92, 36.00%
Epoch 1 Batch  6: 1.12, 50.00%
Epoch 1 Batch  7: 0.98, 32.00%
Epoch 1 

In [14]:
train_output = model.forward(X_train)
print("Train Error: {:.2%}".format(train_output.max(1)[1].ne(y_train.max(1)[1]).sum()/train_output.size(0)))

test_output = model.forward(X_test)
print("Test Error: {:.2%}".format(test_output.max(1)[1].ne(y_test.max(1)[1]).sum()/test_output.size(0)))

Train Error: 4.50%
Test Error: 5.70%
