In [4]:
import math
from collections import defaultdict
import torch
import torch.nn as nn
import torch.nn.functional as F
import random
import time
from utils import Vocab, read_data

device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # to use gpu on kaggle or colab


In [None]:
class NGramModel:
	def __init__(self, n, data):
		self.n = n
		self.vocab = Vocab()
		self.vocab.add('<BOS>')
		self.vocab.add('<EOS>')
		self.vocab.add('<UNK>')
		for seq in data:
			self.vocab.update(seq)
		self.counts = defaultdict(lambda: defaultdict(int))
		self.probs = defaultdict(lambda: defaultdict(int))

	def start(self):
		return ['<BOS>'] * (self.n - 1)
		"""Because we add BOS and EOS in read_data():
			* No extra context needed for unigram model (returns []).
			* n-1 extra BOS tags needed for n>1 to provide context for the first symbol.
		"""

	def fit(self, data):
		"""TODO: 
			* Train the model on the training data by populating the counts. 
				* For n>1, you will need to keep track of the context and keep updating it. 
				* Get the starting context with self.start().
		"""
		context = tuple(self.start())
		for sentence in data:
			for sym in sentence:
				if sym not in self.vocab.sym2num:
					sym='<UNK>'
				if self.n>1:
					self.counts[tuple()][sym]+=1
					self.counts[context][sym]+=1
					context = context[1:] + (sym,)
				else:
					self.counts[context][sym]+=1
		"""TODO: Populate self.probs by converting counts to log probabilities with add-1 smoothing."""
		for context in self.counts:
			for sym in self.vocab.num2sym:
				self.probs[context][sym] = math.log(self.counts[context][sym] + 1) - math.log(sum(self.counts[context].values()) + len(self.vocab))
  		
	def step(self, context):
		"""Returns the distribution over possible next symbols. For unseen contexts, backs off to unigram distribution."""
		if self.n>1:
			context=self.start()+context
			context=tuple(context[-(self.n-1):])
		else:
			context=tuple()
		
		if context in self.probs:
			return self.probs[context]
		else:
			return self.probs[tuple()]
       

	def predict(self, context):
		return max(self.step(context).items(), key=lambda x: x[1])[0] 
	    
	def evaluate(self, data):
		correct, total = 0, 0
		for sentence in data:
			context = self.start()
   
			for sym in sentence:
				pred = self.predict(context)
				if pred == sym:
					correct += 1
				total += 1
				if self.n > 1:
					context = context[1:] + [sym]

		return correct / total if total > 0 else 0.0

if __name__ == '__main__':

	train_data = read_data('data/train.txt')
	val_data = read_data('data/val.txt')
	test_data = read_data('data/test.txt')
	response_data = read_data('data/response.txt')

	for n in [1,5]:
		print(f'\n{n}-Gram Model:')
		model = NGramModel(n, train_data)
		model.fit(train_data)
		print(f"\nValidation: {model.evaluate(val_data)}\nTesting: {model.evaluate(test_data)}\n")

		"""Generate the next 100 characters for the free response questions."""
		for x in response_data:
			x = x[:-1] # remove EOS
			for _ in range(100):
				y = model.predict(x)
				x += y
			print(''.join(x))


1-Gram Model:
0.17344519423673171 0.17463940113200657
<BOS>"I'm not ready to go," said                                                                                                    
<BOS>Lily and Max were best friends. One day                                                                                                    
<BOS>He picked up the juice and                                                                                                    
<BOS>It was raining, so                                                                                                    
<BOS>The end of the story was                                                                                                    

5-Gram Model:
0.5794273208097757 0.5728501004199379
<BOS>"I'm not ready to go," said, "i wanted to the boy who listen the boy who listen the boy who listen the boy who listen the boy w
<BOS>Lily and Max were best friends. One day, the boy who listen the boy who listen the boy who

