In [None]:
!pip install transformers librosa datasets

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
import pandas as pd
SR = 16000
dataset_path = '/content/drive/MyDrive/tourrets_data'  # Assuming the data is in your Google Drive

data = []
for person_id in os.listdir(dataset_path):
    person_dir = os.path.join(dataset_path, person_id)
    if os.path.isdir(person_dir):
        for label in ['0', '1']:
            label_dir = os.path.join(person_dir, label)
            if os.path.isdir(label_dir):
                for filename in os.listdir(label_dir):
                    if filename.endswith('.wav') or filename.endswith('.aac'):  # Updated to include both extensions
                        file_path = os.path.join(label_dir, filename)
                        data.append({'file_path': file_path, 'label': int(label)})

df = pd.DataFrame(data)
display(df.head())

In [None]:
import os
import pandas as pd
import torchaudio
import soundfile as sf

dataset_path = '/content/drive/MyDrive/tourrets_data'  # Assuming the data is in your Google Drive
output_augmented_path = '/content/drive/MyDrive/tourrets_data_augmented' # Path to save augmented data

# Define sliding window parameters (adjust as needed)
window_size = SR * 2  # 2 seconds at 16kHz
hop_size = int(SR * 0.5)     # 0.5 second hop

augmented_data = []

# Create output directory if it doesn't exist
os.makedirs(output_augmented_path, exist_ok=True)

for index, row in df.iterrows():
    # print('x')
    file_path = row['file_path']
    label = row['label']
    person_id = os.path.basename(os.path.dirname(os.path.dirname(file_path)))
    label_dir_name = os.path.basename(os.path.dirname(file_path))


    try:
        # Load the audio file
        waveform, sr = torchaudio.load(file_path)

        # Resample if necessary (assuming target SR is 16kHz)
        if sr != SR:
            waveform = torchaudio.functional.resample(waveform, sr, SR)
            sr = SR # Update sample rate

        # Convert to mono if stereo
        if waveform.shape[0] > 1:
            waveform = waveform.mean(0, keepdim=True)

        # Apply sliding window
        for start in range(0, waveform.shape[1] - window_size + 1, hop_size):
            end = start + window_size
            segment = waveform[:, start:end]

            # Create directory structure for augmented data
            augmented_person_dir = os.path.join(output_augmented_path, person_id)
            augmented_label_dir = os.path.join(augmented_person_dir, label_dir_name)
            os.makedirs(augmented_label_dir, exist_ok=True)

            # Define save path for the segment
            original_filename = os.path.basename(file_path)
            segment_filename = f"{os.path.splitext(original_filename)[0]}_segment_{start}_{end}.wav"
            segment_save_path = os.path.join(augmented_label_dir, segment_filename)

            # Save the segment (using soundfile for broader format support if needed, or torchaudio.save)
            sf.write(segment_save_path, segment.squeeze().numpy(), sr)


            augmented_data.append({'file_path': segment_save_path, 'label': label})

    except Exception as e:
        print(f"Error processing audio file {file_path}: {e}")
        # Optionally, append the original file path if segmentation failed

# Create a new DataFrame with augmented data
df_augmented = pd.DataFrame(augmented_data)

# Replace the original DataFrame with the augmented one
df = df_augmented

display(df.head())
print(f"Original dataset size: {len(data)}")
print(f"Augmented dataset size: {len(df_augmented)}")

In [None]:
df

In [None]:
# # !pip install -q --upgrade transformers torchaudio soundfile

# import torch, torchaudio
# from transformers import AutoFeatureExtractor, AutoModel  # or WavLMModel

# feat_extractor = AutoFeatureExtractor.from_pretrained("microsoft/wavlm-base-plus")
# model = AutoModel.from_pretrained("microsoft/wavlm-base-plus")  # or WavLMModel
# model.eval()



