In [None]:
!kaggle datasets download mohammedabdeldayem/the-fake-or-real-dataset
!unzip the-fake-or-real-dataset.zip -d the-fake-or-real-dataset

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: the-fake-or-real-dataset/for-rerec/for-rerecorded/training/real/recording12954.wav_norm_mono.wav  
  inflating: the-fake-or-real-dataset/for-rerec/for-rerecorded/training/real/recording12955.wav_norm_mono.wav  
  inflating: the-fake-or-real-dataset/for-rerec/for-rerecorded/training/real/recording12956.wav_norm_mono.wav  
  inflating: the-fake-or-real-dataset/for-rerec/for-rerecorded/training/real/recording12957.wav_norm_mono.wav  
  inflating: the-fake-or-real-dataset/for-rerec/for-rerecorded/training/real/recording12958.wav_norm_mono.wav  
  inflating: the-fake-or-real-dataset/for-rerec/for-rerecorded/training/real/recording12959.wav_norm_mono.wav  
  inflating: the-fake-or-real-dataset/for-rerec/for-rerecorded/training/real/recording12961.wav_norm_mono.wav  
  inflating: the-fake-or-real-dataset/for-rerec/for-rerecorded/training/real/recording12962.wav_norm_mono.wav  
  inflating: the-fake-or-real-dataset/f

In [None]:
import random
import os

# set the seed
random.seed(42)

train_real_audio_path = "/content/the-fake-or-real-dataset/for-norm/for-norm/training/real"
train_fake_audio_path = "/content/the-fake-or-real-dataset/for-norm/for-norm/training/fake"
test_real_audio_path = "/content/the-fake-or-real-dataset/for-norm/for-norm/testing/real"
test_fake_audio_path = "/content/the-fake-or-real-dataset/for-norm/for-norm/testing/fake"

train_real_audio_file_paths = [(os.path.join(train_real_audio_path, file), 0) for file in os.listdir(train_real_audio_path)]
train_fake_audio_file_paths = [(os.path.join(train_fake_audio_path, file), 1) for file in os.listdir(train_fake_audio_path)]
test_real_audio_file_paths = [(os.path.join(test_real_audio_path, file), 0) for file in os.listdir(test_real_audio_path)]
test_fake_audio_file_paths = [(os.path.join(test_fake_audio_path, file), 1) for file in os.listdir(test_fake_audio_path)]

train_audio_paths = random.sample(train_real_audio_file_paths, 1000) + random.sample(train_fake_audio_file_paths, 1000)
random.shuffle(train_audio_paths)

test_audio_paths = random.sample(test_real_audio_file_paths, 200) + random.sample(test_fake_audio_file_paths, 200)
random.shuffle(test_audio_paths)

train_audios = [file_path for file_path, _ in train_audio_paths]
train_labels = [label for _, label in train_audio_paths]

test_audios= [file_path for file_path, _ in test_audio_paths]
test_labels = [label for _, label in test_audio_paths]


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torchaudio
from torchaudio import transforms as audio_transforms
from torchvision import transforms as image_transforms
import logging

logger = logging.getLogger(__name__)


class AudioDataset(Dataset):
    def __init__(
        self,
        audio_paths,
        labels,
        sample_rate=16000,
        target_length=3*16000
    ):
        self.audio_paths = audio_paths
        self.labels = labels
        self.sample_rate = sample_rate
        self.target_length = target_length

    def __getitem__(self, idx):

        audio_path = self.audio_paths[idx]
        audio, sample_rate = torchaudio.load(audio_path)

        if audio.shape[0] > 1:
            audio = torch.mean(audio, dim=0, keepdim=True)

        if sample_rate != 16000:
            resample = audio_transforms.Resample(orig_freq=sample_rate, new_freq=self.sample_rate)
            audio = resample(audio)

        if audio.shape[1] < self.target_length:
            padding = torch.zeros(1, self.target_length - audio.shape[1])
            audio = torch.cat((audio, padding), dim=1)
        else:
            audio = audio[:, :self.target_length]

        spectrogram = audio_transforms.MelSpectrogram(
            sample_rate=self.sample_rate,
            n_fft=1024,
            hop_length=512,
            n_mels=128,
            power=2.0
        )(audio)

        mfcc = audio_transforms.MFCC(
            sample_rate=self.sample_rate,
            n_mfcc=13,
            melkwargs={
                'n_fft': 1024,
                'hop_length': 512,
                'n_mels': 128
            }
        )(audio)

        spectrogram = image_transforms.Normalize(
            mean=[spectrogram.mean()],
            std=[spectrogram.std()]
        )(spectrogram)

        mfcc = image_transforms.Normalize(
            mean=[mfcc.mean()],
            std=[mfcc.std()]
        )(mfcc)

        audio = audio / audio.abs().max()

        label = self.labels[idx]

        return (spectrogram, mfcc, audio), label

    def __len__(self):
        return len(self.audio_paths)


