<a href="https://colab.research.google.com/github/Joovvhan/korean-stt/blob/master/kang/seq2seqtuto.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from __future__ import unicode_literals, print_function, division
from io import open
import unicodedata
import string
import re
import random

import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

from google.colab import drive
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=email%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/gdrive


In [0]:
SOS_token = 0
EOS_token = 1

#word에서 index, index에서 word를 찾는 사전, 희귀 단어를 대체하는데 사용할
# 단어의 빈도 수를 가진 Lang이라는 class
class Lang:
  def __init__(self, name):
    self.name = name
    self.wordToIndex = {}
    self.wordToCount = {}
    self.indexToWord = {0:"SOS", 1:"EOS"}
    self.n_words = 2
    
  def addSentence(self, sentence):
    for word in sentence.split(' '):
      self.addWord(word)
      
  def addWord(self, word):
    if word not in self.wordToIndex:
      self.wordToIndex[word] = self.n_words
      self.wordToCount[word] = 1
      self.indexToWord[self.n_words] = word
      self.n_words += 1
    else:
      self.wordToCount[word] += 1
  

def unicodeToAscii(s):
  return ''.join(
    c for c in unicodedata.normalize('NFD', s)
    if unicodedata.category(c) != 'Mn'
  )

#문자 아닌 문자 제거, 소문자 다듬기
def normalizeString(s):
  s = unicodeToAscii(s.lower().strip())
  s = re.sub(r"([.!?])", r"\1", s)
  s = re.sub(r"[^a-zA-Z.!?]+", r" ", s)
  return s
  


In [0]:
def readLangs(lang1, lang2, reverse=False):
  print("Reading lines...")
  
  lines = open('gdrive/My Drive/EnglishFrench/%s-%s.txt' % (lang1, lang2), encoding='utf-8').read().strip().split('\n')
  
  pairs = [[normalizeString(s) for s in l.split('\t')] for l in lines]
  
  if reverse:
    pairs = [list(reversed(p)) for p in pairs]
    inputLang = Lang(lang1)
    outputLang = Lang(lang2)
  else:
    inputLang = Lang(lang1)
    outputLang = Lang(lang2)
    
  return inputLang, outputLang, pairs

In [0]:
MAX_LENGTH = 10

eng_prefixes = (
  "i am", "i m ",
  "he is", "he s ",
  "she is", "she s",
  "you are", "you re ",
  "we are", "we re ", "they are", "they re "
)

def filterPair(p):
  return len(p[0].split(' ')) < MAX_LENGTH and len(p[1].split(' ')) < MAX_LENGTH and \
    p[1].startswith(eng_prefixes)

def filterPairs(pairs):
  return [pair for pair in pairs if filterPair(pair)]

In [7]:
#파일을 줄->쌍으로 분리
#텍스트 정규화, 길이와 내용으로 필터링
#쌍의 문장에서 단어 리스트 생성

def prepareData(lang1, lang2, reverse=False):
  input_lang, output_lang, pairs = readLangs(lang1, lang2, reverse)
  pairs = filterPairs(pairs)
  for pair in pairs:
    input_lang.addSentence(pair[0])
    output_lang.addSentence(pair[1])
  return input_lang, output_lang, pairs

input_lang, output_lang, pairs = prepareData('eng', 'fra', True)
print(random.choice(pairs))

Reading lines...
['vous n etes pas les seuls avec ce probleme.', 'you re not the only one with this problem.']


In [0]:
#seq2seq 인코더, 디코더 // 둘을 연결시켜서 쓰면 된다.

class EncodeRNN(nn.Module):
  def __init__(self,input_size, hidden_size):
    super(EncodeRNN, self).__init__()
    self.hiddenSize = hidden_size
    
    self.embedding = nn.Embedding(input_size, hidden_size)
    self.gru = nn.GRU(hidden_size, hidden_size)
    
  def forward(self, input, hidden):
    embedded = self.embedding(input).view(1,1,-1)
    output = embedded
    output, hidden = self.gru(output, hidden)
    return output, hidden
  
  def initHidden(self):
    return torch.zeros(1,1, self.hiddenSize, device=device)
  
  
class DecodeRNN(nn.Module):
  def __init__(self, hidden_size, output_size):
    super(DecodeRNN, self).__init__()
    self.hiddenSize = hidden_size
    
    self.embedding = nn.Embedding(output_size, hidden_size)
    self.gru = nn.GRU(hidden_size, hidden_size)
    self.out = nn.Linear(hidden_size, output_size)
    self.softmax = nn.LogSoftmax(dim=1)
    
  def forward(self, input, hidden):
    output = self.embedding(input).view(1,1,-1)
    output = F.relu(output)
    output, hidden = self.gru(output, hidden)
    output = self.softmax(self.out(output[0]))
    return output, hidden
  
  def initHidden(self):
    return torch.zeros(1,1,self.hiddenSize, device=device)
  

