In [21]:
from torch import nn
from torch.nn import functional as F
import torch
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, sampler
import torchvision.transforms as transforms
import torchvision.datasets as dset
from matplotlib import pyplot as plt
import numpy as np
import unicodedata
import re
import random

IMG_SIZE = 28
BATCH_SIZE = 64
EPOCHS = 40
SOS_TOKEN = 0
EOS_TOKEN = 1
MAX_LENGTH = 10

eng_prefixes = (
    "i am ", "i m ",
    "he is", "he s ",
    "she is", "she s ",
    "you are", "you re ",
    "we are", "we re ",
    "they are", "they re "
)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [2]:
import os

print(os.getcwd())

e:\VSCODE\Python\AIExperiments\Experiment3


# import data

In [22]:
class Lang:
    def __init__(self,name):
        self.word2index = {"<SOS>": 0, "<EOS>": 1}
        self.index2word = {0: "<SOS>", 1: "<EOS>"}
        self.n_words = 2
        self.name = name

    def addSentence(self, sentence):
        for word in sentence.split(' '):
            self.addWord(word)
    
    def addWord(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            pass

def unicode2ascii(s):
    return ''.join(
        c for c in unicodedata.normalize('NFD', s)
        if unicodedata.category(c) != 'Mn'
)

# lower case, trim, and remove non-letter characters

def normalizeString(s):
    s = unicode2ascii(s.lower().strip())
    s = re.sub(r"([.!?])", r" \1", s)
    s = re.sub(r"[^a-zA-Z.!?]+", r" ", s)
    return s

def readLangs(lang1, lang2, reverse=False):
    print("Reading lines...")

    # read the file and split into lines
    lines = open('data/%s-%s.txt' % (lang1, lang2), encoding='utf-8').\
        read().strip().split('\n')

    # split every line into pairs and normalize
    pairs = [[normalizeString(s) for s in l.split('\t')] for l in lines]

    # reverse pairs, make Lang instances
    if reverse:
        pairs = [list(reversed(p)) for p in pairs]
        input_lang = Lang(lang2)
        output_lang = Lang(lang1)
    else:
        input_lang = Lang(lang1)
        output_lang = Lang(lang2)

    return input_lang, output_lang, pairs


def filterPair(p):
    return len(p[0].split(' ')) < MAX_LENGTH and \
        len(p[1].split(' ')) < MAX_LENGTH and \
        p[1].startswith(eng_prefixes)


def filterPairs(pairs):
    return [pair for pair in pairs if filterPair(pair)]


def prepareData(lang1, lang2, reverse=False):
    input_lang, output_lang, pairs = readLangs(lang1, lang2, reverse)
    print("Read %d sentence pairs" % len(pairs))
    pairs = filterPairs(pairs)
    print("Trimmed to %d sentence pairs" % len(pairs))
    print("Counting words...")
    for pair in pairs:
        input_lang.addSentence(pair[0])
        output_lang.addSentence(pair[1])
    print("Counted words:")
    print(input_lang.name, input_lang.n_words)
    print(output_lang.name, output_lang.n_words)
    return input_lang, output_lang, pairs

In [23]:
input_lang, output_lang, pairs = prepareData('eng', 'fra', True)
print(random.choice(pairs))

Reading lines...
Read 39365 sentence pairs
Trimmed to 1271 sentence pairs
Counting words...
Counted words:
fra 1418
eng 1093
['c est une infirmiere attitree .', 'she is qualified as a nurse .']


# define GRU model

In [24]:
class GRU(nn.Module):
    def __init__(self, input_size=512, hidden_size=512):
        super(GRU, self).__init__()
        self.sigmoid = nn.Sigmoid()
        self.tanh = nn.Tanh()
        self.Wr = nn.Linear(input_size, hidden_size)
        self.Ur = nn.Linear(hidden_size, hidden_size)
        self.Wz = nn.Linear(input_size, hidden_size)
        self.Uz = nn.Linear(hidden_size, hidden_size)
        self.W = nn.Linear(input_size, hidden_size)
        self.U = nn.Linear(hidden_size, hidden_size)
        self.Wy = nn.Linear(hidden_size, input_size)

    def forward(self, input, hidden):
        r = self.sigmoid(self.Wr(input) + self.Ur(hidden))
        z = self.sigmoid(self.Wz(input) + self.Uz(hidden))
        h_hat = self.tanh(self.W(input) + r * self.U(hidden))
        h = (1 - z) * h_hat + z * hidden
        output = self.Wy(h)
        return output, h
    
    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)


