In [None]:
import string
import re
from numpy import array, argmax, random, take
import pandas as pd
from keras.models import Sequential
from keras.layers import Dense, LSTM, Embedding, RepeatVector
from keras.preprocessing.text import Tokenizer
from keras.callbacks import ModelCheckpoint
from keras.preprocessing.sequence import pad_sequences
from keras.models import load_model
from keras import optimizers
import matplotlib.pyplot as plt
%matplotlib inline
pd.set_option('display.max_colwidth', 200)

In [None]:
# function to read raw text file
def read_text(filename):
        # open the file
        file = open(filename, mode='rt', encoding='utf-8')

        # read all text
        text = file.read()
        file.close()
        return text

In [None]:
# split a text into sentences
def to_lines(text):
    sents = text.strip().split('\n')
    sents = [i.split('\t') for i in sents]
    return sents

In [None]:
data = read_text("/content/deu.txt")
eng_pes = to_lines(data)
eng_pes = array(eng_pes)

In [None]:
eng_pes

In [None]:
eng_pes = eng_pes[:50000,:]

In [None]:
# Remove punctuation
eng_pes[:,0] = [s.translate(str.maketrans('', '', string.punctuation)) for s in eng_pes[:,0]]
eng_pes[:,1] = [s.translate(str.maketrans('', '', string.punctuation)) for s in eng_pes[:,1]]

eng_pes


In [None]:
# convert text to lowercase
for i in range(len(eng_pes)):
    eng_pes[i,0] = eng_pes[i,0].lower()
    eng_pes[i,1] = eng_pes[i,1].lower()

In [None]:
# empty lists
eng_l = []
pes_l = []

# populate the lists with sentence lengths
for i in eng_pes[:,0]:
      eng_l.append(len(i.split()))

for i in eng_pes[:,1]:
      pes_l.append(len(i.split()))

length_df = pd.DataFrame({'eng':eng_l, 'pes':pes_l})

length_df.hist(bins = 30)
plt.show()

In [None]:
# function to build a tokenizer
def tokenization(lines):
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(lines)
    return tokenizer

In [None]:
# prepare english tokenizer
eng_tokenizer = tokenization(eng_pes[:, 0])
eng_vocab_size = len(eng_tokenizer.word_index) + 1

eng_length = 8
print('English Vocabulary Size: %d' % eng_vocab_size)

In [None]:
# prepare Persian tokenizer
pes_tokenizer = tokenization(eng_pes[:, 1])
pes_vocab_size = len(pes_tokenizer.word_index) + 1

pes_length = 8
print('Persian Vocabulary Size: %d' % pes_vocab_size)

In [None]:
# encode and pad sequences
def encode_sequences(tokenizer, length, lines):
    seq = tokenizer.texts_to_sequences(lines)
    # pad sequences with 0 values
    seq = pad_sequences(seq, maxlen=length, padding='post')
    return seq

# Model

In [None]:
from sklearn.model_selection import train_test_split

# split data into train and test set
train, test = train_test_split(eng_pes, test_size=0.2, random_state = 12)

In [None]:
# prepare training data
trainX = encode_sequences(pes_tokenizer, pes_length, train[:, 1])
trainY = encode_sequences(eng_tokenizer, eng_length, train[:, 0])

# prepare validation data
testX = encode_sequences(pes_tokenizer, pes_length, test[:, 1])
testY = encode_sequences(eng_tokenizer, eng_length, test[:, 0])


In [None]:
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import MultiHeadAttention

In [None]:
from tensorflow.keras import layers, Model
from tensorflow.keras.layers import MultiHeadAttention

def define_model(in_vocab, out_vocab, in_timesteps, out_timesteps, units):
    inputs = layers.Input(shape=(in_timesteps,))
    embedded_inputs = layers.Embedding(input_dim=in_vocab, output_dim=units, input_length=in_timesteps, mask_zero=True)(inputs)

    # Calculate query and key
    query = keys = layers.Dense(units)(embedded_inputs)

    # Adding the self-attention mechanism
    attention_output = MultiHeadAttention(num_heads=2, key_dim=units)(query, keys)

    lstm1 = layers.LSTM(units, return_sequences=False)(attention_output)
    repeated = layers.RepeatVector(out_timesteps)(lstm1)

    lstm2 = layers.LSTM(units, return_sequences=True)(repeated)
    outputs = layers.Dense(out_vocab, activation='softmax')(lstm2)

    model = Model(inputs=inputs, outputs=outputs)
    return model