In [None]:
# # Load audio (any format), convert to mono 16k
# waveform, sr = torchaudio.load("drive/MyDrive/tourrets_data/01/0/00-00_00__to__00-03_00.aac")     # [C, T]
# if waveform.shape[0] > 1:
#     waveform = waveform.mean(0, keepdim=True)
# if sr != 16000:
#     waveform = torchaudio.functional.resample(waveform, sr, 16000)

# audio = waveform.squeeze().numpy()  # 1D float array in [-1, 1]

# inputs = feat_extractor(audio, sampling_rate=16000, return_tensors="pt")
# with torch.no_grad():
#     outputs = model(**inputs)

# print(outputs.last_hidden_state.shape)  # [batch, time_steps, hidden_dim]


In [None]:
# audio

In [None]:
# import torch
# import torch.nn as nn
# from transformers import WavLMModel, Wav2Vec2FeatureExtractor

# class TicClassifier(nn.Module):
#     def __init__(self, backbone_name="microsoft/wavlm-base-plus", dropout=0.2):
#         super().__init__()
#         self.feat_extractor = Wav2Vec2FeatureExtractor.from_pretrained(backbone_name)
#         self.backbone = WavLMModel.from_pretrained(backbone_name)
#         hidden = self.backbone.config.hidden_size  # 768 for base-plus
#         self.classifier = nn.Sequential(
#             nn.Dropout(dropout),
#             nn.Linear(hidden, 256),
#             nn.ReLU(),
#             nn.Dropout(dropout),
#             nn.Linear(256, 1)  # binary logit
#         )

#     def forward(self, waveforms, sampling_rate=16000):
#         """
#         waveforms: list[1D float arrays] or a padded tensor [B, T]
#         returns: logits [B, 1]
#         """
#         # Use the feature extractor to build inputs + attention_mask
#         inputs = self.feat_extractor(
#             waveforms, sampling_rate=sampling_rate, return_tensors="pt", padding=True
#         )
#         input_values = inputs["input_values"].to(self.backbone.device)      # [B, T]
#         attention_mask = inputs["attention_mask"].to(self.backbone.device)  # [B, T]

#         outputs = self.backbone(input_values=input_values, attention_mask=attention_mask)
#         hs = outputs.last_hidden_state  # [B, T', H]

#         # Masked mean pooling over time (accounting for downsampling of T' vs attention_mask)
#         # Align masks to hidden length if shapes differ
#         if attention_mask.shape[1] != hs.shape[1]:
#             # Downsample mask to hs length (nearest)
#             am = nn.functional.interpolate(
#                 attention_mask[:, None, :].float(), size=hs.shape[1], mode="nearest"
#             ).squeeze(1)
#         else:
#             am = attention_mask

#         am = am.unsqueeze(-1)  # [B, T', 1]
#         masked = hs * am
#         denom = am.sum(dim=1).clamp_min(1e-6)  # [B, 1]
#         pooled = masked.sum(dim=1) / denom     # [B, H]

#         logits = self.classifier(pooled)       # [B, 1]
#         return logits.squeeze(1)               # [B]


# Task
Fine-tune the WavLM model for Tourette's tic detection using the provided dataset located in the "tourrets_data" directory, which contains audio files in both .wav and .aac formats organized by person ID and labeled with 1 for positive samples and 0 for negative samples. Train the `TicClassifier` model and evaluate its performance.

## Prepare the dataset for training

### Subtask:
Create a PyTorch Dataset and DataLoader to handle loading, preprocessing, and batching of your audio data and labels.


**Reasoning**:
Create a PyTorch Dataset and DataLoader to handle the audio data and labels. This involves defining a custom Dataset class to load and preprocess the audio files and their corresponding labels, and then creating a DataLoader instance for batching.



In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
import torchaudio

