# Set the same Python executable for both driver and worker

In [2]:
import os
os.environ["PYSPARK_PYTHON"] = "C:/Users/agrui/anaconda3/python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"] = "C:/Users/agrui/anaconda3/python.exe"

# Read the data and store it in a dataframe


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Phoneme to Grapheme Conversion") \
    .config("spark.executor.memory", "16g") \
    .config("spark.driver.memory", "16g") \
    .getOrCreate()

# Define the schema for the CMU dictionary
schema = StructType([
    StructField("word", StringType(), True),
    StructField("phonemes", StringType(), True)
])

# Load the CMU dictionary file
cmu_file_path = "./cmudict-0.7b-utf8.txt"  # Update path if needed
cmu_rdd = spark.sparkContext.textFile(cmu_file_path) \
    .filter(lambda line: not line.startswith(";;;")) \
    .map(lambda line: line.split("  "))  # Double space separates word and phonemes

# Convert the RDD to a DataFrame
cmu_df = cmu_rdd.map(lambda x: (x[0], x[1])).toDF(schema=schema)

# Limit the DataFrame to 500 rows (adjust as needed)
cmu_df = cmu_df.limit(3200)

# Cache the DataFrame to improve performance
cmu_df.cache()

DataFrame[word: string, phonemes: string]

# Data Cleaning

In [6]:
from pyspark.sql.functions import regexp_replace, lower

# Remove non-alphanumeric characters from 'word'
cmu_df = cmu_df.withColumn('word', regexp_replace(cmu_df['word'], '[^a-zA-Z0-9]', ''))
# cmu_df = cmu_df.withColumn('word', F.expr("regexp_replace(word, '(.)', '$1 ')"))
cmu_df = cmu_df.filter(~F.col('word').rlike('.*[0-9].*'))
cmu_df.show(10, truncate=False)


+----------------+----------------------------------------+
|word            |phonemes                                |
+----------------+----------------------------------------+
|EXCLAMATIONPOINT|EH2 K S K L AH0 M EY1 SH AH0 N P OY2 N T|
|CLOSEQUOTE      |K L OW1 Z K W OW1 T                     |
|DOUBLEQUOTE     |D AH1 B AH0 L K W OW1 T                 |
|ENDOFQUOTE      |EH1 N D AH0 V K W OW1 T                 |
|ENDQUOTE        |EH1 N D K W OW1 T                       |
|INQUOTES        |IH1 N K W OW1 T S                       |
|QUOTE           |K W OW1 T                               |
|UNQUOTE         |AH1 N K W OW1 T                         |
|HASHMARK        |HH AE1 M AA2 R K                        |
|POUNDSIGN       |P AW1 N D S AY2 N                       |
+----------------+----------------------------------------+
only showing top 10 rows



# Split the data for training, validating, and testing

In [8]:
# Split data into 70% train, 15% validation, and 15% test
train_df, test_df = cmu_df.randomSplit([0.8, 0.2], seed=69)

print('training set:')
train_df.show(10, truncate=False)

print('test set:')
test_df.show(10, truncate=False)



training set:
+--------+-------------------+
|word    |phonemes           |
+--------+-------------------+
|A       |AH0                |
|A       |EY1                |
|AA      |EY2 EY1            |
|AAA     |T R IH2 P AH0 L EY1|
|AABERG  |AA1 B ER0 G        |
|AACHEN  |AA1 K AH0 N        |
|AACHENER|AA1 K AH0 N ER0    |
|AAH     |AA1                |
|AAKER   |AA1 K ER0          |
|AALIYAH |AA2 L IY1 AA2      |
+--------+-------------------+
only showing top 10 rows

test set:
+----------+--------------------------+
|word      |phonemes                  |
+----------+--------------------------+
|AARONS    |EH1 R AH0 N Z             |
|AB        |AE1 B                     |
|ABACK     |AH0 B AE1 K               |
|ABACO     |AE1 B AH0 K OW2           |
|ABACUS    |AE1 B AH0 K AH0 S         |
|ABALKIN   |AH0 B AA1 L K IH0 N       |
|ABANDONING|AH0 B AE1 N D AH0 N IH0 NG|
|ABASH     |AH0 B AE1 SH              |
|ABASHED   |AH0 B AE1 SH T            |
|ABASIA    |AH0 B EY1 ZH Y AH0      

# Tokenize and Pad the training data

In [10]:
# Function to process data in chunks
def process_chunk(df_chunk):
    phonemes = [row['phonemes'] for row in df_chunk.select('phonemes').collect()]
    graphemes = [row['word'] for row in df_chunk.select('word').collect()]
    return phonemes, graphemes

# Initialize empty lists to store data
train_phonemes = []
train_graphemes = []

# Set the chunk size for batch processing
chunk_size = 100  # Adjust chunk size based on your system's memory

# Get the total number of rows
total_rows = train_df.count()

