In [None]:
pip install git+https://github.com/openai/whisper.git

In [None]:
import torch
import whisper
import os
import numpy as np
import pandas as pd
from transformers import BertTokenizer, BertModel, BertForSequenceClassification, AdamW
from torch.utils.data import DataLoader, TensorDataset, random_split
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from tqdm import tqdm_notebook as tqdm

import warnings
warnings.filterwarnings('ignore')

In [None]:
# Whisper Setup (Simple Whisper Library)
whisper_model_name = "base"
whisper_model = whisper.load_model(whisper_model_name).to("cuda")

# BERT Setup
bert_model_name = "bert-base-uncased"
bert_tokenizer = BertTokenizer.from_pretrained(bert_model_name)
bert_model = BertModel.from_pretrained(bert_model_name).to("cuda")


In [None]:
mlp_head = torch.nn.Sequential(
    torch.nn.Linear(768, 256),
    torch.nn.ReLU(),
    torch.nn.Linear(256, 1),
    torch.nn.Sigmoid()
).to("cuda")

In [None]:
def transcribe_audio(audio_file_path):
    """Transcribes audio using Whisper (Simple Whisper Library)."""
    try:
        result = whisper_model.transcribe(audio_file_path)
        return result["text"]
    except Exception as e:
        print(f"Error transcribing {audio_file_path}: {e}")
        return None

In [None]:
def get_bert_embeddings(texts, max_length=128, batch_size=32):
    """Generates BERT embeddings in batches.

    Args:
        texts (list of str): List of text strings to generate embeddings for.
        bert_model (BertModel): The pre-trained BertModel.
        bert_tokenizer (BertTokenizer): The BertTokenizer.
        max_length (int): Maximum sequence length.
        batch_size (int): Batch size.

    Returns:
        torch.Tensor: Tensor containing the BERT embeddings.
    """
    try:
        inputs = bert_tokenizer(texts, return_tensors="pt", truncation=True, padding="max_length", max_length=max_length)
        dataset = TensorDataset(inputs.input_ids, inputs.attention_mask)
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)

        all_embeddings = []
        bert_model.eval()
        with torch.no_grad():
            for batch in dataloader:
                input_ids_batch, attention_mask_batch = batch
                input_ids_batch = input_ids_batch.to("cuda")
                attention_mask_batch = attention_mask_batch.to("cuda")
                outputs = bert_model(input_ids_batch, attention_mask=attention_mask_batch)
                embeddings = outputs.last_hidden_state[:, 0, :]  # Take the [CLS] token embedding
                all_embeddings.extend(embeddings.cpu().numpy())
        return torch.tensor(all_embeddings).to("cuda")
    except Exception as e:
        print(f"Error processing text: {e}")
        return None

In [None]:
train_audio_dir = "/kaggle/input/shl-intern-hiring-assessment/dataset/audios_train"
train_csv_path = "/kaggle/input/shl-intern-hiring-assessment/dataset/train.csv"
test_audio_dir = "/kaggle/input/shl-intern-hiring-assessment/dataset/audios_test"

In [None]:
train_transcriptions = {}
train_df = pd.read_csv(train_csv_path)

for index, row in tqdm(train_df.iterrows(), total=len(train_df)):
    audio_file_path = os.path.join(train_audio_dir, row['filename'])
    transcription = transcribe_audio(audio_file_path)
    if transcription is not None:
        train_transcriptions[row['filename']] = transcription

In [None]:
all_texts = list(train_transcriptions.values())
all_embeddings = get_bert_embeddings(all_texts)

In [None]:
train_data = []
for i, filename in enumerate(train_transcriptions.keys()):
    if all_embeddings is not None:
        embedding = all_embeddings[i]
        train_data.append((embedding, train_df[train_df['filename'] == filename]['label'].values[0]))

In [None]:
embeddings = torch.stack([item[0] for item in train_data]).to("cuda")
labels = torch.tensor([item[1] for item in train_data]).float().unsqueeze(1).to("cuda")

In [None]:
dataset = TensorDataset(embeddings, labels)

In [None]:
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

In [None]:
train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=8, shuffle=False)

In [None]:
optimizer = torch.optim.Adam(mlp_head.parameters(), lr=1e-3)
criterion = torch.nn.MSELoss()

epochs = 10
for epoch in range(epochs):
    train_loss = 0.0
    val_loss = 0.0

    # Training
    mlp_head.train()
    for embeddings_batch, labels_batch in train_dataloader:
        optimizer.zero_grad()
        outputs = mlp_head(embeddings_batch) * 5 # scale output to 0-5
        loss = criterion(outputs, labels_batch)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    train_loss /= len(train_dataloader)

    # Validation
    mlp_head.eval()
    with torch.no_grad():
        for embeddings_batch, labels_batch in val_dataloader:
            outputs = mlp_head(embeddings_batch) * 5 # scale output to 0-5
            loss = criterion(outputs, labels_batch)
            val_loss += loss.item()
    val_loss /= len(val_dataloader)

    print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")

In [None]:
test_data = []
test_filenames = []

for filename in os.listdir(test_audio_dir):
    if filename.endswith(".wav"):
        audio_file_path = os.path.join(test_audio_dir, filename)
        transcription = transcribe_audio(audio_file_path)
        if transcription is not None:
            embeddings = get_bert_embeddings(transcription)
            if embeddings is not None:
                test_data.append((filename, embeddings))
                test_filenames.append(filename)

In [None]:
test_predictions = []
with torch.no_grad():
    for filename, embeddings in test_data:
        outputs = mlp_head(embeddings) * 5
        test_predictions.append(outputs.item())

test_predictions_clipped = np.clip(test_predictions, 0, 5)

In [None]:
output_df = pd.DataFrame({'filename': test_filenames, 'label': test_predictions_clipped})
output_df.to_csv("submission.csv", index=False)