In [14]:
import tensorflow as tf
from collections import Counter
import numpy as np
import re

# P1) Analyze the dataset
corpus = [line.strip() for line in open('data/TheTimeMachine.txt') if line.strip()][2:]
print("\n".join(corpus[:10]))

corpus = [re.sub('[^A-Za-z0-9]+', ' ', line).lower() for line in corpus]
corpus = [re.sub(' +', ' ', line) for line in corpus]
corpus = [word for line in corpus for word in line.split()]

vocab_size = 5000
tkn_counter = Counter([word for word in corpus])
vocab = {word: idx for idx, (word, _) in enumerate(tkn_counter.most_common(vocab_size))}
vocab["/UNK"] = len(vocab)

class TextCorpusDataset(tf.keras.utils.Sequence):
    def __init__(self, corpus, vocab, snippet_len=50):
        self.corpus = corpus
        self.snippet_len = snippet_len
        # Vocabulary (word-to-index mapping)
        self.vocab = vocab
        # Inverse vocabulary (index-to-word mapping)
        self.inv_vocab = {idx: word for word, idx in self.vocab.items()}

    def convert2idx(self, word_sequence):
        return [self.vocab.get(word, self.vocab["/UNK"]) for word in word_sequence]

    def convert2words(self, idx_sequence):
        return [self.inv_vocab[idx] for idx in idx_sequence]

    def __len__(self):
        return (len(self.corpus) - self.snippet_len) // self.snippet_len

    def __getitem__(self, idx):
        idx = idx * self.snippet_len
        snippet = self.corpus[idx:idx+self.snippet_len]
        snippet = np.array(self.convert2idx(snippet))
        return snippet

# Test dataset function
dataset = TextCorpusDataset(corpus, vocab, snippet_len=50)
snippet = dataset[123]
print("\nRandom snippet from the corpus.")
print("  * Token IDS:\t", snippet)
print("  * Words:\t\t", " ".join([dataset.inv_vocab[i] for i in snippet]))

The Time Traveller (for so it will be convenient to speak of him)
was expounding a recondite matter to us. His grey eyes shone and
twinkled, and his usually pale face was flushed and animated. The
fire burned brightly, and the soft radiance of the incandescent
lights in the lilies of silver caught the bubbles that flashed and
passed in our glasses. Our chairs, being his patents, embraced and
caressed us rather than submitted to be sat upon, and there was that
luxurious after-dinner atmosphere when thought roams gracefully
free of the trammels of precision. And he put it to us in this
way--marking the points with a lean forefinger--as we sat and lazily

Random snippet from the corpus.
  * Token IDS:	 [  13    1  377   14    4  506  697   85   18   20  855 2616    1    6
   36    5  585 2617    6 1632   59    4 1168   85    0 2618    3 2619
 2620   17    5  149    5    4  513 2621    0 2622    3   82 1633   39
 1633   33 1634  256    7    9  113 1056]
  * Words:		 as i travelled at a hig

In [17]:
from tensorflow.keras.layers import Dense

class CustomLSTM(tf.keras.Model):
    def __init__(self, input_size, hidden_size, output_size=None):
        super(CustomLSTM, self).__init__()

        self.input_size = input_size
        self.hidden_size = hidden_size

        # LSTM Parameters
        self.input_gate = Dense(hidden_size, activation='sigmoid', name='input_gate')
        self.forget_gate = Dense(hidden_size, activation='sigmoid', name='forget_gate')
        self.candidate = Dense(hidden_size, activation='tanh', name='candidate_gate')
        self.output_gate = Dense(hidden_size, activation='sigmoid', name='output_gate')

        self.predictor = Dense(output_size) if output_size is not None else tf.keras.layers.Activation('linear')

        # Initialize weights
        for layer in [self.input_gate, self.forget_gate, self.candidate, self.output_gate]:
            layer.kernel_initializer = tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.01)

    def init_state_cell(self, batch_size):
        state = tf.zeros((batch_size, self.hidden_size))
        cell = tf.zeros((batch_size, self.hidden_size))
        return state, cell

    def call(self, x, state=None, cell=None):
        # Get sequence length and batch size
        seq_len, batch_size = x.shape[:2]

        # Initialize hidden and cell states if not provided
        if state is None or cell is None:
            state, cell = self.init_state_cell(batch_size)

        # Lists to store outputs and cell states for each time step
        outputs = []
 
        # Iterate through the sequence
        for t in range(seq_len):
            # Input at time step t
            xh_t = tf.concat([x[t], state], axis=1)

            # Input gate
            inp_t = self.input_gate(xh_t)

            # Forget gate
            forget_t = self.forget_gate(xh_t)

            # Cell state
            c_tilda_t = self.candidate(xh_t)
            cell = forget_t * cell + (1-forget_t) * c_tilda_t

            # Output gate
            ot = self.output_gate(xh_t)

            # Hidden state update
            state = tf.tanh(cell)

            # Normally an LSTM simply outputs the hidden state.
            # However, here we want our outputs to be the logits for the predicted next word.
            output = self.predictor(state)
            outputs.append(output)

        # Stack outputs along the sequence dimension
        outputs = tf.stack(outputs, axis=0)
        return outputs, (state, cell)

# Example usage
hidden_dim, vocab_size = 256, len(dataset.vocab)
model = CustomLSTM(vocab_size, hidden_dim, vocab_size)


sentence = "today is too darn cold".split()
inp = tf.constant(dataset.convert2idx(sentence))[:, tf.newaxis]
inp = tf.one_hot(inp, len(vocab), dtype=tf.float32)
Yhat, new_state = model(inp)
Yhat = tf.argmax(Yhat, axis=-1)
print(dataset.convert2words(Yhat.numpy().squeeze()))

['inquired', 'worms', 'loathed', 'loathed', 'showering']


