In [2]:
import tensorflow as tf

# RNN Intuition

In [4]:
class MyRNNCell(tf.keras.layers.Layer):
    def __init__(self, rnn_units, input_dim, output_dim):
        super(MyRNNCell, self).__init__()

        # initializing weight matrices
        self.W_xh = self.add_weight([rnn_units, input_dim])
        self.W_hh = self.add_weight([rnn_units, rnn_units])
        self.W_hy = self.add_weight([output_dim, rnn_units])

        # Initialize hidden state to zeros
        self.h = tf.zeros([rnn_units, 1])

    def call(self, x):
        #update the hidden state
        self.h = tf.math.tanh(self.W_hh * self.h + self.W_xh * x)

        #compute the output
        output = self.W_hy * self.h

        # Return the current output and hidden state
        return output, self.h

In [None]:
my_rnn = RNN()
hidden_state = [0,0,0,0]

sentence = ['I', 'love', 'recurrent', 'neural']

for word in sentence:
    prediction, hidden_state = my_rnn(word, hidden_state)

next_word_prediction = prediction

# RNN from scratch

In [6]:
class SimpleRNN:
    def __init__(self, input_size, hidden_size, output_size):
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size

        self.Wxh = np.random.randn(hidden_size, input_size) * 0.01  # Input to hidden
        self.Whh = np.random.randn(hidden_size, hidden_size) * 0.01  # Hidden to hidden
        self.Why = np.random.randn(output_size, hidden_size) * 0.01  # Hidden to output
        self.bh = np.zeros((hidden_size, 1))  # Hidden bias
        self.by = np.zeros((output_size, 1))  # Output bias

    def forward(self, inputs, h_prev):
        xs, hs, ys, ps = {}, {}, {}, {}
        hs[-1] = np.copy(h_prev)
        for t in range(len(inputs)):
            xs[t] = np.zeros((self.input_size, 1))
            xs[t][inputs[t]] = 1  # One-hot encoding of input
            hs[t] = np.tanh(np.dot(self.Wxh, xs[t]) + np.dot(self.Whh, hs[t - 1]) + self.bh)  # Hidden state
            ys[t] = np.dot(self.Why, hs[t]) + self.by  # Unnormalized log probabilities for output
            ps[t] = np.exp(ys[t]) / np.sum(np.exp(ys[t]))  # Softmax to get probabilities
        return xs, hs, ps

    def backward(self, inputs, targets, xs, hs, ps):
        dWxh, dWhh, dWhy = np.zeros_like(self.Wxh), np.zeros_like(self.Whh), np.zeros_like(self.Why)
        dbh, dby = np.zeros_like(self.bh), np.zeros_like(self.by)
        dhnext = np.zeros_like(hs[0])
        for t in reversed(range(len(inputs))):
            dy = np.copy(ps[t])
            dy[targets] -= 1  # Backprop into y, targets is now a single value
            dWhy += np.dot(dy, hs[t].T)
            dby += dy
            dh = np.dot(self.Why.T, dy) + dhnext  # Backprop into h
            dhraw = (1 - hs[t] * hs[t]) * dh  # Backprop through tanh nonlinearity
            dbh += dhraw
            dWxh += np.dot(dhraw, xs[t].T)
            dWhh += np.dot(dhraw, hs[t - 1].T)
            dhnext = np.dot(self.Whh.T, dhraw)
        for dparam in [dWxh, dWhh, dWhy, dbh, dby]:
            np.clip(dparam, -5, 5, out=dparam)  # Clip gradients to prevent exploding gradients
        return dWxh, dWhh, dWhy, dbh, dby

    def train(self, inputs, targets, learning_rate=0.1, num_epochs=1000):
        for epoch in range(num_epochs):
            h_prev = np.zeros((self.hidden_size, 1))  # Initialize hidden state at the beginning of each epoch
            loss = 0
            xs, hs, ps = self.forward(inputs, h_prev)
            dWxh, dWhh, dWhy, dbh, dby = self.backward(inputs, targets, xs, hs, ps)
            self.Wxh -= learning_rate * dWxh
            self.Whh -= learning_rate * dWhh
            self.Why -= learning_rate * dWhy
            self.bh -= learning_rate * dbh
            self.by -= learning_rate * dby
            loss += -np.sum([np.log(ps[t][targets[t], 0]) for t in range(len(inputs))])
            if epoch % 100 == 0:
                print(f"Epoch {epoch}, Loss: {loss}")

    def sample(self, seed_index, n):
        x = np.zeros((self.input_size, 1))
        x[seed_index] = 1
        h = np.zeros((self.hidden_size, 1))
        indices = []
        for _ in range(n):
            h = np.tanh(np.dot(self.Wxh, x) + np.dot(self.Whh, h) + self.bh)
            y = np.dot(self.Why, h) + self.by
            p = np.exp(y) / np.sum(np.exp(y))
            idx = np.random.choice(range(self.output_size), p=p.ravel())
            x = np.zeros((self.input_size, 1))
            x[idx] = 1
            indices.append(idx)
        return indices

    def predict(self, inputs):
        h_prev = np.zeros((self.hidden_size, 1))
        xs, hs, ps = self.forward(inputs, h_prev)
        predictions = [np.argmax(ps[t]) for t in range(len(inputs))]
        return predictions

