<a href="https://colab.research.google.com/github/hiteshJindal/Thesis_Audio_Uncertainity/blob/main/Main_thesis_file_noise_with_spectograms.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pydub
import pandas as pd
import os
import numpy as np
import librosa
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import Conv1D, GlobalMaxPooling1D
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Embedding, LSTM, Bidirectional, Dropout, BatchNormalization, GlobalMaxPooling1D, SpatialDropout1D, Flatten, Concatenate, Input

In [None]:
from google.colab import drive
drive.mount('/gdrive')

In [None]:
# Create an empty DataFrame to store the results
final_df = pd.DataFrame(columns=['Transcript', 'phoneme_likelihood', 'phones'])

#directory = '/gdrive/MyDrive/Input_large_final/Input_large/textGrid_training_large/'
directory = '/gdrive/MyDrive/Input_large_final/Input_large/senddrive/LargeFiles/TextGrid_noise_training'
wav_files_directory = '/gdrive/MyDrive/Input_large_final/Input_large/senddrive/LargeFiles/wav_noise_training'


def parse_textgrid(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()

    data = []
    start_time, end_time, label = None, None, None
    for line in lines:
        line = line.strip()
        if line.startswith('xmin'):
            start_time = float(line.split('=')[1].strip())
        elif line.startswith('xmax'):
            end_time = float(line.split('=')[1].strip())
        elif line.startswith('text'):
            label = line.split('=')[1].strip().strip('"')
            if start_time is not None and end_time is not None and label is not None:
                data.append((start_time, end_time, label))
                start_time, end_time, label = None, None, None

    return data

def textgrid_to_dataframe(file_path):
    data = parse_textgrid(file_path)
    df = pd.DataFrame(data, columns=['Start Time', 'End Time', 'Label'])
    return df

def extract_mfcc_spectrogram(file_path):
    audio, sr = librosa.load(file_path)
    mfcc = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13)
    mfcc_delta = librosa.feature.delta(mfcc)
    mfcc_delta_delta = librosa.feature.delta(mfcc, order=2)
    spectrogram = np.concatenate((mfcc, mfcc_delta, mfcc_delta_delta), axis=0)
    return spectrogram


def extract_mel_spectrogram(file_path, n_mels=32, hop_length=512):
    spectrograms = []
    audio, sr = librosa.load(file_path)
    mfcc_spectrogram = extract_mfcc_spectrogram(file_path)
    mel_spectrogram = librosa.feature.melspectrogram(S=librosa.feature.inverse.mfcc_to_mel(mfcc_spectrogram),
                                                     n_mels=n_mels, hop_length=hop_length)
    mel_spectrogram_db = librosa.power_to_db(mel_spectrogram, ref=np.max)
    spectrograms.append(mel_spectrogram_db)
    return np.array(spectrograms)


# Iterate over the files in the directory
for filename in os.listdir(directory):
    if filename.endswith('.TextGrid'):
        file_path = os.path.join(directory, filename)

        # Process the file and obtain the necessary dataframes
        df = textgrid_to_dataframe(file_path)

        # Get the indices of the matched rows
        indices = df.index[(df['Start Time'] == df['Start Time'].iloc[0]) & (df['End Time'] == df['End Time'].iloc[0])]

        # Split the DataFrame based on indices
        first_df = df.loc[:indices[-1]]
        second_df = df.loc[indices[-1]+1:]

        # Remove rows with blank or null labels from first_df
        first_df = first_df[first_df['Label'].notnull() & (first_df['Label'] != "")]

        # Remove rows with blank or null labels from second_df
        second_df = second_df[second_df['Label'].notnull() & (second_df['Label'] != "")]

        # Combine labels from first_df into a single sentence
        combined_sentence = ' '.join(first_df['Label'].tolist())

        # Create Combined_df with the combined sentence
        combined_df = pd.DataFrame({'Transcript': [combined_sentence]})

        # Find the highest occurring string in second_df
        phoneme_likelihood = second_df['Label'].mode().iloc[0]

        # Create Transcript DataFrame with the most probable phoneme
        transcript_df = pd.DataFrame({'phoneme_likelihood': [phoneme_likelihood]})

        # Create Phones DataFrame with the list of phones
        phones_df = pd.DataFrame({'Phones': [second_df['Label'].tolist()]})

        # Extract the MFCC mel spectrogram
        audio_file_path = os.path.join(wav_files_directory, filename[:-9] + '.wav')
        mfcc_spectrogram = extract_mfcc_spectrogram(audio_file_path)

        # Extract the Mel spectrograms
        mel_spectrograms = extract_mel_spectrogram(audio_file_path)

        # Create MFCC Spectrogram DataFrame with the MFCC mel spectrogram
        mfcc_spectrogram_df = pd.DataFrame({'MFCC_Spectrogram': [mfcc_spectrogram]})

        # Create Mel Spectrogram DataFrame with the mel spectrogram
        mel_spectrograms_df = pd.DataFrame({'Mel_Spectrograms': [mel_spectrograms]})

        # Concatenate the DataFrames and append to the final_df
        result_df = pd.concat([combined_df, transcript_df, phones_df, mfcc_spectrogram_df, mel_spectrograms_df], axis=1)
        final_df = pd.concat([final_df, result_df], ignore_index=True)


