In [1]:
import torch

In [2]:
# Check for GPU availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


Using device: cpu


In [3]:
pip install torch transformers librosa pandas scikit-learn 

Note: you may need to restart the kernel to use updated packages.


In [4]:
import librosa

def load_audio(file_path, sample_rate=16000):
    # Load audio file with librosa
    audio, _ = librosa.load(file_path, sr=sample_rate)
    return audio


In [5]:
import os

def load_audio_from_folders(data_dir):
    audio_data = []
    audio_labels = []

    for label in os.listdir(data_dir):
        label_path = os.path.join(data_dir, label)
        
        # Skip non-directory files
        if not os.path.isdir(label_path):
            continue
            
        for file_name in os.listdir(label_path):
            file_path = os.path.join(label_path, file_name)
            
            # Check if the file is a valid audio file (e.g., .wav, .mp3)
            if not (file_name.endswith('.wav') or file_name.endswith('.mp3')):
                continue
            
            # Load audio
            audio = load_audio(file_path)  # Load with librosa
            audio_data.append(audio)
            audio_labels.append(label)

    return audio_data, audio_labels

# Example usage
#audio_files, labels = load_audio_from_folders("Merged_Dataset")


In [6]:
from transformers import Wav2Vec2Processor, Wav2Vec2Model
import torch

processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base")
model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base")




In [7]:
def extract_wav2vec_embedding(audio):
    inputs = processor(audio, sampling_rate=16000, return_tensors="pt", padding=True)
    with torch.no_grad():
        embeddings = model(**inputs).last_hidden_state
        # Pool across time dimension to get a single embedding vector
        embedding = torch.mean(embeddings, dim=1)
    return embedding

In [8]:
import numpy as np
def prepare_dataset(audio_files, labels):
    embeddings = []
    for i, file in enumerate(audio_files):
        audio = load_audio(file)  # Load audio file
        embedding = extract_wav2vec_embedding(audio)  # Extract embedding
        embeddings.append(embedding.cpu().numpy())  # Append embedding to the list
    return np.array(embeddings), np.array(labels)


In [9]:
import os
import torch
import numpy as np
import librosa
from transformers import Wav2Vec2Processor, Wav2Vec2Model

# Load Wav2Vec model and processor
processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base")
model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base")
model.eval()

def load_audio(file_path, sample_rate=16000):
    # Load audio file with librosa
    audio, _ = librosa.load(file_path, sr=sample_rate)
    return audio

def extract_wav2vec_embedding(audio):
    inputs = processor(audio, sampling_rate=16000, return_tensors="pt", padding=True)
    with torch.no_grad():
        embeddings = model(**inputs).last_hidden_state
        # Pool across time dimension to get a single embedding vector
        embedding = torch.mean(embeddings, dim=1)
    return embedding

def load_audio_from_folders(data_dir):
    audio_data = []
    audio_labels = []
    
    # Loop through each folder ('real' and 'fake')
    for label_folder in os.listdir(data_dir):
        label_path = os.path.join(data_dir, label_folder)
        
        if os.path.isdir(label_path):
            label = 1 if label_folder == 'real' else 0
            
            for file_name in os.listdir(label_path):
                file_path = os.path.join(label_path, file_name)
                
                # Append the file path and label
                audio_data.append(file_path)
                audio_labels.append(label)
                
    return audio_data, audio_labels

def prepare_dataset(audio_files, labels):
    embeddings = []
    for i, file in enumerate(audio_files):
        audio = load_audio(file)  # Load audio file
        embedding = extract_wav2vec_embedding(audio)  # Extract embedding
        embeddings.append(embedding.cpu().numpy())  # Append embedding to the list
    return np.array(embeddings), np.array(labels)

# Load your audio files and labels
data_directory = r'C:\Users\ACER\Desktop\AudioDF\audio'  # Update this to your dataset path
audio_files, labels = load_audio_from_folders(data_directory)  # Ensure this function is defined

# Prepare dataset to get embeddings and labels
embeddings, labels = prepare_dataset(audio_files, labels)

# Print the shape of embeddings
print("Embeddings shape:", embeddings.shape)  # Should be (num_samples, 768)
print("Labels shape:", labels.shape)          # Should be (num_samples,)


Keyword argument `sampling_rate` is not a valid argument for this processor and will be ignored.
Keyword argument `padding` is not a valid argument for this processor and will be ignored.


