In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        os.path.join(dirname, filename)

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

**Doing translation of a Sample audio .wev file to text**

In [None]:
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
import torch
import torchaudio

In [None]:
device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

Loading basic things required for whisper translation <br>
Reference: HuggingFace, Medium

In [None]:
# Model identifier
model_id = "openai/whisper-large-v3-turbo"

In [None]:
# Load the model and processor
model = AutoModelForSpeechSeq2Seq.from_pretrained(model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True)
model.to(device)
processor = AutoProcessor.from_pretrained(model_id)

In [None]:
# Initialize the pipeline
asr_pipeline = pipeline(
    "automatic-speech-recognition",
    model=model,
    tokenizer=processor.tokenizer,
    feature_extractor=processor.feature_extractor,
    torch_dtype=torch_dtype,
    device=device,
)

In [None]:
# Sample Audio file path (Example)
file_path = "/kaggle/input/shl-intern-hiring-assessment/dataset/audios_train/audio_1024.wav"

In [None]:
# Load audio and preprocess
audio, sr = librosa.load(file_path, sr=16000, mono=True)  # Ensure mono and 16kHz
inputs = processor(audio, sampling_rate=16000, return_tensors="pt").to(device, torch_dtype)

# Generate token ids and decode
with torch.no_grad():
    generated_ids = model.generate(inputs["input_features"])

# Decode the predicted tokens into text
transcription = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]

print("Transcription:", transcription)

The whisper model can handle input audio in one shot is usually 30 seconds max per segment. (Approx)

So we will perform silding window technique so it captures first 30 sec then followed by remaining time so that in total it captures the entire 60 seconds audio and give us text translation

In [None]:
def transcribe_audio(file_path, segment_length=30):
    # Load audio
    waveform, sr = torchaudio.load(file_path)

    # Convert to mono if stereo
    if waveform.size(0) > 1:
        waveform = waveform.mean(dim=0, keepdim=True)

    # Resample to 16kHz if not already
    if sr != 16000:
        resampler = torchaudio.transforms.Resample(orig_freq=sr, new_freq=16000)
        waveform = resampler(waveform)

    # Number of samples in each segment
    segment_samples = segment_length * 16000  # 30 sec chunks

    transcriptions = []

    for start in range(0, waveform.size(1), segment_samples):
        end = min(start + segment_samples, waveform.size(1))
        segment = waveform[:, start:end].squeeze().numpy()

        try:
            result = asr_pipeline(segment)
            transcriptions.append(result['text'])
        except Exception as e:
            print(f"Error transcribing segment {start}-{end}: {e}")
            transcriptions.append("")

    return " ".join(transcriptions)

In [None]:
text = transcribe_audio(file_path)
print(text)

**Now we had done transcription with respect to few example.
Lets do transcription of entire audios_train to their respective translation in text format and map it to their respective audio file in train_csv**

Loading all library

In [None]:
import os
import pandas as pd
import torch
import numpy as np
import librosa
import time
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline

In [None]:
def load_whisper_model():
    """
    Load and initialize the Whisper model for speech-to-text transcription.
    
    Returns:
        pipeline: Hugging Face pipeline for automatic speech recognition

    Reference:
        HuggingFace 
    """
    device = "cuda:0" if torch.cuda.is_available() else "cpu"
    torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
    print(f"Using device: {device}")
    
    model_id = "openai/whisper-large-v3-turbo"
    model = AutoModelForSpeechSeq2Seq.from_pretrained(
        model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True
    )
    model.to(device)
    
    processor = AutoProcessor.from_pretrained(model_id)
    pipe = pipeline(
        "automatic-speech-recognition",
        model=model,
        tokenizer=processor.tokenizer,
        feature_extractor=processor.feature_extractor,
        torch_dtype=torch_dtype,
        device=device,
    )
    
    return pipe

In [None]:
def load_audio(file_path):
    """
    Load an audio file and convert it to mono if stereo.
    
    Args:
        file_path: Path to the audio file
        
    Returns:
        audio_data: Audio data as numpy array
        sample_rate: Sample rate of the audio
     Sample rate refers to the number of audio samples taken per second when digitizing an audio signal. (kHz unit)
    """
    audio, sample_rate = librosa.load(file_path, sr=16000, mono=False)
    
    # Check if audio is stereo (2D array) and convert to mono if needed
    if len(audio.shape) > 1 and audio.shape[0] == 2:
        print(f"Converting stereo to mono for {os.path.basename(file_path)}")
        audio = np.mean(audio, axis=0)
    
    return audio, sample_rate

