In [10]:
import os
import pandas as pd
import numpy as np
import librosa
import numpy as np
import librosa
import soundfile as sf
from typing import Union, Optional, Tuple
import torch
from sklearn.model_selection import train_test_split
from random import choices
import pickle
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
from torch.utils.data import SubsetRandomSampler
from transformers import WhisperModel, WhisperFeatureExtractor
from transformers import WhisperProcessor, WhisperModel
import tqdm
import gc

In [11]:
class AudioPreprocessor:
    def __init__(
        self, 
        target_sr: int = 16000, 
        normalize: bool = True,
        trim_silence: bool = True,
        max_duration: Optional[float] = None,
        mono: bool = True
    ):
        self.target_sr = target_sr
        self.normalize = normalize
        self.trim_silence = trim_silence
        self.max_duration = max_duration
        self.mono = mono
        
        
    def process(self, audio_path: Union[str, np.ndarray]) -> np.ndarray:
        if isinstance(audio_path, str):
            audio, orig_sr = librosa.load(audio_path, sr=None, mono=False)
            if self.mono and audio.ndim > 1:
                audio = librosa.to_mono(audio)
        elif isinstance(audio_path, np.ndarray):
            audio, orig_sr = audio_path, self.target_sr
        else:
            raise TypeError("Input must be a file path or numpy array")

        if orig_sr != self.target_sr:
            audio = librosa.resample(audio, orig_sr=orig_sr, target_sr=self.target_sr)
            
        peak_normalization = audio / np.max(np.abs(audio))
        rms_normalization = audio / np.sqrt(np.mean(audio**2))
        audio = peak_normalization * 0.5 + rms_normalization * 0.5
        
        if self.trim_silence:
            audio, _= librosa.effects.trim(audio)
        if self.max_duration is not None:
            max_length = int(self.max_duration * self.target_sr)
            audio = audio[:max_length]
        audio = np.array(audio, dtype=np.float16)
        
        if len(audio) < self.target_sr:
            pad_length = self.target_sr - len(audio)
            audio = np.pad(audio, (0, pad_length), mode='constant')
        else:
            audio = audio[:self.target_sr]
            
        return audio
    
    

    def save_processed_audio(
        self, 
        audio: np.ndarray, 
        output_path: str, 
        format: str = 'wav'
    ):
        sf.write(output_path, audio, self.target_sr, format=format)


In [None]:
class AudioFolderProcessor():
    
    def __init__(self):
        self.preprocessor = AudioPreprocessor()
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
    
    def process_audio_dataset_train(self, base_dir='/Users/anshsingh200516/Desktop/GUI/TRAIN'):
        
        audio_data = []
        
        if not os.path.exists(base_dir):
            raise FileNotFoundError(f"Base directory {base_dir} not found")
        
        for language in os.listdir(base_dir):
            print(f"Current Language: {language}")
            language_path = os.path.join(base_dir, language)
            
            if not os.path.isdir(language_path):
                continue
            
            for keyword in os.listdir(language_path):
                keyword_path = os.path.join(language_path, keyword)
                
                if not os.path.isdir(keyword_path):
                    continue
                for audio_file in os.listdir(keyword_path):
                    if audio_file.lower().endswith(('.wav', '.opus')):
                        audio_path = os.path.join(keyword_path, audio_file)
                        audio_array = self.preprocessor.process(audio_path)
                        audio_data.append({
                            'language': language,
                            'keyword': keyword,
                            'audio_array': audio_array
                        })
        

        df = pd.DataFrame(audio_data)
        return df, 0
    
    def process_audio_dataset_query(self, base_dir='TEST_DUMMY'):
        audio_data = []
        
        if not os.path.exists(base_dir):
            raise FileNotFoundError(f"Base directory {base_dir} not found")
        
        for audio_file in os.listdir(base_dir):
            if audio_file.lower().endswith(('.wav', '.opus')):
                audio_path = os.path.join(base_dir, audio_file)
                audio_array = self.preprocessor.process(audio_path)
                audio_data.append({
                    'filename': audio_file,
                    'filepath': audio_path,
                    'file_extension': os.path.splitext(audio_file)[1],
                    'file_size': os.path.getsize(audio_path),
                    'audio_array': audio_array,
                    'audio_duration': librosa.get_duration(y=audio_array, sr=16000),
                })
        
        df = pd.DataFrame(audio_data)
        stats = {
            'total_audio_files': len(df),
            'total_duration_hours': df['audio_duration'].sum() / 3600,
            'average_duration_seconds': df['audio_duration'].mean(),
            'min_duration_seconds': df['audio_duration'].min(),
            'max_duration_seconds': df['audio_duration'].max(),
        }
        
        return df, stats
    
    def process_audio_dataset(self, base_dir='/Users/anshsingh200516/Desktop/GUI/TRAIN', query=False):
        if not query:
            return self.process_audio_dataset_train(base_dir)
        
        if query:
            return self.process_audio_dataset_query(base_dir)
    
    
