In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import torch
import torch.nn as nn
import torch.nn.functional as F
import re
import os
import unicodedata
import numpy as np



device = torch.device("cpu")

PAD_token = 0 #used for padding short sentences
SOS_token = 1 #Start-of-sentence token
EOS_token = 2 #End-of-sentence token







In [4]:
class Voc:
    def __init__(self, name):
        self.name = name
        self.trimmed = False
        self.word2index = {}
        self.word2count = {}
        self.index2word = {PAD_token:"PAD", SOS_token:"SOS", EOS_token:"EOS"}
        self.num_words = 3 #Count SOS, EOS, PAD
        
    def addSenetnce(self, sentence):
        for word in sentence.split(" "):
            self.addWord(word)
            
    def addWord(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.num_words
            self.word2count[word] = 1
            self.index2word[self.num_words] = word
            self.num_words += 1
        else:
            self.word2count[word] += 1
            
    
    def trim(self, min_count):
        #如果某个词出现的次数小于min_count,则把他去掉
        if self.trimmed:
            return
        self.trimmed = True
        keep_words = []
        for k,v in self.word2count.items():
            if v >= min_count:
                keep_words.append(k)
                
        print('keep_words {} /{} ={:.4f}'.format(len(keep_words), len(self.word2index), len(keep_words)/len(self.word2index)))
        
        #重建dict
        self.word2index = {}
        self.word2count = {}
        self.index2word = {PAD_token:"PAD", SOS_token:"SOS", EOS_token:"EOS"}
        self.num_words = 3
        for word in keep_words:
            self.addWord(word)
            
def normalizeString(s):
    #将字母小写，并过滤掉那些非字母字符
    s = s.lower()
    s = re.sub(r"([.!?])", r" \1", s)
    s = re.sub(r"[^a-zA-Z.!?]+", r" ",s)
    return s

def indexesFromSentence(voc, sentence):
    #返回一个句子的Index序列
    return [voc.word2index[word] for word in sentence.split(' ')] + [EOS_token]


        
        
        


In [5]:
class EncoderRNN(nn.Module):
    def __init__(self, hidden_size, embedding, n_layers=1, dropout=0):
        super(EncoderRNN, self).__init__()
        self.n_layers = n_layers
        self.hidden_size = hidden_size
        self.embedding = embedding
        
        self.gru = nn.GRU(hidden_size, hidden_size, n_layers, 
                          dropout=(0 if n_layers==1 else dropout),bidirectional=True)
        
    def forward(self, input_seq, input_lenghts, hidden=None):
        
        embedded = self.embedding(input_seq)
        
        #pack padded batch of sequences for RNN module
        packed = torch.nn.utils.rnn.pack_padded_sequence(embedded, input_lenghts)
        
        #forward pass through biGRU
        outputs, hidden = self.gru(packed, hidden)
        
        #unpack padding
        outputs, _ = torch.nn.utils.rnn.pad_packed_sequence(outputs)
        
        #sum bidirectional GRU outputs
        outputs = outputs[:,:,:self.hidden_size] + outputs[:,:,self.hidden_size]
        
        #return output and final hidden state
        return outputs, hidden






In [6]:
class Attn(torch.nn.Module):
    def __init__(self, method, hidden_size):
        super(Attn, self).__init__()
        self.method = method
        if self.method not in ["dot","general","concat"]:
            raise ValueError(self.method, "is not an appropriate attention method.")
            
        self.hidden_size = hidden_size
        if self.method == "general":
            self.attn = torch.nn.Linear(self.hidden_size, hidden_size)
        elif self.method == "concat":
            self.attn = torch.nn.Linear(self.hidden_size*2, hidden_size)
            self.v = torch.nn.Parameter(torch.FloatTensor(hidden_size))

    def dot_score(self, hidden, encoder_output):
        return torch.sum(hidden*encoder_output, dim=2)
    
    def general_score(self, hidden, encoder_output):
        energy = self.attn(encoder_output)
        return torch.sum(hidden*energy, dim=2)
    
    def concat_score(self, hidden, encoder_output):
        energy = self.attn(torch.cat((hidden.expand(encoder_output.size(0), -1,-1), encoder_output),2)).tanh()
        return torch.sum(self.v*energy, dim=2)
    
    def forward(self, hidden, encoder_outputs):
        #计算the attention weights(energies) based on the given method
        
        if self.method == "general":
            attn_energies = self.general_score(hidden, encoder_outputs)
        elif self.method == "concat":
            attn_energies = self.concat_score(hidden, encoder_outputs)
        elif self.method == "dot":
            attn_energies = self.dot_score(hidden, encoder_outputs)

    
        attn_energies = attn_energies.t() #将batch_size和max_length转置
        
        #return the softmax normalized probability scores(with added dimension)
        return F.softmax(attn_energies, dim=1).unsqueeze(1)






In [None]:
class DecoderRNN(nn.Module):
    def __init__(self, attn_model, embedding, hidden_size, output_size, n_layers=1, dropout=0.1):
        super(DecoderRNN, self).__init__()
        
        self.attn_model = attn_model
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.n_layers = n_layers
        self.dropout = dropout
        
        
        #Define layers
        self.embedding = embedding
        self.embedding_dropout = nn.Dropout(dropout)
        #使用单向GRU
        self.gru = nn.GRU(hidden_size, hidden_size, n_layers, dropout=(0 if n_layers == 1 else dropout))
        self.concat = nn.Linear(hidden_size*2, hidden_size)
        self.out = nn.Linear(hidden_size, output_size)
        
        self.attn = Attn(attn_model, hidden_size)
        
    
    def forward(self, input_step, last_hidden, encoder_outputs):
        #we run this one step(word) at a time
        #get embedding of current input word
        
        embedded = self.embedding(input_step)
        embedded = self.embedding_dropout(embedded)
        
        #forward through unidirectional GRU
        rnn_output, hidden = self.gru(embedded, last_hidden)
        
        #calculate attention weights from the current GRU output
        attn_weights = self.attn(rnn_output, encoder_outputs)
        
        
        
        
        



