In [1]:
import tensorflow as tf 
from tensorflow.keras.models import Sequential 
from tensorflow.keras.layers import Embedding, LSTM, Dense, GRU, Layer 
from tensorflow.keras.preprocessing.text import Tokenizer 
from tensorflow.keras.preprocessing.sequence import pad_sequences 
import numpy as np 
import regex as re 




In [2]:
def file_to_sentence_list(file_path): 
	with open(file_path, 'r') as file: 
		text = file.read() 

	# Splitting the text into sentences using 
	# delimiters like '.', '?', and '!' 
	sentences = [sentence.strip() for sentence in re.split( 
		r'(?<=[.!?])\s+', text) if sentence.strip()] 

	return sentences 

file_path = 'pizza.txt'
text_data = file_to_sentence_list(file_path) 

# Tokenize the text data 
tokenizer = Tokenizer() 
tokenizer.fit_on_texts(text_data) 
total_words = len(tokenizer.word_index) + 1

# Create input sequences 
input_sequences = [] 
for line in text_data: 
	token_list = tokenizer.texts_to_sequences([line])[0] 
	for i in range(1, len(token_list)): 
		n_gram_sequence = token_list[:i+1] 
		input_sequences.append(n_gram_sequence) 

# Pad sequences and split into predictors and label 
max_sequence_len = max([len(seq) for seq in input_sequences]) 
input_sequences = np.array(pad_sequences( 
	input_sequences, maxlen=max_sequence_len, padding='pre')) 
X, y = input_sequences[:, :-1], input_sequences[:, -1] 

# Convert target data to one-hot encoding 
y = tf.keras.utils.to_categorical(y, num_classes=total_words) 

In [13]:
class SelfGRU(Layer):
    def __init__(self, units, **kwargs):
        super(SelfGRU, self).__init__(**kwargs)
        self.units = units
        self.state_size = units

    def build(self, input_shape):
        self.kernel = self.add_weight(shape=(input_shape[-1], self.units * 3),
                                      initializer='glorot_uniform',
                                      name='kernel')
        self.recurrent_kernel = self.add_weight(shape=(self.units, self.units * 3),
                                                initializer='orthogonal',
                                                name='recurrent_kernel')
        self.bias = self.add_weight(shape=(self.units * 3,),
                                    initializer='zeros',
                                    name='bias')
        self.built = True

    def call(self, inputs, states=None):
        prev_state = states[0] if states is not None else tf.zeros(shape=(inputs.shape[0], self.units))
        z = tf.matmul(inputs, self.kernel)
        z += tf.matmul(prev_state, self.recurrent_kernel)
        z += self.bias

        z_update, z_reset, z_new = tf.split(z, num_or_size_splits=3, axis=1)

        update_gate = tf.nn.sigmoid(z_update)
        reset_gate = tf.nn.sigmoid(z_reset)
        new_state = tf.nn.tanh(z_new)

        output = new_state * (1 - update_gate) + prev_state * update_gate

        return output, [output]

    def get_config(self):
        config = super(SelfGRU, self).get_config()
        config.update({'units': self.units})
        return config

In [4]:
# Define the model 
model = Sequential() 
model.add(Embedding(total_words, 10, 
					input_length=max_sequence_len-1)) 
model.add(GRU(128))
model.add(Dense(total_words, activation='softmax')) 
model.compile(loss='categorical_crossentropy', 
			optimizer='adam', metrics=['accuracy']) 





In [5]:
# Train the model 
model.fit(X, y, epochs=100, verbose=1) 

Epoch 1/100


Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 

<keras.src.callbacks.History at 0x21d6ab2af70>

In [6]:
# Generate next word predictions 
seed_text = "Pizza have different "
next_words = 5

for _ in range(next_words): 
	token_list = tokenizer.texts_to_sequences([seed_text])[0] 
	token_list = pad_sequences( 
		[token_list], maxlen=max_sequence_len-1, padding='pre') 
	predicted_probs = model.predict(token_list) 
	predicted_word = tokenizer.index_word[np.argmax(predicted_probs)] 
	seed_text += " " + predicted_word 

print("Next predicted words:", seed_text) 

Next predicted words: Pizza have different  much mobile owned establishments to


In [None]:
# Define the model 
model2 = Sequential() 
model2.add(Embedding(total_words, 10, 
					input_length=max_sequence_len-1)) 
model2.add(SelfGRU(128))  # Using the custom SelfGRU layer
model2.add(Dense(total_words, activation='softmax')) 
model2.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
# Train the model 
model2.fit(X, y, epochs=100, verbose=1) 

In [None]:
# Generate next word predictions 
seed_text = "Pizza have different "
next_words = 5