class AudioDataset():
    def __init__(self, TrainDir = 'TRAIN/TRAIN', QueryDir = 'TEST_DUMMY_CORRECTED/TEST_DUMMY_CORRECTED', KW_TO_ID = r'kw_to_id.pkl'):
        print('Initializing AudioDataset ...')
        self.audio_processor = AudioFolderProcessor()
        self.KeywordDir = TrainDir
        self.QueryDir = QueryDir
        self.KeyWordData, self.KeywordStats = self.audio_processor.process_audio_dataset(base_dir=self.KeywordDir, query=False)
        print('KeyWord Data Processed ...')
        try:
            self.QueryData, self.QueryStats = self.audio_processor.process_audio_dataset(base_dir=self.QueryDir, query=True)
            print('Query Data Processed ...')
        except KeyError:
            print('Query Data Not Found ...')
        self.KW_TO_ID = pickle.load(open(KW_TO_ID, 'rb'))
        self.ID_TO_KW = {v: k for k, v in self.KW_TO_ID.items()}
        
        
    class KeywordDataset(Dataset):
        def __init__(self, data, KW_TO_ID):
            super().__init__()
            self.data = data
            self.KW_TO_ID = KW_TO_ID
            

        
        def __len__(self):
            return len(self.data)
        
        def __getitem__(self, idx):
                
                
            array = self.data['audio_array'][idx]
            id = int((self.KW_TO_ID[self.data['keyword'][idx]]))
            one_hot_id = torch.nn.functional.one_hot(torch.tensor(id), num_classes=441).float()
            return (array, one_hot_id)
        
    class QueryDataset(Dataset):
        def __init__(self, data):
            super().__init__()
            self.data = data
        
        def __len__(self):
            return len(self.data)
        
        def __getitem__(self, idx):
            return None
        
    def get_keyword_dataset(self):
        return (self.KeywordDataset(self.KeyWordData, self.KW_TO_ID))
    
    def get_query_dataset(self):
        return (self.QueryDataset(self.QueryData, self.KW_TO_ID))
    
    def get_keyword_dataloader(self):
        
        def train_test_val_split(dataset, test_size=0.2, val_size=0.1):
            train_idx, test_idx = train_test_split(range(len(dataset)), test_size=test_size)
            train_idx, val_idx = train_test_split(train_idx, test_size=val_size / (1 - test_size))
            return train_idx, val_idx, test_idx

        train_idx, val_idx, test_idx = train_test_val_split(self.KeywordDataset(self.KeyWordData, self.KW_TO_ID))

        train_sampler = SubsetRandomSampler(train_idx)
        val_sampler = SubsetRandomSampler(val_idx)
        test_sampler = SubsetRandomSampler(test_idx)

        train_loader = DataLoader(self.KeywordDataset(self.KeyWordData, self.KW_TO_ID), batch_size=32, sampler=train_sampler)
        val_loader = DataLoader(self.KeywordDataset(self.KeyWordData, self.KW_TO_ID), batch_size=32, sampler=val_sampler)
        test_loader = DataLoader(self.KeywordDataset(self.KeyWordData, self.KW_TO_ID), batch_size=32, sampler=test_sampler)

        return train_loader, val_loader, test_loader
    
    def get_query_dataloader(self):
        DS = self.QueryDataset(self.QueryData)
        return DataLoader(DS, batch_size=32, shuffle=True)
    
ds = AudioDataset()
KWDS = ds.get_keyword_dataset()