class AudioDataset(Dataset):
    def __init__(self, dataframe):
        self.dataframe = dataframe

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        file_path = self.dataframe.iloc[idx]['file_path']
        label = self.dataframe.iloc[idx]['label']

        try:
            waveform, sr = torchaudio.load(file_path)

            if waveform.shape[0] > 1:
                waveform = waveform.mean(0, keepdim=True)
            if sr != SR:
                waveform = torchaudio.functional.resample(waveform, sr, SR)

            audio = waveform.squeeze().numpy()
            # print("len audio: ", len(audio), self.dataframe.iloc[idx]['file_path'])
            return {"audio": audio, "label": label}

        except Exception as e:
            print(f"Error loading audio file {file_path}: {e}")
            return None # Return None for problematic samples


dataset = AudioDataset(df)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True, collate_fn=lambda x: {k: [dic[k] for dic in x if dic is not None] for k in x[0]})


## Define training parameters

### Subtask:
Set up the optimizer, loss function, learning rate, number of epochs, and other relevant training parameters.


**Reasoning**:
I need to set up the optimizer, loss function, learning rate, and other relevant training parameters.



In [None]:
# import torch.optim as optim

# # Define training parameters
# learning_rate = 1e-2
# num_epochs = 10
# weight_decay = 1e-5

# # Choose optimizer
# optimizer = optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

# # Choose loss function (Binary Cross-Entropy with Logits)
# loss_fn = torch.nn.BCEWithLogitsLoss()

## Implement the training loop

### Subtask:
Write the code for the training process, including forward pass, loss calculation, backpropagation, and weight updates.


In [None]:
from sklearn.model_selection import train_test_split
import os

# Extract person_id from file_path and add it as a new column to the DataFrame
df['person_id'] = df['file_path'].apply(lambda x: os.path.basename(os.path.dirname(os.path.dirname(x))))

# Get unique person IDs
unique_person_ids = df['person_id'].unique()

# Split unique person IDs into training, validation, and testing sets
train_ids, test_ids = train_test_split(unique_person_ids, test_size=0.2, random_state=42)
# train_ids, val_ids = train_test_split(train_ids, test_size=0.25, random_state=42) # 0.25 * 0.8 = 0.2 of original

# Create training, validation, and testing DataFrames
train_df = df[df['person_id'].isin(train_ids)]
# val_df = df[df['person_id'].isin(val_ids)]
test_df = df[df['person_id'].isin(test_ids)]

print(f"Training set size: {len(train_df)}")
# print(f"Validation set size: {len(val_df)}")
print(f"Testing set size: {len(test_df)}")

display(train_df.head())
# display(val_df.head())
display(test_df.head())

**Reasoning**:
The error message "TypeError: list indices must be integers or slices, not tuple" indicates that the input to the model's feature extractor is a list of tensors, but it expects a single tensor. This is because the `collate_fn` in the DataLoader is returning a list of tensors for the 'audio' key. I need to modify the `collate_fn` to pad the audio data and return a single tensor.



In [None]:
from torch.utils.data import DataLoader
import torch
import torch.nn.utils.rnn as rnn_utils
import numpy as np

# Assuming AudioDataset class is defined
# Assuming train_df, val_df, and test_df DataFrames are defined
# Assuming feat_extractor is defined

# Add truncation to collate_fn
def collate_fn_with_truncation(batch, max_len=SR * 2): # Example max_len (10 seconds at 16kHz)
    # Filter out None values
    batch = [item for item in batch if item is not None]
    if not batch:
        return None

    # Separate audio and labels
    audios = [item['audio'] for item in batch]
    labels = [item['label'] for item in batch]

    # Truncate and pad audio data
    padded_audios = rnn_utils.pad_sequence(
        [torch.tensor(audio[:max_len]) for audio in audios],  # Truncate here
        batch_first=True
    )

    return {"audio": padded_audios, "label": torch.tensor(labels)}


# Create AudioDataset instances for each split
train_dataset = AudioDataset(train_df)
# val_dataset = AudioDataset(val_df)
test_dataset = AudioDataset(test_df)

