<a href="https://colab.research.google.com/github/Guohua-Li/vanillarRNN/blob/main/rnn_many_to_one_oi.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

In [None]:
points = np.arange(0, 400, 1)
sin_wave = np.sin(points)
cos_wave = np.cos(points)
series_3 = np.sin(points*np.pi / 8.)

test_points = 200
test_sin_wave = sin_wave[:test_points] + np.random.normal(0, 0.06, test_points)
test_cos_wave = cos_wave[:test_points] + np.random.normal(0, 0.06, test_points)
test_series_3 = series_3[:test_points] + np.random.normal(0, 0.06, test_points)

In [None]:
fig, axs = plt.subplots(3, 2, figsize=(12,3))
axs[0][0].plot(sin_wave)
axs[1][0].plot(cos_wave)
axs[2][0].plot(series_3)
axs[0][1].plot(test_sin_wave)
axs[1][1].plot(test_cos_wave)
axs[2][1].plot(test_series_3)
plt.show()

In [None]:
def dataset(sequences, timesteps, targets): # many to one
    N = len(sequences[0])
    x, y = [], []
    for step in range(N-timesteps):
        xs = []
        for seq in sequences:
            s = seq[step:step+timesteps]
            xs.append(s)
        x.append(np.array(xs).T)
        ys = []
        for i in targets:
            seq = sequences[i]
            ys.append(seq[step+timesteps])
        y.append(ys)
    return np.array(x), np.array(y)

In [None]:
# For training
timesteps = 30
train_sequences = [sin_wave, cos_wave, series_3]
x_train, y_train = dataset(train_sequences, timesteps, targets=[0,1])

# For testing
test_sequences = [test_sin_wave, test_cos_wave, test_series_3]
x_test,  y_test  = dataset(test_sequences, timesteps, targets=[0,1])

In [None]:
class BasicRNN:

    def __init__(self, hidden_dim, input_dim, output_dim):
        self.hidden_dim = hidden_dim
        self.input_dim  = input_dim
        self.output_dim = output_dim
        self.V = np.random.randn(output_dim, hidden_dim)
        self.U = np.random.randn(hidden_dim,  input_dim)
        self.W = np.random.randn(hidden_dim, hidden_dim)

    def forward_pass(self, xs):
        self.states = np.zeros((self.n_timesteps+1, self.hidden_dim, 1))
        h = np.zeros((self.hidden_dim, 1))
        self.states[-1] = h
        for t in np.arange(self.n_timesteps):
            h = np.tanh(np.matmul(self.U, xs[t]) + np.matmul(self.W, h))
            self.states[t] = h
        return np.matmul(self.V, h)

    def backward_pass(self, xs):
        dLdy = self.error
        h = self.states[-2]
        self.dLdV = np.outer(dLdy, h)
        self.dLdU = np.zeros(self.U.shape)
        self.dLdW = np.zeros(self.W.shape)
        dt = np.matmul(self.V.T, dLdy) * (1-h**2)
        for t in np.arange(self.n_timesteps)[::-1]:
            self.dLdU += np.outer(dt, xs[t])
            h = self.states[t-1]
            self.dLdW += np.outer(dt, h)
            dt = np.matmul(self.W.T, dt) * (1 - h**2)

    def gradient_decent(self):
        self.V -= self.lr * self.dLdV
        self.U -= self.lr * self.dLdU
        self.W -= self.lr * self.dLdW

    def train(self, x_train, y_train, epochs, learning_rate):
        n_samples = x_train.shape[0]
        self.n_timesteps = x_train.shape[1]
        self.lr = learning_rate
        new_x_shape = (n_samples, self.n_timesteps, self.input_dim, 1)
        new_y_shape = (n_samples, self.output_dim, 1)
        x_train = x_train.reshape(new_x_shape)
        y_train = y_train.reshape(new_y_shape)
        Ovr_loss = []
        for epoch in tqdm(range(epochs)):
            loss = np.zeros((self.output_dim, 1))
            for xs, ys in zip(x_train,y_train):
                y = self.forward_pass(xs)
                self.error = y - ys
                loss += 0.5 * self.error**2
                self.backward_pass(xs)
                self.gradient_decent()
            Ovr_loss.append(np.squeeze(loss / n_samples))
        return Ovr_loss

    def predict(self, inputs):
        n_samples = inputs.shape[0]
        n_timesteps = inputs.shape[1]
        new_x_shape = (n_samples, n_timesteps, self.input_dim, 1)
        inputs = inputs.reshape(new_x_shape)
        outputs = []
        for xs in inputs:
            y = self.forward_pass(xs)
            outputs.append(y)
        return np.array(outputs)

In [None]:
rnn = BasicRNN(hidden_dim=50, input_dim=3, output_dim=2)
loss = rnn.train(x_train, y_train, epochs=20, learning_rate=1e-2)
plt.plot(loss)
plt.show()

In [None]:
outputs = rnn.predict(x_test)
fig, axs = plt.subplots(2, 1, figsize=(12,3))
T = [i for i in range(len(x_test))]
axs[0].plot(T, y_test[:,0])
axs[0].plot(T, outputs[:,0])
axs[1].plot(T, y_test[:,1])
axs[1].plot(T, outputs[:,1])
axs[0].set_xlim([0,200])
axs[1].set_xlim([0,200])
plt.show()