In [None]:
from tensorflow.keras import layers, Model

def define_model(in_vocab, out_vocab, in_timesteps, out_timesteps, units):
    inputs = layers.Input(shape=(in_timesteps,))
    embedding = layers.Embedding(input_dim=in_vocab, output_dim=units, mask_zero=True)(inputs)

    # Adding the self-attention mechanism
    attention = MultiHeadAttention(num_heads=2, key_dim=units)
    query = attention([embedding, embedding])

    lstm1 = layers.Bidirectional(LSTM(units, return_sequences=False))(query)
    repeated = layers.RepeatVector(out_timesteps)(lstm1)

    lstm2 = layers.LSTM(units, return_sequences=True)(repeated)
    outputs = layers.Dense(out_vocab, activation='softmax')(lstm2)

    model = Model(inputs=inputs, outputs=outputs)
    return model

In [None]:
# model compilation
model = define_model(pes_vocab_size, eng_vocab_size, pes_length, eng_length, 512)

In [None]:

rms = optimizers.RMSprop(lr=0.001)
model.compile(optimizer=rms, loss='sparse_categorical_crossentropy')

In [None]:
filename = 'model.multi_head.13Nov'
checkpoint = ModelCheckpoint(filename, monitor='val_loss', verbose=1, save_best_only=True, mode='min')

# train model
history = model.fit(trainX, trainY.reshape(trainY.shape[0], trainY.shape[1], 1),
                    epochs=30, batch_size=512, validation_split = 0.2,callbacks=[checkpoint],
                    verbose=1)

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.legend(['train','validation'])
plt.show()

In [None]:
import numpy as np
model = load_model('model.h1.24_Oct_29_2')
# Pred
preds = model.predict(testX.reshape((testX.shape[0],testX.shape[1])))
# Convert predictions to classes
preds_classes = np.argmax(preds, axis=-1)

In [None]:
preds_classes

In [None]:
def get_word(n, tokenizer):
    for word, index in tokenizer.word_index.items():
        if index == n:
            return word
    return None

In [None]:
# Convert predictions into text (English)
preds_text = []
for i in preds_classes:
    temp = []
    for j in range(len(i)):
        t = get_word(i[j], eng_tokenizer)
        if j > 0:
            if (t == get_word(i[j-1], eng_tokenizer)) or (t == None):
                temp.append('')
            else:
                temp.append(t)
        else:
            if(t == None):
                temp.append('')
            else:
                temp.append(t)
    preds_text.append(' '.join(temp))

In [None]:
print(len(test[:,0]))
print(len(preds_text))

In [None]:
pred_df = pd.DataFrame({'actual' : test[:,0], 'predicted' : preds_text})

In [None]:
# print 15 rows randomly
pred_df.head(15)

In [None]:
import torch

# Specify the path to your .pth file
pth_file_path = '/content/speakers (2).pth'

# Load the model or data from the .pth file
loaded_data = torch.load(pth_file_path)
loaded_data_1 = torch.load("/content/speakers (4).pth")
# If it's a model, you can access its components like this:
# model = loaded_data['model']

# If it's a dictionary containing other data, you can access it like this:
# some_data = loaded_data['your_key']

# If it's a single tensor, you can directly use it like this:
# tensor = loaded_data

# You can also specify a device (e.g., 'cpu' or 'cuda') to load the data onto.
# For example, if you want to load the model on a specific device:
# model = torch.load(pth_file_path, map_location='cuda:0')


In [None]:
loaded_data

In [None]:
new_dict = {'speaker-{}'.format(v): v for k, v in loaded_data.items()}

# Print new dictionary
print(new_dict)

In [None]:
import torch

torch.save(new_dict, 'speaker.pth')

In [None]:
loaded_data_1