for _ in range(next_words): 
	token_list = tokenizer.texts_to_sequences([seed_text])[0] 
	token_list = pad_sequences( 
		[token_list], maxlen=max_sequence_len-1, padding='pre') 
	predicted_probs = model2.predict(token_list) 
	predicted_word = tokenizer.index_word[np.argmax(predicted_probs)] 
	seed_text += " " + predicted_word 

print("Next predicted words:", seed_text) 

In [6]:
import numpy as np

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def tanh(x):
    return np.tanh(x)

def sigmoid_grad(x):
    return x * (1 - x)

def tanh_grad(x):
    return 1 - x ** 2

class GRU:
    def __init__(self, input_dim, hidden_dim, vocab_size):
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.vocab_size = vocab_size

        # Weight initialization
        self.Wz = np.random.randn(hidden_dim, hidden_dim + input_dim)
        self.Wr = np.random.randn(hidden_dim, hidden_dim + input_dim)
        self.Wh = np.random.randn(hidden_dim, hidden_dim + input_dim)
        self.Wy = np.random.randn(vocab_size, hidden_dim)

        # Bias initialization
        self.bz = np.zeros(hidden_dim)
        self.br = np.zeros(hidden_dim)
        self.bh = np.zeros(hidden_dim)
        self.by = np.zeros(vocab_size)

    def predict(self, h):
        return np.dot(self.Wy, h) + self.by

    def forward(self, x, h_prev):
        x = x.reshape(1, -1)  # Add an extra dimension to x
        concat_h_x = np.hstack((h_prev, x))
        z = sigmoid(np.dot(self.Wz, concat_h_x) + self.bz)
        r = sigmoid(np.dot(self.Wr, concat_h_x) + self.br)
        concat_r_h_x = np.hstack((r * h_prev, x))
        h_tilde = tanh(np.dot(self.Wh, concat_r_h_x) + self.bh)
        h_next = (1 - z) * h_prev + z * h_tilde
        return h_next

    def backward(self, x, h_prev, h_next, y_true):
        dWy, dWz, dWr, dWh = np.zeros_like(self.Wy), np.zeros_like(self.Wz), np.zeros_like(self.Wr), np.zeros_like(self.Wh)
        dby, dbz, dbr, dbh = np.zeros_like(self.by), np.zeros_like(self.bz), np.zeros_like(self.br), np.zeros_like(self.bh)
        concat_h_x = np.hstack((h_prev, x))

        # Backpropagation through time
        dy = np.copy(y_true)
        dh_next = np.dot(self.Wy.T, dy)
        dz = dh_next * (h_next - h_prev) * tanh_grad(h_next)
        dr = dh_next * np.hstack((h_prev, np.zeros(self.input_dim))) * tanh_grad(h_next * r)
        dh_tilde = dh_next * z
        dh_prev = dh_next * (1 - z) + dr * h_prev * tanh_grad(h_next * r)
        concat_r_h_x = np.hstack((r * h_prev, x))

        dWy += np.dot(dy, h_next.T)
        dWz += np.dot(dz, concat_h_x.T)
        dWr += np.dot(dr, concat_h_x.T)
        dWh += np.dot(dh_tilde, concat_r_h_x.T)
        dby += dy
        dbz += dz
        dbr += dr
        dbh += dh_tilde

        return dWy, dby, dWz, dbz, dWr, dbr, dWh, dbh

    def update_weights(self, dWy, dby, dWz, dbz, dWr, dbr, dWh, dbh, learning_rate):
        self.Wy -= learning_rate * dWy
        self.by -= learning_rate * dby
        self.Wz -= learning_rate * dWz
        self.bz -= learning_rate * dbz
        self.Wr -= learning_rate * dWr
        self.br -= learning_rate * dbr
        self.Wh -= learning_rate * dWh
        self.bh -= learning_rate * dbh

def one_hot(word, vocab):
    vec = np.zeros(len(vocab))
    vec[vocab.index(word)] = 1
    return vec

def train_gru(gru, sentence, vocab, learning_rate=0.01):
    h_prev = np.zeros((1, gru.hidden_dim))  # Initialize h_prev with the correct shape
    loss = 0
    for word in sentence.split():
        x = one_hot(word, vocab)
        h_prev = gru.forward(x, h_prev)
        y_pred = gru.predict(h_prev)
        y_true = one_hot(word, vocab)
        loss += np.sum((y_pred - y_true) ** 2)  # Mean squared error loss
        dWy, dby, dWz, dbz, dWr, dbr, dWh, dbh = gru.backward(x, h_prev, y_pred, y_true)
        gru.update_weights(dWy, dby, dWz, dbz, dWr, dbr, dWh, dbh, learning_rate)

    return loss