In [None]:
def transcribe_long_audio(pipe, audio, sample_rate, window_size=30, overlap=1):
    """
    Transcribe audio that may be longer than 30 seconds using a sliding window approach.
    
    Args:
        pipe: Whisper pipeline for transcription (Obtain from load_whisper_model function)
        audio: Audio data as numpy array  (Obtain from load_audio)
        sample_rate: Sample rate of the audio  (Obtain from load_audio)
        window_size: Size of the sliding window in seconds   (Since whisper model can translate 30 sec audio at a time we had used sliding window technique in our work)
        overlap: Overlap between windows in seconds   (Overlapping the time frame between two windows)
        
    Returns:
        full_transcription: Complete transcription of the audio
    """
    # Calculate window and stride sizes in samples
    window_samples = window_size * sample_rate
    stride_samples = (window_size - overlap) * sample_rate
    
    # If audio is shorter than window_size, just transcribe it directly
    if len(audio) <= window_samples:
        result = pipe({"sampling_rate": sample_rate, "raw": audio})
        return result["text"].strip()
    
    # For longer audio, use sliding window approach
    transcriptions = []
    
    # Calculate number of windows
    num_windows = max(1, int(np.ceil((len(audio) - window_samples) / stride_samples)) + 1)
    
    for i in range(num_windows):
        start_sample = int(i * stride_samples)
        end_sample = min(len(audio), start_sample + window_samples)
        
        # Extract audio segment
        audio_segment = audio[start_sample:end_sample]
        
        # Transcribe segment
        result = pipe({"sampling_rate": sample_rate, "raw": audio_segment})
        transcriptions.append(result["text"].strip())
    
    # Join all transcriptions
    full_transcription = " ".join(transcriptions)
    return full_transcription

**Now we will map all audio translates with their Audio file name in train_csv file**

In [None]:
def transcribe_audio_files(audio_folder, csv_path):
    """
    Transcribe all audio files listed in the CSV and add transcriptions as a new column.
    
    Args:
        audio_folder: Path to folder containing audio files
        csv_path: Path to CSV file with list of audio files
        
    Returns:
        df: DataFrame with added transcription column
    """

    # Load CSV file
    df = pd.read_csv(csv_path)
    print(f"Loaded CSV with {len(df)} entries")
    
    # Load Whisper model
    pipe = load_whisper_model()
    
    # Create a new column for transcriptions
    df['transcription'] = ""
    
    # Loop through each audio file and transcribe
    print("Starting transcription process...")
    for i, row in df.iterrows():
        audio_file = row['filename']  # Adjust column name if needed
        full_path = os.path.join(audio_folder, audio_file)
        
        try:
            # Print progress
            print(f"Processing file {i+1}/{len(df)}: {audio_file}")
            
            # Load and convert audio file
            audio, sample_rate = load_audio(full_path)
            
            # Transcribe audio
            transcription = transcribe_long_audio(pipe, audio, sample_rate)
            
            # Add transcription to dataframe
            df.at[i, 'transcription'] = transcription
            
            # Add a small delay to prevent overloading
            if (i + 1) % 10 == 0:
                print(f"Processed {i+1} files. Taking a short break...")
                time.sleep(1)
                
        except Exception as e:
            print(f"Error processing {audio_file}: {str(e)}")
            df.at[i, 'transcription'] = "ERROR: Could not transcribe"
    
    return df

In [None]:
def main():
    """
    Main function to run the transcription process.
    """
    # Define paths
    audio_folder = "/kaggle/input/shl-intern-hiring-assessment/dataset/audios_train"
    csv_path = "/kaggle/input/shl-intern-hiring-assessment/dataset/train.csv"
    output_path = "/kaggle/working/train_with_transcriptions.csv"
    
    # Transcribe audio files
    df = transcribe_audio_files(audio_folder, csv_path)
    
    # Save the updated dataframe
    df.to_csv(output_path, index=False)
    print(f"Transcription complete! Saved to {output_path}")

In [None]:
if __name__ == "__main__":
    main()

In [None]:
# # Print some examples
# print("\nExample transcriptions:")
# for i in range(min(5, len(df))):
#     print(f"File: {df.iloc[i]['file_name']}")
#     print(f"Transcription: {df.iloc[i]['transcription']}")
#     print("-" * 50)

In [None]:
import pandas as pd
# Display the first 5 rows of the dataframe with transcriptions
df = pd.read_csv("/kaggle/working/train_with_transcriptions.csv")
df.head()

