<a href="https://colab.research.google.com/github/InowaR/colab/blob/main/rnn_numpy.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [33]:
import numpy as np

def generate_synthetic_data(num_samples, sequence_length, noise_std=0.1):
  """
  Generates synthetic time series data with a sinusoidal pattern and noise.

  Args:
    num_samples: Number of samples.
    sequence_length: Length of each sequence.
    noise_std: Standard deviation of noise.

  Returns:
    Tuple of NumPy arrays (train_x, train_y, valid_x, valid_y, test_x, test_y).
  """

  # Generate a sinusoidal pattern
  time_steps = np.linspace(0, 2 * np.pi, num_samples)
  signal = np.sin(time_steps)

  # Add noise
  noise = np.random.normal(0, noise_std, num_samples)
  data = signal + noise

  # Create sequences
  X = []
  y = []
  for i in range(num_samples - sequence_length):
    X.append(data[i:i+sequence_length])
    y.append(data[i+sequence_length])
  X = np.array(X)
  y = np.array(y)

  # Split into training, validation, and test sets
  train_size = int(0.7 * len(X))
  valid_size = int(0.15 * len(X))
  test_size = len(X) - train_size - valid_size

  train_x, valid_x, test_x = X[:train_size], X[train_size:train_size+valid_size], X[train_size+valid_size:]
  train_y, valid_y, test_y = y[:train_size], y[train_size:train_size+valid_size], y[train_size+valid_size:]

  return train_x, train_y, valid_x, valid_y, test_x, test_y

# Example usage:
num_samples = 1000
sequence_length = 3
train_x, train_y, valid_x, valid_y, test_x, test_y = generate_synthetic_data(num_samples, sequence_length)
print(train_x, train_y, valid_x, valid_y, test_x, test_y)

