In [126]:
import numpy as np
from typing import Literal

In [127]:
class GRU:
    def __init__(self, input_size, hidden_size, output_size, learning_rate: float = 0.01,
                 output_activation: Literal['linear', 'leaky_relu'] = 'linear',
                 leaky_relu_alpha: float = 0.01):
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size

        # Инициализация весов для GRU
        self.Wz = np.random.rand(hidden_size, input_size) * 0.01
        self.Uz = np.random.rand(hidden_size, hidden_size) * 0.01
        self.bz = np.zeros((hidden_size, 1))

        self.Wh = np.random.rand(hidden_size, input_size) * 0.01
        self.Uh = np.random.randn(hidden_size, hidden_size) * 0.01
        self.bh = np.zeros((hidden_size, 1))

        self.Wr = np.random.rand(hidden_size, input_size) * 0.01
        self.Ur = np.random.rand(hidden_size, hidden_size) * 0.01
        self.br = np.zeros((hidden_size, 1))

        self.Wo = np.random.rand(output_size, hidden_size) * 0.01
        self.bo = np.zeros((output_size, 1))

        self.learning_rate = learning_rate

        # Output neurons activation func + params
        self.output_activation = output_activation
        self.leaky_relu_alpha = leaky_relu_alpha

    def activation(self, x):
        # Гиперболический арксинус
        return np.asinh(x)
    
    def leaky_relu(self, x, k: float = 0.01):
        return np.where(x > 0, x, k * x)
    
    def linear(self, x):
        return x

    def forward(self, X):
        self.h = np.zeros((self.hidden_size, 1))
        self.y = []
        self.cache = []

        for t in range(len(X)):
            x_t = X[t].reshape(-1, 1)

            # Грейдиентное обновление для GRU
            z_t = self.sigmoid(self.Wz @ x_t + self.Uz @ self.h + self.bz)
            r_t = self.sigmoid(self.Wr @ x_t + self.Ur @ self.h + self.br)
            h_tilde = self.activation(self.Wh @ x_t + self.Uh @ (r_t * self.h) + self.bh)
            h_next = (1 - z_t) * h_tilde + z_t * self.h
            self.h = h_next

            # Сохранение активаций для обратного распространения
            self.cache.append((x_t, z_t, r_t, h_tilde, h_next))

            # Выходное значение
            y_t = self.Wo @ h_next + self.bo

            if self.output_activation == 'linear':
                y_t = self.linear(y_t)
            elif self.output_activation == 'leaky_relu':
                y_t = self.leaky_relu(y_t, self.leaky_relu_alpha)

            self.y.append(y_t)

        return np.array(self.y).squeeze(axis=-1)

    def sigmoid(self, x):                
        return 1 / (1 + np.exp(-x))

    def backward(self, X, Y, grad_clip_value=5.0):
        dWz = np.zeros_like(self.Wz)
        dUz = np.zeros_like(self.Uz)
        dbz = np.zeros_like(self.bz)

        dWh = np.zeros_like(self.Wh)
        dUh = np.zeros_like(self.Uh)
        dbh = np.zeros_like(self.bh)

        dWr = np.zeros_like(self.Wr)
        dUr = np.zeros_like(self.Ur)
        dbr = np.zeros_like(self.br)

        dWo = np.zeros_like(self.Wo)
        dbo = np.zeros_like(self.bo)

        dh_next = np.zeros_like(self.h)

        # Обратное распространение ошибки по времени
        for t in reversed(range(len(X))):
            x_t, z_t, r_t, h_tilde, h_next = self.cache[t]
            y_t = self.y[t]

            # Ошибка на выходе
            dy = y_t - Y[t]

            # Градиенты для весов выхода
            dWo += dy @ h_next.T
            dbo += dy

            # Градиенты для скрытого состояния
            dh = self.Wo.T @ dy + dh_next
            dh_tilde = dh * (1 - z_t)
            dz = dh * (h_next - h_tilde)
            dWh += dh_tilde * (1 - h_tilde ** 2) @ x_t.T
            dUh += dh_tilde * (1 - h_tilde ** 2) @ (r_t * self.h).T
            dbh += dh_tilde * (1 - h_tilde ** 2)

            # Градиенты для обновлений
            dUr += dz * r_t * self.h @ self.h.T
            dWr += dz * r_t * self.h @ x_t.T
            dbr += dz * r_t * self.h

            # Градиенты для сброса и обновления
            dUz += dz * (1 - z_t) @ self.h.T
            dWz += dz * (1 - z_t) @ x_t.T
            dbz += dz * (1 - z_t)

            dh_next = (1 - z_t) * dh_tilde + z_t * dh
        
        # Ограничение градиентов, чтобы избежать взрыва
        gradients = [dWz, dUz, dbz, dWh, dUh, dbh, dWr, dUr, dbr, dWo, dbo]
        for grad in gradients:
            np.clip(grad, -grad_clip_value, grad_clip_value, out=grad)

        # Обновление весов
        self.Wz -= self.learning_rate * dWz
        self.Uz -= self.learning_rate * dUz
        self.bz -= self.learning_rate * dbz

        self.Wh -= self.learning_rate * dWh
        self.Uh -= self.learning_rate * dUh
        self.bh -= self.learning_rate * dbh

        self.Wr -= self.learning_rate * dWr
        self.Ur -= self.learning_rate * dUr
        self.br -= self.learning_rate * dbr

        self.Wo -= self.learning_rate * dWo
        self.bo -= self.learning_rate * dbo

    def train(self, X, Y, epochs=100, verbosity: int = 1, grad_clip_value: float = 5.0,
              reset_hidden: bool = False):
        for epoch in range(epochs):
            # Reset hidden state each epoch if needed
            if reset_hidden:
                self.h = np.zeros((self.hidden_size, 1))

            self.forward(X)
            self.backward(X, Y, grad_clip_value)
            if epoch % verbosity == 0:
                # MSE LOSS
                # loss = np.mean((np.array(self.y) - Y) ** 2)

                # MAE LOSS
                loss = np.mean(np.absolute(np.array(self.y) - Y))
                print(f'Epoch {epoch}/{epochs}, Loss: {loss:.6f}')

