In [14]:
from datasets import load_dataset
from transformers import AutoProcessor, WhisperModel, AutoTokenizer
import torch
import wave

# Load the Audio Data
path_to_audio = "data/sub/De95Osq7p1c_trimmed_segment_1.wav"

In [2]:
import wave
import numpy as np

def read_wav_file(file_path):
    # Open the WAV file
    with wave.open(file_path, 'rb') as wav_file:
        # Get the number of frames in the file
        n_frames = wav_file.getnframes()
        
        # Read the frame data
        frame_data = wav_file.readframes(n_frames)
        
        # Get the sample width (in bytes)
        sample_width = wav_file.getsampwidth()
        
        # Get the number of channels
        n_channels = wav_file.getnchannels()
        
        # Get the frame rate (samples per second)
        frame_rate = wav_file.getframerate()
        
        # Convert the byte data to a numpy array
        if sample_width == 1:
            # 8-bit audio
            audio_data = np.frombuffer(frame_data, dtype=np.uint8)
        elif sample_width == 2:
            # 16-bit audio
            audio_data = np.frombuffer(frame_data, dtype=np.int16)
        elif sample_width == 4:
            # 32-bit audio
            audio_data = np.frombuffer(frame_data, dtype=np.int32)
        else:
            raise ValueError("Unsupported sample width: {}".format(sample_width))
        
        # Reshape the array based on the number of channels
        if n_channels > 1:
            audio_data = audio_data.reshape(-1, n_channels)
        
        return audio_data, frame_rate

In [3]:
audio_data, frame_rate = read_wav_file(path_to_audio)

print("Audio Data Shape:", audio_data.shape)
print("Frame Rate:", frame_rate)

Audio Data Shape: (54864,)
Frame Rate: 16000


In [4]:
processor = AutoProcessor.from_pretrained("openai/whisper-medium")

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [5]:
inputs = processor(
    audio_data,
    return_tensors="pt",
    return_attention_mask=True,
    sampling_rate=16000     
)

In [6]:
inputs

{'input_features': tensor([[[1.4301, 1.4301, 1.4301,  ..., 1.4301, 1.4301, 1.4301],
         [1.4301, 1.4301, 1.4301,  ..., 1.4301, 1.4301, 1.4301],
         [1.4301, 1.4301, 1.4301,  ..., 1.4301, 1.4301, 1.4301],
         ...,
         [1.4301, 1.4301, 1.4301,  ..., 1.4301, 1.4301, 1.4301],
         [1.4301, 1.4301, 1.4301,  ..., 1.4301, 1.4301, 1.4301],
         [1.4301, 1.4301, 1.4301,  ..., 1.4301, 1.4301, 1.4301]]]), 'attention_mask': tensor([[1, 1, 1,  ..., 0, 0, 0]], dtype=torch.int32)}

In [7]:
# Process the Audio with Whisper Encoder
model = WhisperModel.from_pretrained("openai/whisper-medium")
model.eval()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
inputs = inputs.to(device)

# Ensuring No Gradient Updates
with torch.no_grad():
    encoder_outputs = model.encoder(**inputs, output_hidden_states=True)


In [8]:
# Step 3: Retrieve the Hidden States
hidden_states = encoder_outputs.hidden_states
audio_embeddings = encoder_outputs.last_hidden_state

print("Shape of audio embeddings:", audio_embeddings.shape)

Shape of audio embeddings: torch.Size([1, 1500, 1024])


In [70]:
audio_embeddings

tensor([[[-0.8103,  0.5628,  0.3018,  ...,  1.4972, -1.0344,  0.3765],
         [ 0.0931,  0.2974, -0.1004,  ...,  0.2545, -0.1267, -0.6107],
         [ 0.7071, -0.0789,  0.9369,  ..., -0.7768,  0.7219, -0.7023],
         ...,
         [ 0.1007,  1.0809,  1.3225,  ...,  1.2200, -0.2207,  0.1168],
         [-0.4745,  0.9073,  1.8731,  ...,  1.6187, -0.3987,  0.0856],
         [-0.4203,  1.0367,  2.1674,  ...,  1.8687, -0.3581,  0.1691]]],
       device='cuda:0')

In [9]:
from transformers import WhisperForConditionalGeneration, WhisperProcessor

model_generate = WhisperForConditionalGeneration.from_pretrained("openai/whisper-medium")
model_generate.to(device)

generated = model_generate.generate(inputs = inputs.input_features)
processor.batch_decode(generated)