# Define batch size (can be adjusted based on GPU memory)
batch_size = 8 # You might need to adjust this based on your GPU memory

# Create DataLoader instances for each split
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn_with_truncation)
# val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn_with_truncation)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn_with_truncation)

print("DataLoaders created for training, validation, and testing.")
# You can optionally print the number of batches in each dataloader to verify
# print(f"Number of batches in training dataloader: {len(train_dataloader)}")
# print(f"Number of batches in validation dataloader: {len(len(val_dataloader))}")
# print(f"Number of batches in testing dataloader: {len(test_dataloader)}")

In [None]:
len(train_dataset),len(test_dataset)


In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import numpy as np
import torch

def evaluate_binary_classifier(model, loader, threshold=0.5, verbose=True):
    """
    Evaluate a binary audio classifier.

    Args:
        model: PyTorch model (already trained).
        loader: DataLoader yielding batches shaped like either
                {"audio": padded_tensor[B,T], "label": tensor/list} OR
                {"waves": list(np1d/tensor1d), "labels": tensor}.
        device: torch.device
        threshold: probability threshold applied to sigmoid(logits).
        verbose: print metrics if True.

    Returns:
        dict with accuracy, precision, recall, f1, confusion matrix, TN/FP/FN/TP,
        and FP/FN ratios relative to positives/negatives.
    """
    model.eval()

    all_labels = []
    all_probs  = []

    with torch.no_grad():
        for batch in loader:
            if batch is None:
                continue

            # Get waves and labels in a loader-agnostic way
            if "waves" in batch:
                waves = []
                for w in batch["waves"]:
                    if isinstance(w, torch.Tensor):
                        waves.append(w.detach().cpu().float().numpy())
                    else:
                        waves.append(np.asarray(w, dtype=np.float32))
                labels = batch.get("labels", batch.get("label"))
            else:
                # {"audio": padded_tensor, "label": ...}
                waves = batch["audio"]             # TicClassifier can accept this directly
                labels = batch.get("label", batch.get("labels"))

            if not isinstance(labels, torch.Tensor):
                labels = torch.tensor(labels, dtype=torch.float32)

            # Forward pass (model handles devices internally for the classifier head)
            logits = model(waves)                  # [B]
            probs  = torch.sigmoid(logits).cpu().numpy()

            all_probs.extend(probs.tolist())
            all_labels.extend(labels.cpu().numpy().tolist())

    # Convert to numpy arrays
    y_true = np.asarray(all_labels, dtype=int)
    y_pred = (np.asarray(all_probs) >= float(threshold)).astype(int)

    # Metrics
    acc  = accuracy_score(y_true, y_pred)
    prec = precision_score(y_true, y_pred, zero_division=0)
    rec  = recall_score(y_true, y_pred, zero_division=0)
    f1   = f1_score(y_true, y_pred, zero_division=0)
    cm   = confusion_matrix(y_true, y_pred)

    # Extract counts if 2x2
    tn = fp = fn = tp = 0
    if cm.shape == (2, 2):
        tn, fp, fn, tp = cm.ravel()
        total_pos = tp + fn
        total_neg = tn + fp
        fp_ratio = fp / total_pos if total_pos > 0 else 0.0
        fn_ratio = fn / total_neg if total_neg > 0 else 0.0
    else:
        fp_ratio = fn_ratio = 0.0

    if verbose:
        print(f"Test Accuracy:  {acc:.4f}")
        print(f"Test Precision: {prec:.4f}")
        print(f"Test Recall:    {rec:.4f}")
        print(f"Test F1 Score:  {f1:.4f}")
        print("Confusion Matrix:\n", cm)
        if cm.shape == (2, 2):
            print(f"True Negatives (TN): {tn}")
            print(f"False Positives (FP): {fp}")
            print(f"False Negatives (FN): {fn}")
            print(f"True Positives (TP): {tp}")
            print(f"Ratio of False Positives to Total Positives: {fp_ratio:.4f}")
            print(f"Ratio of False Negatives to Total Negatives: {fn_ratio:.4f}")
        else:
            print("Confusion matrix shape is not 2x2, cannot compute FP/FN ratios.")

    return {
        "accuracy": acc, "precision": prec, "recall": rec, "f1": f1,
        "cm": cm, "tn": tn, "fp": fp, "fn": fn, "tp": tp,
        "fp_to_total_positives_ratio": fp_ratio,
        "fn_to_total_negatives_ratio": fn_ratio,
        "y_true": y_true, "y_pred": y_pred, "y_prob": np.asarray(all_probs),
    }