Initializing AudioDataset ...
Current Language: ar
Current Language: as
Current Language: cs
Current Language: dv
Current Language: fa
Current Language: fr
Current Language: ru
Current Language: ta
Current Language: tr
Current Language: zh-CN
KeyWord Data Processed ...
Query Data Processed ...


In [14]:
train_loader, val_loader, test_loader = ds.get_keyword_dataloader()

In [16]:
class AudioPreprocessor:
    def __init__(
        self, 
        target_sr: int = 16000, 
        normalize: bool = True,
        trim_silence: bool = True,
        max_duration: Optional[float] = None,
        mono: bool = True
    ):
        self.target_sr = target_sr
        self.normalize = normalize
        self.trim_silence = trim_silence
        self.max_duration = max_duration
        self.mono = mono
        
        
    def process(self, audio_path: Union[str, np.ndarray]) -> np.ndarray:
        if isinstance(audio_path, str):
            audio, orig_sr = librosa.load(audio_path, sr=None, mono=False)
            if self.mono and audio.ndim > 1:
                audio = librosa.to_mono(audio)
        elif isinstance(audio_path, np.ndarray):
            audio, orig_sr = audio_path, self.target_sr
        else:
            raise TypeError("Input must be a file path or numpy array")

        if orig_sr != self.target_sr:
            audio = librosa.resample(audio, orig_sr=orig_sr, target_sr=self.target_sr)
            
        peak_normalization = audio / np.max(np.abs(audio))
        rms_normalization = audio / np.sqrt(np.mean(audio**2))
        audio = peak_normalization * 0.5 + rms_normalization * 0.5
        
        if self.trim_silence:
            audio, _= librosa.effects.trim(audio)
        if self.max_duration is not None:
            max_length = int(self.max_duration * self.target_sr)
            audio = audio[:max_length]
        audio = np.array(audio, dtype=np.float16)
        
        if len(audio) < self.target_sr:
            pad_length = self.target_sr - len(audio)
            audio = np.pad(audio, (0, pad_length), mode='constant')
        else:
            audio = audio[:self.target_sr]
            
        return audio
    
    

    def save_processed_audio(
        self, 
        audio: np.ndarray, 
        output_path: str, 
        format: str = 'wav'
    ):
        sf.write(output_path, audio, self.target_sr, format=format)


class KeywordDataset(Dataset):
        def __init__(self, data, KW_TO_ID):
            super().__init__()
            self.data = data
            self.KW_TO_ID = KW_TO_ID
        
        def __len__(self):
            return len(self.data)
        
        def __getitem__(self, idx):
            array = self.data['audio_array'][idx]
            id = int((self.KW_TO_ID[self.data['keyword'][idx]]))
            one_hot_id = torch.nn.functional.one_hot(torch.tensor(id), num_classes=441).float()
            return (array, one_hot_id)
        
KW_TO_ID = pickle.load(open(r'kw_to_id.pkl', 'rb'))
KeyWordData = ds.KeyWordData
def train_test_val_split(dataset, test_size=0.2, val_size=0.1):
    train_idx, test_idx = train_test_split(range(len(dataset)), test_size=test_size)
    train_idx, val_idx = train_test_split(train_idx, test_size=val_size / (1 - test_size))
    return train_idx, val_idx, test_idx

train_idx, val_idx, test_idx = train_test_val_split(KeywordDataset(KeyWordData, KW_TO_ID))

train_sampler = SubsetRandomSampler(train_idx)
val_sampler = SubsetRandomSampler(val_idx)
test_sampler = SubsetRandomSampler(test_idx)

train_loader = DataLoader(KeywordDataset(KeyWordData, KW_TO_ID), batch_size=32, sampler=train_sampler)
val_loader = DataLoader(KeywordDataset(KeyWordData, KW_TO_ID), batch_size=32, sampler=val_sampler)
test_loader = DataLoader(KeywordDataset(KeyWordData, KW_TO_ID), batch_size=32, sampler=test_sampler)

In [17]:
sample = next(iter(train_loader))

In [19]:
sample[0].shape, sample[1].shape

(torch.Size([32, 16000]), torch.Size([32, 441]))

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