In [0]:
# 어텐션 디코더. 공간활용, 인코딩 부담 줄이기
# 전체 문장을 한 번에 인코딩 하지 않고, 가중치를 계산해서 알맞은 출력 단어를 선택하도록

class AttentionDecodeRNN(nn.Module):
  def __init__(self, hidden_size, input_size, dropoutP=0.1, max_length=MAX_LENGTH):
    super(AttentionDecodeRNN, self).__init__()
    self.hidden_size = hidden_size
    self.input_size = input_size
    self.output_size = input_size
    self.dropoutP = dropoutP
    self.max_length = max_length
    
    self.embedding = nn.Embedding(self.input_size, self.hidden_size)
    self.attention = nn.Linear(self.hidden_size * 2, self.max_length)
    self.attentionCombine = nn.Linear(self.hidden_size * 2, self.hidden_size)
    self.dropout = nn.Dropout(self.dropoutP)
    self.gru = nn.GRU(self.hidden_size, self.hidden_size)
    self.out = nn.Linear(self.hidden_size, self.output_size)
    
  def forward(self, input, hidden, encodeOutputs):
    embedded = self.embedding(input).view(1,1,-1)
    embedded = self.dropout(embedded)
    
    attentionWeights = F.softmax(self.attention(torch.cat((embedded[0], hidden[0]), 1)), dim=1)
    print(attentionWeights)
    attentionApplied = torch.bmm(attentionWeights.unsqueeze(0), encodeOutputs)
    print(attentionApplied)
    output = torch.cat((embedded[0], attentionApplied[0]), 1)
    output = self.attentionCombine(output).unsqueeze(0)
    
    output = F.relu(output)
    otuput, hidden = self.gru(output, hidden)
    
    output = F.log_softmax(self.out(output[0]), dim=1)
    return output, hidden, attentionWeights
  
  def initHidden(self):
    return torch.zeros(1,1,self.hidden_size, device=device)
  

In [0]:
#학습 데이터 준비

def indexesFromSentence(lang, sentence):
  return [lang.wordToIndex[word] for word in sentence.split(' ')]

def tensorFromSentence(lang, sentence):
  indexes = indexesFromSentence(lang, sentence)
  indexes.append(EOS_token)
  return torch.tensor(indexes, dtype=torch.long, device=device).view(-1, 1)

def tensorFromPair(pair):
  inputTensor = tensorFromSentence(input_lang, pair[0])
  targetTensor = tensorFromSentence(output_lang, pair[1])
  return (inputTensor, targetTensor)

In [0]:
#teacher forcing : 디코더의 예측대신 실제 목표 출력을 다음 입력으로 사용,
#  수렴이 빠르지만 학습된 네트워크가 잘못 사용될 때 불안함

teacher_forcing_ratio = 0.5

def train(inputTensor, targetTensor, encoder, decoder, encodeOptimizer,
         decodeOptimizer, criterion, max_length=MAX_LENGTH):
  encodeHidden = encoder.initHidden()
  encodeOptimizer.zero_grad()
  decodeOptimizer.zero_grad()
  
  inputLength = inputTensor.size(0)
  targetLength = targetTensor.size(0)
  
  encodeOutputs = torch.zeros(max_length, encoder.hidden_size, device=device)
  
  loss = 0
  
  for t in range(inputLength):
    encodeOutput, encodeHidden = encoder(inputTensor[t], encodeHidden)
    encodeOutputs[t] = encodeOutput[0,0]
    
  decodeInput = torch.tensor([[SOS_token]], device=device)
  decodeHidden = encodeHidden
  
  use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False
  
  if use_teacher_forcing:
    for t in range(targetLength):
      decodeOutput, decodeHidden, decodeAttention = decoder(
        decodeInput, decodeHidden, encodeOutputs
      )
      loss += criterion(decodeOutput, targetTensor[t])
      decodeInput = targetTensor[t]
      
  else:
    for t in range(targetLength):
      decodeOutput, decodeHidden, decodeAttention = decoder(decodeInput, decodeHidden, encodeOutputs)
      topv, topi = decodeOutput.topk(1)
      decodeInput = topi.squeeze().detach()
      loss += criterion(decodeOutput, targetTensor[t])
      if decodeInput.item() == EOS_token: break;
  
  loss.backward()
  encodeOptimizer.step()
  decodeOptimizer.step()
  
  return loss.item()/ targetLength