In [128]:
# Prepare data using sliding window
def create_sliding_window_data(sequence, window_size):
    X, y = [], []
    for i in range(len(sequence) - window_size):
        X.append(sequence[i:i + window_size])
        y.append(sequence[i + window_size])
    return np.array(X), np.array(y).reshape(-1, 1)

In [129]:
# Fibonacci sequence generator
def fibonacci_generator(n):
    a, b = 0, 1
    for _ in range(n):
        yield a
        a, b = b, a + b

In [130]:
# Squared num sequence generator
def squared_generator(n, fst: float):
    num = fst
    for _ in range(n):
        yield num
        num = num**2

In [131]:
# Squared num sequence generator
def half_generator(n, fst: float):
    num = fst
    for _ in range(n):
        yield num
        num /= 2

In [132]:
# 1/n sequence generator
def one_by_n_generator(n):    
    for i in range(n):
        yield 1 / (i + 1)

In [133]:
# 1, -1, 1, -1, 1,... sequence generator
def plus_one_minus_one_generator(n):    
    for i in range(n):        
        yield 1 if i % 2 == 0 else -1

In [134]:
# Generate squared sequence
n = 100
sequence = list(one_by_n_generator(n))
window_size = 3
X, y = create_sliding_window_data(sequence, window_size)
X_train, y_train = X[0:int(len(X) * 0.8)], y[0:int(len(X) * 0.8)]
X_test, y_test = X[int(len(X) * 0.8):], y[int(len(X) * 0.8):]

# Model params
h = 5
o = 1
learning_rate = 0.0001
output_activation = 'linear'
output_activation_alpha = 0.01

# Predict the next number in the sequence
model = GRU(input_size=window_size, hidden_size=h, output_size=o, learning_rate=learning_rate,
            output_activation=output_activation, leaky_relu_alpha=output_activation_alpha)
model.train(X_train, y_train, epochs=10000, verbosity=1000, grad_clip_value=1)

