In [1]:
import os
import librosa
import numpy as np

# Step 1: Specify the folder path containing the audio files
folder_path = "extracted_content/clips"

# Step 2: List all the files in the folder
file_list = os.listdir(folder_path)

# Step 3: Filter only the .wav files
wav_files = [file for file in file_list if file.endswith(".wav")]

# Step 4: Define the desired length for padding
max_length = 200  # Adjust this value based on your requirements

# Step 5: Loop through each .wav file
for wav_file in wav_files:
    # Step 6: Load the audio file
    audio_path = os.path.join(folder_path, wav_file)
    audio_data, sample_rate = librosa.load(audio_path, sr=None)

    # Step 7: Process the audio file as needed
    # For example, converting the audio to spectrograms
    spectrogram = librosa.feature.melspectrogram(y=audio_data, sr=sample_rate, n_fft=128)

    # Step 8: Pad or truncate the spectrogram to the desired length
    if spectrogram.shape[1] < max_length:
        # Pad the spectrogram if its length is less than the desired length
        padding = max_length - spectrogram.shape[1]
        spectrogram = np.pad(spectrogram, ((0, 0), (0, padding)), mode='constant')
    elif spectrogram.shape[1] > max_length:
        # Truncate the spectrogram if its length is greater than the desired length
        spectrogram = spectrogram[:, :max_length]

  mel_basis = filters.mel(sr=sr, n_fft=n_fft, **kwargs)


In [2]:
import os
import librosa
import numpy as np
import json
import tensorflow as tf
from tensorflow.keras.layers import LSTM, Input, Dense, Attention, Reshape
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.text import Tokenizer

In [3]:
import json

# Load train.json
with open('train.json', 'r') as f:
    train_data = json.load(f)

# Load valid.json
with open('valid.json', 'r') as f:
    valid_data = json.load(f)

# Preprocess audio data for training and validation
# This includes loading audio files, preprocessing them, and tokenizing target sequences
# Define function to preprocess audio data
def preprocess_audio(audio_path, target_shape,n_fft=1024):
    audio_data, sample_rate = librosa.load(audio_path, sr=None)
    spectrogram = librosa.feature.melspectrogram(y=audio_data, sr=sample_rate)
    if spectrogram.shape[1] < target_shape[1]:
        padding = target_shape[1] - spectrogram.shape[1]
        spectrogram = np.pad(spectrogram, ((0, 0), (0, padding)), mode='constant')
    elif spectrogram.shape[1] > target_shape[1]:
        spectrogram = spectrogram[:, :target_shape[1]]
    spectrogram = librosa.power_to_db(spectrogram, ref=np.max)
    spectrogram -= np.mean(spectrogram)
    spectrogram /= np.std(spectrogram)
    return spectrogram

# Preprocess audio data for training and validation
target_shape = (128, 200)  # Example shape, adjust as needed

# Extract audio paths and target sequences from the data
train_audio_paths = [os.path.join('extracted_content/clips', entry['src']) for entry in train_data]
valid_audio_paths = [os.path.join('extracted_content/clips', entry['src']) for entry in valid_data]

X_train = [preprocess_audio(audio_path, target_shape) for audio_path in train_audio_paths]
X_valid = [preprocess_audio(audio_path, target_shape) for audio_path in valid_audio_paths]

train_target_sequences = [entry['tgt'] for entry in train_data]
valid_target_sequences = [entry['tgt'] for entry in valid_data]




In [4]:
# Define the maximum vocabulary size
max_vocab_size = 30000

# Initialize a tokenizer
tokenizer = Tokenizer(num_words=max_vocab_size, oov_token="<OOV>")

# Fit the tokenizer on the target sequences
tokenizer.fit_on_texts(train_target_sequences)

# Convert text to sequences of numerical indices
train_sequences = tokenizer.texts_to_sequences(train_target_sequences)
valid_sequences = tokenizer.texts_to_sequences(valid_target_sequences)