In [None]:
final_df.shape

In [None]:
import random
# Create an empty DataFrame to store the results
final_test_df = pd.DataFrame(columns=['Transcript', 'phoneme_likelihood', 'Phones'])

directory = '/gdrive/MyDrive/Input_large_final/Input_large/senddrive/LargeFiles/Textgrid_noise_test'
wav_test_files_directory = '/gdrive/MyDrive/Input_large_final/Input_large/senddrive/LargeFiles/wav_noise_test'

def parse_textgrid(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()

    data = []
    start_time, end_time, label = None, None, None
    for line in lines:
        line = line.strip()
        if line.startswith('xmin'):
            start_time = float(line.split('=')[1].strip())
        elif line.startswith('xmax'):
            end_time = float(line.split('=')[1].strip())
        elif line.startswith('text'):
            label = line.split('=')[1].strip().strip('"')
            if start_time is not None and end_time is not None and label is not None:
                data.append((start_time, end_time, label))
                start_time, end_time, label = None, None, None

    return data

def textgrid_to_dataframe(file_path):
    data = parse_textgrid(file_path)
    df = pd.DataFrame(data, columns=['Start Time', 'End Time', 'Label'])
    return df

def extract_mfcc_spectrogram(file_path):
    audio, sr = librosa.load(file_path)
    mfcc = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13)
    mfcc_delta = librosa.feature.delta(mfcc)
    mfcc_delta_delta = librosa.feature.delta(mfcc, order=2)
    spectrogram = np.concatenate((mfcc, mfcc_delta, mfcc_delta_delta), axis=0)
    return spectrogram

def extract_mel_spectrogram(file_path, n_mels=32, hop_length=512):
    spectrograms = []
    audio, sr = librosa.load(file_path)
    mfcc_spectrogram = extract_mfcc_spectrogram(file_path)
    mel_spectrogram = librosa.feature.melspectrogram(S=librosa.feature.inverse.mfcc_to_mel(mfcc_spectrogram),
                                                     n_mels=n_mels, hop_length=hop_length)
    mel_spectrogram_db = librosa.power_to_db(mel_spectrogram, ref=np.max)
    spectrograms.append(mel_spectrogram_db)
    return np.array(spectrograms)

# Iterate over the files in the directory
for filename in os.listdir(directory):
    if filename.endswith('.TextGrid'):
        file_path = os.path.join(directory, filename)

        # Process the file and obtain the necessary dataframes
        df = textgrid_to_dataframe(file_path)

        # Get the indices of the matched rows
        indices = df.index[(df['Start Time'] == df['Start Time'].iloc[0]) & (df['End Time'] == df['End Time'].iloc[0])]

        # Split the DataFrame based on indices
        first_test_df = df.loc[:indices[-1]]
        second_test_df = df.loc[indices[-1]+1:]

        # Remove rows with blank or null labels from first_df
        first_test_df = first_test_df[first_test_df['Label'].notnull() & (first_test_df['Label'] != "")]

        # Remove rows with blank or null labels from second_df
        second_test_df = second_test_df[second_test_df['Label'].notnull() & (second_test_df['Label'] != "")]

        # Combine labels from first_df into a single sentence
        combined_test_sentence = ' '.join(first_test_df['Label'].tolist())

        # Create Combined_df with the combined sentence
        combined_test_df = pd.DataFrame({'Transcript': [combined_test_sentence]})

        # Find the highest occurring strings in second_df
        highest_occurrences = second_test_df['Label'].mode()

        if not highest_occurrences.empty:
            # Choose a random element from the list of highest occurrences
            phoneme_test_likelihood = random.choice(highest_occurrences.tolist())
        else:
            phoneme_test_likelihood = None  # Handle the case where there are no labels
        # Create Transcript DataFrame with the highest occurring string
        transcript_test_df = pd.DataFrame({'phoneme_likelihood': [phoneme_test_likelihood]})

        # Create Phones DataFrame with the list of phones
        phones_test_df = pd.DataFrame({'Phones': [second_test_df['Label'].tolist()]})

        # Extract the MFCC mel spectrogram
        audio_file_path = os.path.join(wav_test_files_directory, filename[:-9] + '.wav')
        mfcc_test_spectogram = extract_mfcc_spectrogram(audio_file_path)

        # Extract the Mel spectrograms
        mel_test_spectrograms = extract_mel_spectrogram(audio_file_path)

        # Create MFCC Spectrogram DataFrame with the MFCC mel spectrogram
        mfcc_spectrogram_test_df = pd.DataFrame({'MFCC_Spectrogram': [mfcc_test_spectogram]})

        # Create Mel Spectrogram DataFrame with the mel spectrogram
        mel_spectrograms_test_df = pd.DataFrame({'Mel_Spectrograms': [mel_test_spectrograms]})

        # Concatenate the DataFrames and append to the final_df
        result_test_df = pd.concat([combined_test_df, transcript_test_df, phones_test_df, mfcc_spectrogram_test_df, mel_spectrograms_test_df], axis=1)
        final_test_df = pd.concat([final_test_df, result_test_df], ignore_index=True)


