# Install Packages

In [None]:
!pip install --upgrade torch
!pip install pytube
!pip install git+https://github.com/openai/whisper.git

# Import packages

In [None]:
import torch
import whisper
import pytube
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import IPython.display as ipd

# Conver Speech-To-Text

In [None]:
model_type="small"
df = pd.read_csv("./drive/MyDrive/Santhali-English NLP/data.csv")
df.head()

In [None]:
def to_text(model_m,file_path):
  try:
    model_data = model_m.transcribe(file_path,language="en", fp16 = False)
    print(model_data)
    text = model_data['text']
    return text
  except Exception as e:
    print(e)
    return False

In [None]:
def read_sound(model_type):
  sound_folder_path = "./drive/MyDrive/Santhali-English NLP/sounds/SNT_"
  sound_extention=".mp3"
  model_m = whisper.load_model(model_type)
  new_df = []
  column_values = ['sn','en','pos']
  for i in range(len(df['english'])):
    print("reading sound ",i)
    sn,en,pos = to_text(model_m,sound_folder_path+str(i)+sound_extention),df['english'][i],df['pos'][i]
    new_df.append([sn,en,pos])
  sn_df,en_df,pos_df=[],[],[]
  for dd in new_df:
    sn_df.append(dd[0])
    en_df.append(dd[1])
    pos_df.append(dd[2])
  final_df = pd.DataFrame(list(zip(sn_df,en_df,pos_df)),columns=["sn","en","pos"])
  final_csv_path = './drive/MyDrive/Santhali-English NLP/'+model_type+'.csv'
  with open(final_csv_path, 'w', encoding = 'utf-8-sig') as f:
    final_df.to_csv(f)



In [None]:
read_sound("small")

In [None]:
read_sound("medium")

In [None]:
small_data_df = pd.read_csv("./drive/MyDrive/Santhali-English NLP/small.csv")
medium_data_df = pd.read_csv("./drive/MyDrive/Santhali-English NLP/medium.csv")

In [None]:
def merge_data_of_df(data1,data2)->int:
  if len(data1) > 40 and len(data2) > 40:
    return 0
  if len(data1) == 0 and len(data2) == 0:
    return 0
  elif len(data2) <= len(data1):
    return 2
  else:
    return 1

In [None]:
small_df_size,medium_df_size =  len(small_data_df),len(medium_data_df)
final_sn,final_en,final_pos = [],[],[]
for i in range(int(min(small_df_size,medium_df_size))):
  d = merge_data_of_df(str(small_data_df['sn'][i]),str(medium_data_df['sn'][i]))
  # print(d)
  if d == 0:
    continue
  elif d == 1:
    final_sn.append(str(small_data_df['sn'][i]))
    final_en.append(str(small_data_df['en'][i]))
    final_pos.append(str(small_data_df['pos'][i]))
  else:
    final_sn.append(str(medium_data_df['sn'][i]))
    final_en.append(str(medium_data_df['en'][i]))
    final_pos.append(str(medium_data_df['pos'][i]))
final_merge_df = pd.DataFrame(list(zip(final_sn,final_en,final_pos)),columns=["sn","en","pos"])

In [None]:
# final_merge_df.head()

In [None]:
final_csv_path = './drive/MyDrive/Santhali-English NLP/final_merged_data.csv'
with open(final_csv_path, 'w', encoding = 'utf-8-sig') as f:
  final_merge_df.to_csv(f)

# Model


## Data Cleaing

In [None]:
import string
import re
from pickle import dump
from unicodedata import normalize
from numpy import array

In [None]:
dataSet = pd.read_csv("./drive/MyDrive/Santhali-English NLP/final_merged_data.csv")

In [None]:
def to_pairs(df):
  all_sn,all_en = df['sn'],df['en']
  pairs = list()
  for i in range(len(all_sn)):
    pairs.append([all_en[i],all_sn[i]])
  return pairs

In [None]:
# load data in form of sn - en pair
pairs_data = to_pairs(dataSet)
print(pairs_data[-5:],len(pairs_data))

[['their-2', ' Thank you'], ['they-2', ' onaudience.'], ['they', ' love, Sonam.'], ['weak', ' So'], ['hair', ' Oh!']] 468