vocab = ['I', 'am', 'a', 'bot']
gru = GRU(input_dim=len(vocab), hidden_dim=10, vocab_size=len(vocab))

# Training loop
for epoch in range(1000):
    loss = train_gru(gru, 'I am a bot', vocab)
    if epoch % 100 == 0:
        print(f"Epoch {epoch}, Loss: {loss}")

# Prediction
h_prev = np.zeros(gru.hidden_dim)
sentence = 'I am a'
for word in sentence.split():
    x = np.array(one_hot(word, vocab))
    h_prev = gru.forward(x, h_prev)

y_pred = gru.predict(h_prev)
print(f"Predicted next word: {vocab[np.argmax(y_pred)]}")

ValueError: shapes (10,14) and (1,14) not aligned: 14 (dim 1) != 1 (dim 0)

In [19]:
import numpy as np

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

class GRU:
    def __init__(self, input_dim, hidden_dim):
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        # Initialize weights
        self.W_z = np.random.randn(input_dim + hidden_dim, hidden_dim)
        self.W_r = np.random.randn(input_dim + hidden_dim, hidden_dim)
        self.W_h = np.random.randn(input_dim + hidden_dim, hidden_dim)
        self.b_z = np.zeros((1, hidden_dim))
        self.b_r = np.zeros((1, hidden_dim))
        self.b_h = np.zeros((hidden_dim))

    def forward(self, x, h_prev):
        # Add an extra dimension to the input vector
        x = np.expand_dims(x, axis=0)
        # Concatenate input and previous hidden state
        combined = np.column_stack((x, h_prev))
        # Calculate gates
        z = sigmoid(np.dot(combined, self.W_z) + self.b_z)
        r = sigmoid(np.dot(combined, self.W_r) + self.b_r)
        h_tilde = np.tanh(np.dot(np.column_stack((x, r * h_prev)), self.W_h) + self.b_h)
        # Calculate next hidden state
        h_next = (1 - z) * h_prev + z * h_tilde
        return h_next

    def backward(self, x, h_prev, h_next, dh_next):
        # Concatenate input and previous hidden state
        combined = np.column_stack((x, h_prev))

        # Calculate gates
        z = sigmoid(np.dot(combined, self.W_z) + self.b_z)
        r = sigmoid(np.dot(combined, self.W_r) + self.b_r)
        h_tilde = np.tanh(np.dot(np.column_stack((x, r * h_prev)), self.W_h) + self.b_h)

        # Calculate gradients
        dh_tilde = dh_next * z
        dz = dh_next * (h_tilde - h_prev)
        dr = dh_next * (1 - z) * np.dot(np.column_stack((x, h_prev * r)), self.W_h.T) * (1 - r ** 2)
        dx = np.dot(dh_next * (1 - z), self.W_h[:, :self.input_dim].T) + np.dot(dr * h_prev, self.W_h[:, :self.input_dim].T)
        dh_prev = dh_next * (1 - z) * r + np.dot(dr, self.W_r[:, self.input_dim:].T) + np.dot(dh_next * (1 - z) * (1 - h_tilde ** 2), self.W_h[:, self.input_dim:].T)

        # Calculate weight gradients
        dW_z = np.dot(combined.T, dz)
        dW_r = np.dot(combined.T, dr)
        dW_h = np.dot(np.column_stack((x, r * h_prev)).T, dh_tilde)
        db_z = np.sum(dz, axis=0, keepdims=True)
        db_r = np.sum(dr, axis=0, keepdims=True)
        db_h = np.sum(dh_tilde, axis=0, keepdims=True)

        return dx, dh_prev, dW_z, dW_r, dW_h, db_z, db_r, db_h
    
input_sequence = ['I', 'am', 'a', 'bot']
# Create a dictionary to map words to indices
word_to_idx = {word: idx for idx, word in enumerate(set(input_sequence))}
idx_to_word = {idx: word for word, idx in word_to_idx.items()}
# One-hot encode the input sequence
input_sequence = [np.eye(len(word_to_idx))[word_to_idx[word]] for word in input_sequence]
# Initialize the GRU
input_dim = len(word_to_idx)
hidden_dim = 128
gru = GRU(input_dim, hidden_dim)
# Forward pass through the GRU
h_prev = np.zeros((1, hidden_dim))
for x in input_sequence[:-1]:
    h_prev = gru.forward(x, h_prev)
# Predict the last word
last_word_logits = np.dot(h_prev, gru.W_h.T) + gru.b_h
last_word_idx = np.argmax(last_word_logits, axis=1)[0]
predicted_word = idx_to_word[last_word_idx]
print(f"Predicted last word: {predicted_word}")

ValueError: operands could not be broadcast together with shapes (1,132) (128,) 