In [None]:
# Save the DataFrame to the working directory
output_file_path = "/kaggle/working/transcribed_audio_data.csv"
df.to_csv(output_file_path, index=False)
print(f"Saved transcribed data to {output_file_path}")

In [None]:
df_train = pd.read_csv("/kaggle/input/shl-intern-hiring-assessment/dataset/train.csv")

In [None]:
df_train['label'].min()

In [None]:
df_train['label'].max()

In [None]:
df['transcription'].isnull().any()

In [None]:
df_train['label'].value_counts()

In [None]:
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from transformers import TrainingArguments, Trainer
from datasets import Dataset
from peft import get_peft_model, LoraConfig, TaskType
from sklearn.metrics import mean_squared_error

# Load the transcribed CSV file
df = pd.read_csv("/kaggle/working/transcribed_audio_data.csv")

# Drop the filename column
df_new = df.drop('filename', axis=1)

# Display the dataframe structure
print("DataFrame structure after dropping filename column:")
print(df_new.head())

# Convert to dataset format for Hugging Face
df_new['label'] = df_new['label'].astype(float)  # Ensure labels are floats

# Use entire dataset for training
train_dataset = Dataset.from_pandas(df_new)

# Load tokenizer and model
model_name = "distilbert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Define tokenization function
def tokenize_function(examples):
    return tokenizer(examples["transcription"], padding="max_length", truncation=True, max_length=512)

# Tokenize dataset
tokenized_train = train_dataset.map(tokenize_function, batched=True)

# Initialize DistilBERT model for regression
model = AutoModelForSequenceClassification.from_pretrained(
    model_name,
    num_labels=1,  # Single output for regression
    problem_type="regression"
)

# Configure QLoRA
peft_config = LoraConfig(
    task_type=TaskType.SEQ_CLS,
    r=16,  # Rank
    lora_alpha=32,
    lora_dropout=0.1,
    bias="none",
    target_modules=["q_lin", "v_lin", "k_lin", "out_lin"]  # DistilBERT attention layers
)

# Apply QLoRA to model
model = get_peft_model(model, peft_config)
model.print_trainable_parameters()  # Show trainable vs total parameters

# Fixed custom trainer class with MSE loss
class RegressionTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):
        labels = inputs.get("labels")
        outputs = model(**inputs)
        logits = outputs.logits
        loss_fct = nn.MSELoss()
        loss = loss_fct(logits.view(-1), labels.view(-1))
        return (loss, outputs) if return_outputs else loss

# Training arguments - utilizing both T4 GPUs
training_args = TrainingArguments(
    output_dir="/kaggle/working/distilbert-audio-regression",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    num_train_epochs=100,
    weight_decay=0.01,
    save_strategy="epoch",
    save_total_limit=1,
    fp16=True,
    report_to="none",
    dataloader_num_workers=2,
    # Add explicit logging settings
    logging_dir="/kaggle/working/logs",
    logging_strategy="steps",
    logging_steps=10,  # Log every 10 steps
    logging_first_step=True,  # Log the first step
)

# Initialize custom trainer with MSE loss
trainer = RegressionTrainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train,
    tokenizer=tokenizer,
)

# Train model
trainer.train()

# Save the model
trainer.save_model("/kaggle/working/distilbert-audio-regression-final")
print("Model training complete and model saved!")

# Create a simple function to make predictions with the trained model
def predict_rating(text, model, tokenizer):
    inputs = tokenizer(text, padding="max_length", truncation=True, max_length=512, return_tensors="pt")
    inputs = {k: v.to(model.device) for k, v in inputs.items()}
    with torch.no_grad():
        outputs = model(**inputs)
    return outputs.logits.cpu().numpy().squeeze()

# Example prediction function (can be used after training)
print("Prediction function created. You can use predict_rating(text, model, tokenizer) to make predictions.")

In [None]:
import pandas as pd
import torch
import random
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from peft import PeftModel, PeftConfig

# Load the dataset
df = pd.read_csv("/kaggle/working/train_with_transcriptions.csv")

# Load the saved model and tokenizer
model_path = "/kaggle/working/distilbert-audio-regression-final"
tokenizer = AutoTokenizer.from_pretrained(model_path)

# Load the base model and then apply the PEFT adapter
base_model_name = "distilbert-base-uncased"
base_model = AutoModelForSequenceClassification.from_pretrained(
    base_model_name,
    num_labels=1,
    problem_type="regression"
)
model = PeftModel.from_pretrained(base_model, model_path)
model.eval()