# Pad sequences to ensure uniform length
#max_sequence_length = max(len(seq) for seq in train_sequences + valid_sequences)
train_sequences_padded = pad_sequences(train_sequences, maxlen=200, padding='post')
valid_sequences_padded = pad_sequences(valid_sequences, maxlen=200, padding='post')


In [5]:
train_sequences_padded[0].shape

(200,)

In [6]:
# Define the input shape
input_shape = (128, 200)  # Example shape, adjust as needed

# Define the encoder input
encoder_input = Input(shape=input_shape, name="encoder_input")

# Define the decoder input
decoder_input = Input(shape=input_shape, name="decoder_input")

# Define the encoder LSTM
encoder_lstm = LSTM(256, return_sequences=True, return_state=True, name="encoder_lstm")
encoder_outputs, state_h, state_c = encoder_lstm(encoder_input)
encoder_states = [state_h, state_c]

# Define the decoder LSTM
decoder_lstm = LSTM(256, return_sequences=True, return_state=True, name="decoder_lstm")
decoder_outputs, _, _ = decoder_lstm(decoder_input, initial_state=encoder_states)

# Define the attention mechanism
attention_layer = Attention()
attention_output = attention_layer([encoder_outputs, decoder_outputs])

# Define a dense layer to map the attention output to the desired features
dense_layer = Dense(200, activation="relu", name="dense")
mapped_attention_output = dense_layer(attention_output)

# Reshape the mapped attention output to match the input shape
reshaped_attention_output = Reshape((128, 200))(mapped_attention_output)


In [7]:
# Define the model
model = tf.keras.Model([encoder_input, decoder_input], reshaped_attention_output)

# Compile the model
model.compile(optimizer="adam", loss="mean_squared_error",metrics = ['accuracy'])

# Print model summary
model.summary()


In [8]:
# Define batch size and custom number of epochs
batch_size = 128
num_epochs = 10

def data_generator(X, batch_size):
    for i in range(0, len(X), batch_size):
        encoder_input_batch = X[i:i+batch_size]
        decoder_input_batch = X[i:i+batch_size]  # Same as encoder input for autoencoder
        target_batch = X[i:i+batch_size]  # Same as input for autoencoder
        yield ((encoder_input_batch, decoder_input_batch), target_batch)

# Create data generators
train_generator = tf.data.Dataset.from_generator(
    lambda: data_generator(X_train, batch_size),
    output_signature=(
        ((tf.TensorSpec(shape=(None, 128, 200), dtype=tf.float32), 
          tf.TensorSpec(shape=(None, 128, 200), dtype=tf.float32)), 
         tf.TensorSpec(shape=(None, 128, 200), dtype=tf.float32))
    )
).repeat()

valid_generator = tf.data.Dataset.from_generator(
    lambda: data_generator(X_valid, batch_size),
    output_signature=(
        ((tf.TensorSpec(shape=(None, 128, 200), dtype=tf.float32), 
          tf.TensorSpec(shape=(None, 128, 200), dtype=tf.float32)), 
         tf.TensorSpec(shape=(None, 128, 200), dtype=tf.float32))
    )
).repeat()

In [9]:
# Training
model.fit(
    train_generator,
    steps_per_epoch=len(X_train) // batch_size,
    epochs=num_epochs,
    validation_data=(valid_generator),
    validation_steps=len(X_valid) // batch_size
)