In [None]:
class RNNModel(nn.Module):
	def __init__(self, vocab, dims):
		super().__init__()
		self.vocab = vocab
		self.dims = dims
		self.embed = nn.Embedding(len(vocab), dims)
  
		self.Wxh = nn.Linear(dims, dims)
		self.Whh = nn.Linear(dims, dims)
		self.fc = nn.Linear(dims, len(vocab))

	def start(self):
		return torch.zeros(self.dims, device=device)

	def step(self, h, idx):
		"""	TODO: Pass idx through the layers of the model. Return the updated hidden state (h) and log probabilities."""
		emb = self.embed(torch.tensor([idx], device=device))
		h_new = torch.tanh(self.Wxh(emb) + self.Whh(h.unsqueeze(0))).squeeze(0)
		logits = self.fc(h_new)
		log_probs = F.log_softmax(logits, dim=-1)
		return h_new, log_probs

	def predict(self, h, idx):
		"""	TODO: Obtain the updated hidden state and log probabilities after calling self.step(). 
			Return the updated hidden state and the most likely next symbol."""
		h_new, log_probs = self.step(h,idx)
		next_idx = torch.argmax(log_probs).item()
		next_sym = self.vocab.denumberize(next_idx)
		return h_new, next_sym

	def fit(self, data, lr=0.001, epochs=10):
		"""	TODO: Fill in the code using PyTorch functions and other functions from part2.py and utils.py.
			Most steps will only be 1 line of code. You may write it in the space below the step."""
		
		# 1. Initialize the optimizer. Use `torch.optim.Adam` with `self.parameters()` and `lr`.
		optimizer = torch.optim.Adam(self.parameters(), lr=lr)
		# 2. Set a loss function variable to `nn.NLLLoss()` for negative log-likelihood loss.
		loss_function = nn.NLLLoss()
		# 3. Loop through the specified number of epochs.
		for epoch in range(epochs):
		#	 1. Put the model into training mode using `self.train()`.
			self.train()
		#	 2. Shuffle the training data using random.shuffle().
			random.shuffle(data)
		#	 3. Initialize variables to keep track of the total loss (`total_loss`) and the total number of characters (`total_chars`).
			total_loss=0.0
			total_chars=0
   
			start_time=time.time()
		#	 4. Loop over each sentence in the training data.
			for sentence in data:
		#	 	 1. Initialize the hidden state with the start state, move it to the proper device using `.to(device)`, and detach it from any previous computation graph with `.detach()`.
				h=self.start().to(device).detach()
		#	 	 2. Call `optimizer.zero_grad()` to clear any accumulated gradients from the previous update.
				optimizer.zero_grad()
		#	 	 3. Initialize a variable to keep track of the loss within a sentence (`loss`).
				loss=0.0
		#	 	 4. Loop through the characters of the sentence from position 1 to the end (i.e., start with the first real character, not BOS).
				for i in range(1, len(sentence)):
		#	 	 	1. You will need to keep track of the previous character (at position i-1) and current character (at position i). These should be expressed as numbers, not symbols.
					prev_idx = self.vocab.numberize(sentence[i-1])
		#			2. Call self.step() to get the next hidden state and log probabilities over the vocabulary given the previous character.
					curr_idx = self.vocab.numberize(sentence[i])
		#			3. See if this matches the actual current character (numberized). Do so by computing the loss with the nn.NLLLoss() loss initialized above. 
		#			   * The first argument is the updated log probabilities returned from self.step(). You'll need to reshape it to `(1, V)` using `.view(1, -1)`.
		#			   * The second argument is the current numberized character. It will need to be wrapped in a tensor with `device=device`. Reshape this to `(1,)` using `.view(1)`.
					h, log_probs = self.step(h, prev_idx) 
		#			4. Add this this character loss value to `loss`.
					loss += loss_function(log_probs.view(1,-1), torch.tensor([curr_idx], device=device))
		#			5. Increment `total_chars` by 1.
					total_chars+=1
		#	 	 5. After processing the full sentence, call `loss.backward()` to compute gradients.
				loss.backward()
		#		 6. Apply gradient clipping to prevent exploding gradients. Use `torch.nn.utils.clip_grad_norm_()` with `self.parameters()` and a `max_norm` of 5.0.
				torch.nn.utils.clip_grad_norm_(self.parameters(), max_norm=5.0)
		#		 7. Call `optimizer.step()` to update the model parameters using the computed gradients.
				optimizer.step()
		#		 8. Add `loss.item()` to `total_loss`.
				total_loss+=loss.item()
		#	5. Compute the average loss per character by dividing `total_loss / total_chars`.
			average_loss = total_loss/total_chars
		#	6. For debugging, it will be helpful to print the average loss per character and the runtime after each epoch. Average loss per character should always decrease epoch to epoch and drop from about 3 to 1.2 over the 10 epochs.
			print(f"Epoch {epoch+1}/{epochs}\tAverage Loss: {average_loss:.4f}\t Time: {time.time()-start_time:.2f}s")

	def evaluate(self, data):
		"""	TODO: Iterating over the sentences in the data, calculate next character prediction accuracy. 
		Use `self.eval()` and `with torch.no_grad()` so that the model is not trained during evaluation.
		Use self.predict() to get the predicted next character, and then check if it matches the real next character found in the data.
		Divide the total correct predictions by the total number of characters to get the final accuracy."""
		self.eval()
		correct=0
		total=0
		with torch.no_grad():
			for sentence in data:
				h = self.start()
				for i in range(1, len(sentence)):
					idx = self.vocab.numberize(sentence[i-1])
					h, sym = self.predict(h, idx)
					if sym == sentence[i]:
						correct+=1
					total+=1
		accuracy = correct/total
		return accuracy
