In [None]:
# -*- coding: utf-8 -*-




from google.colab import drive
drive.mount('/content/drive')

!pip install praat-parselmouth

import warnings
import numpy as np
import os
import pandas as pd
import librosa
import librosa.display
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Bidirectional, LSTM, Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import parselmouth
from tqdm import tqdm
from functools import lru_cache

# Path to TIMIT dataset
timit_path = "/content/drive/MyDrive/archive/data"
train_csv = "/content/drive/MyDrive/train_phoneme_labels.csv"
test_csv = "/content/drive/MyDrive/test_phoneme_labels.csv"

# Function to process a single .wav and .phn file
def process_wav_phn(wav_path, phn_path):
    phoneme_data = []

    with open(phn_path, "r") as f:
        for line in f:
            parts = line.strip().split()
            start_sample = int(parts[0])  # Start sample index
            end_sample = int(parts[1])    # End sample index
            phoneme = parts[2]            # Phoneme label
            phoneme_data.append([wav_path, start_sample, end_sample, phoneme])

    return phoneme_data

# Separate lists for train and test
train_data = []
test_data = []

# Walk through dataset and process files
for root, _, files in os.walk(timit_path):
    for file in files:
        if file.endswith(".WAV"):
            wav_path = os.path.join(root, file)
            phn_path = wav_path.replace(".WAV", ".PHN")

            if os.path.exists(phn_path):  # Ensure .phn file exists
                if "TRAIN" in root:
                    train_data.extend(process_wav_phn(wav_path, phn_path))
                elif "TEST" in root:
                    test_data.extend(process_wav_phn(wav_path, phn_path))

# Convert to DataFrames
train_df = pd.DataFrame(train_data, columns=["WAV_File", "Start_Sample", "End_Sample", "Phoneme"])
test_df = pd.DataFrame(test_data, columns=["WAV_File", "Start_Sample", "End_Sample", "Phoneme"])

# Save to CSV
train_df.to_csv(train_csv, index=False)
test_df.to_csv(test_csv, index=False)

print(f"Saved TRAIN phoneme labels to {train_csv} ({len(train_df)} segments)")
print(f"Saved TEST phoneme labels to {test_csv} ({len(test_df)} segments)")

def extract_audio_segment(wav_path, start_sample, end_sample, sr=16000, min_length=0.01):
    try:
        audio, _ = librosa.load(wav_path, sr=sr)
        segment = audio[start_sample:end_sample]

        # Check if segment is too short
        if len(segment) < min_length * sr:
            # Apply padding to minimum length
            pad_size = int(min_length * sr) - len(segment)
            segment = np.pad(segment, (0, pad_size), mode='constant')

        return segment
    except Exception as e:
        print(f"Error loading {wav_path}: {str(e)}")
        return None

def extract_praat_features(segment, sr=16000):
    try:
        # Save the audio to a temporary WAV file (Parselmouth expects file or array)
        snd = parselmouth.Sound(segment, sampling_frequency=sr)

        pitch = snd.to_pitch()
        intensity = snd.to_intensity()
        harmonicity = snd.to_harmonicity()
        formants = snd.to_formant_burg()

        features = []

        # Pitch stats
        pitch_values = pitch.selected_array['frequency']
        pitch_values = pitch_values[pitch_values > 0]  # Remove unvoiced
        features.append(np.mean(pitch_values) if len(pitch_values) > 0 else 0)
        features.append(np.std(pitch_values) if len(pitch_values) > 0 else 0)

        # Intensity
        features.append(np.mean(intensity.values))
        features.append(np.std(intensity.values))

        # Harmonicity
        harmonicity_values = harmonicity.values[0]
        harmonicity_values = harmonicity_values[np.isfinite(harmonicity_values)]
        features.append(np.mean(harmonicity_values) if len(harmonicity_values) > 0 else 0)

        # Formants (mean F1 to F4)
        for i in range(1, 5):
            formant_values = []
            for t in np.arange(0, snd.duration, 0.01):  # Every 10 ms
                f = formants.get_value_at_time(i, t)
                if f and not np.isnan(f) and f < 5000:
                    formant_values.append(f)
            features.append(np.mean(formant_values) if formant_values else 0)

        return np.array(features)

    except Exception as e:
        print(f"Parselmouth error: {e}")
        return np.zeros(9)


