<a href="https://colab.research.google.com/github/JasminPradhan/DeepFake-Audio-Detection-TL/blob/main/Deepfake_AD_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!huggingface-cli login


    _|    _|  _|    _|    _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|_|_|_|    _|_|      _|_|_|  _|_|_|_|
    _|    _|  _|    _|  _|        _|          _|    _|_|    _|  _|            _|        _|    _|  _|        _|
    _|_|_|_|  _|    _|  _|  _|_|  _|  _|_|    _|    _|  _|  _|  _|  _|_|      _|_|_|    _|_|_|_|  _|        _|_|_|
    _|    _|  _|    _|  _|    _|  _|    _|    _|    _|    _|_|  _|    _|      _|        _|    _|  _|        _|
    _|    _|    _|_|      _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|        _|    _|    _|_|_|  _|_|_|_|

    To log in, `huggingface_hub` requires a token generated from https://huggingface.co/settings/tokens .
Enter your token (input will not be visible): 
Add token as git credential? (Y/n) y
Token is valid (permission: fineGrained).
The token `deepfake_audio` has been saved to /root/.cache/huggingface/stored_tokens
[1m[31mCannot authenticate through git-credential as no helper is defined on your machine.
You might have to 

In [2]:
!pip install torch torchaudio torchvision transformers scikit-learn numpy




In [3]:
!pip install torchaudio>=0.12.0 # Update torchaudio to include the 'Reverberate' transform

In [5]:
import torch
import torch.nn as nn
import torchaudio.transforms as T
import random
from transformers import HubertForSequenceClassification
from torch.cuda.amp import autocast, GradScaler
import torch.optim as optim
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from torch.utils.data import Dataset, DataLoader, random_split
import torchaudio
import os
import zipfile
from google.colab import drive
import gc

In [6]:
drive.mount('/content/drive')

Mounted at /content/drive


In [7]:
dataset_zip_path = '/content/drive/MyDrive/audio_dataset.zip'


In [8]:
import shutil
shutil.rmtree('/content/dataset', ignore_errors=True)

In [9]:
dataset_extract_path = "/content/dataset"

# Create the extraction directory
if not os.path.exists(dataset_extract_path):
    os.makedirs(dataset_extract_path)

# Unzip the dataset
with zipfile.ZipFile(dataset_zip_path, 'r') as zip_ref:
    zip_ref.extractall(dataset_extract_path)

print("Dataset extracted successfully!")

Dataset extracted successfully!


In [10]:
def add_gaussian_noise(waveform, noise_level=0.005):
    noise = torch.randn_like(waveform) * noise_level
    return waveform + noise

def time_masking(waveform, max_mask_pct=0.1):
    num_samples = waveform.shape[-1]
    mask_size = int(num_samples * max_mask_pct)
    start = random.randint(0, num_samples - mask_size)
    waveform[:, start:start + mask_size] = 0
    return waveform

def pitch_shift(waveform, sample_rate, pitch_factor=2):
    transform = T.PitchShift(sample_rate=sample_rate, n_steps=pitch_factor)
    return transform(waveform)



# def collate_fn(batch):
#     waveforms, labels = zip(*batch)

#     # Find the maximum length among all waveforms
#     max_len = max(waveform.shape[-1] for waveform in waveforms)

#     # Pad waveforms to the maximum length
#     padded_waveforms = [torch.nn.functional.pad(waveform, (0, max_len - waveform.shape[-1])) for waveform in waveforms]

#     # Stack padded waveforms and labels
#     return torch.stack(padded_waveforms), torch.stack(labels)

def augment_audio(waveform, sample_rate):
    if random.random() < 0.3:
        waveform = add_gaussian_noise(waveform)
    if random.random() < 0.3:
        waveform = time_masking(waveform)
    if random.random() < 0.3:
        waveform = pitch_shift(waveform, sample_rate)
    return waveform


In [22]:
MAX_LENGTH = 160000  # Adjust based on dataset (e.g., 10 sec at 16kHz = 160000 samples)


In [23]:
class AudioDataset(Dataset):
    def __init__(self, real_path, fake_path, target_length=MAX_LENGTH):
        self.real_files = [os.path.join(real_path, f) for f in os.listdir(real_path)]
        self.fake_files = [os.path.join(fake_path, f) for f in os.listdir(fake_path)]
        self.data = [(f, 1) for f in self.real_files] + [(f, 0) for f in self.fake_files]
        self.target_length = target_length
        random.shuffle(self.data)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        file_path, label = self.data[idx]
        waveform, sample_rate = torchaudio.load(file_path)

        # Apply padding or truncation
        if waveform.shape[1] < self.target_length:
            pad_size = self.target_length - waveform.shape[1]
            waveform = torch.nn.functional.pad(waveform, (0, pad_size))  # Pad at the end
        else:
            waveform = waveform[:, :self.target_length]  # Truncate

        waveform = augment_audio(waveform, sample_rate)  # Apply augmentation
        return waveform, torch.tensor(label, dtype=torch.long)

In [24]:
real_path="/content/dataset/audio_dataset/real"
fake_path="/content/dataset/audio_dataset/fake"
# dataset = AudioDataset(real_path, fake_path)
# train_loader = DataLoader(dataset, batch_size=8, shuffle=True)

In [25]:
from torch.utils.data import random_split

full_dataset = AudioDataset(real_path, fake_path)
train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size
train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

In [26]:
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False, pin_memory=True)

In [27]:
!huggingface-cli whoami

jas2002


In [28]:
import torch
from transformers import HubertForSequenceClassification

model = HubertForSequenceClassification.from_pretrained(
    "facebook/hubert-base-ls960", num_labels=2)
for param in model.hubert.encoder.layers[-8:].parameters():  # Fine-tune last 8 layers
    param.requires_grad = True
model = model.to("cuda" if torch.cuda.is_available() else "cpu")

Some weights of HubertForSequenceClassification were not initialized from the model checkpoint at facebook/hubert-base-ls960 and are newly initialized: ['classifier.bias', 'classifier.weight', 'projector.bias', 'projector.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [29]:
optimizer = optim.AdamW(model.parameters(), lr=1e-5)
scaler = GradScaler(enabled=torch.cuda.is_available())
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=5)


  scaler = GradScaler(enabled=torch.cuda.is_available())


In [30]:
def train(model, train_loader, val_loader, epochs=10, patience=3):
    model.train()
    best_val_loss = float('inf')
    patience_counter = 0
    for epoch in range(epochs):
        total_loss = 0
        for waveforms, labels in train_loader:
            waveforms, labels = waveforms.to("cuda" if torch.cuda.is_available() else "cpu"), labels.to("cuda" if torch.cuda.is_available() else "cpu")
            optimizer.zero_grad()
            with autocast():
                outputs = model(waveforms.squeeze(1)).logits  # Ensure correct input shape
                loss = nn.CrossEntropyLoss()(outputs, labels)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            total_loss += loss.item()
        scheduler.step()
        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss:.4f}")

        val_loss = evaluate(model, val_loader, validation=True)
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            torch.save(model.state_dict(), "best_model.pth")  # Save best model
        else:
            patience_counter += 1

        if patience_counter >= patience:
            print("Early stopping triggered.")
            break

        # Free up memory
        torch.cuda.empty_cache()
        gc.collect()


In [31]:
def evaluate(model, test_loader, validation=False):
    model.eval()
    all_preds, all_labels = [], []
    total_loss = 0
    with torch.no_grad():
        for waveforms, labels in test_loader:
            waveforms, labels = waveforms.to("cuda" if torch.cuda.is_available() else "cpu"), labels.to("cuda" if torch.cuda.is_available() else "cpu")
            outputs = model(waveforms.squeeze(1)).logits  # Ensure correct input shape
            loss = nn.CrossEntropyLoss()(outputs, labels)
            total_loss += loss.item()
            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    acc = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    recall = recall_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)

    if validation:
        return total_loss

    print(f"Test Accuracy: {acc * 100:.2f}%")
    print(f"Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}")
    return acc


In [32]:
train(model, train_loader, val_loader, epochs=10)
evaluate(model, val_loader)

  with autocast():


RuntimeError: you can only change requires_grad flags of leaf variables.