### preprocess_audio

In [None]:
import os
import librosa
import soundfile as sf
import scipy.signal as signal
import numpy as np

# مسیرها
input_dir = 'data/raw_recordings'
output_dir = 'data/preprocessed_recordings'
os.makedirs(output_dir, exist_ok=True)

# تنظیمات
target_sr = 16000
subtype = 'PCM_16'
low_cut = 300
high_cut = 3400
order = 5

# توابع پردازش
def normalize_audio(y):
    return y / np.max(np.abs(y))

def trim_silence(y, top_db=30):
    return librosa.effects.trim(y, top_db=top_db)[0]

def bandpass_filter(y, sr, low_cut, high_cut, order=5):
    nyquist = 0.5 * sr
    low = low_cut / nyquist
    high = high_cut / nyquist
    b, a = signal.butter(order, [low, high], btype='band')
    return signal.lfilter(b, a, y)

# پردازش فایل‌ها
for filename in os.listdir(input_dir):
    if filename.endswith('.wav'):
        input_path = os.path.join(input_dir, filename)
        output_path = os.path.join(output_dir, filename)

        # 1. بارگذاری و تبدیل به نرخ ۱۶kHz
        y, sr = librosa.load(input_path, sr=target_sr)

        # 2. نرمال‌سازی دامنه
        y = normalize_audio(y)

        # 3. حذف سکوت از ابتدا و انتها
        y = trim_silence(y)

        # 4. فیلتر باندپَس (حذف نویز خارج از بازه گفتار)
        y = bandpass_filter(y, sr, low_cut, high_cut, order)

        # 5. ذخیره با فرمت ۱۶ بیت
        sf.write(output_path, y, sr, subtype=subtype)

        print(f'✅ Processed and saved: {filename}')


✅ Processed and saved: Bale_000001.wav
✅ Processed and saved: Bale_000002.wav
✅ Processed and saved: Bale_000003.wav
✅ Processed and saved: Bale_000004.wav
✅ Processed and saved: Bale_000005.wav
✅ Processed and saved: Bale_001.wav
✅ Processed and saved: Bale_002.wav
✅ Processed and saved: Bale_003.wav
✅ Processed and saved: Bale_004.wav
✅ Processed and saved: Bale_005.wav
✅ Processed and saved: Bale_01.wav
✅ Processed and saved: Bale_02.wav
✅ Processed and saved: Bale_03.wav
✅ Processed and saved: Bale_04.wav
✅ Processed and saved: Bale_05.wav
✅ Processed and saved: BesiarAali_000001.wav
✅ Processed and saved: BesiarAali_000002.wav
✅ Processed and saved: BesiarAali_000003.wav
✅ Processed and saved: BesiarAali_000004.wav
✅ Processed and saved: BesiarAali_000005.wav
✅ Processed and saved: BesiarAali_001.wav
✅ Processed and saved: BesiarAali_002.wav
✅ Processed and saved: BesiarAali_003.wav
✅ Processed and saved: BesiarAali_004.wav
✅ Processed and saved: BesiarAali_005.wav
✅ Processed and

### plot_waveform

In [5]:
import os
import librosa
import soundfile as sf
import scipy.signal as signal
import numpy as np
import matplotlib.pyplot as plt
import librosa.display

# Path to preprocessed audio files and output directory
input_dir = 'data/preprocessed_recordings'
output_dir = 'plot_waveform'
target_sr = 16000

# Create output directory if it doesn't exist
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Feature extraction functions
def compute_rms(y):
    return np.sqrt(np.mean(y**2))

def compute_zcr(y):
    return np.mean(librosa.zero_crossings(y, pad=False))

def compute_energy(y):
    return np.sum(y**2)

