In [None]:
!pip install numpy pandas librosa torch matplotlib seaborn scikit-learn scipy transformers openai-whisper spacy torchaudio
!python -m spacy download en_core_web_sm
!sudo apt-get install -y ffmpeg


In [None]:
import os
import numpy as np
import pandas as pd
import librosa
import torch
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error, mean_absolute_error
from scipy.stats import pearsonr
from transformers import Wav2Vec2ForCTC, Wav2Vec2Processor, AutoTokenizer, AutoModel
import whisper
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import spacy

In [None]:
import os
import numpy as np
import torch
import librosa
import pandas as pd
import spacy

from torch.utils.data import Dataset
from transformers import (
    Wav2Vec2Processor,
    Wav2Vec2ForCTC,
    AutoTokenizer,
    AutoModel
)
import whisper

# Load spaCy model
nlp = spacy.load("en_core_web_sm")

# Set random seed
np.random.seed(42)
torch.manual_seed(42)

class AudioTextDataset(Dataset):
    def __init__(self, csv_file, audio_dir, transform=None, max_audio_length=48000):
        self.data_frame = pd.read_csv(csv_file)
        self.audio_dir = audio_dir
        self.transform = transform
        self.max_audio_length = max_audio_length

        self.audio_data = []
        self.transcripts = []
        self.labels = []

        self.asr_model = whisper.load_model("base")
        self.wav2vec_processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base-960h")
        self.wav2vec_model = Wav2Vec2ForCTC.from_pretrained("facebook/wav2vec2-base-960h")
        self.bert_tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
        self.bert_model = AutoModel.from_pretrained("bert-base-uncased")

        for idx, row in self.data_frame.iterrows():
            print(f"Processing file {idx+1}/{len(self.data_frame)}: {row['filename']}")
            audio_path = os.path.join(self.audio_dir, row['filename'])

            audio, sr = librosa.load(audio_path, sr=16000)
            audio, _ = librosa.effects.trim(audio, top_db=20)

            if len(audio) > self.max_audio_length:
                audio = audio[:self.max_audio_length]
            else:
                padding = self.max_audio_length - len(audio)
                audio = np.pad(audio, (0, padding), 'constant')

            audio_features = self._extract_audio_features(audio)
            transcript = self._transcribe_audio(audio)
            label = row['label'] if 'label' in row else 0

            self.audio_data.append(audio_features)
            self.transcripts.append(transcript)
            self.labels.append(label)

    def _extract_audio_features(self, audio):
        mfccs = librosa.feature.mfcc(y=audio, sr=16000, n_mfcc=13)
        mfcc_delta = librosa.feature.delta(mfccs)
        mfcc_delta2 = librosa.feature.delta(mfccs, order=2)

        pitches, magnitudes = librosa.piptrack(y=audio, sr=16000)
        pitch = np.array([pitches[magnitudes[:, i].argmax(), i] for i in range(magnitudes.shape[1])], dtype=np.float32)

        energy = float(np.mean(librosa.feature.rms(y=audio)))

        with torch.no_grad():
            inputs = self.wav2vec_processor(audio, sampling_rate=16000, return_tensors="pt")
            outputs = self.wav2vec_model(**inputs)
            wav2vec_embedding = outputs.logits.mean(dim=1).squeeze().numpy().astype(np.float32)

        return {
            'mfccs': mfccs.astype(np.float32),
            'mfcc_delta': mfcc_delta.astype(np.float32),
            'mfcc_delta2': mfcc_delta2.astype(np.float32),
            'pitch': pitch,
            'energy': energy,
            'wav2vec_embedding': wav2vec_embedding
        }

    def _transcribe_audio(self, audio):
        return self.asr_model.transcribe(audio)["text"]

    def _extract_linguistic_features(self, text):
        doc = nlp(text)

        num_tokens = len(doc)
        num_sentences = len(list(doc.sents))
        avg_token_length = np.mean([len(token.text) for token in doc]) if num_tokens > 0 else 0

        unique_tokens = len(set([token.text.lower() for token in doc]))
        ttr = unique_tokens / num_tokens if num_tokens > 0 else 0

        pos_counts = {}
        for token in doc:
            pos = token.pos_
            pos_counts[pos] = pos_counts.get(pos, 0) + 1

        for pos in pos_counts:
            pos_counts[pos] = pos_counts[pos] / num_tokens if num_tokens > 0 else 0

        avg_dependency_distance = np.mean(
            [abs(token.i - token.head.i) for token in doc if token.head is not token]
        ) if num_tokens > 0 else 0

        inputs = self.bert_tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=128)
        with torch.no_grad():
            outputs = self.bert_model(**inputs)
            bert_embedding = outputs.last_hidden_state.mean(dim=1).squeeze().numpy().astype(np.float32)

        return {
            'num_tokens': float(num_tokens),
            'num_sentences': float(num_sentences),
            'avg_token_length': float(avg_token_length),
            'ttr': float(ttr),
            'pos_counts': pos_counts,
            'avg_dependency_distance': float(avg_dependency_distance),
            'bert_embedding': bert_embedding
        }

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        audio_features = self.audio_data[idx]
        transcript = self.transcripts[idx]
        label = self.labels[idx]

        raw_features = self._extract_linguistic_features(transcript)

        linguistic_features = {
            'bert_embedding': raw_features['bert_embedding'],
            'num_tokens': raw_features['num_tokens'],
            'num_sentences': raw_features['num_sentences'],
            'avg_token_length': raw_features['avg_token_length'],
            'ttr': raw_features['ttr'],
            'avg_dependency_distance': raw_features['avg_dependency_distance'],
            'pos_counts': raw_features['pos_counts']
        }

        sample = {
            'audio_features': audio_features,
            'linguistic_features': linguistic_features,
            'transcript': transcript,
            'grammar_score': label
        }

        if self.transform:
            sample = self.transform(sample)

        return sample


