In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.sequence import pad_sequences
from gensim.models import Word2Vec
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, Bidirectional, LSTM, TimeDistributed, Dense, Dropout
from keras_self_attention import SeqSelfAttention
from sklearn.metrics import classification_report
import keras_tuner as kt
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau

In [None]:
dataset_path = "C:\\Users\\mvy48\\Downloads\\ml_combined_anoop-cc-gokul_07Dec19.txt"
with open(dataset_path, 'r', encoding='utf-8') as file:
    lines = file.readlines()
lines = lines[:200000]

def label_sentences(sentences):
    labeled_data = []
    for sentence in sentences:
        sentence = sentence.strip()
        if not sentence:
            continue
        words = sentence.split()
        labels = []
        for word in words:
            if len(word) == 1:
                labels.append((word, 'S'))
            else:
                labels.append((word[0], 'B'))
                for char in word[1:-1]:
                    labels.append((char, 'I'))
                labels.append((word[-1], 'E'))
        labeled_data.append(labels)
    return labeled_data

In [None]:
labeled_lines = label_sentences(lines)

sentences = [[char for char, label in sentence] for sentence in labeled_lines]

# Train Word2Vec model
word2vec_model = Word2Vec(sentences, vector_size=100, window=5, min_count=1, workers=4)
word2vec_model.save("word2vec.model")
vocab = list(word2vec_model.wv.index_to_key)
embeddings = [word2vec_model.wv[word] for word in vocab]
embedding_df = pd.DataFrame(embeddings, index=vocab)
embedding_df.to_csv('word_embeddings.csv')

# Convert Characters to Indices and Prepare Labels
char_to_index = {char: idx for idx, char in enumerate(word2vec_model.wv.index_to_key, start=1)}
char_to_index['PAD'] = 0

label_to_index = {'B': 0, 'I': 1, 'E': 2, 'S': 3}
pad_label = -1

def prepare_data(labeled_sentences, char_to_index, label_to_index):
    X = []
    y = []
    for sentence in labeled_sentences:
        sentence_indices = [char_to_index[char] for char, label in sentence]
        label_indices = [label_to_index[label] for char, label in sentence]
        X.append(sentence_indices)
        y.append(label_indices)
    return X, y

X, y = prepare_data(labeled_lines, char_to_index, label_to_index)


In [None]:
# Padding sequences
MAX_LEN = 100
X_padded = pad_sequences(X, maxlen=MAX_LEN, padding='post', value=char_to_index['PAD'])
y_padded = pad_sequences(y, maxlen=MAX_LEN, padding='post', value=pad_label)

# Split the data
X_train, X_test, y_train, y_test = train_test_split(X_padded, y_padded, test_size=0.3, random_state=42)


In [None]:
# Define the model with hyperparameters
def build_model(hp):
    model = Sequential()
    model.add(Embedding(
        input_dim=len(char_to_index),
        output_dim=hp.Int('embedding_dim', min_value=50, max_value=200, step=50),
        input_length=MAX_LEN,
        mask_zero=True
    ))
    model.add(Dropout(rate=hp.Float('embedding_dropout', min_value=0.1, max_value=0.5, step=0.1)))
    model.add(Bidirectional(LSTM(
        units=hp.Int('lstm_units', min_value=32, max_value=128, step=32),
        return_sequences=True,
        recurrent_dropout=hp.Float('recurrent_dropout', min_value=0.1, max_value=0.5, step=0.1)
    )))
    model.add(SeqSelfAttention(
        attention_activation='sigmoid'
    ))
    model.add(TimeDistributed(Dense(len(label_to_index), activation='softmax')))
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

# Initialize the tuner
tuner = kt.Hyperband(
    build_model,
    objective='val_accuracy',
    max_epochs=10,
    factor=3,
    directory='hyperband_dir',
    project_name='word_segmentation'
)

# Define callbacks
checkpoint = ModelCheckpoint('best_model.h5', monitor='val_accuracy', save_best_only=True, mode='max', verbose=1)
stop_early = EarlyStopping(monitor='val_loss', patience=5, verbose=1)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2, min_lr=0.0001, verbose=1)


In [None]:
# Perform hyperparameter search
tuner.search(X_train, np.expand_dims(y_train, -1), epochs=50, validation_split=0.1, callbacks=[stop_early, checkpoint, reduce_lr])

# Get the optimal hyperparameters
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

# Build the model with the optimal hyperparameters
model = tuner.hypermodel.build(best_hps)
model.summary()

# Train the model with the optimal hyperparameters
history = model.fit(X_train, np.expand_dims(y_train, -1), batch_size=64, epochs=10, validation_split=0.1, callbacks=[stop_early, checkpoint, reduce_lr])


In [None]:
# Load the best model
model = tf.keras.models.load_model('best_model.h5', custom_objects={'SeqSelfAttention': SeqSelfAttention})

# Evaluate the model
loss, accuracy = model.evaluate(X_test, np.expand_dims(y_test, -1))
print(f'Loss: {loss}, Accuracy: {accuracy}')


In [None]:
# Make predictions
y_pred = model.predict(X_test)
y_pred = np.argmax(y_pred, axis=-1)
y_true = np.squeeze(y_test, axis=-1)

# Replace pad labels with -1 for evaluation
y_true[y_true == pad_label] = -1
y_pred[y_true == -1] = -1

# Flatten and filter out padding labels
y_true_flat = y_true[y_true != -1].flatten()
y_pred_flat = y_pred[y_true != -1].flatten()

# Classification Report
label_names = ['B', 'I', 'E', 'S']
print(classification_report(y_true_flat, y_pred_flat, target_names=label_names))


In [None]:
# Decode predictions
def decode_predictions(preds, index_to_label):
    return [[index_to_label[idx] for idx in sentence] for sentence in preds]

index_to_label = {idx: label for label, idx in label_to_index.items()}

def reconstruct_sentence(chars, labels):
    words = []
    word = ''
    for char, label in zip(chars, labels):
        if label == 'B':
            if word:
                words.append(word)
            word = char
        elif label == 'I':
            word += char
        elif label == 'E':
            word += char
            words.append(word)
            word = ''
        elif label == 'S':
            if word:
                words.append(word)
            words.append(char)
            word = ''
    if word:
        words.append(word)
    return words

def predict_sentence(sentence, model, char_to_index, index_to_label):
    sentence_indices = [char_to_index.get(char, 0) for char in sentence]
    sentence_padded = pad_sequences([sentence_indices], maxlen=MAX_LEN, padding='post', value=char_to_index['PAD'])
    preds = model.predict(sentence_padded)
    label_preds = decode_predictions(np.argmax(preds, axis=-1), index_to_label)
    segmented_words = reconstruct_sentence(sentence, label_preds[0])
    return segmented_words

# Define the sentence to predict
sentence_to_predict = "ആഗ്രഹങ്ങൾസാക്ഷാത്കരിക്കാന്പഠിക്കണം"

# Make Prediction
predicted_words = predict_sentence(sentence_to_predict, model, char_to_index, index_to_label)

# Print the Segmented Sentence
print("Segmented Sentence:", ' '.join(predicted_words))
