In [6]:
import itertools
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

sns.set_style('darkgrid')
np.random.seed(seed=1)

# Przygotowanie danych (Suma 18-bitowa) 
# sequence_len = bits + 1 (18 bitów + ewentualny bit przeniesienia)
bits = 18
sequence_len = bits + 1 
nb_train = 1000 

def create_dataset(nb_samples, seq_len):
    max_int = 2**(seq_len-1) - 1
    format_str = '{:0' + str(seq_len) + 'b}'
    X = np.zeros((nb_samples, seq_len, 2))
    T = np.zeros((nb_samples, seq_len, 1))
    for i in range(nb_samples):
        nb1 = np.random.randint(0, max_int)
        nb2 = np.random.randint(0, max_int)
        X[i,:,0] = list(reversed([int(b) for b in format_str.format(nb1)]))
        X[i,:,1] = list(reversed([int(b) for b in format_str.format(nb2)]))
        T[i,:,0] = list(reversed([int(b) for b in format_str.format(nb1 + nb2)]))
    return X, T

X_train, T_train = create_dataset(nb_train, sequence_len)

# 2. Komponenty Sieci

class TensorLinear(object):
    def __init__(self, n_in, n_out, tensor_order, W=None, b=None):
        a = np.sqrt(6.0 / (n_in + n_out))
        self.W = np.random.uniform(-a, a, (n_in, n_out)) if W is None else W
        self.b = np.zeros((n_out)) if b is None else b
        self.bpAxes = tuple(range(tensor_order-1))

    def forward(self, X):
        return np.tensordot(X, self.W, axes=((-1),(0))) + self.b

    def backward(self, X, gY):
        gW = np.tensordot(X, gY, axes=(self.bpAxes, self.bpAxes))
        gB = np.sum(gY, axis=self.bpAxes)
        gX = np.tensordot(gY, self.W.T, axes=((-1),(0)))  
        return gX, gW, gB

class LogisticClassifier(object):
    def forward(self, X):
        return 1. / (1. + np.exp(-np.clip(X, -15, 15))) 
    
    def backward(self, Y, T):
        return (Y - T) / (Y.shape[0] * Y.shape[1])
    
    def loss(self, Y, T):
        return -np.mean((T * np.log(Y + 1e-10)) + ((1-T) * np.log(1-Y + 1e-10)))

class TanH(object):
    def forward(self, X): return np.tanh(X) 
    def backward(self, Y, output_grad): return (1.0 - (Y**2)) * output_grad

class RecurrentStateUpdate(object):
    def __init__(self, nbStates, W, b):
        self.linear = TensorLinear(nbStates, nbStates, 2, W, b)
        self.tanh = TanH()

    def forward(self, Xk, Sk):
        return self.tanh.forward(Xk + self.linear.forward(Sk))
    
    def backward(self, Sk0, Sk1, output_grad):
        gZ = self.tanh.backward(Sk1, output_grad)
        gSk0, gW, gB = self.linear.backward(Sk0, gZ)
        return gZ, gSk0, gW, gB

class RecurrentStateUnfold(object):
    def __init__(self, nbStates, nbTimesteps):
        a = np.sqrt(6. / (nbStates * 2))
        self.W = np.random.uniform(-a, a, (nbStates, nbStates))
        self.b = np.zeros((self.W.shape[0]))
        self.S0 = np.zeros(nbStates)
        self.nbTimesteps = nbTimesteps
        self.stateUpdate = RecurrentStateUpdate(nbStates, self.W, self.b)
        
    def forward(self, X):
        S = np.zeros((X.shape[0], X.shape[1]+1, self.W.shape[0]))
        S[:,0,:] = self.S0
        for k in range(self.nbTimesteps):
            S[:,k+1,:] = self.stateUpdate.forward(X[:,k,:], S[:,k,:])
        return S
    
    def backward(self, X, S, gY):
        gSk = np.zeros_like(gY[:,self.nbTimesteps-1,:])
        gZ = np.zeros_like(X)
        gWSum, gBSum = np.zeros_like(self.W), np.zeros_like(self.b)
        for k in range(self.nbTimesteps-1, -1, -1):
            gSk += gY[:,k,:]
            gZ[:,k,:], gSk, gW, gB = self.stateUpdate.backward(S[:,k,:], S[:,k+1,:], gSk)
            gWSum += gW
            gBSum += gB
        return gZ, gWSum, gBSum, np.sum(gSk, axis=0)