Epoch 0/10000, Loss: 0.040652
Epoch 1000/10000, Loss: 0.028247
Epoch 2000/10000, Loss: 0.028269
Epoch 3000/10000, Loss: 0.028269
Epoch 4000/10000, Loss: 0.028269
Epoch 5000/10000, Loss: 0.028269
Epoch 6000/10000, Loss: 0.028268
Epoch 7000/10000, Loss: 0.028268
Epoch 8000/10000, Loss: 0.028268
Epoch 9000/10000, Loss: 0.028267


In [135]:
# Assess model on test
print('Test dataset validation')
mae = np.mean(np.absolute((y_test.squeeze() - model.forward(X_test).squeeze())))
print(f'MAE on test: {mae:.8f}')
mape = np.mean(np.absolute((y_test.squeeze() - model.forward(X_test).squeeze())) / y_test.squeeze()) * 100
print(f'MAPE on test: {mape:.8f}%')

Test dataset validation
MAE on test: 0.02952825
MAPE on test: 267.62557862%


In [136]:
def predict_next_n_elems(sequence, model, sequence_len, window_size, n):
    predictions: list[float] = []
    test_arr: list = sequence[sequence_len - window_size:sequence_len]
    print(test_arr)
    prediction = model.forward(np.array([test_arr]))
    true_number = 1 / (sequence_len + 1)
    pred_number = prediction.squeeze().item()
    print(f'{np.abs(pred_number - true_number) / true_number * 100:.4f}%')
    print(f'TRUE: {true_number}\nPREDICTED: {pred_number}')
    predictions.append(pred_number)
    for i in range(1, n):
        true_number = 1 / (sequence_len + i + 1)
        
        test_arr = sequence[sequence_len - window_size + i:sequence_len + i]        
        test_arr.extend(predictions[-window_size:] if len(predictions) >= window_size else predictions)
        print(test_arr)

        prediction = model.forward(np.array([test_arr]))
        pred_number = prediction.squeeze().item()
        print(f'{np.abs(pred_number - true_number) / true_number * 100:.4f}%')
        print(f'TRUE: {true_number}\nPREDICTED: {pred_number}')
        predictions.append(pred_number)

In [137]:
predict_next_n_elems(sequence, model, n, window_size, 5)

[0.01020408163265306, 0.010101010101010102, 0.01]
311.7303%
TRUE: 0.009900990099009901
PREDICTED: 0.04076538018430873
[0.010101010101010102, 0.01, 0.04076538018430873]
315.8706%
TRUE: 0.00980392156862745
PREDICTED: 0.04077162658434427
[0.01, 0.04076538018430873, 0.04077162658434427]
320.0176%
TRUE: 0.009708737864077669
PREDICTED: 0.04077840439143932
[0.04076538018430873, 0.04077162658434427, 0.04077840439143932]
324.1779%
TRUE: 0.009615384615384616
PREDICTED: 0.04078633201394373
[0.04077162658434427, 0.04077840439143932, 0.04078633201394373]
328.2565%
TRUE: 0.009523809523809525
PREDICTED: 0.04078633674255481


In [138]:
prediction = model.forward(np.array([[1 / x for x in range(n - window_size + 1, n + 1)]]))
true_number = 1 / (n + 1)
pred_number = prediction.squeeze().item()
print(f'{np.abs(pred_number - true_number) / true_number * 100:.4f}%')

311.7303%


In [139]:
test_arr_2 = [1 / x for x in range(n - window_size + 2, n + 1)]
test_arr_2.append(pred_number)
true_number_2 = 1 / (n + 2)
prediction_2 = model.forward(np.array([test_arr_2]))
pred_number_2 = prediction_2.squeeze().item()
print(f'{np.abs(pred_number_2 - true_number_2) / true_number_2 * 100:.4f}%')

315.8706%


In [140]:
test_arr_2 = [1 / x for x in range(n - window_size + 2, n + 1)]
test_arr_2.append(pred_number)
true_number_2 = 1 / (n + 2)
prediction_2 = model.forward(np.array([test_arr_2]))
pred_number_2 = prediction_2.squeeze().item()
print(f'{np.abs(pred_number_2 - true_number_2) / true_number_2 * 100:.4f}%')

315.8706%