class EncoderGRU(nn.Module):
    def __init__(self, input_size, hidden_size, n_layers=1):
        super(EncoderGRU, self).__init__()
        self.hidden_size = hidden_size
        self.n_layers = n_layers

        self.embedding = nn.Embedding(input_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size, n_layers)

    def forward(self, input, hidden):
        embedded = self.embedding(input).view(1, 1, -1) # S=1 x B x N
        output = embedded
        for i in range(self.n_layers):
            output, hidden = self.gru(output, hidden)
        return output, hidden

    def initHidden(self):
        return torch.zeros(self.n_layers, 1, self.hidden_size, device=device)
    

class DecoderGRU(nn.Module):
    def __init__(self, hidden_size, output_size, n_layers=1):
        super(DecoderGRU, self).__init__()
        self.hidden_size = hidden_size
        self.n_layers = n_layers

        self.embedding = nn.Embedding(output_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size, n_layers)
        self.out = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)
    
    def forward(self, input, hidden):
        output = self.embedding(input).view(1, 1, -1) # S=1 x B x N
        output = F.relu(output)
        for i in range(self.n_layers):
            output, hidden = self.gru(output, hidden)
        output = self.softmax(self.out(output[0]))
        return output, hidden
    
    def initHidden(self):
        return torch.zeros(self.n_layers, 1, self.hidden_size, device=device)
    

# class originalGRU(nn.Moudle):
#     def __init__(self, input_size=512, hidden_size=512):
#         super(originalGRU, self).__init__()
#         self.sigmoid = nn.Sigmoid()
#         self.tanh = nn.Tanh()
#         self.Wr = nn.Linear(input_size, hidden_size)
#         self.Wz = nn.Linear(input_size, hidden_size)
#         self.W = nn.Linear(input_size, hidden_size)
#         self.Wy = nn.Linear(hidden_size, input_size)

#     def forward(self, input, hidden):
#         r = self.sigmoid(self.Wr(input) + hidden)
#         z = self.sigmoid(self.Wz(input) + hidden)
#         h_hat = self.tanh(self.W(input) + r * hidden)
#         h = (1 - z) * h_hat + z * hidden
#         output = self.Wy(h)
#         return output, h
    
#     def initHidden(self):
#         return torch.zeros(1, 1, self.hidden_size, device=device)


# training

In [25]:
from torch.utils.tensorboard import SummaryWriter
import time
import math
from matplotlib import ticker

plt.switch_backend("agg")


writer = SummaryWriter("runs/GRU")


def asMinutes(s):
    m = math.floor(s / 60)
    s -= m * 60
    return "%dm %ds" % (m, s)


def timeSince(since, percent):
    now = time.time()
    s = now - since
    es = s / (percent)
    rs = es - s
    return "%s (- %s)" % (asMinutes(s), asMinutes(rs))


def showPlot(points):
    plt.figure()
    fig, ax = plt.subplots()
    loc = ticker.MultipleLocator(base=0.2)
    ax.yaxis.set_major_locator(loc)
    plt.plot(points)


def indexesFromSentence(lang, sentence):
    return [lang.word2index[word] for word in sentence.split(" ")]


def tensorFromSentence(lang, sentence):
    indexes = indexesFromSentence(lang, sentence)
    indexes.append(EOS_TOKEN)
    return torch.tensor(indexes, dtype=torch.long, device=device).view(-1, 1)


def tensorsFromPair(pair):
    input_tensor = tensorFromSentence(input_lang, pair[0])
    output_tensor = tensorFromSentence(output_lang, pair[1])
    return (input_tensor, output_tensor)


def train(
    input_tensor,
    target_tensor,
    encoder,
    decoder,
    encoder_optimizer,
    decoder_optimizer,
    criterion,
    max_length=MAX_LENGTH,
):
    encoder_hidden = encoder.initHidden()

    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    input_length = input_tensor.size(0)
    target_length = target_tensor.size(0)

    loss = 0

    for ei in range(input_length):
        encoder_output, encoder_hidden = encoder(input_tensor[ei], encoder_hidden)

    decoder_input = torch.tensor([[SOS_TOKEN] * input_tensor.size(1)], device=device)

    decoder_hidden = encoder_hidden

    for di in range(target_length):
        decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
        loss += criterion(decoder_output, target_tensor[di])
        decoder_input = target_tensor[di]

    loss.backward()

    encoder_optimizer.step()
    decoder_optimizer.step()

    return loss.item() / target_length


