<a href="https://colab.research.google.com/github/110805/Tense_Conversion/blob/master/CVAE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!git clone https://github.com/110805/Tense_Conversion.git
%cd Tense_Conversion/

Cloning into 'Tense_Conversion'...
remote: Enumerating objects: 5, done.[K
remote: Counting objects: 100% (5/5), done.[K
remote: Compressing objects: 100% (5/5), done.[K
remote: Total 5 (delta 0), reused 5 (delta 0), pack-reused 0[K
Unpacking objects: 100% (5/5), done.
/content/Tense_Conversion


In [8]:
import torch

a = torch.randn(1,1,3)
b = torch.randn(1,1,3)
print(a)
print(b)
print(a*b)

tensor([[[ 0.7623, -0.1146, -1.3251]]])
tensor([[[ 0.8138, -0.1935,  0.3873]]])
tensor([[[ 0.6204,  0.0222, -0.5132]]])


In [0]:
from __future__ import unicode_literals, print_function, division
from io import open
import unicodedata
import string
import re
import random
import time
import math
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
import matplotlib.pyplot as plt
plt.switch_backend('agg')
import matplotlib.ticker as ticker
import numpy as np
from os import system
from nltk.translate.bleu_score import SmoothingFunction, sentence_bleu

"""========================================================================================
The sample.py includes the following template functions:

1. Encoder, decoder
2. Training function
3. BLEU-4 score function
4. Gaussian score function

You have to modify them to complete the lab.
In addition, there are still other functions that you have to 
implement by yourself.

1. The reparameterization trick
2. Your own dataloader (design in your own way, not necessary Pytorch Dataloader)
3. Output your results (BLEU-4 score, conversion words, Gaussian score, generation words)
4. Plot loss/score
5. Load/save weights

There are some useful tips listed in the lab assignment.
You should check them before starting your lab.
========================================================================================"""

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
SOS_token = 0
EOS_token = 1
#----------Hyper Parameters----------#
hidden_size = 256
latent_size = 32
#The number of vocabulary
vocab_size = 28
tense_size = 4
teacher_forcing_ratio = 0.7
#empty_input_ratio = 0.1
KLD_weight = 0.0
LR = 0.05
MAX_LENGTH = 20



################################
#Example inputs of compute_bleu
################################
#The target word
reference = 'accessed'
#The word generated by your model
output = 'access'

#compute BLEU-4 score
def compute_bleu(output, reference):
    cc = SmoothingFunction()
    if len(reference) == 3:
        weights = (0.33,0.33,0.33)
    else:
        weights = (0.25,0.25,0.25,0.25)
    return sentence_bleu([reference], output,weights=weights,smoothing_function=cc.method1)

"""============================================================================
example input of Gaussian_score

words = [['consult', 'consults', 'consulting', 'consulted'],
['plead', 'pleads', 'pleading', 'pleaded'],
['explain', 'explains', 'explaining', 'explained'],
['amuse', 'amuses', 'amusing', 'amused'], ....]

the order should be : simple present, third person, present progressive, past
============================================================================"""

def Gaussian_score(words):
    words_list = []
    score = 0
    yourpath = ''#should be your directory of train.txt
    with open(yourpath,'r') as fp:
        for line in fp:
            word = line.split(' ')
            word[3] = word[3].strip('\n')
            words_list.extend([word])
        for t in words:
            for i in words_list:
                if t == i:
                    score += 1
    return score/len(words)

def asMinutes(s):
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)

def timeSince(since, percent):
    now = time.time()
    s = now - since
    es = s / (percent)
    rs = es - s
    return '%s (- %s)' % (asMinutes(s), asMinutes(rs))

def get_train_pair(i, Data):
    input_tensor = []
    target_tensor = []
    
    for char in Data[i]:
        input_tensor.append(ord(char)-95)
        target_tensor.append(ord(char)-95)

    target_tensor.append(EOS_token)
    return (torch.tensor(input_tensor, dtype=torch.long).view(-1, 1), torch.tensor(target_tensor, dtype=torch.long).view(-1, 1))

#Encoder
class EncoderRNN(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size

        self.embedding = nn.Embedding(input_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size + tense_size)
        self.hidden2mean = nn.Linear(hidden_size + tense_size, latent_size)
        self.hidden2var = nn.Linear(hidden_size + tense_size, latent_size)
        self.cell2mean = nn.Linear(hidden_size + tense_size, latent_size)
        self.cell2var = nn.Linear(hidden_size + tense_size, latent_size)

    def forward(self, input, hidden):
        embedded = self.embedding(input).view(len(input), 1, -1)
        output = embedded
        output, hidden = self.lstm(output, hidden)

        h_mean = self.hidden2mean(hidden[0])
        h_logvar = self.hidden2var(hidden[0])
        c_mean = self.cell2mean(hidden[1])
        c_logvar = self.cell2var(hidden[1])
        z_h = self.reparameterization(h_mean, h_logvar)
        z_c = self.reparameterization(c_mean, c_logvar)

        return output, (z_h, z_c)

    def reparameterization(self, mean, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn(1, 1, latent_size) # sample a gaussain noise from N(0, I)
        z = mean + std*eps
        
        return z


    def initHidden(self):
        return (torch.zeros(1, 1, self.hidden_size, device=device), torch.zeros(1, 1, self.hidden_size, device=device))

#Decoder
class DecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size):
        super(DecoderRNN, self).__init__()
        self.hidden_size = hidden_size

        self.embedding = nn.Embedding(output_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, latent_size + tense_size)
        self.out = nn.Linear(latent_size + tense_size, output_size)

    def forward(self, input, hidden):
        output = self.embedding(input).view(1, 1, -1)
        output = F.relu(output)
        output, hidden = self.lstm(output, hidden)
        output = self.out(output[0])
        return output, hidden

def evaluate(encoder, decoder, input_string, tense, max_length=MAX_LENGTH):
    with torch.no_grad():
        input_tensor = []
        for input_char in input_string:
            input_tensor.append(ord(input_char)-95)

        input_tensor = torch.tensor(input_tensor, dtype=torch.long).view(-1, 1)
        input_tensor = input_tensor.to(device)

        encoder_hidden = encoder.initHidden()
        c = torch.eye(4)[tense[0]].view(1, 1, -1)
        encoder_hidden0 = torch.cat((encoder_hidden[0], c), 2)
        encoder_hidden1 = torch.cat((encoder_hidden[1], c), 2)
        encoder_output, encoder_hidden = encoder(input_tensor, (encoder_hidden0, encoder_hidden1))

        decoder_input = torch.tensor([[SOS_token]], device=device)  # SOS
        c = torch.eye(4)[tense[1]].view(1, 1, -1)
        encoder_hidden0 = torch.cat((encoder_hidden[0], c), 2)
        encoder_hidden1 = torch.cat((encoder_hidden[1], c), 2)
        decoder_hidden = (encoder_hidden0, encoder_hidden1)
        decoded_words = []
        for di in range(max_length):
            decoder_output, decoder_hidden = decoder(
                decoder_input, decoder_hidden)

            topv, topi = decoder_output.data.topk(1)
            if topi.item() == EOS_token:
                break
            else:
                decoded_words.append(chr(topi.item()+95))

            decoder_input = topi.squeeze().detach()

        pred = ''
        for i in range(len(decoded_words)):
            pred += decoded_words[i]

        return pred

def evalTestdata(encoder, decoder):
    score = 0
    with open('test.txt', 'r') as f:
        all_lines = f.readlines()

    Input = []
    Target = []
    tenses = [[0,3], [0,2], [0,1], [0,1], [3,1], [0,2], [3,0], [2,0], [2,3], [2,1]]
    for line in all_lines:
        if line[-1] == '\n':
            line = line[:-1]

        words = line.split(' ')
        Input.append(words[0])
        Target.append(words[1])
    
    for i in range(len(Input)):
        output = evaluate(encoder, decoder, Input[i], tenses[i])
        #print('input: {}'.format(Input[i]))
        #print('target: {}'.format(Target[i]))
        #print('pred: {}'.format(output))
        
        if len(output) != 0:
            score += compute_bleu(output, Target[i])
        else:
            score += compute_bleu('', Target[i]) # predict empty string
        
        #print('--------------------')
    #print('BLEU-4 score:{}'.format(score/50))
    return score/len(Input)

def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, tense, max_length=MAX_LENGTH):
    encoder_hidden = encoder.initHidden()
    c = torch.eye(4)[tense].view(1, 1, -1)
    encoder_hidden0 = torch.cat((encoder_hidden[0], c), 2)
    encoder_hidden1 = torch.cat((encoder_hidden[1], c), 2)

    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    target_length = target_tensor.size(0)

    loss = 0
    #----------sequence to sequence part for encoder----------#
    encoder_output, encoder_hidden = encoder(input_tensor, (encoder_hidden0, encoder_hidden1))

    decoder_input = torch.tensor([[SOS_token]], device=device)
    encoder_hidden0 = torch.cat((encoder_hidden[0], c), 2)
    encoder_hidden1 = torch.cat((encoder_hidden[1], c), 2)
    decoder_hidden = (encoder_hidden0, encoder_hidden1)
    use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False

    #----------sequence to sequence part for decoder----------#
    if use_teacher_forcing:
        # Teacher forcing: Feed the target as the next input
        for di in range(target_length):
            decoder_output, decoder_hidden = decoder(
                decoder_input, decoder_hidden)
            loss += criterion(decoder_output, target_tensor[di])
            decoder_input = target_tensor[di]  # Teacher forcing

    else:
        # Without teacher forcing: use its own predictions as the next input
        for di in range(target_length):
            decoder_output, decoder_hidden = decoder(
                decoder_input, decoder_hidden)
            topv, topi = decoder_output.topk(1)
            decoder_input = topi.squeeze().detach()  # detach from history as input

            loss += criterion(decoder_output, target_tensor[di])
            if decoder_input.item() == EOS_token:
                break

    loss.backward()
    encoder_optimizer.step()
    decoder_optimizer.step()

    return loss.item() / target_length

def trainIters(encoder, decoder, n_epochs, learning_rate=LR):
    start = time.time()
    plot_losses = []
    BLEU_scores = []
    epoch_loss = 0

    encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)
    decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)

    with open('train.txt', 'r') as f:
        all_lines = f.readlines()

    Data = []
    for line in all_lines:
        if line[-1] == '\n':
            line = line[:-1]

        words = line.split(' ')
        for word in words:
            Data.append(word)

    training_pairs = [get_train_pair(i, Data) for i in range(len(Data))]

    criterion = nn.CrossEntropyLoss()
    
    for epoch in range(1, n_epochs + 1):
        for iters in range(len(Data)):
            training_pair = training_pairs[iters]
            input_tensor = training_pair[0]
            target_tensor = training_pair[1]
            input_tensor = input_tensor.to(device)
            target_tensor = target_tensor.to(device)

            loss = train(input_tensor, target_tensor, encoder,
                        decoder, encoder_optimizer, decoder_optimizer, criterion, iters % 4)
        
            epoch_loss += loss

        epoch_loss_avg = epoch_loss / len(Data) 
        plot_losses.append(epoch_loss_avg)
        epoch_loss = 0
        bleu_score = evalTestdata(encoder, decoder)
        BLEU_scores.append(bleu_score)
        print('%s (%d %d%%) %.4f %.4f' % (timeSince(start, epoch / n_epochs),
                                        epoch, epoch / n_epochs * 100, epoch_loss_avg, bleu_score))

    plt.figure(1)
    plt.plot(range(n_epochs), plot_losses)
    plt.xlabel('Epochs')
    plt.ylabel('CrossEntropyLoss')
    plt.savefig('TrainingLoss')

    plt.figure(2)
    plt.plot(range(n_epochs), BLEU_scores)
    plt.xlabel('Epochs')
    plt.ylabel('BLEU_scores')
    plt.savefig('BLEU_scores')
	
encoder1 = EncoderRNN(vocab_size, hidden_size).to(device)
decoder1 = DecoderRNN(hidden_size, vocab_size).to(device)
trainIters(encoder1, decoder1, 100)
torch.save(encoder1.state_dict(), 'encoder.pkl')
torch.save(decoder1.state_dict(), 'decoder.pkl')