Embeddings shape: (1088, 1, 768)
Labels shape: (1088,)


In [10]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

In [25]:
train_dataset = TensorDataset(torch.tensor(embeddings).float(), torch.tensor(labels).float())
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

In [27]:
class AudioClassifier(nn.Module):
    def __init__(self):
        super(AudioClassifier, self).__init__()
        self.fc1 = nn.Linear(768, 128)  # 768 is the output size of Wav2Vec embeddings
        self.fc2 = nn.Linear(128, 1)     # Binary classification output
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.sigmoid(self.fc2(x))
        return x


In [47]:
# Initialize model, loss function, and optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AudioClassifier().to(device)
criterion = nn.BCELoss()  # Use appropriate loss function for binary classification
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 100
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), labels)  # Squeeze if needed

        # Backward pass
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    avg_loss = running_loss / len(train_loader)
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_loss:.4f}')


Epoch [1/100], Loss: 0.6384
Epoch [2/100], Loss: 0.4358
Epoch [3/100], Loss: 0.2677
Epoch [4/100], Loss: 0.1663
Epoch [5/100], Loss: 0.1098
Epoch [6/100], Loss: 0.0832
Epoch [7/100], Loss: 0.0610
Epoch [8/100], Loss: 0.0446
Epoch [9/100], Loss: 0.0385
Epoch [10/100], Loss: 0.0326
Epoch [11/100], Loss: 0.0250
Epoch [12/100], Loss: 0.0204
Epoch [13/100], Loss: 0.0164
Epoch [14/100], Loss: 0.0143
Epoch [15/100], Loss: 0.0135
Epoch [16/100], Loss: 0.0113
Epoch [17/100], Loss: 0.0112
Epoch [18/100], Loss: 0.0085
Epoch [19/100], Loss: 0.0080
Epoch [20/100], Loss: 0.0068
Epoch [21/100], Loss: 0.0067
Epoch [22/100], Loss: 0.0064
Epoch [23/100], Loss: 0.0087
Epoch [24/100], Loss: 0.0050
Epoch [25/100], Loss: 0.0042
Epoch [26/100], Loss: 0.0041
Epoch [27/100], Loss: 0.0036
Epoch [28/100], Loss: 0.0035
Epoch [29/100], Loss: 0.0032
Epoch [30/100], Loss: 0.0035
Epoch [31/100], Loss: 0.0027
Epoch [32/100], Loss: 0.0025
Epoch [33/100], Loss: 0.0023
Epoch [34/100], Loss: 0.0023
Epoch [35/100], Loss: 0

In [48]:
def augment_audio(audio):
    # Example: pitch shifting and adding noise
    audio_augmented = librosa.effects.pitch_shift(audio, sr=16000, n_steps=4)
    noise = np.random.randn(len(audio))
    audio_augmented = audio + 0.005 * noise
    return audio_augmented


In [49]:
from sklearn.cluster import KMeans

def cluster_embeddings(embeddings, n_clusters=2):
    kmeans = KMeans(n_clusters=n_clusters, random_state=0)
    clusters = kmeans.fit_predict(embeddings)
    return clusters


In [50]:
import torch

# Save the model
torch.save(model.state_dict(), "deepfake_audio_detection_model.pth")


In [51]:
import torch
import librosa
from transformers import Wav2Vec2Processor, Wav2Vec2Model

# Load your models
processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base")
wav2vec2_model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base")
model = AudioClassifier()  # Your custom classifier
model.load_state_dict(torch.load("deepfake_audio_detection_model.pth", weights_only=True))
model.eval()

def load_and_preprocess_audio(audio_path):
    audio, sr = librosa.load(audio_path, sr=16000)
    inputs = processor(audio, return_tensors="pt", padding=True, sampling_rate=16000)
    return inputs['input_values']

def predict(audio_path):
    inputs = load_and_preprocess_audio(audio_path)

    with torch.no_grad():
        # Extract features using Wav2Vec2
        features = wav2vec2_model(inputs).last_hidden_state
        pooled_features = torch.mean(features, dim=1)  # Mean pooling
        
        outputs = model(pooled_features)  # Pass pooled features to the classifier

    predictions = torch.sigmoid(outputs)  # Adjust based on your model's output
    return predictions.numpy()

# Test on an audio sample
audio_file = r'audio/fake/file1018.wav_16k.wav_norm.wav_mono.wav_silence.wav_2sec.wav'
predictions = predict(audio_file)

