<a href="https://colab.research.google.com/github/gautamHCSCV/AI-ML/blob/main/ASSIST_spoof_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import librosa
import numpy as np
import os

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
import torch.nn as nn

class ASSIST(nn.Module):
    def __init__(self):
        super(ASSIST, self).__init__()
        
        # define the convolutional layers
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=16, kernel_size=5, stride=2, padding=2)
        self.bn1 = nn.BatchNorm1d(num_features=16)
        self.pool1 = nn.MaxPool1d(kernel_size=2, stride=2)
        
        self.conv2 = nn.Conv1d(in_channels=16, out_channels=32, kernel_size=5, stride=2, padding=2)
        self.bn2 = nn.BatchNorm1d(num_features=32)
        self.pool2 = nn.MaxPool1d(kernel_size=2, stride=2)
        
        self.conv3 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=5, stride=2, padding=2)
        self.bn3 = nn.BatchNorm1d(num_features=64)
        self.pool3 = nn.MaxPool1d(kernel_size=2, stride=2)
        
        # define the fully connected layers
        self.fc1 = nn.Linear(in_features=80000, out_features=128)
        self.dropout = nn.Dropout(p=0.5)
        self.fc2 = nn.Linear(in_features=128, out_features=2)
        
    def forward(self, x):
        # apply the convolutional layers
        x = x.view(x.shape[0],1, x.shape[1])

        x = self.conv1(x)
        x = self.bn1(x)
        x = nn.functional.relu(x)
        x = self.pool1(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        x = nn.functional.relu(x)
        x = self.pool2(x)
        
        x = self.conv3(x)
        x = self.bn3(x)
        x = nn.functional.relu(x)
        x = self.pool3(x)
        
        # reshape the output of the convolutional layers
        x = x.view(x.size(0), -1)
        
        # apply the fully connected layers
        x = self.fc1(x)
        x = nn.functional.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        
        return x


In [4]:
path = '/content/drive/MyDrive/DAI/Spoof Detection/'
os.listdir(path)

['Audio_english', 'TTS_english']

In [5]:
class AudioDataset(Dataset):
    def __init__(self, real_audio_dir, fake_audio_dir):
        self.real_audio_dir = real_audio_dir
        self.fake_audio_dir = fake_audio_dir
        self.real_audio_files = os.listdir(self.real_audio_dir)
        self.fake_audio_files = os.listdir(self.fake_audio_dir)
        self.sample_rate = 16000 # sample rate for all audio files
        
    def __len__(self):
        return len(self.real_audio_files) + len(self.fake_audio_files)
    
    def __getitem__(self, idx):
        if idx < len(self.real_audio_files):
            audio_file = self.real_audio_files[idx]
            label = 0 # 0 for real audio
            audio_path = os.path.join(self.real_audio_dir, audio_file)
        else:
            audio_file = self.fake_audio_files[idx - len(self.real_audio_files)]
            label = 1 # 1 for fake audio
            audio_path = os.path.join(self.fake_audio_dir, audio_file)
        
        waveform, sr = librosa.load(audio_path, sr=self.sample_rate)
        waveform = torch.from_numpy(waveform).float()
        waveform_length = waveform.shape[0]
        
        # pad or truncate the waveform to have the same length
        target_length = self.sample_rate * 5 # 5 seconds
        if waveform_length < target_length:
            num_missing_samples = target_length - waveform_length
            padding = torch.zeros(num_missing_samples)
            waveform = torch.cat((waveform, padding))
        else:
            waveform = waveform[:target_length]
        
        return waveform, label
            
# Define the data loaders
dataset = AudioDataset(path+'Audio_english', path+'TTS_english')
#test_dataset = AudioDataset('path/to/testing/data')

print(len(dataset))

train_dataset,test_dataset = torch.utils.data.dataset.random_split(dataset,[700,87])


train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

787


In [6]:
a = iter(test_loader)
b = next(a)
print(b[0].shape)
print(b[1])

torch.Size([32, 80000])
tensor([1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1,
        0, 0, 1, 0, 0, 1, 1, 0])


In [None]:
def train(model, train_loader, test_loader, num_epochs):
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0
        train_acc = 0
        
        for data, label in train_loader:
            data = data.to(device)
            label = label.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, label)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            pred = output.argmax(dim=1, keepdim=True)
            train_acc += pred.eq(label.view_as(pred)).sum().item()
        
        train_loss /= len(train_loader.dataset)
        train_acc /= len(train_loader.dataset)
        
        model.eval()
        test_loss = 0
        test_acc = 0
        
        with torch.no_grad():
            for data, label in test_loader:
                data = data.to(device)
                label = label.to(device)
                output = model(data)
                loss = criterion(output, label)
                test_loss += loss.item()
                pred = output.argmax(dim=1, keepdim=True)
                test_acc += pred.eq(label.view_as(pred)).sum().item()
        
        test_loss /= len(test_loader.dataset)
        test_acc /= len(test_loader.dataset)
        
        print(f"Epoch {epoch+1}: Train Loss = {train_loss:.4f}, Train Accuracy = {train_acc:.4f}, Test Loss = {test_loss:.4f}, Test Accuracy = {test_acc:.4f}")

# Train the model
model = ASSIST()
train(model, train_loader, test_loader, num_epochs=10)

Epoch 1: Train Loss = 0.0640, Train Accuracy = 0.8471, Test Loss = 0.2689, Test Accuracy = 0.5977