In [None]:
import torch
import torch.nn as nn
from transformers import WavLMModel, Wav2Vec2FeatureExtractor
from torch.utils.data import Dataset, DataLoader
import torchaudio
import torch.optim as optim
import torch.nn.functional as F
import gc
import numpy as np
import tensorflow as tf
import tensorflow_hub as hub

import numpy as np
import tensorflow as tf
import tensorflow_hub as hub
import torch
import torch.nn as nn
import torch.nn.functional as F

class TicClassifier(nn.Module):
    """
    YAMNet backbone (TF-Hub) -> mean-pooled 1024-D embeddings -> PyTorch classifier head.
    Expects mono 16 kHz float32 audio in [-1, 1].
    """
    def __init__(self, dropout=0.2, yamnet_handle="https://tfhub.dev/google/yamnet/1"):
        super().__init__()
        self.yamnet = hub.load(yamnet_handle)  # frozen TF model
        self.sample_rate = SR
        hidden = 1024  # YAMNet embedding dimension

        self.classifier = nn.Sequential(
            nn.Dropout(dropout),
            nn.Linear(hidden, 256),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(256, 1)  # binary logit
        )

    @torch.no_grad()
    def _yamnet_embed(self, wav_np: np.ndarray) -> np.ndarray:
        """
        wav_np: 1-D numpy float32 at 16 kHz.
        Returns: embeddings [num_frames, 1024] (0.48 s hop, ~0.96 s window).
        """
        x = np.asarray(wav_np, dtype=np.float32)
        # YAMNet returns (scores, embeddings, spectrogram)
        _, embeddings, _ = self.yamnet(x)
        return embeddings.numpy()  # [frames, 1024]

    def forward(self, waveforms, max_len_s: float = None):
        """
        waveforms: either padded tensor [B, T] or list of 1-D arrays/tensors.
        max_len_s: optional cap per-sample seconds (crop random during train, center at eval).
        returns: logits [B]
        """
        # --- normalize batch input to list of 1-D float32 numpy arrays on CPU ---
        if isinstance(waveforms, torch.Tensor):
            batch = [w.detach().cpu().float().numpy() for w in waveforms]
        else:
            batch = []
            for w in waveforms:
                if isinstance(w, torch.Tensor):
                    batch.append(w.detach().cpu().float().numpy())
                else:
                    batch.append(np.asarray(w, dtype=np.float32))

        # --- optional cropping to bound compute ---
        if max_len_s is not None:
            L = int(self.sample_rate * max_len_s)
            for i, w in enumerate(batch):
                if w.shape[0] > L:
                    start = np.random.randint(0, w.shape[0] - L + 1) if self.training else (w.shape[0] - L) // 2
                    batch[i] = w[start:start + L]

        # --- trim trailing padding zeros (if your collate padded with zeros) ---
        for i, w in enumerate(batch):
            nz = np.flatnonzero(np.abs(w) > 1e-7)
            if nz.size > 0:
                batch[i] = w[: nz[-1] + 1]

        # --- run YAMNet per sample, mean-pool embeddings ---
        pooled = []
        for w in batch:
            try:
                emb = self._yamnet_embed(w)  # [frames, 1024]
                if emb.shape[0] == 0:
                    pooled.append(np.zeros(1024, dtype=np.float32))
                else:
                    pooled.append(emb.mean(axis=0).astype(np.float32))
            except Exception:
                # on any TF/audio error, fall back to zeros to keep training robust
                pooled.append(np.zeros(1024, dtype=np.float32))

        feats = torch.from_numpy(np.stack(pooled, axis=0)).to(next(self.classifier.parameters()).device)  # [B,1024]
        logits = self.classifier(feats)  # [B,1]
        return logits.squeeze(1)