In [18]:
def generate(prefix, num_preds, model, vocab):
    prefix = tf.constant(dataset.convert2idx(prefix.split()), dtype=tf.int32).numpy()

    state, cell, outputs = None, None, [prefix[0]]
    for i in range(1, len(prefix) + num_preds):
        # Prepare one token at a time to feed the model
        inp = tf.one_hot(outputs[-1], len(vocab), dtype=tf.float32)[tf.newaxis, tf.newaxis]

        # Compute the prediction for the next token
        yhat, (state, cell) = model(inp, state, cell)

        if i < len(prefix):
            # During warmup (while parsing the prefix), we ignore the model prediction
            outputs.append(prefix[i])
        else:
            # Otherwise, append the model prediction to the output list
            yhat = tf.argmax(yhat[0, 0], axis=-1).numpy()
            outputs.append(yhat)
    return ' '.join([dataset.inv_vocab[tkn] for tkn in outputs])

generate('i do not mean to ask you to accept anything', 10, model, vocab)

'i do not mean to ask you to accept anything appetite thinking fed groping prophecy glared wild malachite escape confident'

In [None]:
from tensorflow.keras.optimizers import RMSprop

def train_on_sequence(seq, model, optimizer, unroll=5):
    """Train the model within a batch of long text sequences."""
    batch_size, num_tokens = seq.shape

    total_loss, state, cell = 0., None, None
    for i in range(0, num_tokens-unroll-1, unroll):
        if state is not None:
            state = tf.stop_gradient(state)
            cell = tf.stop_gradient(cell)

        # Define the input sequence along which we will unroll the RNN
        x = tf.transpose(seq[:, i:i+unroll])
        y = tf.transpose(seq[:, i+1:i+unroll+1])
        # Forward the model and compute the loss
        x = tf.one_hot(x, len(vocab))
        with tf.GradientTape() as tape:
            y_hat, (state, cell) = model(x, state, cell)
            loss = tf.reduce_mean(tf.keras.losses.sparse_categorical_crossentropy(y, tf.reshape(y_hat, (-1, len(vocab))),from_logits=True))
        total_loss += loss.numpy()

        # Backward step
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    n_batches = (num_tokens-unroll-1) // unroll
    return total_loss/n_batches

def fit(model, loader, vocab, lr, num_epochs=100, unroll=5):
    optimizer = RMSprop(learning_rate=lr)
    loader_size = len(list(loader))
    test_prompt = 'i do not mean to ask you to accept anything'
    for epoch in range(num_epochs):
        total_loss = 0
        for i,sequence in enumerate(loader):
            total_loss += train_on_sequence(sequence, model, optimizer, unroll=unroll)
        total_loss /= loader_size

        print(f'Epoch {epoch} | Perplexity {np.exp(total_loss):.1f}. Loss: {total_loss:.3f}')
        print(generate(test_prompt, 50, model, vocab))

num_epochs, lr = 100, 0.005
dataset = TextCorpusDataset(corpus, vocab, 100)
loader = tf.data.Dataset.from_generator(lambda: iter(dataset), output_signature=tf.TensorSpec(shape=(100,), dtype=tf.int32)).batch(32)

model = CustomLSTM(len(dataset.vocab), hidden_dim, output_size=len(dataset.vocab))
fit(model, loader, dataset.vocab, lr, num_epochs)





















































































































































































































































































































In [None]:
corpus = [line.strip() for line in open('/content/drive/My Drive/Colab Notebooks/539_ANN/data/TheTimeMachine.txt') if line.strip()][2:]
print("\n".join(corpus[:10]))

corpus = [re.sub('[^A-Za-z0-9]+', ' ', line).lower() for line in corpus]
corpus = [re.sub(' +', ' ', line) for line in corpus]

train_corpus = corpus[:2500]
test_corpus = corpus[2500:]
train_corpus = [word for line in train_corpus for word in line.split()]
train_corpus = [word for line in test_corpus for word in line.split()]

vocab_size = 5000
tkn_counter = Counter([word for word in train_corpus])
vocab = {word: idx for idx, (word, _) in enumerate(tkn_counter.most_common(vocab_size))}
vocab["/UNK"] = len(vocab)


num_epochs, lr = 100, 0.005
train_dataset = TextCorpusDataset(train_corpus, vocab, 100)
test_dataset = TextCorpusDataset(test_corpus, vocab, 100)
loader = tf.data.Dataset.from_generator(lambda: iter(train_dataset), output_signature=tf.TensorSpec(shape=(100,), dtype=tf.int32)).batch(32)

model = CustomLSTM(len(train_dataset.vocab), hidden_dim, output_size=len(train_dataset.vocab))
fit(model, loader, train_dataset.vocab, lr, num_epochs)

In [None]:
def eval(loader, model, unroll=5):
    """evaluate testset on model"""
    loader_size = len(list(loader))
    total_loss_set = 0
    for i,sequence in enumerate(loader):
        batch_size, num_tokens = sequence.shape
        total_loss, state, cell = 0., None, None
        for i in range(0, num_tokens-unroll-1, unroll):
            # Define the input sequence along which we will unroll the RNN
            ??
            # Forward the model and compute the loss
            ??
            total_loss += loss.numpy()
        n_batches = (num_tokens-unroll-1) // unroll
        total_loss /= n_batches
        total_loss_set += total_loss
    total_loss_set /= loader_size
    return total_loss_set

In [None]:
loader = tf.data.Dataset.from_generator(lambda: iter(test_dataset), output_signature=tf.TensorSpec(shape=(100,), dtype=tf.int32)).batch(32)
print(f'test loss:{eval(loader, model)}')
test_prompt = 'i do not mean to ask you to accept anything'
print(generate(test_prompt, 50, model, vocab))