<a href="https://colab.research.google.com/github/MZohaib03094/MZohaib03094/blob/main/data_clean.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install -q librosa pydub rarfile soundfile

In [None]:
from google.colab import files
uploaded = files.upload()  # Select and upload data1.rar

Saving 1500-1.rar to 1500-1 (1).rar


In [None]:
!apt-get install -y unrar

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
unrar is already the newest version (1:6.1.5-1ubuntu0.1).
0 upgraded, 0 newly installed, 0 to remove and 34 not upgraded.


In [None]:
!pip install -q librosa pydub soundfile


In [None]:
import librosa
import numpy as np
import soundfile as sf
from pydub import AudioSegment
from pydub.effects import normalize
from tqdm import tqdm
import os
import glob

RAW_DIR = "raw_audios"
CLEANED_DIR = "cleaned_audios"
os.makedirs(CLEANED_DIR, exist_ok=True)

# Get list of WAV files
wav_files = glob.glob(f"{RAW_DIR}/**/*.wav", recursive=True)

# Check if audio is valid
def is_audio_valid(path):
    try:
        librosa.load(path, sr=None)
        return True
    except Exception:
        return False

# Full audio cleaning pipeline
def clean_audio(input_path, output_path, target_sr=16000):
    # Load and resample
    y, sr = librosa.load(input_path, sr=None)
    if sr != target_sr:
        y = librosa.resample(y, orig_sr=sr, target_sr=target_sr)
        sr = target_sr

    # Trim silence
    intervals = librosa.effects.split(y, top_db=30)
    cleaned_audio = np.concatenate([y[start:end] for start, end in intervals]) if intervals.any() else y

    # Save to temp file
    temp_path = "temp.wav"
    sf.write(temp_path, cleaned_audio, sr)

    # Normalize using pydub
    audio = AudioSegment.from_wav(temp_path)
    normalized = normalize(audio)
    normalized.export(output_path, format="wav")

# Process all files
for path in tqdm(wav_files, desc="🔧 Cleaning audio files"):
    filename = os.path.basename(path)
    output_path = os.path.join(CLEANED_DIR, filename)

    if not is_audio_valid(path):
        print(f"❌ Skipping corrupted: {filename}")
        continue

    try:
        clean_audio(path, output_path)
    except Exception as e:
        print(f"⚠️ Error cleaning {filename}: {e}")


🔧 Cleaning audio files: 0it [00:00, ?it/s]


In [None]:
import shutil
from google.colab import files
# Create a zip file of cleaned audio
shutil.make_archive("cleaned_audio_dataset", "zip", CLEANED_DIR)
# Download the zip file
files.download("cleaned_audio_dataset.zip")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
import os

for root, dirs, files in os.walk("raw_audios"):
    for file in files:
        print(os.path.join(root, file))


raw_audios/1500-1/DF_E_2000759.flac
raw_audios/1500-1/DF_E_2000782.flac
raw_audios/1500-1/DF_E_2001557.flac
raw_audios/1500-1/DF_E_2000746.flac
raw_audios/1500-1/DF_E_2001955.flac
raw_audios/1500-1/DF_E_2000979.flac
raw_audios/1500-1/DF_E_2001453.flac
raw_audios/1500-1/DF_E_2000346.flac
raw_audios/1500-1/DF_E_2000096.flac
raw_audios/1500-1/DF_E_2003295.flac
raw_audios/1500-1/DF_E_2000910.flac
raw_audios/1500-1/DF_E_2002328.flac
raw_audios/1500-1/DF_E_2001409.flac
raw_audios/1500-1/DF_E_2002339.flac
raw_audios/1500-1/DF_E_2000639.flac
raw_audios/1500-1/DF_E_2000715.flac
raw_audios/1500-1/DF_E_2002712.flac
raw_audios/1500-1/DF_E_2004319.flac
raw_audios/1500-1/DF_E_2003713.flac
raw_audios/1500-1/DF_E_2000223.flac
raw_audios/1500-1/DF_E_2001984.flac
raw_audios/1500-1/DF_E_2003277.flac
raw_audios/1500-1/DF_E_2002964.flac
raw_audios/1500-1/DF_E_2002578.flac
raw_audios/1500-1/DF_E_2002333.flac
raw_audios/1500-1/DF_E_2002606.flac
raw_audios/1500-1/DF_E_2004207.flac
raw_audios/1500-1/DF_E_20003

In [None]:
import glob

cleaned_files = glob.glob(os.path.join(CLEANED_DIR, '*.flac'))
print(f"Total cleaned files: {len(cleaned_files)}")
print("Example cleaned file:", cleaned_files[0] if cleaned_files else "No files found")


Total cleaned files: 871
Example cleaned file: cleaned_audios/DF_E_2000759.flac


In [None]:
import shutil
from google.colab import files

shutil.make_archive("cleaned_audio_dataset", "zip", CLEANED_DIR)
files.download("cleaned_audio_dataset.zip")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
#Feature Extraction

In [None]:
import os
import librosa
import numpy as np
import pandas as pd
from tqdm import tqdm

CLEANED_DIR = "cleaned_audios"
FEATURE_CSV = "audio_features.csv"
MAX_LEN = 400  # max number of frames for MFCC time dimension

def extract_features(file_path, max_len=MAX_LEN):
    y, sr = librosa.load(file_path, sr=16000)
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)

    # Pad or truncate to max_len
    if mfcc.shape[1] < max_len:
        pad_width = max_len - mfcc.shape[1]
        mfcc = np.pad(mfcc, pad_width=((0,0), (0,pad_width)), mode='constant')
    else:
        mfcc = mfcc[:, :max_len]

    return mfcc.flatten()  # flatten 13 x max_len into 1D vector