# Define prediction function
def predict_rating(text, model, tokenizer):
    inputs = tokenizer(text, padding="max_length", truncation=True, max_length=512, return_tensors="pt")
    inputs = {k: v.to(model.device) for k, v in inputs.items()}
    with torch.no_grad():
        outputs = model(**inputs)
    return outputs.logits.cpu().numpy().squeeze()

# Select random samples and make predictions
random.seed(42)  # For reproducibility
num_samples = 5
sample_indices = random.sample(range(len(df)), num_samples)

print(f"\nTesting model on {num_samples} random samples from training data:\n")
print("-" * 80)

for idx in sample_indices:
    sample = df.iloc[idx]
    transcription = sample['transcription']
    true_label = sample['label']
    
    # Get prediction
    predicted_label = predict_rating(transcription, model, tokenizer)
    
    # Calculate error
    error = abs(predicted_label - true_label)
    
    # Print results
    print(f"Sample {idx+1}:")
    print(f"Transcription (truncated): {transcription[:100]}...")
    print(f"True label: {true_label:.2f}")
    print(f"Predicted label: {predicted_label:.2f}")
    print(f"Absolute error: {error:.2f}")
    print("-" * 80)

# Calculate MSE for the random samples
mse_samples = []
for idx in sample_indices:
    sample = df.iloc[idx]
    transcription = sample['transcription']
    true_label = sample['label']
    predicted_label = predict_rating(transcription, model, tokenizer)
    mse_samples.append((predicted_label - true_label) ** 2)

mse = sum(mse_samples) / len(mse_samples)
print(f"Mean Squared Error (MSE) for these samples: {mse:.4f}")
print(f"Root Mean Squared Error (RMSE): {mse ** 0.5:.4f}")

**Now creating code for submission**

In [None]:
def main():
    """
    Main function to run the transcription process.
    """
    # Define paths
    audio_folder = "/kaggle/input/shl-intern-hiring-assessment/dataset/audios_test"
    csv_path = "/kaggle/input/shl-intern-hiring-assessment/dataset/test.csv"
    output_path = "/kaggle/working/test_with_transcriptions.csv"
    
    # Transcribe audio files
    df = transcribe_audio_files(audio_folder, csv_path)
    
    # Save the updated dataframe
    df.to_csv(output_path, index=False)
    print(f"Transcription complete! Saved to {output_path}")

In [None]:
if __name__ == "__main__":
    main()

In [None]:
import pandas as pd
# Display the first 5 rows of the dataframe with transcriptions
df_test = pd.read_csv("/kaggle/working/test_with_transcriptions.csv")
df_test.head()

In [None]:
Test_df = df_test.drop('filename', axis=1)

In [None]:
Test_df.head()

In [None]:
# Load our fine-tuned model and tokenizer
print("Loading fine-tuned model...")
model_path = "/kaggle/working/distilbert-audio-regression-final"
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
model = AutoModelForSequenceClassification.from_pretrained(
    model_path,
    num_labels=1,
    problem_type="regression"
)

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
model.eval()

# Function to predict scores
def predict_rating(text, model, tokenizer, device):
    inputs = tokenizer(text, padding="max_length", truncation=True, max_length=512, return_tensors="pt")
    inputs = {k: v.to(device) for k, v in inputs.items()}
    
    with torch.no_grad():
        outputs = model(**inputs)
    
    # Get the prediction
    prediction = outputs.logits.cpu().numpy().squeeze()
    return prediction

# Make predictions on test data
print("Making predictions...")
predictions = []

for text in df_test['transcription']:
    score = predict_rating(text, model, tokenizer, device)
    predictions.append(score)

# Add predictions to the test dataframe
df_test['label'] = predictions

# Create final submission DataFrame with only required columns
submission_df = pd.DataFrame({
    'filename': df_test['filename'],
    'label': df_test['label']
})

# Save the submission file
submission_df.to_csv("/kaggle/working/submission.csv", index=False)
print("Predictions completed")
print(f"Submission file created with {len(submission_df)} entries")
print("First few predictions:")
print(submission_df.head())

In [56]:
df_submission = pd.read_csv("/kaggle/working/submission.csv")
df_submission.head()

Unnamed: 0,filename,label
0,audio_706.wav,3.725005
1,audio_800.wav,2.367599
2,audio_68.wav,3.089059
3,audio_1267.wav,2.968229
4,audio_683.wav,2.736108
