In [None]:
import numpy as np
import matplotlib.pyplot as plt
import string
from pickle import dump
from unicodedata import normalize
from pickle import load
import itertools
import re
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

In [None]:
def encode_sequences(tokenizer, length, lines):
	tokenized = tokenizer.texts_to_sequences(lines)
	return pad_sequences(tokenized, maxlen=length, padding='post')

def tokenize_it(lines):
	tokenizer = Tokenizer()
	tokenizer.fit_on_texts(lines)
	return tokenizer

def max_length(lines):
	return max(len(line.split()) for line in lines)

In [None]:
filename = 'spa.txt'

file = open(filename, mode='rt', encoding='utf-8')
doc = file.read()
file.close()
 
lines = doc.strip().split('\n')
pairs = [line.split('\t')[:2] for line in  lines]

cleaned = list()
re_print = re.compile('[^%s]' % re.escape(string.printable))
table = str.maketrans('', '', string.punctuation)

for pair in pairs:
  clean_pair = list()
  
  for line in pair:
    line = normalize('NFD', line).encode('ascii', 'ignore')
    line = line.decode('UTF-8')

    line = line.split()
    line = [word.lower() for word in line]
    line = [word.translate(table) for word in line]
    line = [re_print.sub('', w) for w in line]
    line = [word for word in line if word.isalpha()]

    clean_pair.append(' '.join(line))
  cleaned.append(clean_pair)
	
raw_points = np.array(cleaned)

In [None]:
samples = 100000

dataset = raw_points[:samples, :] #np.vstack((raw_points[:samples//2, :],raw_points[125444:125444+(samples//2),:]))


for i in range(0,samples):
  words_eng = dataset[i,0].split()
  words_spa = dataset[i,1].split()
  dataset[i,0] = " ".join(w for w in words_eng)
  dataset[i,1] = " ".join(w for w in words_spa)

np.random.shuffle(dataset)
train, test = dataset[:int(samples*0.95)], dataset[int(samples*0.95):]

tokenized_eng = tokenize_it(dataset[:, 0])
size_eng = len(tokenized_eng.word_index) + 1
len_eng = max_length(dataset[:, 0])

tokenized_spa = tokenize_it(dataset[:, 1])
size_spa = len(tokenized_spa.word_index) + 1
len_spa = max_length(dataset[:, 1])

In [None]:
a = np.zeros((50,1))
max_idx = 0
for line in train[:,0]:
  idx = len(line.split())
  a[idx] = a[idx] + 1

  if idx > max_idx:
    max_idx = idx

merged = list(itertools.chain(*a.tolist()))
plt.bar([i for i, _ in enumerate(merged)], merged)
plt.xlabel('Length of sentences')
plt.ylabel('Number of datapoints')
plt.title('Distribution of length of sentences')

In [None]:
trainX = encode_sequences(tokenized_eng, len_eng, train[:, 0])
trainY = encode_sequences(tokenized_spa, len_spa, train[:, 1])

testX = encode_sequences(tokenized_eng, len_eng, test[:, 0])
testY = encode_sequences(tokenized_spa, len_spa, test[:, 1])

train_outY = trainY
test_outY = testY

for index in range(0,len(train_outY)):
  train_outY[index] = np.asarray([0]+list(train_outY[index,:-1]))

for index in range(0,len(test_outY)):
  test_outY[index] = np.asarray([0]+list(test_outY[index,:-1]))
  
trainY = encode_sequences(tokenized_spa, len_spa, train[:, 1])
testY = encode_sequences(tokenized_spa, len_spa, test[:, 1])

In [None]:
import keras.utils
from tensorflow.keras.models import *
from tensorflow.keras.layers import *
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.utils import to_categorical,plot_model
from tensorflow.keras.losses import *
from tensorflow.keras import optimizers

