In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from keras.utils.np_utils import to_categorical
from keras.preprocessing.sequence import pad_sequences
from keras.models import Sequential
from keras.layers import LSTM, Dense, GRU, Embedding
from keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras import layers, models, optimizers, callbacks
import keras
import os
import re

In [None]:
df = pd.read_csv('data.csv')
print(df.shape)

In [None]:
# df = df [:10]

In [None]:
X_tr, X_val = train_test_split(df['text'], test_size=0.1, random_state=42)

In [None]:
class Text:
    def __init__(self, input_text, token2ind=None, ind2token=None, predict=False):
        self.content = input_text
        self.predict = predict
        self.tokens, self.tokens_distinct = self.tokenize()

        if token2ind != None and ind2token != None:
            self.token2ind, self.ind2token = token2ind, ind2token
        else:
            self.token2ind, self.ind2token = self.create_word_mapping(self.tokens_distinct)

        self.tokens_ind = [self.token2ind[token] if token in self.token2ind.keys() else self.token2ind['<| unknown |>'] for token in self.tokens]

    def __repr__(self):
        return self.content
  
    def __len__(self):
        return len(self.tokens_distinct)
  
    @staticmethod
    def create_word_mapping(values_list):
        values_list.append('<| unknown |>')
        value2ind = {value: ind for ind, value in enumerate(values_list,1)}
        ind2value = dict(enumerate(values_list,1))
        return value2ind, ind2value

    def preprocess(self):
        # remove punctuations
        self.content_preprocess = re.sub("^[\uFE70-\uFEFF]", " ", self.content)
        self.content_preprocess = re.sub(r"[.،\"()0-9:A-Za-z,!%-/؟'ّ»ـ»'ً«'ُ'ْ'َ'ٍ{}؛'ِ'ٌ…\\|\xad”@_?<>’“\]\[éà=‘]","",self.content_preprocess) 
        
        words=[]
        for i in self.content_preprocess.split():
            i.strip()
            words.append(i)
        self.content = (" ".join(words)).strip()

    def tokenize(self):
        if self.predict == False:
            self.preprocess()
        tokens = self.content.split(' ')
        return tokens, list(set(tokens))

    def tokens_info(self):
        print('total tokens: %d, distinct tokens: %d' % (len(self.tokens), len(self.tokens_distinct)))

In [None]:
data_text = ''

for sentence in X_tr:
    if sentence != None:
        data_text = data_text + sentence

vocab = Text(data_text)
vocab.tokens_info()

In [None]:
class Sequences():
    def __init__(self, text_object, max_len, step):
        self.tokens_ind = text_object.tokens_ind
        self.max_len = max_len
        self.step = step
        self.sequences, self.next_words = self.create_sequences()
  
    def __repr__(self):
        return 'Sequence object of max_len: %d and step: %d' % (self.max_len, self.step)
  
    def __len__(self):
        return len(self.sequences)

    def create_sequences(self):
        input_sequences = []
        for i in range(1, len(self.tokens_ind), self.step):
            n_gram_sequence = self.tokens_ind[:i+1]
            input_sequences.append(n_gram_sequence)

        input_sequences = pad_sequences(input_sequences, maxlen=max_len, padding='pre')
        sequences, next_words = input_sequences[:,:-1],input_sequences[:,-1]
    
        return sequences, next_words

    def sequences_info(self):
        print('number of sequences of length %d: %d' % (self.max_len, len(self.sequences)))

In [None]:
max_len = 10
step = 1
token2ind, ind2token = vocab.token2ind, vocab.ind2token

In [None]:
train_sequences=[]
train_next_words=[]

for line in X_tr:
    line_encoded = Text(line, token2ind, ind2token)
    # print(len(line_encoded.tokens))
    line_sequences = Sequences(line_encoded, max_len, step)
    train_sequences += [seq.tolist() for seq in line_sequences.sequences]
    train_next_words += line_sequences.next_words.tolist()
    # line_sequences.sequences_info()

In [None]:
val_sequences=[]
val_next_words=[]

for line in X_val:
    line_encoded = Text(line, token2ind, ind2token)
    # print(len(line_encoded.tokens))
    line_sequences = Sequences(line_encoded, max_len, step)
    val_sequences += [seq.tolist() for seq in line_sequences.sequences]
    val_next_words += line_sequences.next_words.tolist()
    # line_sequences.sequences_info()

In [None]:
class TextDataGenerator(keras.utils.all_utils.Sequence):
    def __init__(self, sequences, next_words, sequence_length, vocab_size, batch_size=32, shuffle=True):
        self.batch_size = batch_size
        self.sequences = sequences
        self.next_words = next_words
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        return int(np.floor(len(self.sequences) / self.batch_size))

    def __getitem__(self, index):
        indexes = self.indexes[index * self.batch_size: (index + 1) * self.batch_size]
        sequences_batch = [self.sequences[k] for k in indexes]
        next_words_batch = [self.next_words[k] for k in indexes]

        X = np.array(sequences_batch)
        y = keras.utils.np_utils.to_categorical(next_words_batch, num_classes=self.vocab_size)

        return X, y

    def on_epoch_end(self):
        self.indexes = np.arange(len(self.sequences))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

In [None]:
batch_size = 512

params = {
  'sequence_length': max_len,
  'vocab_size': len(vocab)+1,
  'batch_size': batch_size,
  'shuffle': True
}

train_generator = TextDataGenerator(train_sequences, train_next_words, **params)
val_generator = TextDataGenerator(val_sequences, val_next_words, **params)

In [None]:
def LSTM_model(sequence_length, vocab_size, layer_size):
    model = Sequential()
    model.add(Embedding(vocab_size, 256, input_length=sequence_length-1, trainable=True))
    model.add(LSTM(layer_size))#, recurrent_dropout=0.1, dropout=0.1
    model.add(Dense(vocab_size, activation='softmax'))
    return model

In [None]:
model = LSTM_model(max_len, len(vocab)+1, 256)
model.compile(loss='categorical_crossentropy', metrics=['acc'], optimizer='adam')

In [None]:
# Directory where the checkpoints will be saved
checkpoint_dir = './training_checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback = callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix,
    save_weights_only=True)

In [None]:
model.fit(train_generator, steps_per_epoch=len(train_generator), epochs=10, callbacks=[checkpoint_callback], validation_data=val_generator)

In [None]:
def generate_seq(model, seq_length, seed_text, n_words):
    in_text = seed_text
    # generate a fixed number of words
    for _ in range(n_words):
        # encode the characters as integers
        encoded = Text(in_text, token2ind, ind2token, True).tokens_ind
        # truncate sequences to a fixed length
        encoded = pad_sequences([encoded], maxlen=seq_length-1, truncating='pre')
        # predict word
        predict_x = model.predict(encoded, verbose=0)
        yhat = np.argmax(predict_x,axis=1)[0]
        # reverse map integer to word
        word = ' ' + ind2token[yhat]
        # append to input
        in_text += word
    return in_text

In [None]:
txt = 'قررت المحكمة'
print(generate_seq(model, max_len, txt, 20))

In [None]:
import math
def perplexity(sequences, next_words, model):
    perplex = 0
    for index, seq in enumerate(sequences):
        predict_x = model.predict([seq], verbose=0)
        prob = predict_x[0][next_words[index]]
        perplex = perplex + math.log(prob,2)

    return math.pow(2, -1*(perplex/len(sequences)))

perplexity(val_sequences,val_next_words,model)