# Esfandiar Kiani (40311614) - DSP - HM07

## Imports

In [1]:
import os
import sounddevice as sd
import librosa
import numpy as np
import pandas as pd
from scipy.spatial.distance import cdist
from dtw import dtw
from transformers import Wav2Vec2Processor, Wav2Vec2Model, HubertModel
import torch
import torchaudio
from torchaudio.pipelines import WAV2VEC2_BASE
from torchaudio.pipelines import HUBERT_BASE


Importing the dtw module. When using in academic works please cite:
  T. Giorgino. Computing and Visualizing Dynamic Time Warping Alignments in R: The dtw Package.
  J. Stat. Soft., doi:10.18637/jss.v031.i07.



## Paths

In [2]:
TRAIN_PATH = "D:/M.A/T1/DSP/Assignments/HM07/Dataset/Train"
TEST_PATH  = "D:/M.A/T1/DSP/Assignments/HM07/Dataset/Test"

## Feature extraction function

In [4]:
def extract_features(filepath, method):
    y, sr = librosa.load(filepath, sr=16000)
    
    if method == 'cepstrum':
        spectrum = np.fft.fft(y)
        log_spectrum = np.log(np.abs(spectrum) + 1e-10)
        cepstrum = np.fft.ifft(log_spectrum).real[:12]
        return cepstrum
        
    elif method == 'mfcc':
        mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=12)
        return np.mean(mfccs, axis=1)
        
    elif method == 'mfcc_energy':
        mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=12)
        energy = np.mean(librosa.feature.rms(y=y))
        return np.append(np.mean(mfccs, axis=1), energy)
        
    elif method == 'mfcc_delta':
        mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=12)
        delta = librosa.feature.delta(mfccs)
        energy = np.mean(librosa.feature.rms(y=y))
        return np.append(np.append(np.mean(mfccs, axis=1), energy), np.mean(delta, axis=1))
        
    elif method == 'mfcc_delta2':
        mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=12)
        delta = librosa.feature.delta(mfccs)
        delta2 = librosa.feature.delta(mfccs, order=2)
        energy = np.mean(librosa.feature.rms(y=y))
        return np.concatenate([np.mean(mfccs, axis=1), [energy], np.mean(delta, axis=1), np.mean(delta2, axis=1)])
        
    elif method == 'wav2vec2':
        processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base")
        model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base")
        inputs = processor(y, sampling_rate=sr, return_tensors="pt", padding=True)
        outputs = model(**inputs)
        return outputs.last_hidden_state.mean(dim=1).detach().numpy().flatten()
        
    elif method == "hubert":
        bundle = HUBERT_BASE
        model = bundle.get_model()

        waveform = torchaudio.functional.resample(y, sr, bundle.sample_rate)

        with torch.inference_mode():
            features, _ = model(waveform)

        return features.mean(dim=1).detach().numpy().flatten()

    else:
        return None


## Methods & Result saving

In [5]:
methods = ['cepstrum', 'mfcc', 'mfcc_energy', 'mfcc_delta', 'mfcc_delta2', 'wav2vec2', 'hubert']
results = []

## Train and eval

In [8]:
for method in methods:
    reference_features = []
    test_features = []
    
    for i in range(10):
        
        ref_feat = extract_features(f"{TRAIN_PATH}/{i}.wav", method)
        test_feat = extract_features(f"{TEST_PATH}/{i}.wav", method)
        reference_features.append(ref_feat)
        test_features.append(test_feat)
    
    correct = 0
    
    for i, test in enumerate(test_features):
        distances = [dtw(test, ref).distance for ref in reference_features]
        if np.argmin(distances) == i:
            correct += 1
            
    accuracy = correct / 10 * 100
    
    results.append((method, accuracy))


preprocessor_config.json:   0%|          | 0.00/159 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/163 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.84k [00:00<?, ?B/s]



vocab.json:   0%|          | 0.00/291 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/85.0 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/380M [00:00<?, ?B/s]

preprocessor_config.json:   0%|          | 0.00/213 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.39k [00:00<?, ?B/s]



OSError: Can't load tokenizer for 'facebook/hubert-base-ls960'. If you were trying to load it from 'https://huggingface.co/models', make sure you don't have a local directory with the same name. Otherwise, make sure 'facebook/hubert-base-ls960' is the correct path to a directory containing all relevant files for a Wav2Vec2CTCTokenizer tokenizer.

In [8]:
def extract_features2(filepath, method):
    y, sr = librosa.load(filepath, sr=16000)
    if method == "hubert":
        bundle = HUBERT_BASE
        model = bundle.get_model()

        waveform = torchaudio.functional.resample(y, sr, bundle.sample_rate)

        with torch.inference_mode():
            features, _ = model(waveform)

        return features.mean(dim=1).detach().numpy().flatten()

In [None]:
reference_features = []
test_features = []
method = 'hubert'

for i in range(10):
    
    ref_feat  = extract_features2(f"{TRAIN_PATH}/{i}.wav", method)
    test_feat = extract_features2(f"{TEST_PATH}/{i}.wav", method)
    reference_features.append(ref_feat)
    test_features.append(test_feat)

correct = 0

for i, test in enumerate(test_features):
    distances = [dtw(test, ref).distance for ref in reference_features]
    if np.argmin(distances) == i:
        correct += 1

accuracy = correct / 10 * 100

results.append((method, accuracy))

## Result

In [None]:
df = pd.DataFrame(results, columns=["Method", "Accuracy"])
df.to_csv("recognition_results.csv", index=False)
print(df)