def extract_time_domain_features(segment, min_frames, n_features):
    """Comprehensive fallback feature extraction"""
    if len(segment) == 0:
        return np.zeros((min_frames, n_features))

    # Basic time-domain features
    features = [
        np.max(np.abs(segment)),
        np.mean(np.abs(segment)),
        np.std(segment),
        librosa.feature.zero_crossing_rate(
            segment,
            frame_length=len(segment),
            center=False
        )[0,0],
        np.sum(segment**2),
        *np.percentile(np.abs(segment), [10, 25, 50, 75, 90])
    ]

    # Spectral features from very short FFT
    if len(segment) >= 8:
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            spec = np.abs(librosa.stft(
                segment,
                n_fft=min(32, len(segment)),
                center=False
            ))
            features.extend([
                np.max(spec),
                np.mean(spec),
                np.median(spec)
            ])

    # Pad to required dimension
    features = features[:n_features]
    if len(features) < n_features:
        features += [0.0] * (n_features - len(features))

    return np.tile(features, (min_frames, 1))

def process_dataset(df, sr=16000):
    features = []
    labels = []
    problematic_files = []

    # Using tqdm to show a progress bar during processing
    for index, row in tqdm(df.iterrows(), total=df.shape[0], desc="Processing dataset", unit="file"):
        wav_path = row['WAV_File']
        start_sample = row['Start_Sample']  # Assuming these are sample indices
        end_sample = row['End_Sample']
        phoneme = row['Phoneme']

        # Extract audio segment (using sample indices directly)
        segment = extract_audio_segment(wav_path, start_sample, end_sample, sr=sr)

        # Extract features
        praat_features = extract_praat_features(segment, sr=sr)
        features.append(praat_features)


        # Store results
        if mfcc_features is not None:
            features.append(mfcc_features)
            labels.append(phoneme)
        else:
            problematic_files.append(wav_path)

    if problematic_files:
        print(f"Could not process {len(problematic_files)} files (e.g., {problematic_files[:3]}...)")

    # Stack features into 3D array: (n_samples, n_frames, n_mfcc)
    features_array = np.stack(features)
    labels_array = np.array(labels)

    return features_array, labels_array

# Load data
train_df = pd.read_csv('/content/drive/MyDrive/archive/data/train_phoneme_labels.csv')
test_df = pd.read_csv('/content/drive/MyDrive/archive/data/test_phoneme_labels.csv')

# Process with checks
print("Processing training data...")
train_features, train_labels = process_dataset(train_df)

print("\nProcessing test data...")
test_features, test_labels = process_dataset(test_df)

# Verify shapes
print(f"\nTrain features shape: {train_features.shape} (samples, frames, mfccs)")
print(f"Test features shape: {test_features.shape}")

# 1. Prepare the data
# Encode labels
label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(train_labels)
y_test_encoded = label_encoder.transform(test_labels)

# Convert to one-hot
num_classes = len(label_encoder.classes_)
y_train_onehot = to_categorical(y_train_encoded, num_classes=num_classes)
y_test_onehot = to_categorical(y_test_encoded, num_classes=num_classes)

# 2. Define the BLSTM model
def create_blstm_model(input_shape, num_classes):
    inputs = Input(shape=input_shape)

    # BLSTM layers
    x = Bidirectional(LSTM(128, return_sequences=True))(inputs)
    x = Dropout(0.3)(x)
    x = BatchNormalization()(x)

    x = Bidirectional(LSTM(128))(x)
    x = Dropout(0.3)(x)
    x = BatchNormalization()(x)

    # Dense layers
    x = Dense(64, activation='relu')(x)
    x = Dropout(0.2)(x)

    outputs = Dense(num_classes, activation='softmax')(x)

    model = Model(inputs=inputs, outputs=outputs)
    return model

# 3. Create and compile the model
input_shape = (train_features.shape[1],)  # e.g., (n_features,)  (frames, mfcc_features)
model = create_blstm_model(input_shape, num_classes)

model.compile(
    optimizer=Adam(learning_rate=0.001),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

# Print model summary
model.summary()

# 4. Train the model
batch_size = 64
epochs = 50

# Optional: Create validation split
X_train, X_val, y_train, y_val = train_test_split(
    train_features, y_train_onehot,
    test_size=0.1, random_state=42
)

history = model.fit(
    X_train, y_train,
    batch_size=batch_size,
    epochs=epochs,
    validation_data=(X_val, y_val),
    verbose=1
)

# 5. Evaluate on test set
test_loss, test_acc = model.evaluate(
    test_features, y_test_onehot,
    batch_size=batch_size,
    verbose=1
)

print(f"\nTest Accuracy: {test_acc:.4f}")
print(f"Test Loss: {test_loss:.4f}")

# 6. Save the model
model.save('phoneme_classification_blstm.h5')
print("Model saved to phoneme_classification_blstm.h5")