train_dataset = AudioDataset(train_audios, train_labels)
test_dataset = AudioDataset(test_audios, test_labels)

train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

print(f"Train dataset size: {len(train_dataset)}")
print(f"Test dataset size: {len(test_dataset)}")

Train dataset size: 2000
Test dataset size: 400


In [None]:
import tqdm

class WaveformProcessor(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv1d(1, 32, kernel_size=3, padding=1)
        self.pool1 = nn.MaxPool1d(4)
        self.conv2 = nn.Conv1d(32, 64, kernel_size=3, padding=1)
        self.pool2 = nn.MaxPool1d(4)
        self.global_pool = nn.AdaptiveAvgPool1d(1)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool1(x)
        x = F.relu(self.conv2(x))
        x = self.pool2(x)
        x = self.global_pool(x)
        return x.view(x.size(0), -1)

class FeatureExtractor(nn.Module):
    def __init__(self, in_channels=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.pool1 = nn.MaxPool2d(2)

        self.conv2 = nn.Conv2d(32, 64, kernel_size=2, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.pool2 = nn.MaxPool2d(2)

        self.global_pool = nn.AdaptiveAvgPool2d((1,1))

    def forward(self, x):
        x = self.pool1(F.relu(self.bn1(self.conv1(x))))
        x = self.pool2(F.relu(self.bn2(self.conv2(x))))
        x = self.global_pool(x)
        return x.view(x.size(0), -1)

class AudioClassifier(nn.Module):
    def __init__(self, sample_rate=16000, target_length=16000*3):
        super().__init__()

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.sample_rate = sample_rate
        self.target_length = target_length

        # Create feature extractors
        self.spec_extractor = FeatureExtractor(in_channels=1)
        self.mfcc_extractor = FeatureExtractor(in_channels=1)
        self.waveform_processor = WaveformProcessor()


        # Combined layers
        self.fc1 = nn.Linear(64*3, 16)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(16, 1)

        self.loss = nn.BCEWithLogitsLoss()
        self.optimizer = None


    def forward(self, spec, mfcc, audio):
        # Extract features from both inputs
        spec_features = self.spec_extractor(spec)
        mfcc_features = self.mfcc_extractor(mfcc)
        waveform_features = self.waveform_processor(audio)

        # Concatenate and pass through FC layers
        x = torch.cat((spec_features, mfcc_features, waveform_features), dim=1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)

        return x

    def fit(self, dataloader, epochs=10):

        # init training mode to turn on batch normalization and dropout
        self.train()
        self.to(self.device)

        # init the optimizer
        self.optimizer = optim.Adam(self.parameters(), lr=0.001, weight_decay=0.01)

        for epoch in tqdm.tqdm(range(epochs)):
            running_loss = 0.0
            for batch_idx, (features, labels) in enumerate(dataloader):

                # get the inputs and labels and move them to the device
                spectrograms, mfccs, waveforms = features
                spectrograms = spectrograms.to(self.device)
                mfccs = mfccs.to(self.device)
                waveforms = waveforms.to(self.device)
                labels = labels.to(self.device).float().unsqueeze(1)

                # clear gradients so they won't accumulate and cause divergence
                self.optimizer.zero_grad()
                outputs = self.forward(spectrograms, mfccs, waveforms)
                loss = self.loss(outputs, labels)
                loss.backward()

                # every 10 batches log the batch loss
                if batch_idx % 10 == 0:
                    print(f"Batch {batch_idx}, Loss: {loss.item()}")

                # update the weights
                self.optimizer.step()
                running_loss += loss.item()

            print(f"Epoch {epoch + 1}/{epochs}, Loss: {running_loss / len(dataloader)}")


    def evaluate(self, dataloader):

        self.eval()

        true_positives = 0
        true_negatives = 0
        false_positives = 0
        false_negatives = 0
        total = 0
        correct = 0

        with torch.no_grad():
            for (spectrograms, mfccs, waveforms), labels in dataloader:
                spectrograms = spectrograms.to(self.device)
                mfccs = mfccs.to(self.device)
                waveforms = waveforms.to(self.device)
                labels = labels.to(self.device).float().unsqueeze(1)

                outputs = self.forward(spectrograms, mfccs, waveforms)
                predicted = torch.round(torch.sigmoid(outputs))

                total += labels.size(0)
                correct += (predicted == labels).sum().item()

                true_positives += ((predicted == 1) & (labels == 1)).sum().item()
                true_negatives += ((predicted == 0) & (labels == 0)).sum().item()
                false_positives += ((predicted == 1) & (labels == 0)).sum().item()
                false_negatives += ((predicted == 0) & (labels == 1)).sum().item()

        accuracy = correct / total
        precision = true_positives / (true_positives + false_positives)
        recall = true_positives / (true_positives + false_negatives)
        f1_score = 2 * (precision * recall) / (precision + recall)

        print(f"Accuracy: {accuracy:.4f}")
        print(f"Precision: {precision:.4f}")
        print(f"Recall: {recall:.4f}")
        print(f"F1-Score: {f1_score:.4f}")

        return accuracy, precision, recall, f1_score

In [None]:
# Create and train the model
model = AudioClassifier()
model.fit(train_dataloader, epochs=10)

  0%|          | 0/10 [00:00<?, ?it/s]

Batch 0, Loss: 0.6994096040725708
Batch 10, Loss: 0.5629104971885681
Batch 20, Loss: 0.6172284483909607
Batch 30, Loss: 0.49110275506973267
Batch 40, Loss: 0.5419744849205017
Batch 50, Loss: 0.4352317452430725
Batch 60, Loss: 0.5352634787559509


 10%|█         | 1/10 [00:23<03:29, 23.31s/it]

Epoch 1/10, Loss: 0.5529675630349962
Batch 0, Loss: 0.40537214279174805
Batch 10, Loss: 0.40020737051963806
Batch 20, Loss: 0.45686715841293335
Batch 30, Loss: 0.4085441827774048
Batch 40, Loss: 0.473294734954834
Batch 50, Loss: 0.41914770007133484
Batch 60, Loss: 0.4136437475681305


 20%|██        | 2/10 [00:46<03:05, 23.20s/it]

Epoch 2/10, Loss: 0.46577840475809007
Batch 0, Loss: 0.507046639919281
Batch 10, Loss: 0.4121561646461487
Batch 20, Loss: 0.39860761165618896
Batch 30, Loss: 0.44077014923095703
Batch 40, Loss: 0.442164808511734
Batch 50, Loss: 0.5541786551475525
Batch 60, Loss: 0.4337530732154846


 30%|███       | 3/10 [01:09<02:42, 23.24s/it]

Epoch 3/10, Loss: 0.44168259842055185
Batch 0, Loss: 0.4920114278793335
Batch 10, Loss: 0.3376464247703552
Batch 20, Loss: 0.4253127872943878
Batch 30, Loss: 0.2884700894355774
Batch 40, Loss: 0.5327842235565186
Batch 50, Loss: 0.32558706402778625
Batch 60, Loss: 0.46871355175971985


 40%|████      | 4/10 [01:33<02:19, 23.27s/it]

Epoch 4/10, Loss: 0.4230226476987203
Batch 0, Loss: 0.46152880787849426
Batch 10, Loss: 0.34848326444625854
Batch 20, Loss: 0.39365696907043457
Batch 30, Loss: 0.4045913517475128
Batch 40, Loss: 0.23492799699306488
Batch 50, Loss: 0.3884674608707428
Batch 60, Loss: 0.29077625274658203


 50%|█████     | 5/10 [01:56<01:56, 23.22s/it]

Epoch 5/10, Loss: 0.3762859974115614
Batch 0, Loss: 0.23359663784503937
Batch 10, Loss: 0.634121298789978
Batch 20, Loss: 0.31693384051322937
Batch 30, Loss: 0.30202627182006836
Batch 40, Loss: 0.488911896944046
Batch 50, Loss: 0.3513648808002472
Batch 60, Loss: 0.2774396240711212


 60%|██████    | 6/10 [02:19<01:32, 23.18s/it]

Epoch 6/10, Loss: 0.3738900905563718
Batch 0, Loss: 0.45845940709114075
Batch 10, Loss: 0.26740509271621704
Batch 20, Loss: 0.30672189593315125
Batch 30, Loss: 0.41068601608276367
Batch 40, Loss: 0.3117598593235016
Batch 50, Loss: 0.5132406949996948
Batch 60, Loss: 0.4205220341682434


 70%|███████   | 7/10 [02:42<01:09, 23.20s/it]

Epoch 7/10, Loss: 0.3438929243693276
Batch 0, Loss: 0.3145645260810852
Batch 10, Loss: 0.26580774784088135
Batch 20, Loss: 0.4553024470806122
Batch 30, Loss: 0.36370590329170227
Batch 40, Loss: 0.2535206079483032
Batch 50, Loss: 0.4159083962440491
Batch 60, Loss: 0.251323938369751


 80%|████████  | 8/10 [03:05<00:46, 23.15s/it]

Epoch 8/10, Loss: 0.34052415215779863
Batch 0, Loss: 0.21707284450531006
Batch 10, Loss: 0.1881699413061142
Batch 20, Loss: 0.21084165573120117
Batch 30, Loss: 0.23640136420726776
Batch 40, Loss: 0.3650696873664856
Batch 50, Loss: 0.26373058557510376
Batch 60, Loss: 0.17864032089710236


 90%|█████████ | 9/10 [03:28<00:23, 23.13s/it]

Epoch 9/10, Loss: 0.29313847540862975
Batch 0, Loss: 0.25289392471313477
Batch 10, Loss: 0.2549511194229126
Batch 20, Loss: 0.32182350754737854
Batch 30, Loss: 0.472601979970932
Batch 40, Loss: 0.17795655131340027
Batch 50, Loss: 0.35409635305404663
Batch 60, Loss: 0.22995921969413757


100%|██████████| 10/10 [03:51<00:00, 23.19s/it]

Epoch 10/10, Loss: 0.3117709346706905





In [None]:
accuracy, precision, recall, f1_score = model.evaluate(test_dataloader)

Accuracy: 0.7525
Precision: 0.7463
Recall: 0.7650
F1-Score: 0.7556


In [None]:
accuracy, precision, recall, f1_score = model.evaluate(train_dataloader)

Accuracy: 0.8940
Precision: 0.9191
Recall: 0.8640
F1-Score: 0.8907


In [None]:
total_params = sum(p.numel() for p in model.parameters())
print(f"Total parameters: {total_params:,}")

Total parameters: 26,977


In [None]:
def save_model(model, model_path="audio_classifier.pt"):
    # Move model to CPU
    model = model.to("cpu")
    model.eval()

    # Create ModelWrapper to handle multiple inputs
    class ModelWrapper(nn.Module):
        def __init__(self, model):
            super().__init__()
            self.model = model

        def forward(self, spectrogram, mfcc, audio):
            return self.model(spectrogram, mfcc, audio)

    # Wrap the model
    wrapped_model = ModelWrapper(model)

    # Create example inputs
    example_spec = torch.randn(1, 1, 128, 94)
    example_mfcc = torch.randn(1, 1, 13, 94)
    example_audio = torch.randn(1, 1, 3*16000)

    # Trace the model
    traced_model = torch.jit.trace(
        wrapped_model,
        (example_spec, example_mfcc, example_audio)
    )

    # Test the traced model
    test_output = traced_model(example_spec, example_mfcc, example_audio)
    print("Test output shape:", test_output.shape)

    # Save the model
    traced_model.save(model_path)
    print(f"Model saved to {model_path}")

    return traced_model

# Save the model
traced_model = save_model(model)

# Test loading and inference
loaded_model = torch.jit.load("audio_classifier.pt")
with torch.no_grad():
    test_spec = torch.randn(1, 1, 128, 94)
    test_mfcc = torch.randn(1, 1, 13, 94)
    test_audio = torch.randn(1, 1, 3*16000)
    output = loaded_model(test_spec, test_mfcc, test_audio)
    print("Test inference output:", output)

Test output shape: torch.Size([1, 1])
Model saved to audio_classifier.pt
Test inference output: tensor([[-16.4534]])