def save_waveform_plot(y, sr, title, rms, zcr, energy):
    plt.figure(figsize=(10, 2.5))
    librosa.display.waveshow(y, sr=sr)
    plt.title(f'Waveform: {title}')
    plt.xlabel("Time (seconds)")
    plt.ylabel("Amplitude")
    
    # Add feature text to the plot
    text_str = f'RMS: {rms:.4f}\nZCR: {zcr:.4f}\nEnergy: {energy:.2f}'
    plt.text(0.02, 0.98, text_str, transform=plt.gca().transAxes, 
             verticalalignment='top', bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
    
    plt.tight_layout()
    output_path = os.path.join(output_dir, f'waveform_{title}.png')
    plt.savefig(output_path)
    plt.close()

def save_fft_plot(y, sr, title, rms, zcr, energy):
    Y = np.fft.rfft(y)
    freqs = np.fft.rfftfreq(len(y), 1/sr)
    plt.figure(figsize=(10, 2.5))
    plt.plot(freqs, np.abs(Y))
    plt.title(f'Frequency Spectrum (FFT): {title}')
    plt.xlabel("Frequency (Hz)")
    plt.ylabel("Magnitude")
    plt.xlim(0, 8000)
    
    # Add feature text to the plot
    text_str = f'RMS: {rms:.4f}\nZCR: {zcr:.4f}\nEnergy: {energy:.2f}'
    plt.text(0.02, 0.98, text_str, transform=plt.gca().transAxes, 
             verticalalignment='top', bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
    
    plt.tight_layout()
    output_path = os.path.join(output_dir, f'fft_{title}.png')
    plt.savefig(output_path)
    plt.close()

def save_features_to_text(filename, rms, zcr, energy):
    output_path = os.path.join(output_dir, f'features_{filename}.txt')
    with open(output_path, 'w') as f:
        f.write(f'File: {filename}\n')
        f.write(f'RMS: {rms:.4f}\n')
        f.write(f'Zero-Crossing Rate: {zcr:.4f}\n')
        f.write(f'Energy: {energy:.2f}\n')

# Analyze each audio file
for filename in os.listdir(input_dir):
    if filename.endswith('.wav'):
        path = os.path.join(input_dir, filename)
        y, sr = librosa.load(path, sr=target_sr)

        # Compute features
        rms = compute_rms(y)
        zcr = compute_zcr(y)
        energy = compute_energy(y)

        # Save features to text file
        save_features_to_text(filename, rms, zcr, energy)

        # Save visualizations with features annotated
        save_waveform_plot(y, sr, filename, rms, zcr, energy)
        save_fft_plot(y, sr, filename, rms, zcr, energy)

### MFCC

In [6]:
import os
import numpy as np
import librosa
import librosa.display
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d

class FeatureExtractor:
    def __init__(self, sample_rate=16000, n_mfcc=13, n_fft=2048,
                 hop_length=512, n_mels=128):
        self.sample_rate = sample_rate
        self.n_mfcc = n_mfcc
        self.n_fft = n_fft
        self.hop_length = hop_length
        self.n_mels = n_mels

    def pre_emphasis(self, signal, pre_emph_coef=0.97):
        return np.append(signal[0], signal[1:] - pre_emph_coef * signal[:-1])

    def remove_silence(self, y, threshold=0.01):
        energy = librosa.feature.rms(y=y)[0]
        frames = np.nonzero(energy > threshold)
        if frames[0].size:
            y = y[librosa.frames_to_samples(frames[0][0]):librosa.frames_to_samples(frames[0][-1])]
        return y

    def extract_mfcc(self, y):
        y = self.pre_emphasis(y)
        mfcc = librosa.feature.mfcc(y=y, sr=self.sample_rate, n_mfcc=self.n_mfcc,
                                    n_fft=self.n_fft, hop_length=self.hop_length,
                                    n_mels=self.n_mels)
        mfcc = (mfcc - np.mean(mfcc)) / np.std(mfcc)
        return mfcc

    def extract_delta_features(self, mfcc):
        delta = librosa.feature.delta(mfcc)
        delta_delta = librosa.feature.delta(mfcc, order=2)
        combined = np.vstack([mfcc, delta, delta_delta])
        return combined

    def normalize_feature_length(self, features, target_length=50):
        n_features, original_length = features.shape
        x_old = np.linspace(0, 1, original_length)
        x_new = np.linspace(0, 1, target_length)
        interpolated = np.zeros((n_features, target_length))
        for i in range(n_features):
            f = interp1d(x_old, features[i], kind='linear')
            interpolated[i] = f(x_new)
        return interpolated

    def extract_energy(self, y):
        return np.sum(y ** 2)

    def save_mfcc_spectrogram(self, mfcc, title, energy):
        plt.figure(figsize=(8, 3))
        librosa.display.specshow(mfcc, x_axis='time')
        plt.colorbar()
        plt.title(f'MFCC: {title}')
        # Add energy text to the plot
        text_str = f'Energy: {energy:.2f}'
        plt.text(0.02, 0.98, text_str, transform=plt.gca().transAxes, 
                 verticalalignment='top', bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
        plt.tight_layout()
        output_path = os.path.join('mfcc_plot', f'mfcc_{title}.png')
        plt.savefig(output_path)
        plt.close()

# Paths for input and output
input_dir = 'data/preprocessed_recordings'
output_dir = 'data/mfcc_features'
plot_dir = 'mfcc_plot'
os.makedirs(output_dir, exist_ok=True)
os.makedirs(plot_dir, exist_ok=True)

# Extract unique words from filenames
words = set()
for filename in os.listdir(input_dir):
    if filename.endswith('.wav'):
        word = filename.split('_')[0]
        words.add(word)
words = sorted(list(words))

extractor = FeatureExtractor()
word_features = {}

for word in words:
    mfccs = []
    energies = []
    lengths = []
    print(f'\n🔍 Word: {word}')
    
    # Process all files for the current word
    for filename in os.listdir(input_dir):
        if filename.startswith(word + '_') and filename.endswith('.wav'):
            wav_path = os.path.join(input_dir, filename)
            y, sr = librosa.load(wav_path, sr=16000)

            y = extractor.remove_silence(y)
            energy = extractor.extract_energy(y)
            mfcc = extractor.extract_mfcc(y)
            mfcc = extractor.extract_delta_features(mfcc)
            mfcc = extractor.normalize_feature_length(mfcc)

            # Save features to npy file
            npy_filename = filename.replace('.wav', '.npy')
            np.save(os.path.join(output_dir, npy_filename), mfcc)

            # Save MFCC spectrogram
            extractor.save_mfcc_spectrogram(mfcc, filename, energy)

            # Collect statistics
            mfccs.append(mfcc)
            energies.append(energy)
            lengths.append(len(y) / sr)

    # Store for final analysis
    word_features[word] = {
        'mfccs': mfccs,
        'energies': energies,
        'lengths': lengths
    }

    print(f"📏 Avg length: {np.mean(lengths):.2f}s")
    print(f"⚡ Avg energy: {np.mean(energies):.2f}")


🔍 Word: Bale
📏 Avg length: 0.51s
⚡ Avg energy: 214.98

🔍 Word: BesiarAali
📏 Avg length: 1.03s
⚡ Avg energy: 315.66

🔍 Word: Kheir
📏 Avg length: 0.59s
⚡ Avg energy: 202.99

🔍 Word: Khodahafez
📏 Avg length: 0.88s
⚡ Avg energy: 262.23

🔍 Word: Motshakeram
📏 Avg length: 0.90s
⚡ Avg energy: 245.67

🔍 Word: Salam
📏 Avg length: 0.58s
⚡ Avg energy: 167.00

🔍 Word: kheir
📏 Avg length: 0.55s
⚡ Avg energy: 175.90


### DTW

In [11]:
import numpy as np
from scipy.spatial.distance import euclidean, cityblock, cosine, correlation
import os
import pandas as pd
import matplotlib.pyplot as plt

class DTWAnalyzer:
    def __init__(self, feature_dir='data/mfcc_features', target_length=50, window_size=0.2, plot_dir='dtw_plots'):
        self.feature_dir = feature_dir
        self.target_length = target_length
        self.window_size = window_size
        self.plot_dir = plot_dir
        # Create plot directory if it doesn't exist
        os.makedirs(self.plot_dir, exist_ok=True)

    def normalize_zscore(self, features):
        """Apply z-score normalization to features."""
        return (features - np.mean(features)) / np.std(features)

    def compute_dtw_distance(self, seq1, seq2, distance_metric='euclidean'):
        """Compute DTW distance and warping path between two sequences with fixed window size."""
        n, m = seq1.shape[1], seq2.shape[1]
        w = int(self.window_size * max(n, m))

        # Initialize cost matrix
        cost_matrix = np.full((n + 1, m + 1), np.inf)
        cost_matrix[0, 0] = 0

        # Initialize path tracking
        path = []

        # Compute cost matrix
        for i in range(1, n + 1):
            for j in range(max(1, i - w), min(m + 1, i + w)):
                if distance_metric == 'euclidean':
                    cost = euclidean(seq1[:, i-1], seq2[:, j-1])
                elif distance_metric == 'manhattan':
                    cost = cityblock(seq1[:, i-1], seq2[:, j-1])
                elif distance_metric == 'cosine':
                    cost = cosine(seq1[:, i-1], seq2[:, j-1])
                elif distance_metric == 'correlation':
                    cost = correlation(seq1[:, i-1], seq2[:, j-1])
                else:
                    raise ValueError("Unsupported distance metric")

                cost_matrix[i, j] = cost + min(
                    cost_matrix[i-1, j],    # Insertion
                    cost_matrix[i, j-1],    # Deletion
                    cost_matrix[i-1, j-1]   # Match
                )

        # Backtrack to find the warping path
        i, j = n, m
        while i > 0 and j > 0:
            path.append((i-1, j-1))
            min_cost = min(cost_matrix[i-1, j], cost_matrix[i, j-1], cost_matrix[i-1, j-1])
            if min_cost == cost_matrix[i-1, j-1]:
                i, j = i-1, j-1
            elif min_cost == cost_matrix[i-1, j]:
                i -= 1
            else:
                j -= 1

        path.reverse()
        return cost_matrix[n, m], path, cost_matrix[1:, 1:]

    def analyze_warping_path(self, path, n, m):
        """Analyze the warping path for statistics and critical points."""
        path = np.array(path)
        lengths = np.sqrt(np.sum(np.diff(path, axis=0)**2, axis=1))
        total_length = np.sum(lengths) if len(lengths) > 0 else 0
        std_dev = np.std(lengths) if len(lengths) > 0 else 0
        diagonal_dev = np.mean(np.abs(path[:, 0] - path[:, 1])) if len(path) > 0 else 0
        turning_points = np.sum(np.abs(np.diff(np.sign(np.diff(path[:, 0] - path[:, 1]))))) / 2 if len(path) > 1 else 0

        # Identify compression and expansion points
        compression_points = []
        expansion_points = []
        for i in range(1, len(path)):
            di, dj = path[i][0] - path[i-1][0], path[i][1] - path[i-1][1]
            if di == 0 and dj > 0:  # Horizontal move (compression)
                compression_points.append(path[i])
            elif dj == 0 and di > 0:  # Vertical move (expansion)
                expansion_points.append(path[i])

        return {
            'total_length': total_length,
            'std_dev': std_dev,
            'diagonal_deviation': diagonal_dev,
            'turning_points': turning_points,
            'compression_points': compression_points,
            'expansion_points': expansion_points,
            'num_compression': len(compression_points),
            'num_expansion': len(expansion_points)
        }

    def plot_cost_matrix(self, cost_matrix, path, word1, word2, idx1, idx2):
        """Plot the cost matrix with DTW path, diagonal line, and critical points."""
        plt.figure(figsize=(8, 6))
        plt.imshow(cost_matrix, origin='lower', cmap='viridis', interpolation='nearest')
        plt.colorbar(label='Cost')
        plt.title(f'DTW Cost Matrix: {word1}_{idx1:02d} vs {word2}_{idx2:02d}')
        plt.xlabel(f'{word2}_{idx2:02d} Frames')
        plt.ylabel(f'{word1}_{idx1:02d} Frames')

        # Plot DTW path
        path = np.array(path)
        plt.plot(path[:, 1], path[:, 0], 'r-', label='DTW Path')

        # Plot diagonal line
        min_len = min(cost_matrix.shape)
        plt.plot(range(min_len), range(min_len), 'k--', label='Diagonal', alpha=0.5)

        # Plot compression and expansion points
        path_stats = self.analyze_warping_path(path, cost_matrix.shape[0], cost_matrix.shape[1])
        if path_stats['compression_points']:
            comp_points = np.array(path_stats['compression_points'])
            plt.scatter(comp_points[:, 1], comp_points[:, 0], c='red', marker='o', s=50, label='Compression Points')
        if path_stats['expansion_points']:
            exp_points = np.array(path_stats['expansion_points'])
            plt.scatter(exp_points[:, 1], exp_points[:, 0], c='blue', marker='^', s=50, label='Expansion Points')

        plt.legend()
        plt.grid(True)
        plt.savefig(os.path.join(self.plot_dir, f'dtw_path_{word1}_{idx1:02d}_vs_{word2}_{idx2:02d}.png'))
        plt.close()

    def compare_features(self, word1, word2, idx1=1, idx2=1):
        """Compare two audio features using multiple metrics and fixed window size."""
        file1 = f"{word1}_{idx1:02d}.npy"
        file2 = f"{word2}_{idx2:02d}.npy"

        try:
            mfcc1 = np.load(os.path.join(self.feature_dir, file1))
            mfcc2 = np.load(os.path.join(self.feature_dir, file2))
        except FileNotFoundError as e:
            print(f"Error: File not found - {e}")
            return None, None

        # Verify shape for 13 MFCC features
        expected_shape = (39, self.target_length)
        if mfcc1.shape != expected_shape or mfcc2.shape != expected_shape:
            print(f"Error: Unexpected shape for {file1}: {mfcc1.shape} or {file2}: {mfcc2.shape}. Expected {expected_shape}.")
            return None, None

        # Apply z-score normalization
        mfcc1 = self.normalize_zscore(mfcc1)
        mfcc2 = self.normalize_zscore(mfcc2)

        # Compute different distance metrics on normalized features
        results = {
            'Euclidean': euclidean(mfcc1.flatten(), mfcc2.flatten()),
            'Manhattan': cityblock(mfcc1.flatten(), mfcc2.flatten()),
            'Cosine': cosine(mfcc1.flatten(), mfcc2.flatten()),
            'Correlation': correlation(mfcc1.flatten(), mfcc2.flatten())
        }

        # Compute DTW with fixed window size
        distance, path, cost_matrix = self.compute_dtw_distance(mfcc1, mfcc2)
        path_stats = self.analyze_warping_path(path, mfcc1.shape[1], mfcc2.shape[1])
        dtw_results = {
            'distance': distance,
            'path': path,
            'cost_matrix': cost_matrix,
            'path_stats': path_stats
        }

        # Plot cost matrix with DTW path
        self.plot_cost_matrix(cost_matrix, path, word1, word2, idx1, idx2)

        return results, dtw_results


# Initialize analyzer with fixed window size
analyzer = DTWAnalyzer(window_size=0.2)

# Define comparisons
comparisons = [
    ('Salam', 'Salam', 1, 2),  # Same word comparison
    ('Salam', 'Bale', 1, 1),   # Different word comparison
    ('Bale', 'Kheir', 1, 1),   # Different word comparison
    ('Bale', 'Motshakeram', 1, 1),      # Additional comparison
    ('BesiarAali', 'BesiarAali', 3, 2),  # Same word comparison
    ('Motshakeram', 'Motshakeram', 1, 2),  # Same word comparison
    ('Salam', 'Motshakeram', 1, 2),  # Same word comparison
    ('BesiarAali', 'BesiarAali', 5, 3),  # Same word comparison
    ('BesiarAali', 'BesiarAali', 3, 2),  # Same word comparison
    ('BesiarAali', 'BesiarAali', 4, 5),  # Same word comparison
    ('BesiarAali', 'BesiarAali', 2, 5),  # Same word comparison
    ('BesiarAali', 'Salam', 3, 2),  # Same word comparison
    ('BesiarAali', 'Motshakeram', 2, 4),  # Same word comparison
    ('Salam', 'Motshakeram', 2, 3),  # Same word comparison
    ('Salam', 'Motshakeram', 1, 3),  # Same word comparison
    ('Salam', 'Motshakeram', 1, 4),  # Same word comparison
    ('Salam', 'Motshakeram', 3, 5),  # Same word comparison
]

# Perform comparisons and collect results
table_data = []
dtw_table_data = []
combined_table_data = []
for word1, word2, idx1, idx2 in comparisons:
    print(f"\n🔍 Comparing {word1}_{idx1:02d} vs {word2}_{idx2:02d}")
    metrics, dtw_results = analyzer.compare_features(word1, word2, idx1, idx2)
    
    if metrics is None or dtw_results is None:
        print(f"Skipping comparison due to missing files or incorrect shape.")
        continue

    # Print standard metrics
    print(f"📏 Standard Metrics: {metrics}")
    
    # Print DTW results
    print(f"📐 DTW (window_size=0.2):")
    print(f"  Distance: {dtw_results['distance']:.2f}")
    print(f"  Path Stats: {dtw_results['path_stats']}")
    print(f"  Diagonal Deviation Analysis: Mean deviation from diagonal is {dtw_results['path_stats']['diagonal_deviation']:.2f}")
    print(f"  Critical Points: {dtw_results['path_stats']['num_compression']} compression points, {dtw_results['path_stats']['num_expansion']} expansion points")

    # Add to standard metrics table
    table_data.append({
        'Word1': word1,
        'Word2': word2,
        'Euclidean': metrics['Euclidean'],
        'Manhattan': metrics['Manhattan'],
        'Cosine': metrics['Cosine'],
        'Correlation': metrics['Correlation']
    })

    # Add to DTW table
    dtw_table_data.append({
        'Word1': word1,
        'Word2': word2,
        'DTW Distance': dtw_results['distance'],
        'Total Length': dtw_results['path_stats']['total_length'],
        'Std Dev': dtw_results['path_stats']['std_dev'],
        'Diagonal Deviation': dtw_results['path_stats']['diagonal_deviation'],
        'Turning Points': dtw_results['path_stats']['turning_points'],
        'Compression Points': dtw_results['path_stats']['num_compression'],
        'Expansion Points': dtw_results['path_stats']['num_expansion']
    })

    # Add to combined table
    combined_table_data.append({
        'Word1': word1,
        'Word2': word2,
        'Euclidean': metrics['Euclidean'],
        'Manhattan': metrics['Manhattan'],
        'Cosine': metrics['Cosine'],
        'Correlation': metrics['Correlation'],
        'DTW Distance': dtw_results['distance'],
        'Total Length': dtw_results['path_stats']['total_length'],
        'Std Dev': dtw_results['path_stats']['std_dev'],
        'Diagonal Deviation': dtw_results['path_stats']['diagonal_deviation'],
        'Turning Points': dtw_results['path_stats']['turning_points'],
        'Compression Points': dtw_results['path_stats']['num_compression'],
        'Expansion Points': dtw_results['path_stats']['num_expansion']
    })

# Print standard metrics table
print("\n📊 Standard Metrics Comparison Table:")
print("| Word1 | Word2 | Euclidean | Manhattan | Cosine | Correlation |")
print("|-------|-------|-----------|-----------|--------|-------------|")
for row in table_data:
    print(f"| {row['Word1']} | {row['Word2']} | {row['Euclidean']:8.2f} | {row['Manhattan']:8.2f} | {row['Cosine']:6.2f} | {row['Correlation']:6.2f} |")

# Print DTW results table
print("\n📊 DTW Results Comparison Table:")
print("| Word1 | Word2 | DTW Distance | Total Length | Std Dev | Diagonal Deviation | Turning Points | Compression Points | Expansion Points |")
print("|-------|-------|--------------|--------------|---------|--------------------|----------------|--------------------|------------------|")
for row in dtw_table_data:
    print(f"| {row['Word1']} | {row['Word2']} | {row['DTW Distance']:11.2f} | {row['Total Length']:11.2f} | {row['Std Dev']:7.2f} | {row['Diagonal Deviation']:17.2f} | {row['Turning Points']:13.2f} | {row['Compression Points']:17.0f} | {row['Expansion Points']:15.0f} |")

# Save tables to CSV files
if table_data:
    df_standard = pd.DataFrame(table_data)
    df_standard.to_csv('dtw/standard_metrics.csv', index=False)
    print("\nStandard metrics saved to standard_metrics.csv")
else:
    print("\nNo standard metrics data to save to CSV.")

if dtw_table_data:
    df_dtw = pd.DataFrame(dtw_table_data)
    df_dtw.to_csv('dtw/dtw_results.csv', index=False)
    print("DTW results saved to dtw_results.csv")
else:
    print("No DTW results data to save to CSV.")

if combined_table_data:
    df_combined = pd.DataFrame(combined_table_data)
    df_combined.to_csv('dtw/combined_metrics.csv', index=False)
    print("Combined metrics saved to combined_metrics.csv")
else:
    print("No combined metrics data to save to CSV.")


🔍 Comparing Salam_01 vs Salam_02
📏 Standard Metrics: {'Euclidean': 8.59812952224131, 'Manhattan': np.float64(190.54705987301293), 'Cosine': np.float64(0.018955854174677045), 'Correlation': np.float64(0.018955854174677045)}
📐 DTW (window_size=0.2):
  Distance: 55.72
  Path Stats: {'total_length': np.float64(69.88225099390857), 'std_dev': np.float64(0.08116894978836614), 'diagonal_deviation': np.float64(0.5882352941176471), 'turning_points': np.float64(1.5), 'compression_points': [array([30, 30])], 'expansion_points': [array([1, 0])], 'num_compression': 1, 'num_expansion': 1}
  Diagonal Deviation Analysis: Mean deviation from diagonal is 0.59
  Critical Points: 1 compression points, 1 expansion points

🔍 Comparing Salam_01 vs Bale_01
📏 Standard Metrics: {'Euclidean': 22.969408787441722, 'Manhattan': np.float64(455.02153657559154), 'Cosine': np.float64(0.13528044616528379), 'Correlation': np.float64(0.13528044616528379)}
📐 DTW (window_size=0.2):
  Distance: 136.82
  Path Stats: {'total_l

In [12]:
import numpy as np
import pickle
import os
import pandas as pd
from scipy.spatial.distance import euclidean, cityblock, cosine, correlation
import random

class DTWBasedASR:
    def __init__(self, feature_dir='data/mfcc_features', target_length=50, window_size=0.2):
        self.feature_dir = feature_dir
        self.target_length = target_length
        self.window_size = window_size
        self.templates = {}  # Dictionary to store training templates
        self.word_list = []

    def normalize_zscore(self, features):
        """Apply z-score normalization to features."""
        return (features - np.mean(features)) / np.std(features)

    def compute_dtw_distance(self, seq1, seq2, distance_metric='euclidean'):
        """Compute DTW distance between two sequences with fixed window size."""
        n, m = seq1.shape[1], seq2.shape[1]
        w = int(self.window_size * max(n, m))

        # Initialize cost matrix
        cost_matrix = np.full((n + 1, m + 1), np.inf)
        cost_matrix[0, 0] = 0

        for i in range(1, n + 1):
            for j in range(max(1, i - w), min(m + 1, i + w)):
                if distance_metric == 'euclidean':
                    cost = euclidean(seq1[:, i-1], seq2[:, j-1])
                elif distance_metric == 'manhattan':
                    cost = cityblock(seq1[:, i-1], seq2[:, j-1])
                elif distance_metric == 'cosine':
                    cost = cosine(seq1[:, i-1], seq2[:, j-1])
                elif distance_metric == 'correlation':
                    cost = correlation(seq1[:, i-1], seq2[:, j-1])
                else:
                    raise ValueError("Unsupported distance metric")

                cost_matrix[i, j] = cost + min(
                    cost_matrix[i-1, j],    # Insertion
                    cost_matrix[i, j-1],    # Deletion
                    cost_matrix[i-1, j-1]   # Match
                )

        return cost_matrix[n, m]

    def compute_metrics(self, seq1, seq2):
        """Compute Euclidean, Manhattan, Cosine, and Correlation distances."""
        seq1_flat = seq1.flatten()
        seq2_flat = seq2.flatten()
        return {
            'Euclidean': euclidean(seq1_flat, seq2_flat),
            'Manhattan': cityblock(seq1_flat, seq2_flat),
            'Cosine': cosine(seq1_flat, seq2_flat),
            'Correlation': correlation(seq1_flat, seq2_flat)
        }

    def train(self, training_data):
        """Train the system by loading MFCC features from .npy files."""
        self.templates = {}
        for word in training_data:
            self.templates[word] = []
            for sample in training_data[word]:
                npy_path = os.path.join(self.feature_dir, sample)
                try:
                    mfcc = np.load(npy_path)
                    if mfcc.shape != (39, self.target_length):
                        print(f"Warning: Skipping {npy_path} due to incorrect shape {mfcc.shape}")
                        continue
                    self.templates[word].append(self.normalize_zscore(mfcc))
                except FileNotFoundError:
                    print(f"Error: File {npy_path} not found")
                    continue
        print("Training completed.")

    def predict(self, test_sample):
        """Predict the word for a test sample using DTW distance."""
        npy_path = os.path.join(self.feature_dir, test_sample)
        try:
            mfcc = np.load(npy_path)
            if mfcc.shape != (39, self.target_length):
                print(f"Error: Incorrect shape for {npy_path}: {mfcc.shape}")
                return None, None, None
            mfcc = self.normalize_zscore(mfcc)
        except FileNotFoundError:
            print(f"Error: File {npy_path} not found")
            return None, None, None

        distances = {}
        for word in self.templates:
            distances[word] = []
            for template in self.templates[word]:
                distance = self.compute_dtw_distance(mfcc, template)
                distances[word].append(distance)

        # Compute average distance per word
        avg_distances = {word: np.mean(dists) for word, dists in distances.items()}
        
        # Find the word with minimum average distance
        predicted_word = min(avg_distances, key=avg_distances.get)
        
        # Compute confidence as normalized inverse distance
        total_dist = np.sum([1/d for d in avg_distances.values() if d != 0])
        confidence = (1 / avg_distances[predicted_word]) / total_dist if total_dist != 0 else 1.0
        
        return predicted_word, confidence, avg_distances

    def compare_all_pairs(self, training_data):
        """Compare all pairs of training samples using multiple metrics."""
        table_data = []
        for word1 in training_data:
            for word2 in training_data:
                for sample1 in training_data[word1]:
                    for sample2 in training_data[word2]:
                        npy_path1 = os.path.join(self.feature_dir, sample1)
                        npy_path2 = os.path.join(self.feature_dir, sample2)
                        try:
                            mfcc1 = np.load(npy_path1)
                            mfcc2 = np.load(npy_path2)
                            if mfcc1.shape != (39, self.target_length) or mfcc2.shape != (39, self.target_length):
                                print(f"Warning: Skipping pair {sample1} vs {sample2} due to incorrect shape")
                                continue
                            mfcc1 = self.normalize_zscore(mfcc1)
                            mfcc2 = self.normalize_zscore(mfcc2)
                            metrics = self.compute_metrics(mfcc1, mfcc2)
                            table_data.append({
                                'Word1': word1,
                                'Word2': word2,
                                'Sample1': sample1,
                                'Sample2': sample2,
                                'Euclidean': metrics['Euclidean'],
                                'Manhattan': metrics['Manhattan'],
                                'Cosine': metrics['Cosine'],
                                'Correlation': metrics['Correlation']
                            })
                        except FileNotFoundError:
                            print(f"Error: File not found for {sample1} or {sample2}")
                            continue
        return table_data

    def save_model(self, path):
        """Save the trained templates to a file."""
        os.makedirs(os.path.dirname(path), exist_ok=True)
        with open(path, 'wb') as f:
            pickle.dump(self.templates, f)
        print(f"Model saved to {path}")

    def load_model(self, path):
        """Load trained templates from a file."""
        with open(path, 'rb') as f:
            self.templates = pickle.load(f)
        print(f"Model loaded from {path}")

# Dynamically create training and test data with 60/40 split
feature_dir = 'data/mfcc_features'
all_files = [f for f in os.listdir(feature_dir) if f.endswith('.npy')]

# Extract unique words
words = set()
for f in all_files:
    word = f.split('_')[0]
    words.add(word)
words = sorted(list(words))

# Organize files by word
word_files = {word: [] for word in words}
for f in all_files:
    word = f.split('_')[0]
    word_files[word].append(f)

# Split into 60% training and 40% testing
training_data = {}
test_data = {}
for word, files in word_files.items():
    random.shuffle(files)  # Randomize the order
    total_samples = len(files)
    train_count = int(total_samples * 0.6)  # 60% for training
    train_files = files[:train_count]
    test_files = files[train_count:]  # 40% for testing
    training_data[word] = train_files
    test_data[word] = test_files
    print(f"Word: {word}, Total samples: {total_samples}, Training: {len(train_files)}, Testing: {len(test_files)}")

# Initialize and train the system
asr_system = DTWBasedASR(feature_dir='data/mfcc_features')
asr_system.word_list = words
asr_system.train(training_data)
asr_system.save_model('model_res/dtw_asr_model.pkl')

# Test and evaluate
results = []
confidences = []
confusion_matrix = {word: {w: 0 for w in words} for word in words}
word_lengths = {
    'Salam': 5,
    'Bale': 4,
    'BesiarAali': 10,
    'Kheir': 5,
    'Khodahafez': 10,
    'Motshakeram': 10
}

for word in test_data:
    for sample in test_data[word]:
        prediction, confidence, distances = asr_system.predict(sample)
        if prediction is None:
            print(f"Skipping {sample} due to error in prediction")
            continue
        results.append({
            'True': word,
            'Predicted': prediction,
            'Confidence': confidence,
            'Sample': sample
        })
        confidences.append(confidence)
        confusion_matrix[word][prediction] += 1
        print(f"True: {word}, Predicted: {prediction}, Confidence: {confidence:.3f}")

# Calculate overall accuracy
correct = sum(1 for r in results if r['True'] == r['Predicted'])
total = len(results)
overall_accuracy = correct / total if total > 0 else 0
print(f"\nOverall Model Accuracy: {overall_accuracy:.3f} ({correct}/{total})")

# Calculate per-word accuracy
per_word_accuracy = {}
for word in test_data:
    correct_word = sum(1 for r in results if r['True'] == word and r['True'] == r['Predicted'])
    total_word = len(test_data[word])
    per_word_accuracy[word] = correct_word / total_word if total_word > 0 else 0
    print(f"Accuracy for {word}: {per_word_accuracy[word]:.3f} ({correct_word}/{total_word})")

# Confidence distribution
print("\nConfidence Distribution:")
print(f"Average Confidence: {np.mean(confidences):.3f}")
print(f"Max Confidence: {np.max(confidences):.3f}")
print(f"Min Confidence: {np.min(confidences):.3f}")

# Analyze highest confidence word
max_confidence_result = max(results, key=lambda x: x['Confidence'])
print(f"\nWord with highest confidence: {max_confidence_result['True']} (Predicted: {max_confidence_result['Predicted']}, Confidence: {max_confidence_result['Confidence']:.3f})")

# Analyze word length vs accuracy
print("\nWord Length vs Accuracy:")
for word, acc in per_word_accuracy.items():
    print(f"{word} (length: {word_lengths.get(word, len(word))}): Accuracy = {acc:.3f}")

# Save confusion matrix to CSV
os.makedirs('model_res', exist_ok=True)
confusion_df = pd.DataFrame(confusion_matrix)
confusion_df.to_csv('model_res/confusion_matrix.csv')
print("\nConfusion matrix saved to model_res/confusion_matrix.csv")

# Save results to CSV
results_df = pd.DataFrame(results)
results_df.to_csv('model_res/test_results.csv')
print("Test results saved to model_res/test_results.csv")

# Save overall accuracy to CSV
accuracy_data = [{'Overall Accuracy': overall_accuracy, 'Correct Predictions': correct, 'Total Predictions': total}]
accuracy_df = pd.DataFrame(accuracy_data)
accuracy_df.to_csv('model_res/overall_accuracy.csv', index=False)
print("Overall accuracy saved to model_res/overall_accuracy.csv")

# Compare all pairs of training samples
metrics_table = asr_system.compare_all_pairs(training_data)
metrics_df = pd.DataFrame(metrics_table)
metrics_df.to_csv('model_res/metrics_comparison.csv', index=False)
print("\nMetrics comparison table saved to model_res/metrics_comparison.csv")

# Print metrics table
print("\nMetrics Comparison Table:")
print("| Word1 | Word2 | Sample1 | Sample2 | Euclidean | Manhattan | Cosine | Correlation |")
print("|-------|-------|---------|---------|-----------|-----------|--------|-------------|")
for row in metrics_table:
    print(f"| {row['Word1']} | {row['Word2']} | {row['Sample1']} | {row['Sample2']} | {row['Euclidean']:8.2f} | {row['Manhattan']:8.2f} | {row['Cosine']:6.2f} | {row['Correlation']:6.2f} |")

Word: Bale, Total samples: 15, Training: 9, Testing: 6
Word: BesiarAali, Total samples: 14, Training: 8, Testing: 6
Word: Kheir, Total samples: 15, Training: 9, Testing: 6
Word: Khodahafez, Total samples: 10, Training: 6, Testing: 4
Word: Motshakeram, Total samples: 15, Training: 9, Testing: 6
Word: Salam, Total samples: 15, Training: 9, Testing: 6
Training completed.
Model saved to model_res/dtw_asr_model.pkl
True: Bale, Predicted: Bale, Confidence: 0.210
True: Bale, Predicted: Bale, Confidence: 0.210
True: Bale, Predicted: Bale, Confidence: 0.213
True: Bale, Predicted: Bale, Confidence: 0.230
True: Bale, Predicted: Bale, Confidence: 0.217
True: Bale, Predicted: Bale, Confidence: 0.202
True: BesiarAali, Predicted: BesiarAali, Confidence: 0.257
True: BesiarAali, Predicted: BesiarAali, Confidence: 0.267
True: BesiarAali, Predicted: BesiarAali, Confidence: 0.272
True: BesiarAali, Predicted: BesiarAali, Confidence: 0.248
True: BesiarAali, Predicted: BesiarAali, Confidence: 0.241
True: Bes

### preprocess_audio_continuous_speech

In [27]:
import os
import librosa
import soundfile as sf
import scipy.signal as signal
import numpy as np

# مسیرها
input_dir = 'data/Sentence/'
output_dir = 'data/continuous_speech'
os.makedirs(output_dir, exist_ok=True)

# تنظیمات
target_sr = 16000
subtype = 'PCM_16'
low_cut = 300
high_cut = 3400
order = 5

# توابع پردازش
def normalize_audio(y):
    return y / np.max(np.abs(y))

def trim_silence(y, top_db=30):
    return librosa.effects.trim(y, top_db=top_db)[0]

def bandpass_filter(y, sr, low_cut, high_cut, order=5):
    nyquist = 0.5 * sr
    low = low_cut / nyquist
    high = high_cut / nyquist
    b, a = signal.butter(order, [low, high], btype='band')
    return signal.lfilter(b, a, y)

# پردازش فایل‌ها
for filename in os.listdir(input_dir):
    if filename.endswith('.wav'):
        input_path = os.path.join(input_dir, filename)
        output_path = os.path.join(output_dir, filename)

        # 1. بارگذاری و تبدیل به نرخ ۱۶kHz
        y, sr = librosa.load(input_path, sr=target_sr)

        # 2. نرمال‌سازی دامنه
        y = normalize_audio(y)

        # 3. حذف سکوت از ابتدا و انتها
        y = trim_silence(y)

        # 4. فیلتر باندپَس (حذف نویز خارج از بازه گفتار)
        y = bandpass_filter(y, sr, low_cut, high_cut, order)

        # 5. ذخیره با فرمت ۱۶ بیت
        sf.write(output_path, y, sr, subtype=subtype)

        print(f'✅ Processed and saved: {filename}')


✅ Processed and saved: asre_be_kheir_salam.wav
✅ Processed and saved: bale_hatman_anjam_midam.wav
✅ Processed and saved: bale_salam.wav
✅ Processed and saved: fekhrkonam_bale.wav
✅ Processed and saved: felan_khodahafez.wav
✅ Processed and saved: kheili_moteshaker.wav
✅ Processed and saved: khir_intor_nist.wav
✅ Processed and saved: khodahafez_ta_faeda.wav
✅ Processed and saved: moteshakeram_lotf.wav
✅ Processed and saved: na_khir_ashtebah_shode.wav
✅ Processed and saved: no_key_01.wav
✅ Processed and saved: no_key_02.wav
✅ Processed and saved: no_key_03.wav
✅ Processed and saved: no_key_04.wav
✅ Processed and saved: no_key_05.wav
✅ Processed and saved: no_key_06.wav
✅ Processed and saved: no_key_07.wav
✅ Processed and saved: no_key_08.wav
✅ Processed and saved: no_key_09.wav
✅ Processed and saved: no_key_10.wav
✅ Processed and saved: salam_mo_kh.wav
✅ Processed and saved: salam_sob_be_kheir.wav
✅ Processed and saved: ye_salam_garm_be_hame.wav


### plot_waveform_continuous_speech

In [24]:
import os
import librosa
import soundfile as sf
import scipy.signal as signal
import numpy as np
import matplotlib.pyplot as plt
import librosa.display

# Path to preprocessed audio files and output directory
input_dir = 'data/continuous_speech/'
output_dir = 'plot_waveform_continuous_speech'
target_sr = 16000

# Create output directory if it doesn't exist
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Feature extraction functions
def compute_rms(y):
    return np.sqrt(np.mean(y**2))

def compute_zcr(y):
    return np.mean(librosa.zero_crossings(y, pad=False))

def compute_energy(y):
    return np.sum(y**2)

def save_waveform_plot(y, sr, title, rms, zcr, energy):
    plt.figure(figsize=(10, 2.5))
    librosa.display.waveshow(y, sr=sr)
    plt.title(f'Waveform: {title}')
    plt.xlabel("Time (seconds)")
    plt.ylabel("Amplitude")
    
    # Add feature text to the plot
    text_str = f'RMS: {rms:.4f}\nZCR: {zcr:.4f}\nEnergy: {energy:.2f}'
    plt.text(0.02, 0.98, text_str, transform=plt.gca().transAxes, 
             verticalalignment='top', bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
    
    plt.tight_layout()
    output_path = os.path.join(output_dir, f'waveform_{title}.png')
    plt.savefig(output_path)
    plt.close()

def save_fft_plot(y, sr, title, rms, zcr, energy):
    Y = np.fft.rfft(y)
    freqs = np.fft.rfftfreq(len(y), 1/sr)
    plt.figure(figsize=(10, 2.5))
    plt.plot(freqs, np.abs(Y))
    plt.title(f'Frequency Spectrum (FFT): {title}')
    plt.xlabel("Frequency (Hz)")
    plt.ylabel("Magnitude")
    plt.xlim(0, 8000)
    
    # Add feature text to the plot
    text_str = f'RMS: {rms:.4f}\nZCR: {zcr:.4f}\nEnergy: {energy:.2f}'
    plt.text(0.02, 0.98, text_str, transform=plt.gca().transAxes, 
             verticalalignment='top', bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
    
    plt.tight_layout()
    output_path = os.path.join(output_dir, f'fft_{title}.png')
    plt.savefig(output_path)
    plt.close()

def save_features_to_text(filename, rms, zcr, energy):
    output_path = os.path.join(output_dir, f'features_{filename}.txt')
    with open(output_path, 'w') as f:
        f.write(f'File: {filename}\n')
        f.write(f'RMS: {rms:.4f}\n')
        f.write(f'Zero-Crossing Rate: {zcr:.4f}\n')
        f.write(f'Energy: {energy:.2f}\n')

# Analyze each audio file
for filename in os.listdir(input_dir):
    if filename.endswith('.wav'):
        path = os.path.join(input_dir, filename)
        y, sr = librosa.load(path, sr=target_sr)

        # Compute features
        rms = compute_rms(y)
        zcr = compute_zcr(y)
        energy = compute_energy(y)

        # Save features to text file
        save_features_to_text(filename, rms, zcr, energy)

        # Save visualizations with features annotated
        save_waveform_plot(y, sr, filename, rms, zcr, energy)
        save_fft_plot(y, sr, filename, rms, zcr, energy)

KeyboardInterrupt: 

<Figure size 1000x250 with 0 Axes>

### MFCC_continuous_speech

In [28]:
import os
import numpy as np
import librosa
import librosa.display
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d

class FeatureExtractor:
    def __init__(self, sample_rate=16000, n_mfcc=13, n_fft=2048,
                 hop_length=512, n_mels=128):
        self.sample_rate = sample_rate
        self.n_mfcc = n_mfcc
        self.n_fft = n_fft
        self.hop_length = hop_length
        self.n_mels = n_mels

    def pre_emphasis(self, signal, pre_emph_coef=0.97):
        return np.append(signal[0], signal[1:] - pre_emph_coef * signal[:-1])

    def remove_silence(self, y, threshold=0.01):
        try:
            energy = librosa.feature.rms(y=y)[0]
            frames = np.nonzero(energy > threshold)
            if frames[0].size:
                y = y[librosa.frames_to_samples(frames[0][0]):librosa.frames_to_samples(frames[0][-1])]
            return y
        except Exception as e:
            print(f"Error in remove_silence: {e}")
            return y

    def extract_mfcc(self, y):
        try:
            y = self.pre_emphasis(y)
            mfcc = librosa.feature.mfcc(y=y, sr=self.sample_rate, n_mfcc=self.n_mfcc,
                                        n_fft=self.n_fft, hop_length=self.hop_length,
                                        n_mels=self.n_mels)
            mfcc = (mfcc - np.mean(mfcc)) / np.std(mfcc)
            return mfcc
        except Exception as e:
            print(f"Error in extract_mfcc: {e}")
            return None

    def extract_delta_features(self, mfcc):
        try:
            delta = librosa.feature.delta(mfcc)
            delta_delta = librosa.feature.delta(mfcc, order=2)
            combined = np.vstack([mfcc, delta, delta_delta])
            return combined
        except Exception as e:
            print(f"Error in extract_delta_features: {e}")
            return None

    def normalize_feature_length(self, features, target_length=50):
        try:
            n_features, original_length = features.shape
            x_old = np.linspace(0, 1, original_length)
            x_new = np.linspace(0, 1, target_length)
            interpolated = np.zeros((n_features, target_length))
            for i in range(n_features):
                f = interp1d(x_old, features[i], kind='linear')
                interpolated[i] = f(x_new)
            return interpolated
        except Exception as e:
            print(f"Error in normalize_feature_length: {e}")
            return None

    def extract_energy(self, y):
        try:
            return np.sum(y ** 2)
        except Exception as e:
            print(f"Error in extract_energy: {e}")
            return 0.0

    def save_mfcc_spectrogram(self, mfcc, title, energy):
        try:
            plt.figure(figsize=(8, 3))
            librosa.display.specshow(mfcc, x_axis='time', sr=self.sample_rate)
            plt.colorbar()
            plt.title(f'MFCC: {title}')
            text_str = f'Energy: {energy:.2f}'
            plt.text(0.02, 0.98, text_str, transform=plt.gca().transAxes, 
                     verticalalignment='top', bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
            plt.tight_layout()
            output_path = os.path.join(plot_dir, f'mfcc_{title}.png')
            plt.savefig(output_path)
            plt.close()
            print(f"✅ Saved MFCC spectrogram: {output_path}")
        except Exception as e:
            print(f"Error in save_mfcc_spectrogram for {title}: {e}")

# Paths for input and output
input_dir = 'data/continuous_speech/'
output_dir = 'data/mfcc_features_continuous_speech'
plot_dir = 'mfcc_plot_continuous_speech'
os.makedirs(output_dir, exist_ok=True)
os.makedirs(plot_dir, exist_ok=True)

# Check if input directory exists and has files
if not os.path.exists(input_dir):
    print(f"Error: Input directory {input_dir} does not exist.")
    exit()
if not os.listdir(input_dir):
    print(f"Error: No files found in {input_dir}.")
    exit()

# Extract unique filenames (treating each file as a whole unit)
sentences = []
for filename in os.listdir(input_dir):
    if filename.endswith('.wav'):
        sentence = os.path.splitext(filename)[0]
        sentences.append(sentence)
sentences = sorted(sentences)

if not sentences:
    print(f"Error: No .wav files found in {input_dir}.")
    exit()

extractor = FeatureExtractor()
sentence_features = {}

for sentence in sentences:
    mfccs = []
    energies = []
    lengths = []
    print(f'\n🔍 Processing Sentence: {sentence}')
    
    # Process the sentence file
    filename = sentence + '.wav'
    wav_path = os.path.join(input_dir, filename)
    
    try:
        y, sr = librosa.load(wav_path, sr=16000)
    except Exception as e:
        print(f"Error loading {wav_path}: {e}")
        continue

    y = extractor.remove_silence(y)
    if y is None or len(y) == 0:
        print(f"Warning: Empty audio after silence removal for {filename}")
        continue

    energy = extractor.extract_energy(y)
    mfcc = extractor.extract_mfcc(y)
    if mfcc is None:
        print(f"Warning: Failed to extract MFCC for {filename}")
        continue

    mfcc = extractor.extract_delta_features(mfcc)
    if mfcc is None:
        print(f"Warning: Failed to extract delta features for {filename}")
        continue

    mfcc = extractor.normalize_feature_length(mfcc)
    if mfcc is None:
        print(f"Warning: Failed to normalize feature length for {filename}")
        continue

    # Save features to npy file
    npy_filename = filename.replace('.wav', '.npy')
    npy_path = os.path.join(output_dir, npy_filename)
    try:
        np.save(npy_path, mfcc)
        print(f"✅ Saved MFCC features: {npy_path}")
    except Exception as e:
        print(f"Error saving MFCC features for {npy_filename}: {e}")

    # Save MFCC spectrogram
    extractor.save_mfcc_spectrogram(mfcc, filename, energy)

    # Collect statistics
    mfccs.append(mfcc)
    energies.append(energy)
    lengths.append(len(y) / sr)

    # Store for final analysis
    sentence_features[sentence] = {
        'mfccs': mfccs,
        'energies': energies,
        'lengths': lengths
    }

    print(f"📏 Avg length: {np.mean(lengths):.2f}s")
    print(f"⚡ Avg energy: {np.mean(energies):.2f}")

print("\n🎉 Processing completed!")


🔍 Processing Sentence: asre_be_kheir_salam
✅ Saved MFCC features: data/mfcc_features_continuous_speech\asre_be_kheir_salam.npy
✅ Saved MFCC spectrogram: mfcc_plot_continuous_speech\mfcc_asre_be_kheir_salam.wav.png
📏 Avg length: 1.25s
⚡ Avg energy: 449.56

🔍 Processing Sentence: bale_hatman_anjam_midam
✅ Saved MFCC features: data/mfcc_features_continuous_speech\bale_hatman_anjam_midam.npy
✅ Saved MFCC spectrogram: mfcc_plot_continuous_speech\mfcc_bale_hatman_anjam_midam.wav.png
📏 Avg length: 1.92s
⚡ Avg energy: 424.27

🔍 Processing Sentence: bale_salam
✅ Saved MFCC features: data/mfcc_features_continuous_speech\bale_salam.npy
✅ Saved MFCC spectrogram: mfcc_plot_continuous_speech\mfcc_bale_salam.wav.png
📏 Avg length: 1.38s
⚡ Avg energy: 405.26

🔍 Processing Sentence: fekhrkonam_bale
✅ Saved MFCC features: data/mfcc_features_continuous_speech\fekhrkonam_bale.npy
✅ Saved MFCC spectrogram: mfcc_plot_continuous_speech\mfcc_fekhrkonam_bale.wav.png
📏 Avg length: 1.63s
⚡ Avg energy: 569.94

🔍 

### annotation

In [16]:
import os
import pygame
import json
import librosa
import soundfile as sf
import numpy as np

class AudioAnnotator:
    def __init__(self, audio_file, text):
        """
        ابزار برچسب‌گذاری دستی فایل‌های صوتی
        
        Args:
            audio_file: مسیر فایل صوتی
            text: متن جمله
        """
        self.audio_file = audio_file
        self.text = text
        self.annotations = []
    
    def play_segment(self, start_time, end_time):
        """
        پخش بخشی از فایل صوتی
        
        Args:
            start_time: زمان شروع (ثانیه)
            end_time: زمان پایان (ثانیه)
        """
        # بارگذاری فایل صوتی
        y, sr = librosa.load(self.audio_file, sr=None)
        start_sample = int(start_time * sr)
        end_sample = int(end_time * sr)
        segment = y[start_sample:end_sample]
        
        # ذخیره فایل موقت برای پخش
        temp_file = "temp_segment.wav"
        sf.write(temp_file, segment, sr)
        
        # پخش فایل موقت با استفاده از pygame
        pygame.mixer.init()
        pygame.mixer.music.load(temp_file)
        pygame.mixer.music.play()
        
        # صبر برای اتمام پخش
        while pygame.mixer.music.get_busy():
            pygame.time.Clock().tick(10)

    def add_annotation(self, word, start_time, end_time):
        """
        اضافه کردن برچسب برای یک کلمه کلیدی
        
        Args:
            word: کلمه کلیدی
            start_time: زمان شروع
            end_time: زمان پایان
        """
        # اعتبارسنجی ورودی‌ها
        if start_time < 0 or end_time < start_time:
            raise ValueError("زمان شروع یا پایان نامعتبر است.")
        
        # اضافه کردن به لیست annotations
        self.annotations.append({
            "word": word,
            "start_time": start_time,
            "end_time": end_time
        })
    
    def save_annotations(self, output_path):
        """
        ذخیره برچسب‌ها در فرمت JSON
        
        Args:
            output_path: مسیر ذخیره فایل JSON
        """
        data = {
            "filename": os.path.basename(self.audio_file),
            "text": self.text,
            "duration": librosa.get_duration(filename=self.audio_file),
            "keywords": self.annotations
        }
        
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=4)

# استفاده از annotator
def annotate_all_files(input_dir, output_dir):
    """
    برچسب‌گذاری تمام فایل‌های ضبط شده
    
    Args:
        input_dir: دایرکتوری حاوی فایل‌های صوتی
        output_dir: دایرکتوری ذخیره‌سازی برچسب‌ها
    """
    # بررسی وجود دایرکتوری خروجی
    os.makedirs(output_dir, exist_ok=True)

    # لیست کردن فایل‌های صوتی در دایرکتوری ورودی
    for filename in os.listdir(input_dir):
        if filename.endswith(".wav"):
            audio_file = os.path.join(input_dir, filename)
            text = "متن فرضی جمله"  # اینجا باید متن واقعی فایل را وارد کنید
            
            annotator = AudioAnnotator(audio_file, text)
            
            # شبیه‌سازی برچسب‌گذاری دستی برای هر فایل (شما باید این مراحل را به صورت دستی انجام دهید)
            # برای مثال:
            annotator.add_annotation("سلام", 0.2, 0.8)
            annotator.add_annotation("صبح", 1.0, 1.5)

            # ذخیره برچسب‌ها
            output_path = os.path.join(output_dir, f"{os.path.splitext(filename)[0]}.json")
            annotator.save_annotations(output_path)

# فراخوانی برچسب‌گذاری برای تمام فایل‌ها
input_dir = 'data/Sentence/'  # مسیر دایرکتوری فایل‌های صوتی
output_dir = 'data/annotations'  # مسیر دایرکتوری ذخیره‌سازی برچسب‌ها
annotate_all_files(input_dir, output_dir)


	This alias will be removed in version 1.0.
  "duration": librosa.get_duration(filename=self.audio_file),


### SubsequenceDTW

In [31]:
import os
import json
import numpy as np
from scipy.spatial.distance import euclidean

class SubsequenceDTWKeywordDetector:
    def __init__(self, keyword_feature_dir='data/mfcc_features', 
                 speech_feature_dir='data/mfcc_features_continuous_speech', 
                 annotation_dir='data/annotations', 
                 output_dir='data/keyword_detections',
                 target_length=50, sample_rate=16000, hop_length=512):
        """
        Initialize the Subsequence DTW Keyword Detector.
        """
        self.keyword_feature_dir = keyword_feature_dir
        self.speech_feature_dir = speech_feature_dir
        self.annotation_dir = annotation_dir
        self.output_dir = output_dir
        self.target_length = target_length
        self.sample_rate = sample_rate
        self.hop_length = hop_length
        os.makedirs(self.output_dir, exist_ok=True)
        
        # Updated keyword_map to include "Khodahafez"
        self.keyword_map = {
            'Salam': 'سلام',
            'Bale': 'بله',
            'Kheir': 'خیر',
            'Motshakeram': 'متشکرم',
            'BesiarAali': 'بسیار عالی',
            'Khodahafez': 'خداحافظ'  # Added Khodahafez
        }

    def compute_subsequence_dtw(self, query, sequence, distance_metric='euclidean'):
        """
        Compute Subsequence DTW to find the best alignment of query in sequence.
        """
        n, m = query.shape[1], sequence.shape[1]
        cost_matrix = np.full((n + 1, m + 1), np.inf)
        cost_matrix[0, :] = 0
        predecessors = np.zeros((n + 1, m + 1, 2), dtype=int)

        for i in range(1, n + 1):
            for j in range(1, m + 1):
                if distance_metric == 'euclidean':
                    cost = euclidean(query[:, i-1], sequence[:, j-1])
                else:
                    raise ValueError("Unsupported distance metric")

                costs = [
                    cost_matrix[i-1, j-1],  # Match
                    cost_matrix[i-1, j],    # Insertion
                    cost_matrix[i, j-1]     # Deletion
                ]
                min_cost_idx = np.argmin(costs)
                cost_matrix[i, j] = cost + costs[min_cost_idx]
                predecessors[i, j] = [(i-1, j-1), (i-1, j), (i, j-1)][min_cost_idx]

        end_idx = np.argmin(cost_matrix[n, 1:]) + 1
        min_cost = cost_matrix[n, end_idx]
        path = []
        i, j = n, end_idx
        while i > 0:
            path.append((i-1, j-1))
            i, j = predecessors[i, j]
        path.reverse()
        start_idx = path[0][1] if path else 0
        return min_cost, start_idx, end_idx

    def frame_to_time(self, frame_idx):
        """
        Convert frame index to time in seconds.
        """
        return frame_idx * self.hop_length / self.sample_rate

    def detect_keywords(self, speech_file, keywords, threshold=40.0):
        """
        Detect keywords in a continuous speech file using Subsequence DTW.
        """
        speech_path = os.path.join(self.speech_feature_dir, speech_file)
        try:
            speech_mfcc = np.load(speech_path)
            print(f"Loaded speech MFCC: {speech_file}, Shape: {speech_mfcc.shape}")
        except FileNotFoundError:
            print(f"Error: Speech file {speech_path} not found.")
            return None

        if speech_mfcc.shape[0] != 39:
            print(f"Error: Unexpected shape for {speech_file}: {speech_mfcc.shape}. Expected (39, N).")
            return None

        annotation_path = os.path.join(self.annotation_dir, speech_file.replace('.npy', '.json'))
        try:
            with open(annotation_path, 'r', encoding='utf-8') as f:
                annotation = json.load(f)
            duration = annotation.get('duration', 0.0)
            text = annotation.get('text', '')
            print(f"Annotation loaded: {text}, Duration: {duration}")
        except FileNotFoundError:
            print(f"Warning: Annotation file {annotation_path} not found. Using default values.")
            duration = speech_mfcc.shape[1] * self.hop_length / self.sample_rate
            text = 'Unknown'

        # Filter keywords based on sentence text
        allowed_keywords = []
        for keyword in keywords:
            persian_keyword = self.keyword_map.get(keyword, keyword)
            if persian_keyword in text:
                allowed_keywords.append(keyword)
        print(f"Allowed keywords for {speech_file}: {allowed_keywords}")

        detections = {}
        for keyword in allowed_keywords:
            keyword_files = [f for f in os.listdir(self.keyword_feature_dir) 
                           if f.startswith(keyword + '_') and f.endswith('.npy')]
            if not keyword_files:
                print(f"Warning: No feature files found for keyword {keyword}.")
                continue

            best_distance = float('inf')
            best_detection = None
            for keyword_file in keyword_files:
                keyword_path = os.path.join(self.keyword_feature_dir, keyword_file)
                try:
                    keyword_mfcc = np.load(keyword_path)
                    print(f"Loaded keyword MFCC: {keyword_file}, Shape: {keyword_mfcc.shape}")
                except FileNotFoundError:
                    print(f"Error: Keyword file {keyword_path} not found.")
                    continue

                if keyword_mfcc.shape != (39, self.target_length):
                    print(f"Error: Unexpected shape for {keyword_file}: {keyword_mfcc.shape}. Expected (39, {self.target_length}).")
                    continue

                distance, start_idx, end_idx = self.compute_subsequence_dtw(keyword_mfcc, speech_mfcc)
                start_time = self.frame_to_time(start_idx)
                end_time = self.frame_to_time(end_idx)
                print(f"Keyword: {keyword_file}, DTW Distance: {distance:.2f}, Start Time: {start_time:.2f}s, End Time: {end_time:.2f}s")

                if distance < threshold and distance < best_distance:
                    best_distance = distance
                    best_detection = {
                        'word': self.keyword_map.get(keyword, keyword),
                        'start_time': round(start_time, 2),
                        'end_time': round(end_time, 2)
                    }

            if best_detection:
                detections[keyword] = best_detection

        # Convert detections to list and sort by start time
        output_detections = [det for det in detections.values() if det]
        output_detections.sort(key=lambda x: x['start_time'])

        output = {
            'filename': speech_file.replace('.npy', '.wav'),
            'text': text,
            'duration': round(duration, 6),
            'keywords': output_detections
        }

        output_path = os.path.join(self.output_dir, speech_file.replace('.npy', '.json'))
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(output, f, ensure_ascii=False, indent=4)
        print(f"✅ Saved detections to {output_path}")
        return output

def process_all_speech_files():
    """
    Process all speech files in the speech feature directory.
    """
    detector = SubsequenceDTWKeywordDetector()
    keywords = ['Salam', 'Bale', 'Kheir', 'Motshakeram', 'BesiarAali', 'Khodahafez']  # Added 'Khodahafez'
    print(f"Keywords to search: {keywords}")
    
    for speech_file in os.listdir(detector.speech_feature_dir):
        if speech_file.endswith('.npy'):
            print(f"\n🔍 Processing speech file: {speech_file}")
            result = detector.detect_keywords(speech_file, keywords, threshold=40.0)
            if result:
                print(f"Detections: {result['keywords']}")

if __name__ == '__main__':
    process_all_speech_files()


Keywords to search: ['Salam', 'Bale', 'Kheir', 'Motshakeram', 'BesiarAali', 'Khodahafez']

🔍 Processing speech file: asre_be_kheir_salam.npy
Loaded speech MFCC: asre_be_kheir_salam.npy, Shape: (39, 50)
Annotation loaded:  عصر به خیر سلام , Duration: 2.5997708333333334
Allowed keywords for asre_be_kheir_salam.npy: ['Salam', 'Kheir']
Loaded keyword MFCC: Salam_000001.npy, Shape: (39, 50)
Keyword: Salam_000001.npy, DTW Distance: 43.90, Start Time: 1.12s, End Time: 1.60s
Loaded keyword MFCC: Salam_000002.npy, Shape: (39, 50)
Keyword: Salam_000002.npy, DTW Distance: 50.79, Start Time: 1.12s, End Time: 1.60s
Loaded keyword MFCC: Salam_000003.npy, Shape: (39, 50)
Keyword: Salam_000003.npy, DTW Distance: 46.65, Start Time: 1.12s, End Time: 1.60s
Loaded keyword MFCC: Salam_000004.npy, Shape: (39, 50)
Keyword: Salam_000004.npy, DTW Distance: 49.71, Start Time: 1.12s, End Time: 1.60s
Loaded keyword MFCC: Salam_000005.npy, Shape: (39, 50)
Keyword: Salam_000005.npy, DTW Distance: 45.31, Start Time:

### GUI