In [None]:
# clean a list of lines
def to_clean_pairs(lines):
	cleaned = list()
	# prepare regex for char filtering
	re_print = re.compile('[^%s]' % re.escape(string.printable))
	# prepare translation table for removing punctuation
	table = str.maketrans('', '', string.punctuation)
	for pair in lines:
		clean_pair = list()
		for line in pair:
			# normalize unicode characters
			line = normalize('NFD', str(line)).encode('ascii', 'ignore')
			line = line.decode('UTF-8')
			# tokenize on white space
			line = line.split()
			# convert to lowercase
			line = [word.lower() for word in line]
			# remove punctuation from each token
			line = [word.translate(table) for word in line]
			# remove non-printable chars form each token
			line = [re_print.sub('', w) for w in line]
			# remove tokens with numbers in them
			line = [word for word in line if word.isalpha()]
			# store as string
			clean_pair.append(' '.join(line))
		cleaned.append(clean_pair)
	return array(cleaned)

In [None]:
## getting cleaned data set
clean_pairs = to_clean_pairs(pairs_data)
print(clean_pairs[-5:],len(clean_pairs))

## Split Dataset

In [None]:
from pickle import load
from pickle import dump
from numpy.random import rand
from numpy.random import shuffle

dataset = clean_pairs.copy()
total_data_size = len(dataset)
training_data_size = int(total_data_size * 0.75)

shuffle(dataset)

# split into train/test
train_data_set, test_data_set = dataset[:training_data_size], dataset[training_data_size:]

## Train NLP

In [None]:
from pickle import load
from numpy import array
from keras.preprocessing.text import Tokenizer
from keras.utils import pad_sequences
from keras.utils import to_categorical
from keras.utils.vis_utils import plot_model
from keras.models import Sequential
from keras.layers import LSTM
from keras.layers import Dense
from keras.layers import Embedding
from keras.layers import RepeatVector
from keras.layers import TimeDistributed
from keras.callbacks import ModelCheckpoint

In [None]:
# initialize a tokenizer
def create_tokenizer(lines):
	tokenizer = Tokenizer()
	tokenizer.fit_on_texts(lines)
	return tokenizer

In [None]:
# max sentence length
def max_length(lines):
	return max(len(line.split()) for line in lines)

In [None]:
# encode and pad sequences
def encode_sequences(tokenizer, length, lines):
	# integer encode sequences
	X = tokenizer.texts_to_sequences(lines)
	# pad sequences with 0 values
	X = pad_sequences(X, maxlen=length, padding='post')
	return X

In [None]:
# one hot encode target sequence
def encode_output(sequences, vocab_size):
	ylist = list()
	for sequence in sequences:
		encoded = to_categorical(sequence, num_classes=vocab_size)
		ylist.append(encoded)
	y = array(ylist)
	y = y.reshape(sequences.shape[0], sequences.shape[1], vocab_size)
	return y

In [None]:
# define model
def define_model(src_vocab, tar_vocab, src_timesteps, tar_timesteps, n_units):
	model = Sequential()
	model.add(Embedding(src_vocab, n_units, input_length=src_timesteps, mask_zero=True))
	model.add(LSTM(n_units))
	model.add(RepeatVector(tar_timesteps))
	model.add(LSTM(n_units, return_sequences=True))
	model.add(TimeDistributed(Dense(tar_vocab, activation='softmax')))
	return model

In [None]:
# prepare sn tokenizer
sn_tokenizer = create_tokenizer(dataset[:, 0])
sn_vocab_size = len(sn_tokenizer.word_index) + 1
sn_length = max_length(dataset[:, 0])
print('SN Vocabulary Size: %d' % sn_vocab_size)
print('SN Max Length: %d' % (sn_length))
# prepare en tokenizer
en_tokenizer = create_tokenizer(dataset[:, 1])
en_vocab_size = len(en_tokenizer.word_index) + 1
en_length = max_length(dataset[:, 1])
print('EN Vocabulary Size: %d' % en_vocab_size)
print('EN Max Length: %d' % (en_length))

SN Vocabulary Size: 398
SN Max Length: 4
EN Vocabulary Size: 457
EN Max Length: 6