Due to a bug fix in https://github.com/huggingface/transformers/pull/28687 transcription using a multilingual Whisper will default to language detection followed by transcription instead of translation to English.This might be a breaking change for your use case. If you want to instead always translate your audio to English, make sure to pass `language='en'`.


["<|startoftranscript|><|en|><|transcribe|><|notimestamps|> I'm gonna make a new friend!<|endoftext|>"]

## Mistral

# Experimenting Adaptors

In [None]:
class Adaptor(torch.nn.Module):
    # A simple learnable adaptor
    def __init__(self, output_embedding_size = 1024): # Bloom's Embedding size for each token
        super().__init__()
        self.linear = torch.nn.Linear(1024, output_embedding_size, bias=False)

    def forward(self, x):
        x = self.linear(x)
        return x

In [None]:
class AudioToTextPipeline(nn.Module):
    def __init__(self, whisper_model_name="openai/whisper-medium", bloom_model_name="bigscience/bloom-560m"):
        super(AudioToTextPipeline, self).__init__()
        
        # Load the Whisper model and processor
        self.processor = AutoProcessor.from_pretrained(whisper_model_name)
        self.whisper_model = WhisperModel.from_pretrained(whisper_model_name)
        
        # Freeze Whisper weights
        for param in self.whisper_model.parameters():
            param.requires_grad = False
        
        # Load the Bloom model and tokenizer
        self.bloom_model = BloomForCausalLM.from_pretrained(bloom_model_name)
        self.tokenizer = BloomTokenizerFast.from_pretrained(bloom_model_name)
        
        # Freeze Bloom weights
        for param in self.bloom_model.parameters():
            param.requires_grad = False
        
        # Define the Adaptor
        self.adaptor = torch.nn.Linear(1024, 1024, bias=False)

    def forward(self, audio_file_path):
        # Step 1: Generate audio embeddings
        inputs = self.processor(audio_file_path, return_tensors="pt", sampling_rate=16000)
        with torch.no_grad():
            outputs = self.whisper_model(**inputs)
        audio_embeddings = outputs.last_hidden_state.squeeze(0)  # Assuming batch size of 1

        # Step 2: Transform embeddings using the adaptor
        transformed_embeddings = self.adaptor(audio_embeddings)

        with torch.no_grad():
            outputs = self.bloom_model(inputs_embeds=transformed_embeddings.unsqueeze(0))
        
        # generated_tokens = outputs.logits.argmax(dim=-1)
        generated_text = self.tokenizer.decode(generated_tokens[0], skip_special_tokens=True)
        
        return generated_text

    def training_step(self, audio_file_path, target_text):
        # Forward pass
        inputs = self.processor(audio_file_path, return_tensors="pt", sampling_rate=16000)
        with torch.no_grad():
            outputs = self.whisper_model(**inputs)
        audio_embeddings = outputs.last_hidden_state.squeeze(0)  # Assuming batch size of 1
        
        # Transform embeddings using the adaptor
        transformed_embeddings = self.adaptor(audio_embeddings)
        
        # Prepare input for the Bloom model
        input_ids = torch.zeros((1, transformed_embeddings.size(0)), dtype=torch.long)  # Dummy input IDs
        attention_mask = torch.ones_like(input_ids)
        
        # Generate predictions
        outputs = self.bloom_model(input_ids=input_ids, attention_mask=attention_mask, inputs_embeds=transformed_embeddings.unsqueeze(0))
        logits = outputs.logits

        # Tokenize target text
        target_ids = self.tokenizer(target_text, return_tensors="pt").input_ids.squeeze(0)

        # Calculate loss
        shift_logits = logits[..., :-1, :].contiguous()
        shift_labels = target_ids[..., 1:].contiguous()
        loss_fct = nn.CrossEntropyLoss()
        loss = loss_fct(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1))

        return loss

    def train_model(self, train_data, epochs=1, learning_rate=1e-3):
        # Only optimize the adaptor's parameters
        optimizer = optim.Adam(self.adaptor.parameters(), lr=learning_rate)
        
        for epoch in range(epochs):
            total_loss = 0
            for audio_file_path, target_text in train_data:
                optimizer.zero_grad()
                loss = self.training_step(audio_file_path, target_text)
                loss.backward()
                optimizer.step()
                total_loss += loss.item()
            print(f'Epoch {epoch+1}, Loss: {total_loss/len(train_data)}')