[[ 0.056644    0.02121594 -0.09524918]
 [ 0.02121594 -0.09524918  0.15841453]
 [-0.09524918  0.15841453  0.20390365]
 ...
 [-0.97268589 -0.91402364 -1.02423087]
 [-0.91402364 -1.02423087 -0.7898141 ]
 [-1.02423087 -0.7898141  -0.83655214]] [ 1.58414532e-01  2.03903650e-01 -2.55095355e-02  5.52665459e-02
 -2.23845192e-03 -5.82854892e-02  1.20548649e-01  2.42669564e-02
 -8.44719074e-03  1.74973199e-01 -1.11648376e-01  1.12791971e-01
  9.10777028e-02  8.65144416e-02  8.76518675e-02  1.57832624e-01
  1.99728316e-02  1.02864065e-01 -3.37617878e-02  7.39550742e-02
  9.62119509e-02  1.81511172e-01  7.89877174e-02  1.32062031e-01
  1.32348426e-01  2.87167639e-01  1.35592894e-01  2.30105993e-01
  1.90943633e-01  3.47506168e-01  2.70743721e-01  1.75782655e-01
  1.50480327e-01  1.89128623e-01  1.56541124e-01  1.69229027e-01
  2.29558760e-01  3.10913661e-01  4.34136923e-01  2.78196554e-01
  9.45954810e-02  2.89284008e-01  1.93364351e-01  2.64654472e-01
  3.40161610e-01  2.13496740e-01  3.41445323e

In [35]:
import math
import numpy as np


def mse(actual, predicted):
    return np.mean((actual-predicted)**2)

def mse_grad(actual, predicted):
    return (predicted - actual)

def init_params(layer_conf):
    layers = []
    for i in range(1, len(layer_conf)):
        np.random.seed(0)
        k = 1/math.sqrt(layer_conf[i]["hidden"])
        i_weight = np.random.rand(layer_conf[i-1]["units"], layer_conf[i]["hidden"]) * 2 * k - k

        h_weight = np.random.rand(layer_conf[i]["hidden"], layer_conf[i]["hidden"]) * 2 * k - k
        h_bias = np.random.rand(1, layer_conf[i]["hidden"]) * 2 * k - k

        o_weight = np.random.rand(layer_conf[i]["hidden"], layer_conf[i]["output"]) * 2 * k - k
        o_bias = np.random.rand(1, layer_conf[i]["output"]) * 2 * k - k

        layers.append(
            [i_weight, h_weight, h_bias, o_weight, o_bias]
        )
    return layers


def forward(x, layers):
    hiddens = []
    outputs = []
    for i in range(len(layers)):
        i_weight, h_weight, h_bias, o_weight, o_bias = layers[i]
        hidden = np.zeros((x.shape[0], i_weight.shape[1]))
        output = np.zeros((x.shape[0], o_weight.shape[1]))
        for j in range(x.shape[0]):
            input_x = x[j,:][np.newaxis,:] @ i_weight
            hidden_x = input_x + hidden[max(j-1,0),:][np.newaxis,:] @ h_weight + h_bias
            # Activation.  tanh avoids outputs getting larger and larger.
            hidden_x = np.tanh(hidden_x)
            # Store hidden for use in backprop
            hidden[j,:] = hidden_x

            # Output layer
            output_x = hidden_x @ o_weight + o_bias
            output[j,:] = output_x
        hiddens.append(hidden)
        outputs.append(output)
    return hiddens, outputs[-1]



def backward(layers, x, lr, grad, hiddens):
    for i in range(len(layers)):
        i_weight, h_weight, h_bias, o_weight, o_bias = layers[i]
        hidden = hiddens[i]
        next_h_grad = None
        i_weight_grad, h_weight_grad, h_bias_grad, o_weight_grad, o_bias_grad = [0] * 5

        for j in range(x.shape[0] - 1, -1, -1):
            # Add newaxis in the first dimension
            out_grad = grad[j,:][np.newaxis, :]

            # Output updates
            # np.newaxis creates a size 1 axis, in this case transposing matrix
            o_weight_grad += hidden[j,:][:, np.newaxis] @ out_grad
            o_bias_grad += out_grad

            # Propagate gradient to hidden unit
            h_grad = out_grad @ o_weight.T

            if j < x.shape[0] - 1:
                # Then we multiply the gradient by the hidden weights to pull gradient from next hidden state to current hidden state
                hh_grad = next_h_grad @ h_weight.T
                # Add the gradients together to combine output contribution and hidden contribution
                h_grad += hh_grad

            # Pull the gradient across the current hidden nonlinearity
            # derivative of tanh is 1 - tanh(x) ** 2
            # So we take the output of tanh (next hidden state), and plug in
            tanh_deriv = 1 - hidden[j][np.newaxis,:] ** 2

            # next_h_grad @ np.diag(tanh_deriv_next) multiplies each element of next_h_grad by the deriv
            # Effect is to pull value across nonlinearity
            h_grad = np.multiply(h_grad, tanh_deriv)

            # Store to compute h grad for previous sequence position
            next_h_grad = h_grad.copy()

            # If we're not at the very beginning
            if j > 0:
                # Multiply input from previous layer by post-nonlinearity grad at current layer
                h_weight_grad += hidden[j-1][:, np.newaxis] @ h_grad
                h_bias_grad += h_grad

            i_weight_grad += x[j,:][:,np.newaxis] @ h_grad

        # Normalize lr by number of sequence elements
        lr = lr / x.shape[0]
        i_weight -= i_weight_grad * lr
        h_weight -= h_weight_grad * lr
        h_bias -= h_bias_grad * lr
        o_weight -= o_weight_grad * lr
        o_bias -= o_bias_grad * lr
        layers[i] = [i_weight, h_weight, h_bias, o_weight, o_bias]
    return layers


epochs = 500
lr = 1e-5

layer_conf = [
    {"type":"input", "units": 3},
    {"type": "rnn", "hidden": 4, "output": 1}
]
layers = init_params(layer_conf)

for epoch in range(epochs):
    sequence_len = 1
    epoch_loss = 0
    for j in range(train_x.shape[0] - sequence_len):
        seq_x = train_x[j:(j+sequence_len),]
        seq_y = train_y[j:(j+sequence_len),]
        hiddens, outputs = forward(seq_x, layers)
        grad = mse_grad(seq_y, outputs)
        params = backward(layers, seq_x, lr, grad, hiddens)
        epoch_loss += mse(seq_y, outputs)

    if epoch % 50 == 0:
        sequence_len = 1
        valid_loss = 0
        for j in range(valid_x.shape[0] - sequence_len):
            seq_x = valid_x[j:(j+sequence_len),]
            seq_y = valid_y[j:(j+sequence_len),]
            _, outputs = forward(seq_x, layers)
            valid_loss += mse(seq_y, outputs)

        print(f"Epoch: {epoch} train loss {epoch_loss / len(train_x)} valid loss {valid_loss / len(valid_x)}")

Epoch: 0 train loss 0.47258703619975023 valid loss 1.672924020935338
Epoch: 50 train loss 0.34959429443016116 valid loss 1.3850539401896906
Epoch: 100 train loss 0.26523919019748726 valid loss 1.1169455299457045
Epoch: 150 train loss 0.20123143756789721 valid loss 0.8718426694707455
Epoch: 200 train loss 0.15133739990787748 valid loss 0.6583145692843783
Epoch: 250 train loss 0.11292149120275215 valid loss 0.48237721582946913
Epoch: 300 train loss 0.08409856016532039 valid loss 0.34479803557708755
Epoch: 350 train loss 0.06303391124603938 valid loss 0.24187004387270802
Epoch: 400 train loss 0.04797123680924112 valid loss 0.16753435830208466
Epoch: 450 train loss 0.03737770946553229 valid loss 0.11529152531886883