# Gather all cleaned flac files
file_paths = []
for root, dirs, files in os.walk(CLEANED_DIR):
    for file in files:
        if file.lower().endswith(".flac"):
            file_paths.append(os.path.join(root, file))

print(f"Extracting features from {len(file_paths)} files...")

features_list = []
file_names = []

for fp in tqdm(file_paths):
    try:
        feat = extract_features(fp)
        features_list.append(feat)
        file_names.append(os.path.basename(fp))
    except Exception as e:
        print(f"Failed to process {fp}: {e}")

# Convert to DataFrame
df_features = pd.DataFrame(features_list)
df_features.insert(0, "filename", file_names)

# Save to CSV
df_features.to_csv(FEATURE_CSV, index=False)
print(f"Features saved to {FEATURE_CSV}")


Extracting features from 871 files...


100%|██████████| 871/871 [00:11<00:00, 77.41it/s]


Features saved to audio_features.csv


In [None]:
from google.colab import files
files.download(FEATURE_CSV)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn

# Load features
df = pd.read_csv("audio_features.csv")

# Extract labels from filenames
def get_label_from_filename(filename):
    if "DF_" in filename:
        return 0  # Real
    elif "FA_" in filename:
        return 1  # Fake
    else:
        return -1  # Unknown

df['label'] = df['filename'].apply(get_label_from_filename)
df = df[df['label'] != -1]  # Remove unknowns

features = df.drop(columns=["filename", "label"]).values
labels = df["label"].values


In [None]:
#Dataset & DataLoader

In [None]:
class AudioDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        # Reshape flattened 5200 vector to (400, 13)
        feature = self.features[idx].reshape(400, 13)
        return torch.tensor(feature, dtype=torch.float32), torch.tensor(self.labels[idx], dtype=torch.long)

dataset = AudioDataset(features, labels)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)


In [None]:
#LSTM Model

In [None]:
class LSTMClassifier(nn.Module):
    def __init__(self, input_size=13, hidden_size=64, num_layers=2, num_classes=2):
        super(LSTMClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out


In [None]:
#Training Loop

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = LSTMClassifier().to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop
for epoch in range(10):
    total_loss = 0
    model.train()
    for batch_features, batch_labels in dataloader:
        batch_features, batch_labels = batch_features.to(device), batch_labels.to(device)

        outputs = model(batch_features)
        loss = criterion(outputs, batch_labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"📈 Epoch {epoch+1}, Loss: {total_loss / len(dataloader):.4f}")


📈 Epoch 1, Loss: 0.3591
📈 Epoch 2, Loss: 0.0014
📈 Epoch 3, Loss: 0.0005
📈 Epoch 4, Loss: 0.0004
📈 Epoch 5, Loss: 0.0003
📈 Epoch 6, Loss: 0.0003
📈 Epoch 7, Loss: 0.0002
📈 Epoch 8, Loss: 0.0002
📈 Epoch 9, Loss: 0.0002
📈 Epoch 10, Loss: 0.0002


In [None]:
torch.save(model.state_dict(), "lstm_audio_model.pth")
print("✅ Model saved.")


✅ Model saved.


In [None]:
#Evaluate the model & Split Dataset into Train and Test

In [None]:
from sklearn.model_selection import train_test_split

# Split into train/test (80% train, 20% test)
X_train, X_test, y_train, y_test = train_test_split(
    features, labels, test_size=0.2, stratify=labels, random_state=42
)


In [None]:
#Create DataLoaders

In [None]:
# Dataset class reused
train_dataset = AudioDataset(X_train, y_train)
test_dataset = AudioDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


In [None]:
#Retrain Model on Training Set

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader

# 🧠 Load model
model = LSTMClassifier().to(device)

# 🎯 Loss & Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# 🔁 Training loop using train_loader
num_epochs = 10
for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    epoch_loss = 0.0

    for batch_features, batch_labels in train_loader:
        batch_features = batch_features.to(device)
        batch_labels = batch_labels.to(device)

        # Forward pass
        outputs = model(batch_features)
        loss = criterion(outputs, batch_labels)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    avg_loss = epoch_loss / len(train_loader)
    print(f"📘 Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")


📘 Epoch [1/10], Loss: 0.4091
📘 Epoch [2/10], Loss: 0.0059
📘 Epoch [3/10], Loss: 0.0010
📘 Epoch [4/10], Loss: 0.0007
📘 Epoch [5/10], Loss: 0.0006
📘 Epoch [6/10], Loss: 0.0005
📘 Epoch [7/10], Loss: 0.0004
📘 Epoch [8/10], Loss: 0.0004
📘 Epoch [9/10], Loss: 0.0003
📘 Epoch [10/10], Loss: 0.0003


In [None]:
#Evaluate on Test Set

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

model.eval()
y_true = []
y_pred = []

with torch.no_grad():
    for features_batch, labels_batch in test_loader:
        features_batch = features_batch.to(device)
        labels_batch = labels_batch.to(device)

        outputs = model(features_batch)
        _, predicted = torch.max(outputs.data, 1)

        y_true.extend(labels_batch.cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())

# Compute metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)

print(f"✅ Evaluation Results:")
print(f"🔹 Accuracy:  {accuracy:.4f}")
print(f"🔹 Precision: {precision:.4f}")
print(f"🔹 Recall:    {recall:.4f}")
print(f"🔹 F1 Score:  {f1:.4f}")


✅ Evaluation Results:
🔹 Accuracy:  1.0000
🔹 Precision: 0.0000
🔹 Recall:    0.0000
🔹 F1 Score:  0.0000


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