# Process the DataFrame in chunks
for start in range(0, total_rows, chunk_size):
    df_chunk = train_df.limit(start + chunk_size).subtract(train_df.limit(start))
    phoneme_chunk, grapheme_chunk = process_chunk(df_chunk)
    train_phonemes.extend(phoneme_chunk)
    train_graphemes.extend(grapheme_chunk)

# Add <start> and <end> tokens to grapheme sequences
train_graphemes = ['<start> ' + grapheme + ' <end>' for grapheme in train_graphemes]

# Tokenize the phonemes and graphemes using Keras' Tokenizer
phoneme_tokenizer = Tokenizer(char_level=True)
phoneme_tokenizer.fit_on_texts(train_phonemes)

# Tokenize the graphemes again with <start> and <end> tokens included
grapheme_tokenizer = Tokenizer(char_level=True, filters='')
grapheme_tokenizer.fit_on_texts(train_graphemes)

# Ensure that <start> and <end> tokens are in the vocabulary
# Manually add if not present
if '<start>' not in grapheme_tokenizer.word_index:
    grapheme_tokenizer.word_index['<start>'] = len(grapheme_tokenizer.word_index) + 1
if '<end>' not in grapheme_tokenizer.word_index:
    grapheme_tokenizer.word_index['<end>'] = len(grapheme_tokenizer.word_index) + 1

# Update the index_word dictionary for reverse lookup
grapheme_tokenizer.index_word = {v: k for k, v in grapheme_tokenizer.word_index.items()}

# Convert sequences to numerical tokens again
train_phoneme_seq = phoneme_tokenizer.texts_to_sequences(train_phonemes)
train_grapheme_seq = grapheme_tokenizer.texts_to_sequences(train_graphemes)

# Pad sequences to the same length
max_len_input = max([len(seq) for seq in train_phoneme_seq])
max_len_output = max([len(seq) for seq in train_grapheme_seq])

X_train_padded = pad_sequences(train_phoneme_seq, maxlen=max_len_input, padding='post')
Y_train_padded = pad_sequences(train_grapheme_seq, maxlen=max_len_output, padding='post')

# Print the shapes of the padded sequences to verify
print(f"Input (X) shape: {X_train_padded.shape}")
print(f"Output (Y) shape: {Y_train_padded.shape}")


Input (X) shape: (2304, 45)
Output (Y) shape: (2304, 31)


# Create an encoder that processes inputs (phonemes) and a decoder that generates outputs (graphemes)

In [12]:
import tensorflow as tf
from tensorflow.keras.layers import Input, LSTM, Dense, Embedding, Concatenate, Attention, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical
import numpy as np

# Convert Y_train_padded to categorical (one-hot encoding)
vocab_size_graphemes = len(grapheme_tokenizer.word_index) + 1
decoder_target_data = to_categorical(Y_train_padded, num_classes=vocab_size_graphemes)
# Define encoder input data and decoder input data
encoder_input_data = X_train_padded
decoder_input_data = Y_train_padded

# Encoder with Dropout
encoder_input = Input(shape=(None,))
encoder_embedding = Embedding(input_dim=len(phoneme_tokenizer.word_index) + 1, output_dim=128)(encoder_input)
encoder_lstm = LSTM(512, return_sequences=True, return_state=True)
encoder_output, state_h, state_c = encoder_lstm(encoder_embedding)
encoder_output = Dropout(0.5)(encoder_output)

# Decoder with Dropout
decoder_input = Input(shape=(None,))
decoder_embedding = Embedding(input_dim=vocab_size_graphemes, output_dim=128)(decoder_input)
decoder_lstm = LSTM(512, return_sequences=True, return_state=True)
decoder_output, _, _ = decoder_lstm(decoder_embedding, initial_state=[state_h, state_c])
decoder_output = Dropout(0.5)(decoder_output)

# Attention mechanism
attention = Attention(name="attention_layer")
attention_result = attention([decoder_output, encoder_output])

# Concatenate attention output and decoder output
decoder_concat_input = Concatenate(axis=-1)([decoder_output, attention_result])

# Dense layer to generate predictions
decoder_dense = Dense(vocab_size_graphemes, activation='softmax')
decoder_output_final = decoder_dense(decoder_concat_input)

# Define and compile the model
model = Model([encoder_input, decoder_input], decoder_output_final)
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Summary of the model
model.summary()

# Training & Validation

In [14]:
history = model.fit([encoder_input_data, decoder_input_data], 
                    decoder_target_data, 
                    batch_size=64, 
                    epochs=20, 
                    validation_split=0.2)

Epoch 1/20