if __name__ == '__main__':
	
	train_data = read_data('data/train.txt')
	val_data = read_data('data/val.txt')
	test_data = read_data('data/test.txt')
	response_data = read_data('data/response.txt')

	vocab = Vocab()
	"""TODO: Populate vocabulary with all possible characters/symbols in the training data, including '<BOS>', '<EOS>', and '<UNK>'."""

	vocab.add('<BOS>')
	vocab.add('<EOS>')
	vocab.add('<UNK>')
	for sentence in train_data:
		vocab.update(sentence)
     
	model = RNNModel(vocab, dims=128).to(device)
	model.fit(train_data)

	torch.save({
		'model_state_dict': model.state_dict(),
		'vocab': model.vocab,
		'dims': model.dims
	}, 'rnn_model.pth')

	"""Use this code if you saved the model and want to load it up again to evaluate. Comment out the model.fit() and torch.save() code if so.
	# checkpoint = torch.load('rnn_model.pth', map_location=device, weights_only=False)
	# vocab = checkpoint['vocab']
	# dims = checkpoint['dims']
	# model = RNNModel(vocab, dims).to(device)
	# model.load_state_dict(checkpoint['model_state_dict'])
	"""

	model.eval()
	print(f"\nValidation: {model.evaluate(val_data)}\nTesting: {model.evaluate(test_data)}\n")

	"""Generate the next 100 characters for the free response questions."""
	for x in response_data:
		x = x[:-1] # remove EOS
		state = model.start()
		for char in x:
			idx = vocab.numberize(char)
			state, _ = model.step(state, idx)
		idx = vocab.numberize(x[-1])
		for _ in range(100):
			state, sym = model.predict(state, idx)
			x += sym # My predict() returns the denumberized symbol. Yours may work differently; change the code as needed.
		print(''.join(x))


Epoch 1/10	Average Loss: 1.9258	 Time: 14.27s