class EmbeddingClassifier(nn.Module):
    def __init__(self, input_size=25600, num_classes=441):
        super().__init__()
        self.classifier = nn.Sequential(
            nn.Linear(input_size, 2048),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(2048, 1024),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(1024, num_classes)
        )
        
        
    
    def forward(self, _, embeddings, __):
        return self.classifier(embeddings)

def train_model(model, train_loader, val_loader, criterion, optimizer, device, epochs=50):
    model.to(device)
    best_val_loss = float('inf')
    
    processor = WhisperProcessor.from_pretrained("openai/whisper-base")
    modelWhisper = WhisperModel.from_pretrained("openai/whisper-base").encoder.to(device)
        
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        
        for array, labels in train_loader:
            with torch.no_grad():
                inputs = processor(array.cpu().numpy(), sampling_rate=16000, return_tensors="pt")
                inputs = {key: value.to(device) for key, value in inputs.items()}
                encoder_outputs = modelWhisper(inputs['input_features'])
                embeddings = encoder_outputs.last_hidden_state.cpu().numpy()[:,:50,:]
                embeddings = torch.tensor(embeddings).to(device)
                embeddings = embeddings.view(embeddings.size(0), -1)
            labels = labels.to(device)
            optimizer.zero_grad()
            outputs = model(None, embeddings, None)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
        
        # Validation
        model.eval()
        val_loss = 0
        correct = 0
        total = 0
        
        with torch.no_grad():
            for array, labels in val_loader:
                inputs = processor(array.cpu().numpy(), sampling_rate=16000, return_tensors="pt")
                inputs = {key: value.to(device) for key, value in inputs.items()}
                encoder_outputs = modelWhisper(inputs['input_features'])
                embeddings = encoder_outputs.last_hidden_state.cpu().numpy()[:,:50,:]
                embeddings = torch.tensor(embeddings).to(device)
                embeddings = embeddings.view(embeddings.size(0), -1)
            
                labels = labels.to(device)
                outputs = model(None, embeddings, None)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                
                _, predicted = torch.max(outputs, 1)
                _, actual = torch.max(labels, 1)
                total += labels.size(0)
                correct += (predicted == actual).sum().item()
        
        print(f'Epoch {epoch+1}: Train Loss {train_loss/len(train_loader):.4f}, '
              f'Val Loss {val_loss/len(val_loader):.4f}, '
              f'Val Accuracy {100 * correct/total:.2f}%')
        
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_model.pth')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = EmbeddingClassifier()

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=5e-4)

train_model(model, train_loader, val_loader, criterion, optimizer, device)



Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


Epoch 1: Train Loss 2.6795, Val Loss 1.9553, Val Accuracy 68.37%
Epoch 2: Train Loss 2.0712, Val Loss 1.6713, Val Accuracy 73.81%
Epoch 3: Train Loss 1.7700, Val Loss 1.3301, Val Accuracy 78.04%
Epoch 4: Train Loss 1.6080, Val Loss 1.2417, Val Accuracy 79.00%
Epoch 5: Train Loss 1.5050, Val Loss 1.0812, Val Accuracy 81.57%
Epoch 6: Train Loss 1.4270, Val Loss 1.0712, Val Accuracy 81.11%
Epoch 7: Train Loss 1.3372, Val Loss 0.9345, Val Accuracy 82.98%
Epoch 8: Train Loss 1.2765, Val Loss 0.9137, Val Accuracy 85.06%
Epoch 9: Train Loss 1.2366, Val Loss 0.9569, Val Accuracy 83.15%


In [None]:
def test_model(model, test_loader, device):
    model.load_state_dict(torch.load('best_model.pth'))
    model.to(device).eval()
    processor = WhisperProcessor.from_pretrained("openai/whisper-base")
    modelWhisper = WhisperModel.from_pretrained("openai/whisper-base").encoder.to(device)
    
    correct = 0
    total = 0
    
    with torch.no_grad():
            for array, labels in val_loader:
                inputs = processor(array.cpu().numpy(), sampling_rate=16000, return_tensors="pt")
                inputs = {key: value.to(device) for key, value in inputs.items()}
                encoder_outputs = modelWhisper(inputs['input_features'])
                embeddings = encoder_outputs.last_hidden_state.cpu().numpy()[:,:50,:]
                embeddings = torch.tensor(embeddings).to(device)
                embeddings = embeddings.view(embeddings.size(0), -1)
                labels = labels.to(device)
            outputs = model(None, embeddings, None)
            
            _, predicted = torch.max(outputs, 1)
            _, actual = torch.max(labels, 1)
            total += labels.size(0)
            correct += (predicted == actual).sum().item()
    
    print(f'Test Accuracy: {100 * correct/total:.2f}%')
    