In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Embedding, LSTM, Bidirectional, Dropout, BatchNormalization, GlobalMaxPooling1D, SpatialDropout1D, Flatten, Input, Concatenate
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import Conv1D, GlobalMaxPooling1D
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import LSTM, MaxPooling1D
import tensorflow as tf


X_transcript = final_df['Transcript']
X_MFCC_spectrograms = final_df['MFCC_Spectrogram']
y = final_df['phoneme_likelihood']

#  the data into training and test sets
X_train_transcript, X_test_transcript, X_train_MFCC_spectrograms, X_test_MFCC_spectrograms, y_train, y_test = train_test_split(
    X_transcript, X_MFCC_spectrograms, y, test_size=0.2, random_state=42)

# Initialize the tokenizer
tokenizer = Tokenizer()

# Fit the tokenizer on the transcript training data
tokenizer.fit_on_texts(X_train_transcript)

# Convert transcript text to sequences
X_train_transcript_seq = tokenizer.texts_to_sequences(X_train_transcript)
X_test_transcript_seq = tokenizer.texts_to_sequences(X_test_transcript)

# Pad transcript sequences to have the same length
max_length_transcript = max(max(len(seq) for seq in X_train_transcript_seq), max(len(seq) for seq in X_test_transcript_seq))
X_train_transcript_padded = pad_sequences(X_train_transcript_seq, maxlen=max_length_transcript, padding='post')
X_test_transcript_padded = pad_sequences(X_test_transcript_seq, maxlen=max_length_transcript, padding='post')

# Find the maximum sequence length for transcript
max_length_transcript = max(len(seq) for seq in X_train_transcript_seq + X_test_transcript_seq)

# Find the maximum number of features for spectrograms
max_features_spectrogram = max(arr.shape[1] for arr in X_train_MFCC_spectrograms + X_test_MFCC_spectrograms)

# Pad or truncate the transcript sequences to have the same length
X_train_transcript_padded = pad_sequences(X_train_transcript_seq, maxlen=max_length_transcript, padding='post')
X_test_transcript_padded = pad_sequences(X_test_transcript_seq, maxlen=max_length_transcript, padding='post')

# Pad or truncate the spectrogram arrays to have the same number of features
X_train_MFCC_spectrograms_padded = np.array([np.pad(arr[:, :max_features_spectrogram], ((0, 0), (0, max_features_spectrogram - arr.shape[1])), mode='constant') for arr in X_train_MFCC_spectrograms])
X_test_MFCC_spectrograms_padded = np.array([np.pad(arr[:, :max_features_spectrogram], ((0, 0), (0, max_features_spectrogram - arr.shape[1])), mode='constant') for arr in X_test_MFCC_spectrograms])


# Encode the response variable
label_encoder = LabelEncoder()
label_encoder.fit(y)  # Fit the label encoder on all labels in y
y_train_encoded = label_encoder.transform(y_train)
y_test_encoded = label_encoder.transform(y_test)

# Find the maximum number of features for spectrograms
max_features_spectrogram = max(max(arr.shape[1] for arr in X_train_MFCC_spectrograms), max(arr.shape[1] for arr in X_test_MFCC_spectrograms))

