In [None]:
import re
import datetime
import numpy as np
import pandas as pd
import random
from tqdm import tqdm
import tensorflow.keras
import tensorflow as tf
from sklearn.metrics import fbeta_score
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras.layers import Embedding,LSTM, TimeDistributed, Dense, Bidirectional
from tensorflow.keras.initializers import HeNormal, GlorotNormal, GlorotUniform
from nltk.translate.bleu_score import sentence_bleu
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
data = pd.read_csv('preprocessed_15.csv')

In [None]:
pd.options.display.max_colwidth = 500
data[:5]

In [None]:
def preprocess(t, add_start_token, add_end_token):

  if add_start_token == True and add_end_token == False:
    t = '<start>'+' '+t
  if add_start_token == False and add_end_token == True:
    t = t+' '+'<end>'
  if add_start_token == True and add_end_token == True:
    t = '<start>'+' '+t+' '+'<end>'

  t = re.sub(' +', ' ', t)
  return t

In [None]:
encoder_input = [preprocess(line, add_start_token= True, add_end_token=True) for line in data['error']]
decoder_input = [preprocess(line, add_start_token= True, add_end_token=False) for line in data['correct']]
decoder_output = [preprocess(line, add_start_token= False, add_end_token=True) for line in data['correct']]
print(encoder_input[0])
print(decoder_input[0])
print(decoder_output[0])

In [None]:
#ENCODER INPUT

tokenizer = Tokenizer(filters='', split=" ")
tokenizer.fit_on_texts(encoder_input)
word_index = tokenizer.word_index #vocabulary

max_length = max([ len(row.split(" ")) for row in encoder_input ])
INPUT_ENCODER_LENGTH = max_length

enc_input_encoded = tokenizer.texts_to_sequences(encoder_input)
enc_input_padded= pad_sequences(enc_input_encoded, maxlen=INPUT_ENCODER_LENGTH, padding="post")

print(enc_input_padded.shape)

In [None]:
print(encoder_input[0])
print(enc_input_padded[0])

In [None]:
#DECODER INPUT
decoder_data = decoder_input.copy()
decoder_data.extend(decoder_output)

out_tokenizer = Tokenizer(filters='', split=" ")
out_tokenizer.fit_on_texts(decoder_data)
word_index = out_tokenizer.word_index #vocabulary

max_length = max([ len(row.split(" ")) for row in decoder_input ])
INPUT_DECODER_LENGTH = max_length

In [None]:
dec_input_encoded = out_tokenizer.texts_to_sequences(decoder_input)
dec_input_padded= pad_sequences(dec_input_encoded, maxlen=INPUT_DECODER_LENGTH, padding="post", truncating = "post")

print(dec_input_padded.shape)

In [None]:
print(decoder_input[0])
print(dec_input_padded[0])

In [None]:
dec_output_encoded = out_tokenizer.texts_to_sequences(decoder_output)
dec_output_padded= pad_sequences(dec_output_encoded, maxlen=INPUT_DECODER_LENGTH, padding="post", truncating = "post")

print(dec_output_padded.shape)

In [None]:
print(decoder_output[1])
print(dec_output_padded[1])

In [None]:
#Reference: https://fasttext.cc/docs/en/english-vectors.html
import io

def load_vectors(fname):
    fin = io.open(fname, 'r', encoding='utf-8', newline='\n', errors='ignore')
    n, d = map(int, fin.readline().split())
    data = {}
    for line in fin:
        tokens = line.rstrip().split(' ')
        data[tokens[0]] = np.asarray(tokens[1:])#map(float, tokens[1:])
    return data

In [None]:
embedding_index = load_vectors('wiki-news-300d-1M.vec')

In [None]:
#https://keras.io/examples/nlp/pretrained_word_embeddings/
word_index = tokenizer.word_index
num_tokens = len(word_index) + 2
embedding_dim = 300
hits = 0
misses = 0

embedding_matrix = np.zeros((num_tokens, embedding_dim))
for word, i in word_index.items():
    embedding_vector = embedding_index.get(word)

    if type(embedding_vector) == np.ndarray and embedding_vector.shape[0] == 300:  
        embedding_matrix[i] = embedding_vector
        hits += 1

    else:
        misses += 1