In [None]:
# prepare training data
trainX = encode_sequences(en_tokenizer, en_length, train_data_set[:, 1])
trainY = encode_sequences(sn_tokenizer, sn_length, train_data_set[:, 0])
trainY = encode_output(trainY, sn_vocab_size)

In [None]:
# prepare validation data
testX = encode_sequences(en_tokenizer, en_length, test_data_set[:, 1])
testY = encode_sequences(sn_tokenizer, sn_length, test_data_set[:, 0])
testY = encode_output(testY, sn_vocab_size)

In [None]:
# define model
model = define_model(en_vocab_size, sn_vocab_size, en_length, sn_length, 256)
model.compile(optimizer='adam', loss='categorical_crossentropy')
# summarize defined model
print(model.summary())
plot_model(model, to_file='model.png', show_shapes=True)

In [None]:
# fit model
filename = 'model.h5'
checkpoint = ModelCheckpoint(filename, monitor='val_loss', verbose=1, save_best_only=True, mode='min')
model.fit(trainX, trainY, epochs=100, batch_size=64, validation_data=(testX, testY), callbacks=[checkpoint], verbose=2)

## Evaluation

In [None]:
from pickle import load
from numpy import array
from numpy import argmax
from keras.preprocessing.text import Tokenizer
from keras.utils import pad_sequences
from keras.models import load_model
from nltk.translate.bleu_score import corpus_bleu

In [None]:
# fit a tokenizer
def create_tokenizer(lines):
	tokenizer = Tokenizer()
	tokenizer.fit_on_texts(lines)
	return tokenizer

In [None]:
# max sentence length
def max_length(lines):
	return max(len(line.split()) for line in lines)

In [None]:
# encode and pad sequences
def encode_sequences(tokenizer, length, lines):
	# integer encode sequences
	X = tokenizer.texts_to_sequences(lines)
	# pad sequences with 0 values
	X = pad_sequences(X, maxlen=length, padding='post')
	return X

In [None]:
# map an integer to a word
def word_for_id(integer, tokenizer):
	for word, index in tokenizer.word_index.items():
		if index == integer:
			return word
	return None

In [None]:
# generate target given source sequence
def predict_sequence(model, tokenizer, source):
	prediction = model.predict(source, verbose=0)[0]
	integers = [argmax(vector) for vector in prediction]
	target = list()
	for i in integers:
		word = word_for_id(i, tokenizer)
		if word is None:
			break
		target.append(word)
	return ' '.join(target)

In [None]:
# evaluate the model
def evaluate_model(model, tokenizer, sources, raw_dataset):
	actual, predicted = list(), list()
	for i, source in enumerate(sources):
		# translate encoded source text
		source = source.reshape((1, source.shape[0]))
		translation = predict_sequence(model, sn_tokenizer, source)
		raw_target, raw_src = raw_dataset[i]
		actual.append([raw_target.split()])
		predicted.append(translation.split())
	# calculate BLEU score
	print('BLEU-1: %f' % corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))
	print('BLEU-2: %f' % corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0)))
	print('BLEU-3: %f' % corpus_bleu(actual, predicted, weights=(0.3, 0.3, 0.3, 0)))
	print('BLEU-4: %f' % corpus_bleu(actual, predicted, weights=(0.25, 0.25, 0.25, 0.25)))

In [None]:
# prepare sn tokenizer
sn_tokenizer = create_tokenizer(dataset[:, 0])
sn_vocab_size = len(sn_tokenizer.word_index) + 1
sn_length = max_length(dataset[:, 0])
# prepare en tokenizer
en_tokenizer = create_tokenizer(dataset[:, 1])
en_vocab_size = len(en_tokenizer.word_index) + 1
en_length = max_length(dataset[:, 1])
# prepare data
trainX = encode_sequences(en_tokenizer, en_length, train_data_set[:, 1])
testX = encode_sequences(en_tokenizer, en_length, test_data_set[:, 1])

In [None]:
# load model
model = load_model('model.h5')
# test on some training sequences
print('train')
evaluate_model(model, sn_tokenizer, trainX, train_data_set)
# test on some test sequences
print('test')
evaluate_model(model, sn_tokenizer, testX, test_data_set)