# Pad or truncate the spectrogram arrays to have the same number of features
X_train_MFCC_spectrograms_padded = np.array([np.pad(arr[:, :777], ((0, 0), (0, 777 - arr.shape[1])), mode='constant') if arr.shape[1] < 777 else arr[:, :777] for arr in X_train_MFCC_spectrograms])
X_test_MFCC_spectrograms_padded = np.array([np.pad(arr[:, :777], ((0, 0), (0, 777 - arr.shape[1])), mode='constant') if arr.shape[1] < 777 else arr[:, :777] for arr in X_test_MFCC_spectrograms])

# Define the model
input_transcript = Input(shape=(max_length_transcript,))
embedding = Embedding(input_dim=len(tokenizer.word_index) + 1, output_dim=128)(input_transcript)
conv1d_transcript = Conv1D(filters=64, kernel_size=3, activation='relu')(embedding)
conv1d_transcript = BatchNormalization()(conv1d_transcript)
conv1d_transcript = GlobalMaxPooling1D()(conv1d_transcript)
conv1d_transcript = Dropout(0.5)(conv1d_transcript)
dense = Dense(units=128, activation='relu', kernel_regularizer=l2(0.0001))(conv1d_transcript)
dense = Dropout(0.5)(dense)

# Add LSTM layer to the transcript part
lstm_transcript = LSTM(64)(embedding)
lstm_transcript = Dropout(0.5)(lstm_transcript)

# Merge the LSTM and Conv1D features
merged_features = Concatenate()([dense, lstm_transcript])
input_spectrogram = Input(shape=(39, max_features_spectrogram))
conv1d = Conv1D(filters=128, kernel_size=3, activation='relu')(input_spectrogram)
conv1d = MaxPooling1D(pool_size=2)(conv1d)
conv1d = BatchNormalization()(conv1d)
conv1d = Dropout(0.5)(conv1d)

# Add another Conv1D layer
conv1d = Conv1D(filters=128, kernel_size=3, activation='relu')(conv1d)
conv1d = MaxPooling1D(pool_size=2)(conv1d)
conv1d = BatchNormalization()(conv1d)
conv1d = Dropout(0.5)(conv1d)

# Flatten the Conv1D output
conv1d = Flatten()(conv1d)

# Merge the transcript and spectrogram features
merged_features = Concatenate()([conv1d, merged_features])

dense = Dense(units=256, activation='relu')(merged_features)
dense = BatchNormalization()(dense)
dense = Dropout(0.5)(dense)

output = Dense(units=len(label_encoder.classes_), activation='softmax')(dense)

model = Model(inputs=[input_transcript, input_spectrogram], outputs=output)

# Compile the model
model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Early stopping callback
#early_stopping = EarlyStopping(patience=3, restore_best_weights=True)

# Train the model
model.fit([X_train_transcript_padded, X_train_MFCC_spectrograms_padded], y_train_encoded,
          epochs=10, batch_size=16, validation_data=([X_test_transcript_padded, X_test_MFCC_spectrograms_padded], y_test_encoded))

# Calculate training and validation accuracy
_, train_accuracy = model.evaluate([X_train_transcript_padded, X_train_MFCC_spectrograms_padded], y_train_encoded)
_, test_accuracy = model.evaluate([X_test_transcript_padded, X_test_MFCC_spectrograms_padded], y_test_encoded)

print('Training Accuracy:', train_accuracy)
print('Validation Accuracy:', test_accuracy)


In [None]:
# Extract the input features from the final test data
X_new_mfcc_spectrograms = final_test_df['MFCC_Spectrogram']
X_new_transcripts = final_test_df['Transcript']

# Convert text to sequences using the tokenizer fitted on the training data
X_new_transcript_seq = tokenizer.texts_to_sequences(X_new_transcripts)
X_new_transcript_padded = pad_sequences(X_new_transcript_seq, maxlen=max_length_transcript, padding='post')

#print(X_new_spectrograms.shape, X_new_spectrograms.dtype)
# Make predictions on the new data
# Find the maximum number of features for spectrograms


# Pad or truncate the spectrogram arrays to have the same number of features
X_new_mfcc_spectrograms_padded = np.array([np.pad(arr[:, :777], ((0, 0), (0, 777 - arr.shape[1])), mode='constant') if arr.shape[1] < 777 else arr[:, :777] for arr in X_new_mfcc_spectrograms])
predictions = model.predict([X_new_transcript_padded, X_new_mfcc_spectrograms_padded])

# Decode the predicted labels
predicted_labels = label_encoder.inverse_transform(np.argmax(predictions, axis=1))
print(predicted_labels)

# Calculate the accuracy on the new data
accuracy = np.mean(predicted_labels == final_test_df['phoneme_likelihood'])
print("Accuracy on New Data:", accuracy)