In [None]:
class LSTMModel(nn.Module):
    def __init__(self, vocab, dims):
        super().__init__()
        self.vocab = vocab
        self.dims = dims
        self.embed = nn.Embedding(len(vocab), dims)
        self.W_x = nn.Linear(dims, 4 * dims) 
        self.W_h = nn.Linear(dims, 4 * dims, bias=False) 
        self.fc = nn.Linear(dims, len(vocab))

    def start(self):
        h = torch.zeros(1, self.dims, device=device)
        c = torch.zeros(1, self.dims, device=device)
        return (h, c)

    def step(self, state, idx):
        h, c = state  
        x = self.embed(torch.tensor([idx], device=device)).unsqueeze(0)  
        x = x.squeeze(1) 

        gates = self.W_x(x) + self.W_h(h) 
        i, f, g, o = gates.chunk(4, dim=1)

        i = torch.sigmoid(i)
        f = torch.sigmoid(f)
        o = torch.sigmoid(o)
        g = torch.tanh(g)

        c_next = f * c + i * g
        h_next = o * torch.tanh(c_next)

        logits = self.fc(h_next)
        log_probs = F.log_softmax(logits, dim=1)

        return (h_next, c_next), log_probs

    def predict(self, state, idx):
        new_state, log_probs = self.step(state, idx)
        next_idx = torch.argmax(log_probs, dim=1).item()
        next_sym = self.vocab.denumberize(next_idx)
        return new_state, next_sym

    def fit(self, data, lr=0.001, epochs=10):
        optimizer = torch.optim.Adam(self.parameters(), lr=lr)
        loss_function = nn.NLLLoss()
        for epoch in range(epochs):
            self.train()
            random.shuffle(data)
            total_loss = 0.0
            total_chars = 0
            start_time = time.time()
            for sentence in data:
                state = self.start()
                state = (state[0].detach(), state[1].detach())
                optimizer.zero_grad()
                loss = 0.0
                for i in range(1, len(sentence)):
                    prev_idx = self.vocab.numberize(sentence[i - 1])
                    curr_idx = self.vocab.numberize(sentence[i])
                    state, log_probs = self.step(state, prev_idx)
                    loss += loss_function(log_probs.view(1, -1), torch.tensor([curr_idx], device=device))
                    total_chars += 1
                loss.backward()
                torch.nn.utils.clip_grad_norm_(self.parameters(), max_norm=5.0)
                optimizer.step()
                total_loss += loss.item()
            avg_loss = total_loss / total_chars
            print(f"Epoch {epoch + 1}/{epochs}\tAverage Loss: {avg_loss:.4f}\tTime: {time.time() - start_time:.2f}s")

    def evaluate(self, data):
        self.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for sentence in data:
                state = self.start()
                for i in range(1, len(sentence)):
                    idx = self.vocab.numberize(sentence[i - 1])
                    state, sym = self.predict(state, idx)
                    if sym == sentence[i]:
                        correct += 1
                    total += 1
        return correct / total if total > 0 else 0


if __name__ == '__main__':
    vocab = Vocab()
    vocab.add('<BOS>')
    vocab.add('<EOS>')
    vocab.add('<UNK>')

    train_data = read_data('data/train.txt')
    val_data = read_data('data/val.txt')
    test_data = read_data('data/test.txt')
    response_data = read_data('data/response.txt')

    for sent in train_data:
        vocab.update(sent)

    model = LSTMModel(vocab, dims=128).to(device)
    model.fit(train_data)

    torch.save({
        'model_state_dict': model.state_dict(),
        'vocab': model.vocab,
        'dims': model.dims
    }, 'lstm_model.pth')

    model.eval()
    print(f"\nValidation: {model.evaluate(val_data)}\nTesting: {model.evaluate(test_data)}\n")

    for x in response_data:
        x = x[:-1]  # remove EOS
        state = model.start()
        for char in x:
            idx = vocab.numberize(char)
            state, _ = model.predict(state, idx)
        idx = vocab.numberize(x[-1])
        for _ in range(100):
            state, sym = model.predict(state, idx)
            idx = vocab.numberize(sym)
            x += sym
        print(''.join(x))


Epoch 1/10	Average Loss: 1.9433	Time: 24.44s
Epoch 2/10	Average Loss: 1.5725	Time: 25.20s
Epoch 3/10	Average Loss: 1.4469	Time: 26.62s
Epoch 4/10	Average Loss: 1.3706	Time: 25.49s
Epoch 5/10	Average Loss: 1.3168	Time: 24.93s
Epoch 6/10	Average Loss: 1.2707	Time: 27.12s
Epoch 7/10	Average Loss: 1.2341	Time: 24.77s
Epoch 8/10	Average Loss: 1.2022	Time: 25.63s
Epoch 9/10	Average Loss: 1.1740	Time: 27.21s
Epoch 10/10	Average Loss: 1.1492	Time: 24.36s
0.6221437859929407 0.6146550120885252
<BOS>"I'm not ready to go," saidy, the boy named timmy.<EOS> the sky and said.<EOS> the boy named timmy.<EOS>.<EOS> the boy named timmy.<EOS> the sky an
<BOS>Lily and Max were best friends. One day.<EOS> the sky and said.<EOS> the boy named timmy.<EOS>.<EOS> the boy named timmy.<EOS> the sky and said.<EOS> the boy named
<BOS>He picked up the juice andy the sky and said, "i was so happy.<EOS> the sky and said.<EOS> the boy named timmy.<EOS>.<EOS> the boy named timmy
<BOS>It was raining, son the sky and said