In [None]:
# 2. Feature Normalization and Preparation
class FeatureNormalizer(object):
    """Normalize features to have zero mean and unit variance"""
    def __init__(self, feature_means=None, feature_stds=None):
        self.feature_means = feature_means
        self.feature_stds = feature_stds
        
    def fit(self, dataset):
        """Compute means and stds from dataset"""
        # This is a simplified example, you would need to calculate stats
        # for all numeric features in your actual implementation
        pass
        
    def __call__(self, sample):
        """Normalize features in the sample"""
        # This is a simplified example, you would normalize all features
        # in your actual implementation
        return sample


In [None]:
# 5. Main Pipeline

# Paths and parameters
train_csv = '/kaggle/input/shl-intern-hiring-assessment/dataset/train.csv'
test_csv = '/kaggle/input/shl-intern-hiring-assessment/dataset/test.csv'
sample_submission_csv = '/kaggle/input/shl-intern-hiring-assessment/dataset/sample_submission.csv'
train_audio_dir = '/kaggle/input/shl-intern-hiring-assessment/dataset/audios_train'
test_audio_dir = '/kaggle/input/shl-intern-hiring-assessment/dataset/audios_test'
batch_size = 16
num_epochs = 25
learning_rate = 0.001
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Data preparation
print("Preparing training data...")
train_dataset = AudioTextDataset(train_csv, train_audio_dir)



In [None]:
# 3. Multimodal Model Definition
class MultimodalGrammarScorer(nn.Module):
    def __init__(self, audio_dim=32, text_dim=768, hidden_dim=256):
        super(MultimodalGrammarScorer, self).__init__()
        
        # Audio processing branch
        self.audio_encoder = nn.Sequential(
            nn.Linear(audio_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(0.3)
        )
        
        # Text processing branch
        self.text_encoder = nn.Sequential(
            nn.Linear(text_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(0.3)
        )
        
        # Fusion and output layers
        self.fusion = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim // 2, hidden_dim // 4),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim // 4, 1)
        )
        
    def forward(self, audio_features, text_features):
        # Process audio branch
        audio_encoding = self.audio_encoder(audio_features)
        
        # Process text branch
        text_encoding = self.text_encoder(text_features)
        
        # Concatenate features
        combined = torch.cat((audio_encoding, text_encoding), dim=1)
        
        # Final prediction
        output = self.fusion(combined)
        
        return output


In [None]:
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10, device='cuda'):
    """Train the model"""
    model.to(device)
    best_val_corr = -1.0

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0

        for batch in train_loader:
            # Extract inputs from dict-like batch
            audio_embeddings = torch.tensor(np.stack(batch['audio_features']['wav2vec_embedding'])).float().to(device)
            text_embeddings = torch.tensor(np.stack(batch['linguistic_features']['bert_embedding'])).float().to(device)
            scores = torch.tensor(batch['grammar_score']).float().to(device)

            # Forward and backward pass
            optimizer.zero_grad()
            outputs = model(audio_embeddings, text_embeddings)
            loss = criterion(outputs.squeeze(), scores)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        epoch_loss = running_loss / len(train_loader)

        # Validation
        model.eval()
        val_predictions = []
        val_targets = []

        with torch.no_grad():
            for batch in val_loader:
                audio_embeddings = torch.tensor(np.stack(batch['audio_features']['wav2vec_embedding'])).float().to(device)
                text_embeddings = torch.tensor(np.stack(batch['linguistic_features']['bert_embedding'])).float().to(device)
                scores = torch.tensor(batch['grammar_score']).float().to(device)

                outputs = model(audio_embeddings, text_embeddings)
                predictions = outputs.squeeze().cpu().numpy()
                val_predictions.extend(predictions)
                val_targets.extend(scores.cpu().numpy())

        val_corr, _ = pearsonr(val_targets, val_predictions)
        val_rmse = np.sqrt(mean_squared_error(val_targets, val_predictions))

        print(f"Epoch {epoch+1}/{num_epochs} - Loss: {epoch_loss:.4f} - Val Corr: {val_corr:.4f} - Val RMSE: {val_rmse:.4f}")

        if val_corr > best_val_corr:
            best_val_corr = val_corr
            torch.save(model.state_dict(), 'best_grammar_model.pt')

    return model


