In [161]:
import numpy as np

In [242]:
class DataReader:
    def __init__(self, file_path, seq_length):
        self.seq_length = seq_length
        with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
            self.data = f.read()
        self.words = self.data.split()  
        self.vocab = list(set(self.words))  
        self.vocab_size = len(self.vocab)
        self.word_to_ix = {word: i for i, word in enumerate(self.vocab)} 
        self.ix_to_word = {i: word for i, word in enumerate(self.vocab)} 
        self.pointer = 0

    def next_batch(self):
        inputs = [self.word_to_ix[word] for word in self.words[self.pointer:self.pointer+self.seq_length]]
        targets = [self.word_to_ix[word] for word in self.words[self.pointer+1:self.pointer+self.seq_length+1]]
        self.pointer += self.seq_length
        if self.pointer + self.seq_length + 1 >= len(self.words):
            self.pointer = 0
        return inputs, targets

    def just_started(self):
        return self.pointer == 0

In [243]:
class RNN:

    def __init__(self, hidden_size, vocab_size, seq_length, learning_rate):
        self.hidden_size = hidden_size
        self.vocab_size = vocab_size
        self.seq_length = seq_length
        self.learning_rate = learning_rate

        # uniform distribution
        # Wij ~ U(-sqrt(1/fan_in), sqrt(1/fan_in))
        self.U = np.random.uniform(-np.sqrt(1./vocab_size), np.sqrt(1./vocab_size), (hidden_size, vocab_size))    
        self.W = np.random.uniform(-np.sqrt(1./hidden_size), np.sqrt(1./hidden_size), (hidden_size, hidden_size)) 
        self.V = np.random.uniform(-np.sqrt(1./hidden_size), np.sqrt(1./hidden_size), (vocab_size, hidden_size)) 
        self.b = np.zeros((hidden_size, 1)) 
        self.c = np.zeros((vocab_size, 1))

    def forward(self, inputs, hprev):
        x, h, o, ycap = {}, {}, {}, {}
        h[-1] = np.copy(hprev)
        for t in range(len(inputs)):
            x[t] = np.zeros((self.vocab_size, 1))
            x[t][inputs[t]] = 1
            # h(t) = f(U * x(t) + W * h(t-1) + b)
            h[t] = np.tanh(np.dot(self.U, x[t]) + np.dot(self.W, h[t-1]) + self.b)
            # o(t) = V * h(t) + c
            o[t] = np.dot(self.V, h[t]) + self.c
            # softmax (normalized)
            ycap[t] = np.exp(o[t] - np.max(o[t])) / np.sum(np.exp(o[t] - np.max(o[t])))
        return x, h, ycap
 
    def loss(self, p, targets):
        return sum(-np.log(p[t][targets[t], 0]) for t in range(len(targets)))

    def backward(self, x, h, ycap, targets):
        dU, dW, dV = np.zeros_like(self.U), np.zeros_like(self.W), np.zeros_like(self.V)
        db, dc = np.zeros_like(self.b), np.zeros_like(self.c)
        dhnext = np.zeros_like(h[0])
        for t in reversed(range(self.seq_length)):
            dy = np.copy(ycap[t])
            dy[targets[t]] -= 1
            dV += np.dot(dy, h[t].T)
            dc += dy
            dh = np.dot(self.V.T, dy) + dhnext
            dhraw = (1 - h[t] * h[t]) * dh
            db += dhraw
            dU += np.dot(dhraw, x[t].T)
            dW += np.dot(dhraw, h[t-1].T)
            dhnext = np.dot(self.W.T, dhraw)

        # clip to reduce exploding gradients 
        for dparam in [dU, dW, dV, db, dc]:
            np.clip(dparam, -5, 5, out=dparam)
        return dU, dW, dV, db, dc

    def update(self, dU, dW, dV, db, dc):
        for param, dparam in zip([self.U, self.W, self.V, self.b, self.c], [dU, dW, dV, db, dc]):
            param += -self.learning_rate * dparam

    def sample(self, h, seed_ix, n):
        x = np.zeros((self.vocab_size, 1))
        x[seed_ix] = 1
        ixes = []
        for t in range(n):
            h = np.tanh(np.dot(self.U, x) + np.dot(self.W, h) + self.b)
            y = np.dot(self.V, h) + self.c
            p = np.exp(y - np.max(y)) / np.sum(np.exp(y - np.max(y)))
            ix = np.random.choice(range(self.vocab_size), p=p.ravel())
            x = np.zeros((self.vocab_size, 1))
            x[ix] = 1
            ixes.append(ix)
        return ixes

    def train(self, data_reader, max_iters=20000):
        iter_num = 0
        threshold = 0.01
        smooth_loss = -np.log(1.0/data_reader.vocab_size)*self.seq_length
        
        while smooth_loss > threshold and iter_num < max_iters:
            if data_reader.just_started():
                hprev = np.zeros((self.hidden_size, 1))
            inputs, targets = data_reader.next_batch()
            xs, hs, ps = self.forward(inputs, hprev)
            dU, dW, dV, db, dc = self.backward(xs, hs, ps, targets)
            loss = self.loss(ps, targets)
            self.update(dU, dW, dV, db, dc)
            smooth_loss = smooth_loss * 0.999 + loss * 0.001
            hprev = hs[self.seq_length-1]
            if not iter_num % 500:
                sample_ix = self.sample(hprev, inputs[0], 200)
                print(" ".join(data_reader.ix_to_word[ix] for ix in sample_ix))
                print(f"\n\niter :{iter_num}, loss:{smooth_loss}")
            iter_num += 1

        print(f"Training completed after {iter_num} iterations with final loss {smooth_loss}")

    def predict(self, data, start, n):
        x = np.zeros((self.vocab_size, 1))
        words = start.split()
        idx = []
        for i in range(len(words)):
            ix = data.word_to_ix[words[i]]
            x[ix] = 1
            idx.append(ix)
        h = np.zeros((self.hidden_size, 1))
        for t in range(n):
            h = np.tanh(np.dot(self.U, x) + np.dot(self.W, h) + self.b)
            o = np.dot(self.V, h) + self.c
            p = np.exp(o - np.max(o)) / np.sum(np.exp(o - np.max(o)))
            ix = np.random.choice(range(self.vocab_size), p=p.ravel())
            x = np.zeros((self.vocab_size, 1))
            x[ix] = 1
            idx.append(ix)
        txt = ' '.join(data.ix_to_word[ix] for ix in idx)
        return txt