class RnnBinaryAdder(object):
    def __init__(self, nb_of_inputs, nb_of_outputs, nb_of_states, sequence_len):
        self.tensorInput = TensorLinear(nb_of_inputs, nb_of_states, 3)
        self.rnnUnfold = RecurrentStateUnfold(nb_of_states, sequence_len)
        self.tensorOutput = TensorLinear(nb_of_states, nb_of_outputs, 3)
        self.classifier = LogisticClassifier()
        
    def forward(self, X):
        recIn = self.tensorInput.forward(X)
        S = self.rnnUnfold.forward(recIn)
        Z = self.tensorOutput.forward(S[:,1:,:])
        Y = self.classifier.forward(Z)
        return recIn, S, Z, Y
    
    def backward(self, X, Y, recIn, S, T):
        gZ = self.classifier.backward(Y, T)
        gRecOut, gWout, gBout = self.tensorOutput.backward(S[:,1:,:], gZ)
        gRnnIn, gWrec, gBrec, gS0 = self.rnnUnfold.backward(recIn, S, gRecOut)
        _, gWin, gBin = self.tensorInput.backward(X, gRnnIn)
        return gWout, gBout, gWrec, gBrec, gWin, gBin, gS0
    
    def getOutput(self, X): return self.forward(X)[3]
    def getBinaryOutput(self, X): return np.around(self.getOutput(X))
    def loss(self, Y, T): return self.classifier.loss(Y, T)
    
    def getParamGrads(self, X, T):
        recIn, S, Z, Y = self.forward(X)
        gWout, gBout, gWrec, gBrec, gWin, gBin, gS0 = self.backward(X, Y, recIn, S, T)
        return [g for g in itertools.chain(
                np.nditer(gS0),
                np.nditer(gWin),
                np.nditer(gBin),
                np.nditer(gWrec),
                np.nditer(gBrec),
                np.nditer(gWout),
                np.nditer(gBout))]

    def get_params_iter(self):
        return itertools.chain(
            np.nditer(self.rnnUnfold.S0, op_flags=['readwrite']),
            np.nditer(self.tensorInput.W, op_flags=['readwrite']),
            np.nditer(self.tensorInput.b, op_flags=['readwrite']),
            np.nditer(self.rnnUnfold.W, op_flags=['readwrite']),
            np.nditer(self.rnnUnfold.b, op_flags=['readwrite']),
            np.nditer(self.tensorOutput.W, op_flags=['readwrite']), 
            np.nditer(self.tensorOutput.b, op_flags=['readwrite']))

# 4. Trening z RMSProp i Momentum 

nb_of_states = 12 
RNN = RnnBinaryAdder(2, 1, nb_of_states, sequence_len)

lmbd, learning_rate, momentum_term = 0.5, 0.01, 0.9
mb_size, epochs = 100, 20

nbParameters = sum(1 for _ in RNN.get_params_iter())
maSquare = [0.0] * nbParameters
Vs = [0.0] * nbParameters
losses = []

print("Rozpoczynanie treningu...")
for epoch in range(epochs):
    for mb in range(nb_train // mb_size):
        X_mb = X_train[mb*mb_size:(mb+1)*mb_size]
        T_mb = T_train[mb*mb_size:(mb+1)*mb_size]
        
        V_tmp = [v * momentum_term for v in Vs]
        for pIdx, P in enumerate(RNN.get_params_iter()): P += V_tmp[pIdx]
        
        backprop_grads = RNN.getParamGrads(X_mb, T_mb)    
        
        for pIdx, P in enumerate(RNN.get_params_iter()):
            maSquare[pIdx] = lmbd * maSquare[pIdx] + (1-lmbd) * backprop_grads[pIdx]**2
            pGradNorm = (learning_rate * backprop_grads[pIdx]) / (np.sqrt(maSquare[pIdx]) + 1e-7)
            Vs[pIdx] = V_tmp[pIdx] - pGradNorm     
            P -= pGradNorm
            
    current_loss = RNN.loss(RNN.getOutput(X_train[:200]), T_train[:200])
    losses.append(current_loss)
    if epoch % 1 == 0:
        print(f"Epoch {epoch}, Loss: {current_loss:.6f}")


print("\nWyniki testowe:")
X_t, T_t = create_dataset(5, sequence_len)
Y_t = RNN.getBinaryOutput(X_t)
for i in range(5):
    x1 = int("".join(reversed([str(int(d)) for d in X_t[i,:,0]])), 2)
    x2 = int("".join(reversed([str(int(d)) for d in X_t[i,:,1]])), 2)
    target = int("".join(reversed([str(int(d)) for d in T_t[i,:,0]])), 2)
    pred = int("".join(reversed([str(int(d)) for d in Y_t[i,:,0]])), 2)
    status = "OK" if target == pred else "BŁĄD"
    print(f"Test {i+1}: {x1:7} + {x2:7} = {target:8} | Sieć: {pred:8} ({status})")

Rozpoczynanie treningu...
Epoch 0, Loss: 0.693978
Epoch 1, Loss: 0.678093
Epoch 2, Loss: 0.614516
Epoch 3, Loss: 0.445526
Epoch 4, Loss: 0.283458
Epoch 5, Loss: 0.167323
Epoch 6, Loss: 0.073782
Epoch 7, Loss: 0.024623
Epoch 8, Loss: 0.005985
Epoch 9, Loss: 0.001163
Epoch 10, Loss: 0.000202
Epoch 11, Loss: 0.000036
Epoch 12, Loss: 0.000007
Epoch 13, Loss: 0.000001
Epoch 14, Loss: 0.000000
Epoch 15, Loss: 0.000000
Epoch 16, Loss: 0.000000
Epoch 17, Loss: 0.000000
Epoch 18, Loss: 0.000000
Epoch 19, Loss: 0.000000

Wyniki testowe:
Test 1:   94367 +  231698 =   326065 | Sieć:   326065 (OK)
Test 2:  180392 +   85253 =   265645 | Sieć:   265645 (OK)
Test 3:  188528 +  183772 =   372300 | Sieć:   372300 (OK)
Test 4:   24015 +  190568 =   214583 | Sieć:   214583 (OK)
Test 5:   99818 +  222546 =   322364 | Sieć:   322364 (OK)