In [0]:
# 시간 출력

import time
import math

def printMinutes(s):
  m = math.floor(s/60)
  s -= m*60
  return '%dm %ds' % (m, s)

def timeSince(since, percent):
  now = time.time()
  s = now - since
  es = s / percent
  rs = es - s
  return '%s ( - %s)' % (printMinutes(s), primtMinutes(rs))

In [0]:
#학습과정
#1. 타이머
#2. optimizer, criterion 초기화
#3. 학습 쌍 세트 생성
#4. loss

#여러 번 train을 호출하여 진행률과 평균손실 출력

def trainIters(encoder, decoder, nIters, printEvery=1000, plotEvery=100, learningRate=0.01):
  start = time.time()
  plotLosses = []
  printLossTotal = 0
  plotLossTotal = 0
  
  encodeOptimizer = optim.SGD(encoder.parameters(), lr=learningRate)
  decodeOptimizer = optim.SGD(decoder.parameters(), lr=learningRate)
  trainingPairs = [tensorFromPair(random.choice(pairs)) for i in range(nIters)]
  criterion = nn.NLLLoss()
  
  for iter in range(1, nIters+1):
    trainingPair = trainingPairs[iter-1]
    inputTensor = trainingPair[0]
    targetTensor = trainingPair[1]
    
    loss = train(inputTensor, targetTensor, encoder, decoder,
                encodeOptimizer, decodeOptimizer, criterion)
    printLossTotal += loss
    plotLossTotal += loss
    
    if iter % printEvery == 0:
      printLossAvg = printLossTotal / printEvery
      printLossTotal = 0
      print('%s (%d %d%%) %.4f' % (timeSince(start, iter / nIters), iter, iter/nIters * 100,
           printLossAvg))

    if iter % plotEvery == 0:
      plotLossAvg = plotLossTotal / plotEvery
      plotLosses.append(plotLossAvg)
      plotLossTotal = 0
      
  showPlot(plotLosses)


In [0]:
import matplotlib.pyplot as plt
plt.switch_backend('agg')
import matplotlib.ticker as ticker
import numpy as np

def showPlot(points):
  plt.figure()
  fig, ax = plt.subplots()
  loc = ticker.MultipleLocator(base=0.2)
  ax.yaxis.set_major_locator(loc)
  plt.plot(points)

In [0]:
#각 단계마다 디코더의 예측을 되돌려 전달, 예측할 때마다 단어를 출력
# EOS토큰 예측 시, 멈춤

def evaluate(encoder, decoder, sentence, max_length=MAX_LENGTH):
  with torch.no_grad():
    inputTensor = tensorFromSentence(input_lang, sentence)
    inputLength = inputTensor.size()[0]
    encodeHidden = encoder.initHidden()
    
    encodeOutputs = torch.zeros(max_length, encoder.hiddenSize, device=device)
    
    for t in range(inputLength):
      encodeOutput, encodeHidden = encoder(inputTensor[t], encodeHidden)
      encodeOutputs[t] += encodeOutput[0,0]
      
    decodeInput = torch.tensor([[SOS_token]], device=device)
    decodeHidden = encodeHidden
    
    decodedWords = []
    decodeAttentions = torch.zeros(max_length, max_length)
    
    for t in range(max_length):
      decodeOutput, decodeHidden, decodeAttention = decoder(
        decodeInput, decodeHidden, encodeOutput)
      decodeAttentions[t] = decodeAttention.data
      topv, topi = decodeOutput.data.topk(1)
      if topi.item() == EOS_token:
        decodedWords.append('<EOS>')
        break
      else: decodedWord.append(output_lang.indexToWord[topi.item()])
        
      decodeInput = topi.squeeze().detach()
      
    return decodedWords, decodeAttentions[:t + 1]
  
def evaluateRandom(encoder, decoder, n=10):
  for i in range(n):
    pair = random.choice(pairs)
    print('>', pair[0])
    print('=', pair[1])
    outputWords, attentions = evaluate(encoder, decoder, pair[0])
    outputSentence = ''.join(outputWords)
    print('<', outputSentence)
    print('')
    
    


In [0]:
hidden_size = 256
encoder1 = EncodeRNN(input_lang.n_words, hidden_size).to(device)
attndecoder1 = AttentionDecodeRNN(hidden_size, output_lang.n_words, dropoutP=0.1).to(device)