print("Converted %d words (%d misses)" % (hits, misses))
np.save('GEC/in_embedding.npy', embedding_matrix)

In [None]:
word_index = out_tokenizer.word_index
num_tokens = len(word_index) + 2
embedding_dim = 300
hits = 0
misses = 0

embedding_matrix = np.zeros((num_tokens, embedding_dim))
for word, i in word_index.items():
    embedding_vector = embedding_index.get(word)

    if type(embedding_vector) == np.ndarray and embedding_vector.shape[0] == 300:  
        embedding_matrix[i] = embedding_vector
        hits += 1

    else:
        misses += 1
print("Converted %d words (%d misses)" % (hits, misses))
np.save('GEC/out_embedding.npy', embedding_matrix)

In [None]:
in_embedding_matrix = np.load('GEC/in_embedding.npy')
out_embedding_matrix = np.load('GEC/out_embedding.npy')
print(in_embedding_matrix.shape, out_embedding_matrix.shape)

In [None]:
#ENCODER
class Encoder(tf.keras.Model):
    def __init__(self,inp_vocab_size,embedding_size,lstm_size,input_length):
        super().__init__()
        self.vocab_size = inp_vocab_size
        self.embedding_size = embedding_size
        self.lstm_units = lstm_size
        self.input_length = input_length

def build(self, input_sequence):
        #self.embedding = Embedding(input_dim=self.vocab_size, output_dim=self.embedding_size, input_length=self.input_length, 
        #                           #embeddings_initializer=keras.initializers.Constant(in_embedding_matrix), mask_zero=True, 
        #                           weights = [in_embedding_matrix], mask_zero=True, 
        #                           trainable = False, name="embedding_layer_encoder")
        self.embedding = Embedding(input_dim=self.vocab_size, output_dim=self.embedding_size, input_length=self.input_length,
                           mask_zero=True, name="embedding_layer_encoder")
        self.lstm = LSTM(self.lstm_units, return_state=True, return_sequences=True, name="Encoder_LSTM")

    def call(self,input_sequence,states, training = True):
        input_embedding = self.embedding(input_sequence)   #(batch_size, length of input array, embedding_size)
        self.lstm_output, self.state_h, self.state_c = self.lstm(input_embedding, initial_state = states)
        return self.lstm_output,self.state_h, self.state_c


    def initialize_states(self,batch_size):
      initializer = GlorotNormal()
      lstm_state_h = initializer(shape=(batch_size, self.lstm_units))#tf.zeros((batch_size, self.lstm_units), dtype=tf.dtypes.float32, name="Encoder_LSTM_hidden_state")
      lstm_state_c = initializer(shape=(batch_size, self.lstm_units))#tf.zeros((batch_size, self.lstm_units), dtype=tf.dtypes.float32, name="Encoder_LSTM_cell_state")
      return lstm_state_h, lstm_state_c


#DECODER
class Decoder(tf.keras.Model):
    def init(self,out_vocab_size,embedding_size,lstm_size,input_length):
        super().init()
        self.vocab_size = out_vocab_size
        self.embedding_size = embedding_size
        self.lstm_units = lstm_size
        self.input_length = input_length


    def build(self,input_sequence):
        #self.embedding = Embedding(input_dim=self.vocab_size, output_dim=self.embedding_size, input_length=self.input_length, 
        #                           #embeddings_initializer=keras.initializers.Constant(out_embedding_matrix), 
        #                           weights = [out_embedding_matrix], mask_zero=True, 
        #                           trainable = False, name="embedding_layer_decoder")
        self.embedding = Embedding(input_dim=self.vocab_size, output_dim=self.embedding_size, input_length=self.input_length,
                           mask_zero=True, name="embedding_layer_decoder") 
        self.lstm = LSTM(self.lstm_units, return_state=True, return_sequences=True, name="Decoder_LSTM")


    def call(self,input_sequence,initial_states, training = True):

        input_embedding = self.embedding(input_sequence)
        self.lstm_output, self.state_h, self.state_c = self.lstm(input_embedding, initial_state=initial_states)
        return self.lstm_output,self.state_h, self.state_c