# Mohammad Amin Rami
# 98101588
# HW4
# Question 1


In [1]:
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
from io import open
import string
import re
import random
from tqdm import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [2]:
SOS_token = 0
EOS_token = 1

class Lang:
    def __init__(self):
        self.word2index = {}
        self.index2word = {0: "SOS", 1: "EOF"}
        self.word2count = {}
        self.n_words = 2

    def add_sentence(self, sentence):
        for word in sentence.split(' '):
            self.add_word(word)

    def add_word(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            self.word2count[word] += 1


In [3]:
lang = Lang()
data_path = "ferdousi.txt"
data = []
max_length = 0

with open(data_path) as file:
    next(file)
    next(file)
    while True:
        try:
            first_mesra = next(file).strip()
            second_mesra = next(file).strip()
            data.append((first_mesra, second_mesra))
            
            lang.add_sentence(first_mesra)
            lang.add_sentence(second_mesra)
            max_length = len(first_mesra.split(' ')) if len(first_mesra.split(' ')) > max_length else max_length 
            max_length = len(second_mesra.split(' ')) if len(second_mesra.split(' ')) > max_length else max_length
            
        except StopIteration:
            break

max_length += 2


In [4]:
def indexesFromSentence(lang, sentence):
    return [lang.word2index[word] for word in sentence.split(' ')]

def tensorFromSentence(lang, sentence):
    indexes = indexesFromSentence(lang, sentence)
    indexes.append(EOS_token)
    return torch.tensor(indexes, dtype=torch.long, device=device).view(-1, 1)

def tensorsFromPair(lang, pair):
    input_tensor = tensorFromSentence(lang, pair[0]).to(device)
    target_tensor = tensorFromSentence(lang, pair[1]).to(device)
    return (input_tensor, target_tensor)


In [5]:
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import numpy as np


def showPlot(points):
    plt.figure()
    fig, ax = plt.subplots()
    # this locator puts ticks at regular intervals
    loc = ticker.MultipleLocator(base=0.2)
    ax.yaxis.set_major_locator(loc)
    plt.plot(points)
    plt.show()

### Part A: LSTM

In [6]:
class EncoderRNN(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size)

    def forward(self, x, h):
        embedded = self.embedding(x).view(1, 1, -1)
        output, hidden = self.lstm(embedded, h)
        return output, hidden

    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)


In [7]:
class DecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size)
        self.out = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, x, h):
        output = self.embedding(x).view(1, 1, -1)
        output = F.relu(output)
        output, hidden = self.lstm(output, h)
        output = self.softmax(self.out(output[0]))
        return output, hidden

    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)


In [8]:
def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_length):
    encoder_hidden = encoder.initHidden()
    encoder_hidden = (encoder_hidden, torch.zeros_like(encoder_hidden))

    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    input_length = input_tensor.size(0)
    target_length = target_tensor.size(0)

    encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)

    loss = 0

    for ei in range(input_length):
        encoder_output, encoder_hidden = encoder(
            input_tensor[ei], encoder_hidden)
        encoder_outputs[ei] = encoder_output[0, 0]

    decoder_input = torch.tensor([[SOS_token]], device=device)

    decoder_hidden = encoder_hidden
    
    # Teacher forcing: Feed the target as the next input
    for di in range(target_length):
        decoder_output, decoder_hidden = decoder(
            decoder_input, decoder_hidden)
        loss += criterion(decoder_output, target_tensor[di])
        decoder_input = target_tensor[di]  # Teacher forcing

    loss.backward()

    encoder_optimizer.step()
    decoder_optimizer.step()

    return loss.item() / target_length


In [9]:
def trainIters(pairs, encoder, decoder, lang, max_length, print_every=1000, plot_every=100, learning_rate=0.01):
    plot_losses = []
    print_loss_total = 0  # Reset every print_every
    plot_loss_total = 0  # Reset every plot_every

    encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)
    decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)

    criterion = nn.CrossEntropyLoss()

    for i, training_pair in enumerate(pairs):
        input_tensor = tensorFromSentence(lang, training_pair[0])
        target_tensor = tensorFromSentence(lang, training_pair[1])

        loss = train(input_tensor, target_tensor, encoder,
                     decoder, encoder_optimizer, decoder_optimizer, criterion, max_length)
        print_loss_total += loss
        plot_loss_total += loss
        if (i + 1) == 100:
            print_loss_avg = print_loss_total / 100
            print(f'train loss:      {print_loss_avg:0.2f}       iteration {i+1}/{len(pairs)}')
        if (i + 1) % print_every == 0:
            print_loss_avg = print_loss_total / print_every
            print_loss_total = 0
            print(f'train loss:      {print_loss_avg:0.2f}       iteration {i+1}/{len(pairs)}')

        if (i + 1) % plot_every == 0:
            plot_loss_avg = plot_loss_total / plot_every
            plot_losses.append(plot_loss_avg)
            plot_loss_total = 0

    return plot_losses


In [10]:
input_size = len(lang.word2index)
hidden_size = 120

encoder = EncoderRNN(input_size, hidden_size).to(device)
decoder = DecoderRNN(hidden_size, input_size).to(device)


In [11]:
loss = []
epochs = 5
max_data = 40000
for epoch in range(epochs):
    try:
        print(f"============== epoch {epoch + 1} ==============")
        epoch_loss = trainIters(data[:max_data], encoder, decoder, lang, max_length, print_every=5000, plot_every=300, learning_rate=0.025)
        loss.extend(epoch_loss)
    except KeyboardInterrupt:
        print('Training finished')
        break


train loss:      8.55       iteration 100/40000
Training finished


In [12]:
def evaluate(encoder, decoder, sentence, input_lang, max_length=13):
    with torch.no_grad():
        input_tensor = tensorFromSentence(input_lang, sentence)
        input_length = input_tensor.size()[0]
        encoder_hidden = encoder.initHidden()
        encoder_hidden = (encoder_hidden, torch.zeros_like(encoder_hidden))

        encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)

        for ei in range(input_length):
            encoder_output, encoder_hidden = encoder(input_tensor[ei],
                                                     encoder_hidden)
            encoder_outputs[ei] += encoder_output[0, 0]

        decoder_input = torch.tensor([[SOS_token]], device=device)  # SOS

        decoder_hidden = encoder_hidden

        decoded_words = []
        decoder_attentions = torch.zeros(max_length, max_length)

        for di in range(max_length):
            decoder_output, decoder_hidden = decoder(
                decoder_input, decoder_hidden)
            topv, topi = decoder_output.data.topk(1)
            if topi.item() == EOS_token:
                decoded_words.append('<EOS>')
                break
            else:
                decoded_words.append(input_lang.index2word[topi.item()])

            decoder_input = topi.squeeze().detach()
        
        decoded_words =  ' '.join(decoded_words[:-1])
        return decoded_words


In [13]:
import random
k = random.choice(range(len(data)))
k = 10334
print(data[k][0])
print('='*40)
print(evaluate(encoder, decoder, data[k][0], lang))
print('='*40)
print(data[k][1])
k

مر او را سوی تخت ایران برد
به را به و و و و
بر نامداران و شیران برد


10334