In [248]:
import requests

url = "https://www.gutenberg.org/files/11/11-0.txt"
response = requests.get(url)

with open("alice_in_wonderland.txt", "w", encoding="utf-8", errors='replace') as file:
    file.write(response.text)


In [249]:
# preprocessing
import re

def read_text_file(file_path):
    with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
        text = file.read()
    return text

def normalize_text(text):
    text = text.lower()
    text = re.sub(r'[^a-z\s]', '', text)
    return text

def tokenize_text(text):
    words = text.split()
    return words

def clean_words(words):
    words = [word for word in words if len(word) > 1]
    return words

def save_preprocessed_text(words, output_path):
    with open(output_path, 'w', encoding='utf-8') as file:
        file.write(' '.join(words))


input_file = "alice_in_wonderland.txt"
output_file = "sample.txt"


text = read_text_file(input_file)
normalized_text = normalize_text(text)
words = tokenize_text(normalized_text)
cleaned_words = clean_words(words)
save_preprocessed_text(cleaned_words, output_file)


In [299]:
# training
filename = 'sample.txt' 
seq_length = 10
hidden_size = 100
learning_rate = 1e-3

data_reader = DataReader(filename, seq_length)
rnn = RNN(hidden_size, data_reader.vocab_size, seq_length, learning_rate)


rnn.train(data_reader)




iter :0, loss:79.204490346299
armdeniedwaitrudeengravedsurprisedslateslltreaclewellehlinemuchnesssurprisedkillingpurringreducedtriedfoolishpennyworthtwelvelearntboldlysolidprotectioncomfortablecryingtaughtdesperatepairsyoungpausedlizardsteathingsgeneralsonopinionhateccatchingstandingwholeupsetblastsfriendswingskeepingclingingsafetoyssinkmelancholyitlljusticeprisonkneltimitatedpointingtruemistakeeyelidsmindingstartiiopenpointedpeepingproducedwatersshuttingsheddinglapclosedthrowingcatcattlesheepbellsargumentfryingpangoodexplainedverysnappishlysharebrightneedstrygravygrandstringsdeniedwaistfanciedbillfrontripplingmaamtearsfillsliveddarknesssnortingdreamtailshearhoarserabbitholeeatsoppositeeyeshouldteapartylapliveversepuzzleduneasyquiteclaspeddistanceforgettingunwillinglyhasflavourlaughgardenersnotiononeselfchargescreepoccasionalwintermysteryrememberedfightingcurvingheelswitsrockwifedishbuttonsdesperatenosedownwardswalkshortnoisespiecesnotthoughtfullywhatsreachfittedhighswallowedcakescre

In [304]:
# predictions
start_text = "she said"
n = 5

predicted_text = rnn.predict(data_reader, start_text, n)
print(predicted_text)

she said they who imitated heard all
