In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split
import re
import torch
from torch.utils.data import Dataset, DataLoader, Subset
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC
from tqdm.auto import tqdm
import torchaudio
from dataclasses import dataclass, field
import evaluate
import os
import soundfile as sf
import numpy as np
import torchaudio

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
ROOT_DIR = '../data'
FINTUNED_MODEL = '../models/wav2vec2-large-960h-cv'

os.environ['CUDA_VISIBLE_DEVICES'] = '2'
device = 'cuda' if torch.cuda.is_available() else 'cpu'
cv_model = Wav2Vec2ForCTC.from_pretrained(FINTUNED_MODEL).to(device)
cv_processor = Wav2Vec2Processor.from_pretrained(FINTUNED_MODEL)

  WeightNorm.apply(module, name, dim)


In [3]:
DEV_INDICES = pd.read_csv(f'{ROOT_DIR}/cv-valid-dev.csv')
DEV_DATA_BASE = f"{ROOT_DIR}/cv-valid-dev"

In [25]:
def transcribe(file, eval_model, processor):
    # try:
    # Load audio file with soundfile
    audio_array, sample_rate = sf.read(file)
    
    # Convert to mono if stereo
    if len(audio_array.shape) > 1:
        audio_array = audio_array.mean(axis=1)
    
    # Resample to 16kHz if needed
    if sample_rate != 16000:
        resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)
        audio_array = torch.from_numpy(audio_array).float()
        audio_array = resampler(audio_array).numpy()
        sample_rate = 16000
    
    # Get duration
    
    # Normalize audio array
    audio_array = audio_array / np.max(np.abs(audio_array))
    
        # Process audio with Wav2Vec2
    input_values = processor(
        audio_array, 
        return_tensors="pt", 
        padding="longest",
        sampling_rate=sample_rate
    ).input_values.to(device)

    # Get model predictions
    with torch.no_grad():
        logits = eval_model(input_values).logits
        predicted_ids = torch.argmax(logits, dim=-1)
        transcription = processor.batch_decode(predicted_ids)[0]


    return transcription


In [26]:
def detect_hot_words(transcription, hot_words):
    """
    Detect hot words in the transcription.
    """
    detected = []
    for word in hot_words:
        if re.search(r'\b' + re.escape(word) + r'\b', transcription, re.IGNORECASE):
            detected.append(word)
    return detected

def output_hot_word_lst(df_with_transcription, hot_words, saving_path='./detected.txt'):
    """
    Output the detected hot words to a file.
    """
    file_name_lst = []
    for i in tqdm(range(len(df_with_transcription))):
        transcription = df_with_transcription.iloc[i]['transcription']
        detected = detect_hot_words(transcription, hot_words)
        if detected:
            file_name_lst.append(df_with_transcription.iloc[i]['filename'])
    with open(saving_path, 'w') as f:
        for file_name in file_name_lst:
            f.write(file_name + '\n')

In [27]:
predictions = []
for i in tqdm(range(len(DEV_INDICES))):
    row = DEV_INDICES.iloc[i]
    file_path = f"{DEV_DATA_BASE}/{row['filename']}"
    
    if not os.path.exists(file_path):
        print(f"File {file_path} does not exist.")
        continue
    
    transcription = transcribe(file_path, cv_model, cv_processor)
    
    predictions.append(transcription)

DEV_INDICES['transcription'] = predictions

  audio_array = audio_array / np.max(np.abs(audio_array))
100%|██████████| 4076/4076 [01:40<00:00, 40.38it/s]


In [7]:
DEV_INDICES.to_csv('./inference_results.csv', index=False)

In [8]:
HOT_WORDS = ["be careful", "destroy",  "stranger"]

In [9]:
output_hot_word_lst(DEV_INDICES, HOT_WORDS, saving_path='./detected.txt')

100%|██████████| 4076/4076 [00:00<00:00, 26456.43it/s]