test_model(model, test_loader, device)

In [1]:
import torch.nn as nn
import librosa
import numpy as np
import soundfile as sf
from typing import Union, Optional, Tuple
import torch
from transformers import WhisperModel, WhisperFeatureExtractor
from transformers import WhisperProcessor, WhisperModel

class AudioPreprocessor:
    def __init__(
        self, 
        target_sr: int = 16000, 
        normalize: bool = True,
        trim_silence: bool = True,
        max_duration: Optional[float] = None,
        mono: bool = True
    ):
        self.target_sr = target_sr
        self.normalize = normalize
        self.trim_silence = trim_silence
        self.max_duration = max_duration
        self.mono = mono
        
        
    def process(self, audio_path: Union[str, np.ndarray]) -> np.ndarray:
        if isinstance(audio_path, str):
            audio, orig_sr = librosa.load(audio_path, sr=None, mono=False)
            if self.mono and audio.ndim > 1:
                audio = librosa.to_mono(audio)
        elif isinstance(audio_path, np.ndarray):
            audio, orig_sr = audio_path, self.target_sr
        else:
            raise TypeError("Input must be a file path or numpy array")

        if orig_sr != self.target_sr:
            audio = librosa.resample(audio, orig_sr=orig_sr, target_sr=self.target_sr)
            
        peak_normalization = audio / np.max(np.abs(audio))
        rms_normalization = audio / np.sqrt(np.mean(audio**2))
        audio = peak_normalization * 0.5 + rms_normalization * 0.5
        
        if self.trim_silence:
            audio, _= librosa.effects.trim(audio)
        if self.max_duration is not None:
            max_length = int(self.max_duration * self.target_sr)
            audio = audio[:max_length]
        audio = np.array(audio, dtype=np.float16)
        
        if len(audio) < self.target_sr:
            pad_length = self.target_sr - len(audio)
            audio = np.pad(audio, (0, pad_length), mode='constant')
        else:
            audio = audio[:self.target_sr]
            
        return audio
    

    def save_processed_audio(
        self, 
        audio: np.ndarray, 
        output_path: str, 
        format: str = 'wav'
    ):
        sf.write(output_path, audio, self.target_sr, format=format)

class EmbeddingClassifier(nn.Module):
    def __init__(self, input_size=25600, num_classes=441):
        super().__init__()
        self.classifier = nn.Sequential(
            nn.Linear(input_size, 2048),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(2048, 1024),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(1024, num_classes)
        )
    
    def forward(self, _, embeddings, __):
        return self.classifier(embeddings)

def inference(model, audio_path, device):
    model.load_state_dict(torch.load('best_model_cpu.pth'))
    model.to(device).eval()
    processor = WhisperProcessor.from_pretrained("openai/whisper-base")
    modelWhisper = WhisperModel.from_pretrained("openai/whisper-base").encoder.to(device)
    
    with torch.no_grad():
        audio_processor = AudioPreprocessor()
        audio_array = audio_processor.process(audio_path)
        inputs = processor(audio_array, sampling_rate=16000, return_tensors="pt")
        inputs = {key: value.to(device) for key, value in inputs.items()}
        encoder_outputs = modelWhisper(inputs['input_features'])
        embeddings = encoder_outputs.last_hidden_state.cpu().numpy()[:,:50,:]
        embeddings = torch.tensor(embeddings).to(device)
        embeddings = embeddings.view(embeddings.size(0), -1)
        outputs = model(None, embeddings, None)
    
    _, predicted = torch.max(outputs, 1)
    return predicted.item()

device = 'cpu'
model = EmbeddingClassifier()
audio_path = 'TEST_DUMMY_FINAL/1.wav'
inference(model, audio_path, device)

  from .autonotebook import tqdm as notebook_tqdm
  model.load_state_dict(torch.load('best_model_cpu.pth'))


357