def collate_fn(batch):
    # Filter out None values
    batch = [item for item in batch if item is not None]
    if not batch:
        return None

    # Separate audio and labels
    audios = [item['audio'] for item in batch]
    labels = [item['label'] for item in batch]

    # Pad audio data
    padded_audios = torch.nn.utils.rnn.pad_sequence([torch.tensor(audio) for audio in audios], batch_first=True)

    return {"audio": padded_audios, "label": torch.tensor(labels)}


dataloader = train_dataloader

model = TicClassifier() # Re-instantiate the model with the corrected forward pass
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Define training parameters
learning_rate = 1e-3
num_epochs = 50
weight_decay = 1e-5

# Choose optimizer
optimizer = optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

# Choose loss function (Binary Cross-Entropy with Logits)
loss_fn = torch.nn.BCEWithLogitsLoss()

fp = 0
fn = 0
model.train()
for epoch in range(num_epochs):
    total_loss = 0
    for i, batch in enumerate(dataloader):
        # cop += 1
        # print(batch)
        # exit()
        if batch is None:
            continue # Skip empty batches

        broke = False
        for x in batch["audio"]:
            if len(x)  >= 1e5:
              broke = True
              break
        if broke: continue
        audio_data = batch['audio'].to(device)
        labels = batch['label'].float().to(device)

        # Forward pass
        logits = model(audio_data)
        # print(logits, "ll")
        for i in range(len(logits)):
          if logits[i] > 0.5 and batch['label'][i] == 0:
              fp += 1
          if logits[i] <= 0.5 and batch['label'][i] == 1:
              fn += 1


        # Calculate loss
        loss = loss_fn(logits, labels)

        # Backpropagation
        loss.backward()

        # Update weights
        optimizer.step()

        # Zero gradients
        optimizer.zero_grad()

        total_loss += loss.item()
        # print(total_loss)
        # if cop > 10: break
        # Explicitly delete tensors and clear cache more frequently
        del audio_data, labels, logits, loss
        torch.cuda.empty_cache()
        gc.collect()

        # Optional: Clear cache after every few iterations
        if (i + 1) % 10 == 0: # Clear cache every 10 iterations
            torch.cuda.empty_cache()


    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(dataloader)}")

print("fp", fp/num_epochs)
print("fn", fn/num_epochs)
# Clear cache and collect garbage after training
torch.cuda.empty_cache()
gc.collect()

In [None]:
optimizer

In [None]:
total_loss/len(dataloader)

In [None]:
evaluate_binary_classifier(model, test_dataloader )

In [None]:
# After training:
SAVE_DIR = "drive/MyDrive/artifacts"
import os, json, torch
os.makedirs(SAVE_DIR, exist_ok=True)

# 1) Save ONLY the PyTorch classifier head
torch.save(model.classifier.state_dict(), f"{SAVE_DIR}/classifier_head.pt")

# 2) Save a small config so the server knows how to rebuild the model
config = {
    "yamnet_handle": "https://tfhub.dev/google/yamnet/1",
    "sr": 16000,
    "dropout": 0.2,
    "max_len_s": 2.0,
    "threshold": 0.5
}
with open(f"{SAVE_DIR}/model_config.json", "w") as f:
    json.dump(config, f, indent=2)

print("Saved:", os.listdir(SAVE_DIR))