# Assuming binary classification
label = "Real" if predictions[0] > 0.5 else "Fake"
print(f"Prediction for the audio sample: {label}")


Prediction for the audio sample: Real


In [None]:
'''
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
import tensorflow as tf
from sklearn.model_selection import GridSearchCV
from tensorflow.keras.callbacks import EarlyStopping

def create_model(learning_rate=0.001, dropout_rate=0.5):
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(128, 63, 1)))
    model.add(tf.keras.layers.Dropout(dropout_rate))
    model.add(tf.keras.layers.Flatten())
    model.add(tf.keras.layers.Dense(64, activation='relu'))
    model.add(tf.keras.layers.Dense(1, activation='sigmoid'))  # For binary classification
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
    return model

class KerasClassifierWrapper(BaseEstimator, ClassifierMixin):
    def __init__(self, learning_rate=0.001, dropout_rate=0.5, epochs=10, batch_size=32):
        self.learning_rate = learning_rate
        self.dropout_rate = dropout_rate
        self.epochs = epochs
        self.batch_size = batch_size
        self.model = None

    def create_model(self):
        return create_model(self.learning_rate, self.dropout_rate)

    def fit(self, X, y):
        self.model = self.create_model()
        # Adding EarlyStopping
        early_stopping = EarlyStopping(monitor='loss', patience=3, restore_best_weights=True)
        self.model.fit(X, y, epochs=self.epochs, batch_size=self.batch_size, verbose=1, callbacks=[early_stopping])

    def predict(self, X):
        return (self.model.predict(X) > 0.5).astype("int32")

# Define hyperparameter grid
param_grid = {
    'learning_rate': [0.001, 0.01],
    'dropout_rate': [0.3, 0.5],
    'batch_size': [8, 16, 32],
    'epochs': [5, 10]
}

# Now you can use the custom wrapper in GridSearchCV
model = KerasClassifierWrapper()

# Create the GridSearchCV object
grid = GridSearchCV(estimator=model, param_grid=param_grid, scoring='accuracy', n_jobs=-1, cv=3)

# Fit the grid search to the training data
grid_result = grid.fit(X_mel_train, y_train)

# Print the best parameters and best score
print("Best parameters found: ", grid_result.best_params_)
print("Best accuracy: ", grid_result.best_score_)
'''


In [None]:
'''
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
import tensorflow as tf
from sklearn.model_selection import GridSearchCV

class KerasClassifierWrapper(BaseEstimator, ClassifierMixin):
    def __init__(self, learning_rate=0.001, dropout_rate=0.5, epochs=10, batch_size=8):  # Reduced batch size
        self.learning_rate = learning_rate
        self.dropout_rate = dropout_rate
        self.epochs = epochs
        self.batch_size = batch_size
        self.model = None

    def create_model(self):
        model = tf.keras.models.Sequential()
        model.add(tf.keras.layers.Conv2D(16, (3, 3), activation='relu', input_shape=(128, 63, 1)))  # Reduced number of filters
        model.add(tf.keras.layers.Dropout(self.dropout_rate))
        model.add(tf.keras.layers.Flatten())
        model.add(tf.keras.layers.Dense(32, activation='relu'))  # Reduced number of neurons
        model.add(tf.keras.layers.Dense(1, activation='sigmoid'))  # For binary classification
        optimizer = tf.keras.optimizers.Adam(learning_rate=self.learning_rate)
        model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
        return model

    def fit(self, X, y):
        self.model = self.create_model()
        self.model.fit(X, y, epochs=self.epochs, batch_size=self.batch_size, verbose=1)

    def predict(self, X):
        return (self.model.predict(X) > 0.5).astype("int32")

# Now you can use the custom wrapper in GridSearchCV
model = KerasClassifierWrapper()

# Define hyperparameter grid
param_grid = {
    'learning_rate': [0.001, 0.01],
    'dropout_rate': [0.3, 0.5],
    'batch_size': [8, 16],  # Reduced batch size
    'epochs': [5, 10]  # Reduced epochs
}

# Create the GridSearchCV object with n_jobs=1
grid = GridSearchCV(estimator=model, param_grid=param_grid, scoring='accuracy', n_jobs=1, cv=3)

# Fit the grid search to the training data
grid_result = grid.fit(X_mel_train, y_train)

# Print the best parameters and best score
print("Best parameters found: ", grid_result.best_params_)
print("Best accuracy: ", grid_result.best_score_)
'''