In [8]:
import numpy as np
from keras.datasets import imdb
from keras.preprocessing import sequence

# Set random seed for reproducibility
np.random.seed(7)

# Load the IMDB dataset, keeping only the top 5000 words and using sequences of up to 500 words
top_words = 5000
max_review_length = 500
(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=top_words)

# Pad sequences to ensure they are all the same length
x_train = sequence.pad_sequences(x_train, maxlen=max_review_length)
x_test = sequence.pad_sequences(x_test, maxlen=max_review_length)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/imdb.npz
[1m17464789/17464789[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 0us/step


In [None]:
def train_rnn_on_imdb(rnn, x_train, y_train, num_epochs=10, learning_rate=0.1):
    for epoch in range(num_epochs):
        loss = 0
        for i in range(len(x_train)):
            inputs = x_train[i]
            target = y_train[i]

            # Forward pass
            xs, hs, ps = rnn.forward(inputs, np.zeros((rnn.hidden_size, 1)))

            # Backward pass
            targets = target  # Single target value
            dWxh, dWhh, dWhy, dbh, dby = rnn.backward(inputs, targets, xs, hs, ps)

            # Update weights and biases
            rnn.Wxh -= learning_rate * dWxh
            rnn.Whh -= learning_rate * dWhh
            rnn.Why -= learning_rate * dWhy
            rnn.bh -= learning_rate * dbh
            rnn.by -= learning_rate * dby

            # Calculate loss
            loss += -np.sum(np.log(ps[len(inputs) - 1][targets, 0]))

        if epoch % 1 == 0:
            print(f"Epoch {epoch}, Loss: {loss}")

# Initialize the RNN
input_size = top_words
hidden_size = 100
output_size = 2  # Binary classification (2 classes)
rnn = SimpleRNN(input_size, hidden_size, output_size)

# Train the RNN on IMDB dataset
train_rnn_on_imdb(rnn, x_train, y_train, num_epochs=10, learning_rate=0.1)

## Evaluation

In [None]:
def evaluate_model(rnn, x_test, y_test):
    correct_predictions = 0
    total_predictions = len(x_test)

    for i in range(total_predictions):
        inputs = x_test[i]
        target = y_test[i]
        predictions = rnn.predict(inputs)
        if predictions[-1] == target:
            correct_predictions += 1

    accuracy = correct_predictions / total_predictions
    return accuracy

# Assuming x_test and y_test are your test datasets
accuracy = evaluate_model(rnn, x_test, y_test)
print(f"Test Accuracy: {accuracy * 100:.2f}%")

## Saving the Model

In [None]:
import pickle

def save_model(rnn, filename):
    with open(filename, 'wb') as file:
        pickle.dump(rnn, file)

def load_model(filename):
    with open(filename, 'rb') as file:
        return pickle.load(file)

save_model(rnn, 'rnn_model.pkl')