In [None]:
import os
os.chdir(r'D:\ML\minor project')
print("Current working directory:", os.getcwd())

Current working directory: D:\ML\minor project


In [None]:
!pip install torchaudio==2.6.0

Collecting torchaudio==2.6.0
  Downloading torchaudio-2.6.0-cp312-cp312-win_amd64.whl.metadata (6.7 kB)
Downloading torchaudio-2.6.0-cp312-cp312-win_amd64.whl (2.4 MB)
   ---------------------------------------- 0.0/2.4 MB ? eta -:--:--
   ---------------------------------------- 0.0/2.4 MB ? eta -:--:--
   -------- ------------------------------- 0.5/2.4 MB 2.1 MB/s eta 0:00:01
   -------- ------------------------------- 0.5/2.4 MB 2.1 MB/s eta 0:00:01
   ----------------- ---------------------- 1.0/2.4 MB 1.4 MB/s eta 0:00:01
   ------------------------- -------------- 1.6/2.4 MB 1.7 MB/s eta 0:00:01
   -------------------------------------- - 2.4/2.4 MB 2.1 MB/s eta 0:00:01
   ---------------------------------------- 2.4/2.4 MB 2.2 MB/s eta 0:00:00
Installing collected packages: torchaudio
Successfully installed torchaudio-2.6.0


In [None]:
import os
import librosa

def analyze_audio_files(audio_path):
    min_length = float('inf')
    sample_rates = set()

    for label in ['FAKE', 'REAL']:
        label_path = os.path.join(audio_path, label)
        if not os.path.exists(label_path):
            print(f"Directory not found: {label_path}")
            continue

        for file in os.listdir(label_path):
            if file.endswith('.wav'):
                file_path = os.path.join(label_path, file)
                try:
                    # Load audio with original sample rate
                    y, sr = librosa.load(file_path, sr=None)
                    duration = len(y)  # Total samples
                    sample_rates.add(sr)

                    if duration < min_length:
                        min_length = duration

                except Exception as e:
                    print(f"Error processing {file_path}: {str(e)}")
                    continue

    return min_length, sample_rates

if __name__ == "__main__":
    AUDIO_PATH = r"D:\ML\minor project\archive\SPLITTED"

    if not os.path.exists(AUDIO_PATH):
        print(f"Audio directory not found: {AUDIO_PATH}")
        exit(1)

    min_samples, sample_rates = analyze_audio_files(AUDIO_PATH)

    print("\nAnalysis Results:")
    print(f"Minimum audio length (samples): {min_samples}")
    print(f"Sample rates found: {sample_rates}")

    # Recommended n_fft settings
    common_sr = sample_rates.pop() if sample_rates else 16000
    recommended_n_fft = {
        'music': 2048,       # Typical for music @ 22050Hz
        'speech': 512,       # Typical for speech processing
        'custom': min(min_samples, 1024)  # Safe value based on your shortest file
    }

    print("\nRecommended n_fft values:")
    print(f"- For music: {recommended_n_fft['music']}")
    print(f"- For speech: {recommended_n_fft['speech']}")
    print(f"- Safe custom: {recommended_n_fft['custom']} (based on your shortest file)")

    if min_samples < 512:
        print("\nWarning: Very short audio files detected!")
        print("Consider:")
        print("- Padding shorter files with silence")
        print("- Using smaller n_fft (e.g., 256)")
        print("- Checking file lengths with librosa.get_duration()")



Analysis Results:
Minimum audio length (samples): 144
Sample rates found: {40000, 44100, 48000}

Recommended n_fft values:
- For music: 2048
- For speech: 512
- Safe custom: 144 (based on your shortest file)

Consider:
- Padding shorter files with silence
- Using smaller n_fft (e.g., 256)
- Checking file lengths with librosa.get_duration()


In [None]:
"""
Audio Deepfake Detection with Conformer
- Fixed tensor dimensions
- Enhanced augmentation handling
- Custom collate function
"""

import os
import torch
import torchaudio
import numpy as np
from torch import nn
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from torchaudio.transforms import MelSpectrogram, TimeStretch, Resample
from torchaudio.models.conformer import Conformer
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, roc_auc_score

# Configuration
AUDIO_PATH = r"D:\ML\minor project\archive\SPLITTED"
TARGET_SAMPLE_RATE = 44100
REQUIRED_LENGTH = 10 * TARGET_SAMPLE_RATE  # 441,000 samples
N_FFT = 256
WIN_LENGTH = 256
HOP_LENGTH = 64
N_MELS = 128
BATCH_SIZE = 8
EPOCHS = 50
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
FIXED_TIME_STEPS = (REQUIRED_LENGTH - N_FFT) // HOP_LENGTH + 1  # 6887

def pad_collate(batch):
    """Custom collate function to handle final dimension mismatches"""
    specs, labels = zip(*batch)

    # Get max time dimension
    max_time = max(spec.shape[-1] for spec in specs)

    # Pad all specs to max_time
    padded_specs = []
    for spec in specs:
        if spec.shape[-1] < max_time:
            spec = torch.nn.functional.pad(spec, (0, max_time - spec.shape[-1]))
        padded_specs.append(spec)

    return torch.stack(padded_specs), torch.tensor(labels)

def is_valid_file(file_path):
    """Check if audio file meets requirements"""
    try:
        info = torchaudio.info(file_path)
        return info.num_frames == REQUIRED_LENGTH
    except Exception as e:
        print(f"Invalid file {file_path}: {str(e)}")
        return False