[1m29/29[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m30s[0m 828ms/step - accuracy: 0.3611 - loss: 2.3345 - val_accuracy: 0.6846 - val_loss: 1.1170
Epoch 2/20
[1m29/29[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m25s[0m 866ms/step - accuracy: 0.7195 - loss: 0.9631 - val_accuracy: 0.8877 - val_loss: 0.4966
Epoch 3/20
[1m29/29[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m36s[0m 1s/step - accuracy: 0.9123 - loss: 0.3864 - val_accuracy: 0.9600 - val_loss: 0.2162
Epoch 4/20
[1m29/29[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m60s[0m 2s/step - accuracy: 0.9669 - loss: 0.1503 - val_accuracy: 0.9854 - val_loss: 0.0821
Epoch 5/20
[1m29/29[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m51s[0m 2s/step - accuracy: 0.9912 - loss: 0.0529 - val_accuracy: 0.9943 - val_loss: 0.0384
Epoch 6/20
[1m16/29[0m [32m━━━━━━━━━━━[0m[37m━━━━━━━━━[0m [1m16s[0m 1s/step - accuracy: 0.9955 - loss: 0.0283

KeyboardInterrupt: 

# Testing

In [None]:
# Prepare test data (assuming test_df is already preprocessed similarly to train_df)
test_phonemes = [row['phonemes'] for row in test_df.select('phonemes').collect()]
test_graphemes = [row['word'] for row in test_df.select('word').collect()]

# Tokenize the phonemes and graphemes using the trained tokenizers
test_phoneme_seq = phoneme_tokenizer.texts_to_sequences(test_phonemes)
test_grapheme_seq = grapheme_tokenizer.texts_to_sequences(test_graphemes)

# Pad the sequences to the same lengths as during training
X_test_padded = pad_sequences(test_phoneme_seq, maxlen=max_len_input, padding='post')
Y_test_padded = pad_sequences(test_grapheme_seq, maxlen=max_len_output, padding='post')

# Convert the target (Y_test_padded) to categorical (one-hot encoding)
vocab_size_graphemes = len(grapheme_tokenizer.word_index) + 1  # Get the number of graphemes from the trained tokenizer
Y_test_categorical = to_categorical(Y_test_padded, num_classes=vocab_size_graphemes)

# Evaluate the model on the test set
test_loss, test_accuracy = model.evaluate([X_test_padded, Y_test_padded], Y_test_categorical)

# Print test accuracy and loss
print(f'Test Loss: {test_loss}, Test Accuracy: {test_accuracy}')


# Create a Inference-model

In [None]:
# --- Inference process for making predictions ---
# Prepare the inference model
# Encoder Model for inference
encoder_model = Model(encoder_input, [encoder_output, state_h, state_c])

# Decoder Model for inference
decoder_state_input_h = Input(shape=(512,))
decoder_state_input_c = Input(shape=(512,))
decoder_hidden_state_input = Input(shape=(None, 512))

decoder_embedding2 = Embedding(input_dim=vocab_size_graphemes, output_dim=128)(decoder_input)
decoder_lstm2 = LSTM(512, return_sequences=True, return_state=True)
decoder_output2, state_h2, state_c2 = decoder_lstm2(decoder_embedding2, initial_state=[decoder_state_input_h, decoder_state_input_c])
decoder_attention_result = attention([decoder_output2, decoder_hidden_state_input])
decoder_concat_input2 = Concatenate(axis=-1)([decoder_output2, decoder_attention_result])
decoder_output_final2 = decoder_dense(decoder_concat_input2)

decoder_model = Model(
    [decoder_input, decoder_hidden_state_input, decoder_state_input_h, decoder_state_input_c],
    [decoder_output_final2, state_h2, state_c2]
)


# Decoding Sequences

In [None]:
# Function to decode a sequence
def decode_sequence(input_seq):
    # Encode the input as state vectors
    encoder_outputs, state_h, state_c = encoder_model.predict(input_seq)

    # Generate empty target sequence of length 1
    target_seq = np.zeros((1, 1))
    # Start token
    target_seq[0, 0] = grapheme_tokenizer.word_index['<start>']

    # Sampling loop to generate sequence
    stop_condition = False
    decoded_sentence = ''
    while not stop_condition:
        output_tokens, h, c = decoder_model.predict([target_seq, encoder_outputs, state_h, state_c])

        # Sample a token
        sampled_token_index = np.argmax(output_tokens[0, -1, :])
        sampled_char = grapheme_tokenizer.index_word.get(sampled_token_index, '')

        # Exit condition: either hit max length or find stop token.
        if sampled_char == '<end>' or len(decoded_sentence) > max_len_output:
            stop_condition = True
        else:
            decoded_sentence += sampled_char

        # Update the target sequence (of length 1)
        target_seq = np.zeros((1, 1))
        target_seq[0, 0] = sampled_token_index

        # Update internal states
        state_h, state_c = h, c

    return decoded_sentence

# Demo:

In [None]:
# Sample input for testing (new phoneme sequence)
sample_phoneme_sequence = 'H EH L OW'  # Word: "HELLO"

# Tokenize and pad the sample input using the trained phoneme tokenizer
sample_phoneme_sequence_tokenized = phoneme_tokenizer.texts_to_sequences([sample_phoneme_sequence])
sample_phoneme_sequence_padded = pad_sequences(sample_phoneme_sequence_tokenized, maxlen=max_len_input, padding='post')

# Make the prediction with the model
predicted_grapheme_word = decode_sequence(sample_phoneme_sequence_padded)

# Print predicted word
print("Predicted grapheme word:", predicted_grapheme_word)