def evaluate_model(model, test_loader, device='cuda'):
    """Evaluate the model on test data"""
    model.to(device)
    model.eval()

    predictions = []
    targets = []

    with torch.no_grad():
        for batch in test_loader:
            audio_embeddings = torch.tensor(np.stack(batch['audio_features']['wav2vec_embedding'])).float().to(device)
            text_embeddings = torch.tensor(np.stack(batch['linguistic_features']['bert_embedding'])).float().to(device)
            scores = torch.tensor(batch['grammar_score']).float().to(device)

            outputs = model(audio_embeddings, text_embeddings)
            batch_predictions = outputs.squeeze().cpu().numpy()

            predictions.extend(batch_predictions)
            targets.extend(scores.cpu().numpy())

    if np.any(targets):
        corr, p_value = pearsonr(targets, predictions)
        rmse = np.sqrt(mean_squared_error(targets, predictions))
        mae = mean_absolute_error(targets, predictions)

        print(f"Test Results - Pearson Correlation: {corr:.4f} (p={p_value:.4f}), RMSE: {rmse:.4f}, MAE: {mae:.4f}")

        plt.figure(figsize=(10, 6))
        plt.scatter(targets, predictions, alpha=0.6)
        plt.plot([min(targets), max(targets)], [min(targets), max(targets)], 'r--')
        plt.xlabel('Actual Grammar Scores')
        plt.ylabel('Predicted Grammar Scores')
        plt.title(f'Actual vs Predicted Grammar Scores (Pearson r={corr:.4f})')
        plt.grid(True)
        plt.savefig('grammar_score_predictions.png')

        return predictions, {'correlation': corr, 'rmse': rmse, 'mae': mae}
    else:
        return predictions, None


In [None]:
from torch.utils.data import DataLoader
from torch.utils.data._utils.collate import default_collate

# Cross-validation setup
kf = KFold(n_splits=5, shuffle=True, random_state=42)
fold_results = []

def custom_collate(batch):
    required_top_keys = {'audio_features', 'linguistic_features', 'grammar_score'}
    required_linguistic_keys = {'bert_embedding'}  # Keep only the BERT embedding

    filtered_batch = []
    for item in batch:
        filtered_item = {}

        for k in required_top_keys:
            if k not in item:
                continue

            if k == 'linguistic_features':
                filtered_item[k] = {sub_k: sub_v for sub_k, sub_v in item[k].items() if sub_k in required_linguistic_keys}
            else:
                filtered_item[k] = item[k]

        filtered_batch.append(filtered_item)

    return default_collate(filtered_batch)



for fold, (train_idx, val_idx) in enumerate(kf.split(train_dataset)):
    print(f"\nTraining fold {fold+1}/5...")

    # Split dataset
    train_subsampler = torch.utils.data.SubsetRandomSampler(train_idx)
    val_subsampler = torch.utils.data.SubsetRandomSampler(val_idx)

    # DataLoaders with custom collate_fn
    train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=train_subsampler, collate_fn=custom_collate)
    val_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=val_subsampler, collate_fn=custom_collate)

    # Initialize model
    model = MultimodalGrammarScorer()
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-5)

    # Train model
    model = train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, device)

    # Evaluate on validation set
    val_predictions, val_metrics = evaluate_model(model, val_loader, device)
    fold_results.append(val_metrics)

# Summarize cross-validation results
print("\nCross-validation results:")
for i, result in enumerate(fold_results):
    print(f"Fold {i+1}: Correlation = {result['correlation']:.4f}, RMSE = {result['rmse']:.4f}")

avg_corr = np.mean([r['correlation'] for r in fold_results])
avg_rmse = np.mean([r['rmse'] for r in fold_results])
print(f"Average: Correlation = {avg_corr:.4f}, RMSE = {avg_rmse:.4f}")


In [None]:
# Final model training on full dataset
print("\nTraining final model on full dataset...")

# Important: Use the same collate function here as well
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=custom_collate)

final_model = MultimodalGrammarScorer()
criterion = nn.MSELoss()
optimizer = optim.Adam(final_model.parameters(), lr=learning_rate, weight_decay=1e-5)

# You can use val_loader = train_loader just for consistency, or use a small split if needed
final_model = train_model(final_model, train_loader, train_loader, criterion, optimizer, num_epochs, device)

# Prepare test data
print("\nPreparing test data...")
test_dataset = AudioTextDataset(test_csv, test_audio_dir)

# Again: use the collate function here too
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=custom_collate)

# Generate predictions for test set
test_predictions, _ = evaluate_model(final_model, test_loader, device)




In [None]:
# Create submission file
submission = pd.read_csv(sample_submission_csv)
submission['label'] = test_predictions

# Clip predictions to valid range (0 to 5)
submission['label'] = submission['label'].clip(0, 5)

# Round to nearest 0.5
submission['label'] = (submission['label'] * 2).round() / 2

submission.to_csv('/kaggle/working/submission.csv', index=False)

print("✅ Submission file created successfully!")