def trainIters(
    encoder, decoder, n_iters, print_every=1000, plot_every=100, learning_rate=0.01
):
    start = time.time()
    plot_losses = []
    print_loss_total = 0  # reset every print_every
    plot_loss_total = 0  # reset every plot_every

    encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)
    decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)
    training_pairs = [tensorsFromPair(random.choice(pairs)) for i in range(n_iters)]
    criterion = nn.NLLLoss()

    for iter in range(1, n_iters + 1):
        training_pair = training_pairs[iter - 1]
        input_tensor = training_pair[0]
        target_tensor = training_pair[1]

        loss = train(
            input_tensor,
            target_tensor,
            encoder,
            decoder,
            encoder_optimizer,
            decoder_optimizer,
            criterion,
        )
        print_loss_total += loss
        plot_loss_total += loss


        if iter % print_every == 0:
            print_loss_avg = print_loss_total / print_every
            print_loss_total = 0
            writer.add_scalar("Loss/train", print_loss_avg, iter)
            print(
                "%s (%d %d%%) %.4f"
                % (
                    timeSince(start, iter / n_iters),
                    iter,
                    iter / n_iters * 100,
                    print_loss_avg,
                )
            )

        if iter % plot_every == 0:
            plot_loss_avg = plot_loss_total / plot_every
            plot_losses.append(plot_loss_avg)
            plot_loss_total = 0

    showPlot(plot_losses)


def evaluate(encoder, decoder, sentence, max_length=MAX_LENGTH):
    with torch.no_grad():
        input_tensor = tensorFromSentence(input_lang, sentence)
        input_length = input_tensor.size(0)
        encoder_hidden = encoder.initHidden()

        for ei in range(input_length):
            encoder_output, encoder_hidden = encoder(input_tensor[ei], encoder_hidden)

        decoder_input = torch.tensor([[SOS_TOKEN]], device=device)
        decoder_hidden = encoder_hidden

        decoded_words = []
        decoder_attentions = torch.zeros(max_length, max_length)

        for di in range(max_length):
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
            # decoder_attentions[di] = decoder_attention.data
            topv, topi = decoder_output.data.topk(1)
            if topi.item() == EOS_TOKEN:
                decoded_words.append("<EOS>")
                break
            else:
                decoded_words.append(output_lang.index2word[topi.item()])

            decoder_input = topi.squeeze().detach()

        return decoded_words


def evaluateRandomly(encoder, decoder, n=10):
    for i in range(n):
        pair = random.choice(pairs)
        print(">", pair[0])
        print("=", pair[1])
        output_words = evaluate(encoder, decoder, pair[0])
        output_sentence = " ".join(output_words)
        print("<", output_sentence)
        print("")



In [26]:
hidden_size = 256
encoder1 = EncoderGRU(input_lang.n_words, hidden_size).to(device)
decoder1 = DecoderGRU(hidden_size, output_lang.n_words).to(device)


# trainIters(encoder1, decoder1, 55000, print_every=5000)

if os.path.exists('decoder.pth'):
    checkpoint = torch.load('decoder.pth')
    decoder1.load_state_dict(checkpoint)
    print('Decoder loaded from checkpoint.')

if os.path.exists('encoder.pth'):
    checkpoint = torch.load('encoder.pth')
    encoder1.load_state_dict(checkpoint)
    print('Encoder loaded from checkpoint.')


Decoder loaded from checkpoint.
Encoder loaded from checkpoint.


In [27]:
evaluateRandomly(encoder1, decoder1)

> il est handicape mental .
= he is mentally handicapped .
< he is mentally handicapped . <EOS>

> elle est sortie pour des chaussures .
= she s out shopping for shoes .
< she s out s out s out s out s

> je traverse les memes problemes .
= i m having the same problems .
< i m having some problems . <EOS>

> son metier est medecin .
= he is a doctor by profession .
< he is not at harvard . <EOS>

> tu es incroyablement stupide .
= you re unbelievably stupid .
< he is busy not concerned with his desk . <EOS>

> elle est plus agee et plus sage maintenant .
= she is older and wiser now .
< she is older and wiser now . <EOS>

> vous avez parfaitement raison .
= you are absolutely correct .
< you are wanted my succeed . <EOS>

> il travaille de nuit ce soir .
= he is on night duty tonight .
< he is going to lose night duty tonight . <EOS>

> je me rejouis que tu le voies ainsi .
= i m glad you see it that way .
< i m glad of your see it . <EOS>

> je suis ravi que tu aies souleve ca .
= i m

In [14]:
encoder1_pth_name = f'encoder64.pth'
decoder1_pth_name = f'decoder64.pth'
torch.save(encoder1.state_dict(), encoder1_pth_name)
torch.save(decoder1.state_dict(), decoder1_pth_name)