In [None]:
def norm_model(eng_size, spa_size, eng_len, spa_len, n_units,dropout_rate,eng_embed_dim,es_embed_dim):
  
  input_one = Input(shape=(eng_len,))
  input_two = Input(shape=(spa_len,)) 

  emb_one = Embedding(input_dim=eng_size,output_dim=eng_embed_dim,embeddings_initializer="uniform",name = "embedding_en")(input_one)
  emb_two = Embedding(input_dim=spa_size,output_dim=es_embed_dim,embeddings_initializer="uniform",name = "embedding_es")(input_two)

  _, state_h, state_c = LSTM(n_units, recurrent_dropout = dropout_rate, return_state = True)(emb_one)

  lstm_decode = LSTM(n_units, dropout=dropout_rate, return_sequences=True)(emb_two, initial_state = [state_h,state_c])
  out = Dense(spa_size, activation='softmax')(lstm_decode)
  out_model = Model([input_one,input_two],out)

  return out_model

In [None]:
def att_model(eng_size, spa_size, eng_len, spa_len, n_units,dropout_rate,eng_embed_dim,es_embed_dim):
  
  input_one = Input(shape=(eng_len,))
  input_two = Input(shape=(spa_len,)) 

  emb_one = Embedding(input_dim=eng_size+1,output_dim=eng_embed_dim,embeddings_initializer="uniform",name = "embedding_en")(input_one)
  emb_two = Embedding(input_dim=spa_size+1,output_dim=es_embed_dim,embeddings_initializer="uniform",name = "embedding_es")(input_two)

  emb_one = Bidirectional(LSTM(int(n_units/2),recurrent_dropout=dropout_rate,return_sequences=True))(emb_one)
  x_enc,state_h,state_c = LSTM(n_units,return_state = True,recurrent_dropout = dropout_rate, return_sequences=True)(emb_one)

  lstm_decode = LSTM(n_units,dropout=dropout_rate, return_sequences=True)(emb_two,initial_state = [state_h,state_c])
  lstm_decode = LSTM(n_units,dropout=dropout_rate, return_sequences=True)(lstm_decode)

  attention = Activation('softmax', name='attention')(dot([lstm_decode, x_enc], axes=[2, 2]))

  output = TimeDistributed(Dense(2*n_units, activation="tanh"))(concatenate([dot([attention, x_enc], axes=[2,1]), lstm_decode]))
  out = TimeDistributed(Dense(spa_size, activation="softmax"))(output)

  out_model = Model([input_one,input_two],out)

  return out_model

In [None]:
model = norm_model(size_eng, size_spa, len_eng, len_spa, 256, 0.2, 256, 256)
keras.utils.plot_model(model, to_file='Normal_model.png', rankdir='LR', show_layer_names=False)

In [None]:
model = att_model(size_eng, size_spa, len_eng, len_spa, 256, 0.2, 256, 256)
keras.utils.plot_model(model, to_file='att_model.png', rankdir='LR', show_layer_names=False)

In [None]:
n_units = [128, 256]
history_list = []

eng_embed_dim = 256
es_embed_dim = eng_embed_dim
dropout_rate = 0.2

for el in n_units:
  # model = att_model(size_eng, size_spa, len_eng, len_spa, el, 0.2, 256, 256)
  model = norm_model(size_eng, size_spa, len_eng, len_spa, el, 0.2, 256, 256)

  checkpoint = ModelCheckpoint('model_norm_{}.h5'.format(el), monitor='val_loss', verbose=1, save_best_only=True, mode='min')
  model.compile(loss=SparseCategoricalCrossentropy(),optimizer=optimizers.Adam(lr=0.001))

  history = model.fit([trainX,train_outY], trainY, epochs=30, batch_size=256, validation_data=([testX,test_outY], testY), callbacks=[checkpoint])
  history_list.append(history)

train_loss = []
train_acc = []
val_loss = []
val_acc = []

for i in range(len(history_list)):
  train_loss.append(history_list[i].history['loss'])
  train_acc.append(history_list[i].history['accuracy'])
  val_loss.append(history_list[i].history['val_loss'])
  val_acc.append(history_list[i].history['val_accuracy'])

np.savez('history_mod.npz', train_loss=train_loss, train_acc=train_acc, val_loss=val_loss, val_acc=val_acc, n_units=n_units)

In [None]:
plt.plot(history.history['loss'], label='training loss')
plt.plot(history.history['val_loss'], label='validation loss')
plt.legend()
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation loss')
plt.show()

In [None]:
plt.plot(history.history['accuracy'], label='training accuracy')
plt.plot(history.history['val_accuracy'], label='validation accuracy')
plt.legend()
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Training and Validation accuracy')
plt.show()