Epoch 1/10
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1412s[0m 7s/step - accuracy: 0.0242 - loss: 0.5341 - val_accuracy: 0.0541 - val_loss: 0.3837
Epoch 2/10
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1419s[0m 8s/step - accuracy: 0.0561 - loss: 0.3769 - val_accuracy: 0.0810 - val_loss: 0.3628
Epoch 3/10
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1444s[0m 8s/step - accuracy: 0.0814 - loss: 0.3643 - val_accuracy: 0.0966 - val_loss: 0.3591
Epoch 4/10
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1440s[0m 8s/step - accuracy: 0.1014 - loss: 0.3568 - val_accuracy: 0.1646 - val_loss: 0.3355
Epoch 5/10
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1435s[0m 8s/step - accuracy: 0.1902 - loss: 0.3348 - val_accuracy: 0.2845 - val_loss: 0.3173
Epoch 6/10
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1430s[0m 8s/step - accuracy: 0.2957 - loss: 0.3204 - val_accuracy: 0.3638 - val_loss: 0.3164
Epoch 7/10
[1m1

<keras.src.callbacks.history.History at 0x25e52ba3a50>

In [11]:
# Load test.json
with open('test.json', 'r') as f:
    test_data = json.load(f)

# Preprocess audio data for test samples
X_test_encoder = []  # For encoder input
X_test_decoder = []  # For decoder input
id = []  # To store test sample IDs

for sample in test_data:
    audio_path = os.path.join('extracted_content/clips', sample['src'])
    spectrogram = preprocess_audio(audio_path, (128, 200))  # Assuming spectrogram shape is (128, 200)
    if spectrogram is not None:
        X_test_encoder.append(spectrogram)
        X_test_decoder.append(spectrogram)  # Decoder input same as encoder input
        id.append(sample['id'])

# Convert test data to NumPy arrays
X_test_encoder_numpy = np.array(X_test_encoder)
X_test_decoder_numpy = np.array(X_test_decoder)


In [12]:

# Preprocess audio data for test samples
X_test = []
test_ids = []  # To store test sample IDs
for sample in test_data:
    audio_path = os.path.join('extracted_content/clips', sample['src'])
    spectrogram = preprocess_audio(audio_path, (128, 200))  # Assuming spectrogram shape is (128, 200)
    if spectrogram is not None:
        X_test.append(spectrogram)
        test_ids.append(sample['id'])

# Convert test data to TensorFlow dataset
test_dataset = tf.data.Dataset.from_tensor_slices(X_test)


In [13]:
# Make predictions on test data
predictions = model.predict([X_test_encoder_numpy, X_test_decoder_numpy])

[1m43/43[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m63s[0m 1s/step


In [14]:
# Define a reverse mapping from indices to words or pictogram terms
reverse_word_index = {index: word for word, index in tokenizer.word_index.items()}

# Decode token indices into words or pictogram terms
hyp = []
for prediction in predictions:
    decoded_prediction = []
    prev_word = None
    for index in np.argmax(prediction, axis=-1):
        word = reverse_word_index.get(index, '')
        # Check if the current word is different from the previous one
        if word != prev_word:
            decoded_prediction.append(word)
            prev_word = word
    # Filter out empty strings and join the words/pictogram terms into a single string
    decoded_prediction_str = ' '.join(filter(None, decoded_prediction))
    # Capitalize the first letter of each word
    decoded_prediction_str = ' '.join(word.capitalize() for word in decoded_prediction_str.split())
    hyp.append(decoded_prediction_str)


In [15]:
# Print the first ID and its corresponding decoded prediction
print("ID:", id[0])
print("Decoded Prediction:", hyp[0])


ID: cefc-tcof-Acc_del_07-17
Decoded Prediction: Haut Rã©flã©chir Avoir Un Corps Truc Passã© Et Nous Rã©flã©chir Passã© Mettre Il Nous Rã©flã©chir Mettre Et Aprã¨s Savoir Rã©flã©chir Aprã¨s Savoir Au Faire Savoir Et Au Aprã¨s Savoir Et Trop Faire Savoir Avoir Et Faire Savoir Au Faire Au Tu Haut Rã©flã©chir Aprã¨s Rã©flã©chir Tu Rã©flã©chir Haut Rã©flã©chir Haut Savoir Haut Rã©flã©chir Vois Rã©flã©chir Truc Rã©flã©chir Vois Rã©flã©chir Heureusement Vois Le Vois Le Rã©flã©chir Savoir


In [16]:
print(predictions[0])

[[0.2195904  0.46313396 0.8887986  ... 0.         0.         0.        ]
 [0.21325555 0.5549171  0.95220447 ... 0.         0.         0.        ]
 [0.47446433 0.8124995  1.0803368  ... 0.         0.         0.        ]
 ...
 [0.13632785 0.37387532 0.5607461  ... 0.         0.         0.        ]
 [0.0520239  0.33662152 0.51994246 ... 0.         0.         0.        ]
 [0.         0.2733747  0.4927201  ... 0.         0.         0.        ]]


In [17]:
import json
import zipfile
import os

# Load test.json to get the IDs
with open('test.json', 'r') as f:
    test_data = json.load(f)

# Write the predictions and corresponding IDs to run.json
with open('run.json', 'w') as run_file:
    for idx, prediction in enumerate(hyp):
        # Get the corresponding ID from test.json
        sample_id = test_data[idx]['id']
        # Write to run.json
        json_line = {'id': sample_id, 'hyp': prediction}
        run_file.write(json.dumps(json_line) + '\n')

# Zip the run.json file
with zipfile.ZipFile('run.zip', 'w') as zipf:
    zipf.write('run.json', os.path.basename('run.json'))

In [25]:
import json
import zipfile
import numpy as np
import nltk
from nltk.tokenize import word_tokenize
from nltk.translate.bleu_score import corpus_bleu, SmoothingFunction
from nltk.translate.meteor_score import meteor_score



# Prepare data for run.json
run_data = []

for idx, (id, hyp) in enumerate(zip(id, hyp_valid)):
    print("id:", id)
    print("hyp:", hyp)
    
    # Append ID and Decoded Prediction to run_data
    run_data.append({"id": id, "hyp": hyp})

# Write run_data to run.json
with open("run.json", "w") as json_file:
    json.dump(run_data, json_file)

# Create run.zip containing run.json
with zipfile.ZipFile("run.zip", "w") as zip_file:
    zip_file.write("run.json")

print("run.json and run.zip created successfully.")


id: cefc-tcof-Acc_del_07-19
hyp: <oov> <oov> Ã©poque Parler Ils <oov> Le Maison Groupe Groupe But But Maison Lã Groupe Ils Maison Maison Me Oui Passã© 00 00 Un Un Passã© Maison 00 00 00 Oui Maison Maison Une Ãªtre Maison Maison Maison 00 00 00 Maison 00 00 00 00 Maison Maison Maison Maison Maison Me 00 Maison Me Me Maison Maison Maison Oui Maison Oui Oui Oui Oui Oui Oui Maison Alors Alors Oui Alors Alors Oui Oui Oui Que <oov> <oov> <oov> Que Ã©poque Ã©poque <oov> Alors Alors Alors Ã©poque Ã©poque Est <oov> <oov> <oov> Ã©poque <oov> <oov> <oov> <oov> <oov> <oov> <oov> Est Est <oov> <oov> <oov> <oov> Ã©poque <oov> <oov> <oov> <oov> <oov> <oov> <oov> <oov> <oov> Il Il <oov> <oov> <oov> <oov> <oov> <oov> <oov> <oov>
id: cefc-tcof-Acc_del_07-28
hyp: Les Les Celui Celui Alors Que Lã Le Passã© Ils Avoir Que Que Toi Le Le Il Nous Non Oui Oui Oui Un Un Ã Ã Alors Lã Lã Le Le Que Que Lã Lã Le Le Le Le Le Lã Lã Lã Lã Lã Lã Lã Lã Lã Lã Oui Oui Un Un Que Oui Oui Un Oui Oui Alors Alors Oui Il Alors T

In [26]:
import json
import nltk
from nltk.tokenize import word_tokenize
from nltk.translate.bleu_score import corpus_bleu, SmoothingFunction
from nltk.translate.meteor_score import meteor_score

# Smooth function for BLEU score
smooth = SmoothingFunction().method4

# Assuming decoded_predictions is a list containing the decoded predictions generated by your model
# And hypotheses is a list containing the ground truth hypotheses

# Preprocess hypotheses to match the format expected by corpus_bleu
references = valid_target_sequences

# Preprocess decoded_predictions to match the format expected by corpus_bleu
decoded_predictions_tokenized = [np.unique(hyp.split()) for hyp in hyp_valid]

# Calculate BLEU score with smoothing
bleu_score = corpus_bleu(references, decoded_predictions_tokenized, smoothing_function=smooth)

print("BLEU Score with smoothing:", bleu_score)

# Download NLTK resources if not already downloaded
nltk.download('wordnet')

# Tokenize references
tokenized_references = [word_tokenize(ref) for ref in valid_target_sequences]

# Tokenize hypotheses
tokenized_hypotheses = [word_tokenize(hyp) for hyp in hyp_valid]

# Calculate METEOR score
meteor_scores = [meteor_score([ref], hyp) for ref, hyp in zip(tokenized_references, tokenized_hypotheses)]
average_meteor_score = sum(meteor_scores) / len(meteor_scores)

print("Average METEOR Score:", average_meteor_score)

# Prepare data for JSON
output_data = {
    "BLEU Score with smoothing": bleu_score,
    "Average METEOR Score": average_meteor_score
}

# Write output_data to a JSON file
with open("prediction.json", "w") as json_file:
    json.dump(output_data, json_file)

print("Evaluation results written to evaluation_results.json.")


BLEU Score with smoothing: 8.643793438386789e-05


[nltk_data] Downloading package wordnet to C:\Users\Gabriel
[nltk_data]     prince\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


Average METEOR Score: 0.0484401267339582
Evaluation results written to evaluation_results.json.


In [None]:
import json
import zipfile

# Assume you have prediction_data and run_data dictionaries containing your prediction and run information

# Write prediction_data to prediction.json
with open("prediction.json", "w") as prediction_file:
    json.dump(output_data, prediction_file)

# Write run_data to run.json
with open("run.json", "w") as run_file:
    json.dump(run_data, run_file)

# Create a ZIP file
with zipfile.ZipFile("run.zip", "w") as zip_file:
    # Add prediction.json to the ZIP file
    zip_file.write("prediction.json", arcname="prediction.json")
    # Add run.json to the ZIP file
    zip_file.write("run.json", arcname="run.json")

print("run.zip created successfully.")


In [20]:
# Load test.json
with open('valid.json', 'r') as f:
    x_valid_data = json.load(f)

# Preprocess audio data for test samples
X_valid_encoder = []  # For encoder input
X_valid_decoder = []  # For decoder input
id = []  # To store test sample IDs

for sample in x_valid_data:
    audio_path = os.path.join('extracted_content/clips', sample['src'])
    spectrogram = preprocess_audio(audio_path, (128, 200))  # Assuming spectrogram shape is (128, 200)
    if spectrogram is not None:
        X_valid_encoder.append(spectrogram)
        X_valid_decoder.append(spectrogram)  # Decoder input same as encoder input
        id.append(sample['id'])

# Convert test data to NumPy arrays
X_valid_encoder_numpy = np.array(X_valid_encoder)
X_valid_decoder_numpy = np.array(X_valid_decoder)



In [21]:
perdict_valid = model.predict([X_valid_encoder_numpy,X_valid_decoder_numpy])

[1m43/43[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m68s[0m 2s/step


In [22]:
# Decode token indices into words or pictogram terms
hyp_valid = []
for prediction in perdict_valid:
    decoded_prediction = [reverse_word_index.get(index, '') for index in np.argmax(prediction, axis=-1)]
    # Find the index of the first padding token (0)
    padding_index = next((i for i, x in enumerate(decoded_prediction) if x == ''), len(decoded_prediction))
    # Truncate the decoded prediction to remove padding tokens
    decoded_prediction = decoded_prediction[:padding_index]
    # Filter out empty strings and join the words/pictogram terms into a single string
    decoded_prediction_str = ' '.join(filter(None, decoded_prediction))
    # Capitalize the first letter of each word
    decoded_prediction_str = ' '.join(word.capitalize() for word in decoded_prediction_str.split())
    hyp_valid.append(decoded_prediction_str)

In [23]:
# Define a reverse mapping from indices to words or pictogram terms
reverse_word_index = {index: word for word, index in tokenizer.word_index.items()}

# Decode token indices into words or pictogram terms
hyp_valid = []
for prediction in perdict_valid:
    decoded_prediction = [reverse_word_index.get(index, '') for index in np.argmax(prediction, axis=-1)]
    # Filter out empty strings and join the words/pictogram terms into a single string
    decoded_prediction_str = ' '.join(filter(None, decoded_prediction))
    # Capitalize the first letter of each word
    decoded_prediction_str = ' '.join(word.capitalize() for word in decoded_prediction_str.split())
    hyp_valid.append(decoded_prediction_str)

In [27]:
hyp_valid[0]

'<oov> <oov> Ã©poque Parler Ils <oov> Le Maison Groupe Groupe But But Maison Lã Groupe Ils Maison Maison Me Oui Passã© 00 00 Un Un Passã© Maison 00 00 00 Oui Maison Maison Une Ãªtre Maison Maison Maison 00 00 00 Maison 00 00 00 00 Maison Maison Maison Maison Maison Me 00 Maison Me Me Maison Maison Maison Oui Maison Oui Oui Oui Oui Oui Oui Maison Alors Alors Oui Alors Alors Oui Oui Oui Que <oov> <oov> <oov> Que Ã©poque Ã©poque <oov> Alors Alors Alors Ã©poque Ã©poque Est <oov> <oov> <oov> Ã©poque <oov> <oov> <oov> <oov> <oov> <oov> <oov> Est Est <oov> <oov> <oov> <oov> Ã©poque <oov> <oov> <oov> <oov> <oov> <oov> <oov> <oov> <oov> Il Il <oov> <oov> <oov> <oov> <oov> <oov> <oov> <oov>'

In [28]:
import nltk
from nltk.translate.bleu_score import corpus_bleu
from nltk.translate.bleu_score import SmoothingFunction, corpus_bleu

# Smooth function
smooth = SmoothingFunction().method4



# Assuming decoded_predictions is a list containing the decoded predictions generated by your model
# And hypotheses is a list containing the ground truth hypotheses

# Preprocess hypotheses to match the format expected by corpus_bleu
references = valid_target_sequences

# Preprocess decoded_predictions to match the format expected by corpus_bleu
decoded_predictions_tokenized =  [np.unique(hyp.split()) for hyp in hyp_valid]


# Calculate BLEU score with smoothing
bleu_score = corpus_bleu(references,decoded_predictions_tokenized, smoothing_function=smooth)

print("BLEU Score with smoothing:", bleu_score)


BLEU Score with smoothing: 8.643793438386789e-05


In [29]:
import nltk
nltk.download('wordnet')

from nltk.tokenize import word_tokenize
from nltk.translate.meteor_score import meteor_score

# Tokenize references
tokenized_references = [word_tokenize(ref) for ref in valid_target_sequences]

# Tokenize hypotheses
tokenized_hypotheses = [word_tokenize(hyp) for hyp in hyp_valid]

# Calculate METEOR score
meteor_scores = [meteor_score([ref], hyp) for ref, hyp in zip(tokenized_references, tokenized_hypotheses)]
average_meteor_score = sum(meteor_scores) / len(meteor_scores)

print("Average METEOR Score:", average_meteor_score)


[nltk_data] Downloading package wordnet to C:\Users\Gabriel
[nltk_data]     prince\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


Average METEOR Score: 0.0484401267339582


In [30]:
len(references) , len(decoded_predictions_tokenized)

(1348, 1348)

In [33]:
model.save(r"D:\OneDrive\Desktop\IMAGECLEF\Model\speech_to_picto.keras")

In [36]:
from tensorflow.keras.models import load_model

# Load the model
summa = load_model(r"D:\OneDrive\Desktop\IMAGECLEF\Model\speech_to_picto.keras")