In [1]:
import numpy as np
import matplotlib.pyplot as plt

# Ustawienie ziarna losowości dla powtarzalności wyników
np.random.seed(seed=1)

In [2]:
#1. Konfiguracja i Generowanie Danych
nb_of_samples = 30   
sequence_len = 20      

# Generowanie sekwencji z wartościami {0.33, 0.66, 1.0}
# Urandom.choice - symulowanie zaokrąglenia wartości z rozkładu
possible_values = [0.33, 0.66, 1.0]
X = np.random.choice(possible_values, (nb_of_samples, sequence_len))

# Cel t: liczba wystąpień "0.33" w sekwencji 

t = np.sum(np.isclose(X, 0.33), axis=1)

print(f"Kształt danych wejściowych X: {X.shape}")
print(f"Przykładowa sekwencja (pierwsza): \n{X[0]}")
print(f"Cel dla pierwszej sekwencji (liczba 0.33): {t[0]}")

Kształt danych wejściowych X: (30, 20)
Przykładowa sekwencja (pierwsza): 
[0.66 0.33 0.33 0.66 0.66 0.33 0.33 0.66 0.33 0.66 0.33 1.   0.66 1.
 0.33 1.   0.66 1.   0.33 0.33]
Cel dla pierwszej sekwencji (liczba 0.33): 9


In [3]:
#2. Definicja Modelu Liniowego RNN
def update_state(xk, sk, wx, wRec):
    """Oblicza stan k na podstawie stanu k-1 i wejścia xk"""
    return xk * wx + sk * wRec

def forward_states(X, wx, wRec):
    """Rozwija sieć w czasie i oblicza stany dla wszystkich próbe."""
    S = np.zeros((X.shape[0], X.shape[1] + 1)) # Stan s0 = 0 [cite: 130]
    for k in range(0, X.shape[1]):
        S[:, k+1] = update_state(X[:, k], S[:, k], wx, wRec)
    return S

def loss(y, t):
    """Błąd średniokwadratowy (MSE)."""
    return np.mean((t - y)**2)

def output_gradient(y, t):
    """Gradient błędu względem wyjścia."""
    return 2.0 * (y - t)

def backward_gradient(X, S, grad_out, wRec):
    """Algorytm BPTT - obliczanie gradientów wag."""
    grad_over_time = np.zeros((X.shape[0], X.shape[1] + 1))
    grad_over_time[:, -1] = grad_out
    
    wx_grad = 0
    wRec_grad = 0
    
    # Cofanie się w czasie (od T do 1)
    for k in range(X.shape[1], 0, -1):
        # Akumulacja gradientów wag
        wx_grad += np.sum(np.mean(grad_over_time[:, k] * X[:, k-1], axis=0))
        wRec_grad += np.sum(np.mean(grad_over_time[:, k] * S[:, k-1], axis=0))
        
        # Propagacja gradientu do poprzedniego stanu
        grad_over_time[:, k-1] = grad_over_time[:, k] * wRec
        
    return (wx_grad, wRec_grad), grad_over_time

In [4]:
# 3. Optymalizacja RProp
def update_rprop(X, t, W, W_prev_sign, W_delta, eta_p, eta_n):
    """Aktualizacja wag algorytmem RProp[cite: 226]."""
    # Forward pass
    S = forward_states(X, W[0], W[1])
    
    # Obliczenie gradientów
    grad_out = output_gradient(S[:, -1], t)
    W_grads, _ = backward_gradient(X, S, grad_out, W[1])
    
    W_sign = np.sign(W_grads) # Znak nowego gradientu 
    
    # Aktualizacja kroku uczenia (Delta) dla każdej wagi
    for i, _ in enumerate(W):
        if W_sign[i] == W_prev_sign[i]:
            W_delta[i] *= eta_p
        else:
            W_delta[i] *= eta_n
            
    return W_delta, W_sign

In [5]:
# 4. Trening
# Hiperparametry RProp
eta_p = 1.2
eta_n = 0.5

# Inicjalizacja wag i parametrów RProp 
W = [-1.5, 2.0]     # [wx, wRec]
W_delta = [0.001, 0.001]
W_prev_sign = [0, 0]

loss_history = []

print("\nRozpoczynam trening...")
for i in range(500): # 500 iteracji
    # Krok optymalizacji
    W_delta, W_sign = update_rprop(X, t, W, W_prev_sign, W_delta, eta_p, eta_n)
    
    # Aktualizacja wag
    for k, _ in enumerate(W):
        W[k] -= W_sign[k] * W_delta[k] #minimalizujacja błędu
        
    W_prev_sign = W_sign
    
    # Monitorowanie błędu co 50 iteracji
    if i % 50 == 0:
        current_loss = loss(forward_states(X, W[0], W[1])[:, -1], t)
        loss_history.append(current_loss)
        print(f"Iteracja {i}: MSE = {current_loss:.4f}, Wagi: wx={W[0]:.4f}, wRec={W[1]:.4f}")

print(f"\nOstateczne wagi: wx = {W[0]:.4f}, wRec = {W[1]:.4f}")


Rozpoczynam trening...
Iteracja 0: MSE = 1088236768149.8201, Wagi: wx=-1.4995, wRec=1.9995
Iteracja 50: MSE = 4.6047, Wagi: wx=1.7420, wRec=0.8440
Iteracja 100: MSE = 4.5555, Wagi: wx=1.7956, wRec=0.8336
Iteracja 150: MSE = 4.5474, Wagi: wx=1.8549, wRec=0.8277
Iteracja 200: MSE = 4.5425, Wagi: wx=1.9008, wRec=0.8233
Iteracja 250: MSE = 4.5387, Wagi: wx=1.9407, wRec=0.8191
Iteracja 300: MSE = 4.5360, Wagi: wx=1.9769, wRec=0.8153
Iteracja 350: MSE = 4.5342, Wagi: wx=2.0090, wRec=0.8124
Iteracja 400: MSE = 4.5331, Wagi: wx=2.0337, wRec=0.8099
Iteracja 450: MSE = 4.5324, Wagi: wx=2.0529, wRec=0.8079

Ostateczne wagi: wx = 2.0683, wRec = 0.8065


In [6]:
# 5. Test Modelu (Weryfikacja Zadania)
# Sekwencja zawiera: cztery razy 0.33, dwa razy 0.66, reszta 1.0
test_input_list = [0.33, 0.33, 0.66, 1.0, 0.33, 1.0, 0.66, 0.33] + [1.0]*(20-8)
test_inpt = np.asmatrix([test_input_list]) # Dopasowanie wymiaru (1, 20)

# Prawdziwy cel (liczba 0.33)
target_val = np.sum(np.isclose(test_inpt, 0.33))

# Predykcja modelu
model_output = forward_states(test_inpt, W[0], W[1])[:, -1][0]

print("\n--- Wynik Testu ---")
print(f"Sekwencja testowa zawiera {target_val} razy liczbę 0.33.")
print(f"Model przewidział wynik: {model_output:.2f}")
print(f"Błąd bezwzględny: {abs(target_val - model_output):.2f}")


--- Wynik Testu ---
Sekwencja testowa zawiera 4 razy liczbę 0.33.
Model przewidział wynik: 10.27
Błąd bezwzględny: 6.27