def load_data():
    """Load and filter data with proper augmentation"""
    fake_files = [os.path.join(AUDIO_PATH, "FAKE", f)
                 for f in os.listdir(os.path.join(AUDIO_PATH, "FAKE"))
                 if is_valid_file(os.path.join(AUDIO_PATH, "FAKE", f))]

    real_files = [os.path.join(AUDIO_PATH, "REAL", f)
                 for f in os.listdir(os.path.join(AUDIO_PATH, "REAL"))
                 if is_valid_file(os.path.join(AUDIO_PATH, "REAL", f))]

    # Apply 7x augmentation to REAL class after validation
    real_files_augmented = real_files * 7
    labels = [0]*len(fake_files) + [1]*len(real_files_augmented)

    return train_test_split(
        fake_files + real_files_augmented,
        labels,
        test_size=0.2,
        stratify=labels,
        random_state=42
    )

class AudioDataset(Dataset):
    def __init__(self, files, labels, augment=False):
        self.files = files
        self.labels = labels
        self.augment = augment

        # Audio transforms
        self.resample = Resample(orig_freq=TARGET_SAMPLE_RATE, new_freq=TARGET_SAMPLE_RATE)
        self.mel_spec = MelSpectrogram(
            sample_rate=TARGET_SAMPLE_RATE,
            n_fft=N_FFT,
            win_length=WIN_LENGTH,
            hop_length=HOP_LENGTH,
            n_mels=N_MELS,
            normalized=True
        )
        self.time_stretch = TimeStretch(
            hop_length=HOP_LENGTH,
            n_freq=N_FFT//2 + 1
        )

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        file = self.files[idx]
        label = self.labels[idx]

        # Load and process
        waveform, _ = torchaudio.load(file)
        waveform = self.resample(waveform)
        waveform = waveform.mean(dim=0, keepdim=True)
        waveform = waveform / (waveform.abs().max() + 1e-9)

        # Base Mel spectrogram
        mel = self.mel_spec(waveform)

        # Augmentation (REAL only)
        if self.augment and label == 1:
            spec = torch.stft(
                waveform,
                n_fft=N_FFT,
                hop_length=HOP_LENGTH,
                win_length=WIN_LENGTH,
                window=torch.hann_window(WIN_LENGTH),
                return_complex=True
            )

            # Time stretch with strict length enforcement
            rate = np.random.uniform(0.9, 1.1)
            spec = self.time_stretch(spec, rate)
            spec = spec[..., :FIXED_TIME_STEPS]  # Hard trim to exact size

            # Regenerate Mel from stretched spec
            mel = self.mel_spec(torch.abs(spec))

        # Final shape enforcement
        if mel.shape[-1] != FIXED_TIME_STEPS:
            mel = mel[..., :FIXED_TIME_STEPS]  # Trim to exact size

        return torchaudio.transforms.AmplitudeToDB()(mel), label

class AudioConformer(nn.Module):
    def __init__(self, num_classes=2):
        super().__init__()
        self.conformer = Conformer(
            input_dim=N_MELS,
            num_heads=4,
            ffn_dim=256,
            num_layers=4,
            depthwise_conv_kernel_size=63
        )
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool1d(1),
            nn.Flatten(),
            nn.Linear(256, num_classes)
        )

    def forward(self, x, lengths):
        x = x.squeeze(1)  # [B,1,F,T] -> [B,F,T]
        x = x.permute(0,2,1)  # [B,T,F] for Conformer
        x, _ = self.conformer(x, lengths)
        x = x.permute(0,2,1)  # [B,F,T] for pooling
        return self.classifier(x.unsqueeze(1))

def create_weighted_sampler(labels):
    class_counts = np.bincount(labels)
    class_weights = 1. / torch.Tensor(class_counts)
    sample_weights = class_weights[labels]
    return WeightedRandomSampler(sample_weights, len(sample_weights))

def train_model():
    train_files, test_files, train_labels, test_labels = load_data()

    train_dataset = AudioDataset(train_files, train_labels, augment=True)
    test_dataset = AudioDataset(test_files, test_labels)

    sampler = create_weighted_sampler(train_dataset.labels)

    train_loader = DataLoader(
        train_dataset,
        batch_size=BATCH_SIZE,
        sampler=sampler,
        collate_fn=pad_collate  # Custom collate
    )
    test_loader = DataLoader(
        test_dataset,
        batch_size=BATCH_SIZE,
        collate_fn=pad_collate  # Custom collate
    )

    model = AudioConformer().to(DEVICE)
    criterion = nn.CrossEntropyLoss(weight=torch.tensor([1.0, 3.0]).to(DEVICE))
    optimizer = torch.optim.AdamW(model.parameters(), lr=3e-5)

    # Fixed lengths for all samples
    lengths = torch.full((BATCH_SIZE,), FIXED_TIME_STEPS,
                       dtype=torch.long, device=DEVICE)

    for epoch in range(EPOCHS):
        model.train()
        for specs, labels in train_loader:
            specs = specs.to(DEVICE)
            labels = labels.to(DEVICE)

            outputs = model(specs, lengths[:specs.size(0)])
            loss = criterion(outputs, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Validation
        model.eval()
        all_preds, all_labels = [], []
        with torch.no_grad():
            for specs, labels in test_loader:
                specs = specs.to(DEVICE)
                outputs = model(specs, lengths[:specs.size(0)])
                preds = torch.argmax(outputs, dim=1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.numpy())

        print(f"Epoch {epoch+1}/{EPOCHS}")
        print(classification_report(all_labels, all_preds, target_names=["FAKE", "REAL"]))
        print(f"ROC AUC: {roc_auc_score(all_labels, all_preds):.4f}")

    torch.save(model.state_dict(), "conformer_model_final.pth")

if __name__ == "__main__":
    train_model()




RuntimeError: stack expects each tensor to be equal size, but got [1, 129, 128, 6887] at entry 0 and [1, 128, 6887] at entry 1