In [3]:
import torch
import torch.nn as nn
import librosa
import numpy as np
from torch.utils.data import DataLoader, TensorDataset
import os
#from models.model_architecture import DemucsModel
from sklearn.model_selection import train_test_split
import soundfile as sf

In [None]:
# Define dataset directory
dataset_dir = "/path/to/dataset/"
batch_size = 3  # You can change this value as needed
segment_length =10*44100 # Fixed length for all audio samples (~1.5 sec at 44.1kHz)


In [5]:
# Get file paths for each category
def get_file_paths(folder_name):
    path = os.path.join(dataset_dir, folder_name)
    if not os.path.exists(path):
        print(f"Warning: {folder_name} folder not found!")
        return []
    return sorted([os.path.join(path, f) for f in os.listdir(path)])

In [6]:
song_paths = get_file_paths("song")
bass_paths = get_file_paths("bass")
vocal_paths = get_file_paths("vocal")
drum_paths = get_file_paths("drum")
music_paths = get_file_paths("music")

In [7]:
import librosa
import numpy as np

def load_audio(file_path, target_sr=44100, segment_length=441000):  
    audio, sr = librosa.load(file_path, sr=target_sr, mono=True)
    
    # Pad if audio is shorter than segment length
    if len(audio) < segment_length:
        audio = np.pad(audio, (0, segment_length - len(audio)), mode='constant')

    # Number of segments (each 10 seconds long)
    num_segments = len(audio) // segment_length  
    segments = []
    max_amplitudes = []

    for i in range(num_segments):
        start = i * segment_length
        end = start + segment_length
        segment = audio[start:end]

        # Get max amplitude of the segment
        original_max = np.max(np.abs(segment)) if np.max(np.abs(segment)) > 0 else 1.0  
        max_amplitudes.append(original_max)

        # Normalize the segment
        segment = segment / original_max if original_max > 0 else segment
        
        segments.append(segment)

    return np.array(segments), np.array(max_amplitudes)  # Return normalized segments + original max amplitudes


In [9]:
# Ensure all segments have the same shape
for i in range(5):  # Checking first 5 samples
    assert train_data[i][0].shape == train_data[i][1].shape == train_data[i][2].shape == train_data[i][3].shape == train_data[i][4].shape, f"Mismatch at index {i}"
print("All segments have matching shapes")


All segments have matching shapes


In [None]:
# Ensure dataset length consistency
min_len = min(len(song_paths), len(bass_paths), len(vocal_paths), len(drum_paths), len(music_paths))
print(f"Using {min_len} samples for training.")
song_paths, bass_paths, vocal_paths, drum_paths, music_paths = (
    song_paths[:min_len], bass_paths[:min_len], vocal_paths[:min_len], drum_paths[:min_len], music_paths[:min_len]
)

# Prepare dataset
train_data = []  # Stores only the audio segments for training
max_amplitudes = []  # Stores original max amplitudes for reconstruction

print("Loading dataset...")
for i in range(min_len):
    song_segments, song_max = load_audio(song_paths[i])
    bass_segments, bass_max = load_audio(bass_paths[i])
    vocal_segments, vocal_max = load_audio(vocal_paths[i])
    drum_segments, drum_max = load_audio(drum_paths[i])
    music_segments, music_max = load_audio(music_paths[i])

    for j in range(len(song_segments)):  # Loop over segments
        train_data.append((
            song_segments[j], 
            bass_segments[j], 
            vocal_segments[j], 
            drum_segments[j], 
            music_segments[j]
        ))  

        # Store max amplitudes separately
        max_amplitudes.append((
            song_max[j],  
            bass_max[j],  
            vocal_max[j],  
            drum_max[j],  
            music_max[j]
        ))

print("Dataset loaded successfully!")

In [10]:
# Split dataset into training and testing
train_set, test_set = train_test_split(train_data, test_size=0.2, random_state=42)

# Convert lists to PyTorch tensors
train_dataset = TensorDataset(*[torch.tensor(np.array(d), dtype=torch.float32) for d in zip(*train_set)])
test_dataset = TensorDataset(*[torch.tensor(np.array(d), dtype=torch.float32) for d in zip(*test_set)])

# Prepare DataLoaders
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_data_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


In [12]:
# Fetch one batch from DataLoader and check its shape
for batch in train_data_loader:
    print(f"Batch shape: {[b.shape for b in batch]}")
    break  # Print only first batch


Batch shape: [torch.Size([3, 441000]), torch.Size([3, 441000]), torch.Size([3, 441000]), torch.Size([3, 441000]), torch.Size([3, 441000])]


In [None]:
print(f"Total samples in dataset: {len(train_data)}")  
print(f"Each sample should have 5 elements (song, bass, vocal, drum, music): {len(train_data[0])}")  

# Print the shape of a few segments
print("Example shapes:")
print(f"Song segment shape: {train_data[0][0].shape}")
print(f"Bass segment shape: {train_data[0][1].shape}")
print(f"Vocal segment shape: {train_data[0][2].shape}")
print(f"Drum segment shape: {train_data[0][3].shape}")
print(f"Music segment shape: {train_data[0][4].shape}")


In [13]:
import torch
import torch.nn as nn

# Constants
num_sources = 4  # bass, vocal, drums, music

class DemucsModel(nn.Module):
    def __init__(self):
        super(DemucsModel, self).__init__()

        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv1d(1, 32, kernel_size=16, stride=4, padding=8),
            nn.BatchNorm1d(32),
            nn.ReLU(),
            nn.Conv1d(32, 64, kernel_size=16, stride=4, padding=8),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Conv1d(64, 128, kernel_size=16, stride=4, padding=8),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Conv1d(128, 256, kernel_size=16, stride=8, padding=8),
            nn.BatchNorm1d(256),
            nn.ReLU(),
        )

        # Bidirectional LSTM for temporal modeling
        self.rnn = nn.LSTM(256, 256, batch_first=True, bidirectional=True)
        self.lstm_fc = nn.Linear(512, 256)  # Merge bidirectional outputs

        # Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(256, 128, kernel_size=16, stride=8, padding=8, output_padding=2),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.ConvTranspose1d(128, 64, kernel_size=16, stride=4, padding=8, output_padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.ConvTranspose1d(64, 32, kernel_size=16, stride=4, padding=8, output_padding=1),
            nn.BatchNorm1d(32),
            nn.ReLU(),
            nn.ConvTranspose1d(32, num_sources, kernel_size=16, stride=4, padding=8, output_padding=1),
            nn.Tanh(),  # Keeps output in range (-1, 1)
        )

    def forward(self, x):
        x = self.encoder(x)  
        x = x.permute(0, 2, 1)  # Change to (batch, time, channels) for LSTM
        x, _ = self.rnn(x)  
        x = self.lstm_fc(x)  
        x = self.decoder(x.permute(0, 2, 1))  # Change back to (batch, channels, time)
        return x

# Initialize Model
model = DemucsModel()

# Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


In [14]:
import torch.nn.functional as F

def si_snr_loss(pred, target, eps=1e-8):
    target_energy = torch.sum(target**2, dim=-1, keepdim=True) + eps
    scale = torch.sum(target * pred, dim=-1, keepdim=True) / target_energy
    target_proj = scale * target
    noise = pred - target_proj

    si_snr = torch.sum(target_proj**2, dim=-1) / (torch.sum(noise**2, dim=-1) + eps)
    si_snr = 10 * torch.log10(si_snr + eps)
    
    return -si_snr.mean()

criterion = si_snr_loss


In [15]:
num_epochs=50

In [16]:
import torch.nn.functional as F

target_length = 441000  # Ensure all outputs are 441000 samples

print("Starting training...")

# Training loop
for epoch in range(num_epochs):
    epoch_loss = 0
    for batch_idx, batch in enumerate(train_data_loader):
        inputs, target_bass, target_vocal, target_drum, target_music = batch
        inputs = inputs.unsqueeze(1)  # Add channel dimension for Conv1d

        # Pad inputs to match target length
        inputs = F.pad(inputs, (0, target_length - inputs.shape[-1]))  
        target_bass = F.pad(target_bass, (0, target_length - target_bass.shape[-1]))
        target_vocal = F.pad(target_vocal, (0, target_length - target_vocal.shape[-1]))
        target_drum = F.pad(target_drum, (0, target_length - target_drum.shape[-1]))
        target_music = F.pad(target_music, (0, target_length - target_music.shape[-1]))

        # Forward pass
        outputs = model(inputs)  # Model output: [batch_size, 4, ?] (unknown length)

        # If model output is smaller, pad it to match target length
        if outputs.shape[-1] < target_length:
            pad_size = target_length - outputs.shape[-1]
            outputs = F.pad(outputs, (0, pad_size))

        # Stack targets for simpler loss computation
        targets = torch.stack([target_bass, target_vocal, target_drum, target_music], dim=1)

        # Compute loss
        loss = criterion(outputs, targets)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

        # Print batch info
        #print(f"Batch {batch_idx} - Predicted Shape: {outputs.shape}, Target Shape: {targets.shape}")

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss/len(train_data_loader):.4f}")  


Starting training...
Epoch 1/50, Loss: 41.4152
Epoch 2/50, Loss: 22.2007
Epoch 3/50, Loss: 17.3469
Epoch 4/50, Loss: 14.9297
Epoch 5/50, Loss: 15.1850
Epoch 6/50, Loss: 14.1816
Epoch 7/50, Loss: 12.8669
Epoch 8/50, Loss: 11.8511
Epoch 9/50, Loss: 10.8245
Epoch 10/50, Loss: 10.4349
Epoch 11/50, Loss: 9.6924
Epoch 12/50, Loss: 10.0983
Epoch 13/50, Loss: 9.5579
Epoch 14/50, Loss: 8.9316
Epoch 15/50, Loss: 8.6701
Epoch 16/50, Loss: 8.1620
Epoch 17/50, Loss: 8.2290
Epoch 18/50, Loss: 8.0776
Epoch 19/50, Loss: 7.5438
Epoch 20/50, Loss: 7.3371
Epoch 21/50, Loss: 7.0777
Epoch 22/50, Loss: 7.0217
Epoch 23/50, Loss: 6.6650
Epoch 24/50, Loss: 6.6382
Epoch 25/50, Loss: 6.3798
Epoch 26/50, Loss: 6.0281
Epoch 27/50, Loss: 6.2811
Epoch 28/50, Loss: 6.0460
Epoch 29/50, Loss: 5.7686
Epoch 30/50, Loss: 5.6906
Epoch 31/50, Loss: 5.8909
Epoch 32/50, Loss: 5.8145
Epoch 33/50, Loss: 5.4752
Epoch 34/50, Loss: 5.4045
Epoch 35/50, Loss: 4.9797
Epoch 36/50, Loss: 4.7261
Epoch 37/50, Loss: 4.4821
Epoch 38/50, Lo

In [17]:
print(f"Predicted shape: {outputs.shape}")
print(f"Target shape: {target_bass.shape}")  # Target shape should be same as predictions


Predicted shape: torch.Size([2, 4, 441000])
Target shape: torch.Size([2, 441000])


In [18]:
# Save the trained model
torch.save(model.state_dict(), "model_save.pth")
print("Model saved successfully!")


Model saved successfully!


In [19]:
import numpy as np

def remove_low_amplitude_noise(audio, threshold_ratio=0.07):
    max_amplitude = np.max(np.abs(audio))
    threshold = max_amplitude * threshold_ratio
    audio_denoised = np.where(np.abs(audio) > threshold, audio, 0)
    return audio_denoised


In [20]:
import os
import torch
import soundfile as sf

print("Starting validation...")
model.eval()  # Set model to evaluation mode
val_loss = 0
output_dir = "C:/Users/Mora siri/Desktop/output_audio_files_seconds/"  # Output directory
os.makedirs(output_dir, exist_ok=True)

with torch.no_grad():  # No need to compute gradients during validation
    for batch_idx, batch in enumerate(test_data_loader):
        inputs, target_bass, target_vocal, target_drum, target_music = batch
        inputs = inputs.unsqueeze(1)  # Add channel dimension
        
        output = model(inputs)  # Forward pass

        # Ensure output is exactly 441000 samples
        target_length = 441000
        if output.shape[-1] < target_length:
            pad_size = target_length - output.shape[-1]
            output = F.pad(output, (0, pad_size))

        # Save outputs as audio files
        current_batch_size = inputs.shape[0]  # Get actual batch size
        for i in range(current_batch_size):
            output_bass = remove_low_amplitude_noise(output[i, 0, :].cpu().numpy())
            output_vocal = remove_low_amplitude_noise(output[i, 1, :].cpu().numpy())
            output_drum = remove_low_amplitude_noise(output[i, 2, :].cpu().numpy())
            output_music = remove_low_amplitude_noise(output[i, 3, :].cpu().numpy())
            
            sf.write(os.path.join(output_dir, f"test_output_bass_{batch_idx*current_batch_size + i}.wav"), output_bass, samplerate=44100)
            sf.write(os.path.join(output_dir, f"test_output_vocal_{batch_idx*current_batch_size + i}.wav"), output_vocal, samplerate=44100)
            sf.write(os.path.join(output_dir, f"test_output_drum_{batch_idx*current_batch_size + i}.wav"), output_drum, samplerate=44100)
            sf.write(os.path.join(output_dir, f"test_output_music_{batch_idx*current_batch_size + i}.wav"), output_music, samplerate=44100)

        # Compute batch loss and accumulate
        batch_loss = (criterion(output[:, 0, :], target_bass) +
                      criterion(output[:, 1, :], target_vocal) +
                      criterion(output[:, 2, :], target_drum) +
                      criterion(output[:, 3, :], target_music)).item()

        val_loss += batch_loss / len(test_data_loader)  # Average over dataset

print(f"Validation Loss: {val_loss:.4f}")


Starting validation...
Validation Loss: 36.3526


In [21]:
import librosa
import soundfile as sf
import numpy as np
from scipy.signal import butter, lfilter

# Function for Butterworth Bandpass Filter
def butter_bandpass(lowcut, highcut, fs, order=5):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    return b, a

# Function to apply bandpass filter
def bandpass_filter(data, lowcut, highcut, fs, order=5):
    b, a = butter_bandpass(lowcut, highcut, fs, order)
    y = lfilter(b, a, data)
    return y

# Function for simple noise reduction
def noise_reduction(audio, sr):
    # Simple noise reduction: split based on silence and remove the noise segments
    noise_estimation = librosa.effects.split(audio, top_db=30)  # Adjust top_db as needed
    clean_audio = np.concatenate([audio[start:end] for start, end in noise_estimation], axis=0)
    return clean_audio

# Load audio using librosa
audio_path = r"C:\Users\Mora siri\Desktop\output_audio_files_seconds\test_output_vocal_1.wav"
audio, sr = librosa.load(audio_path, sr=44100)  # Use 44100 Hz sample rate

# Step 1: Apply Noise Reduction
clean_audio = noise_reduction(audio, sr)

# Step 2: Apply Bandpass Filter (Isolate vocal frequencies between 500Hz and 3000Hz)
filtered_audio = bandpass_filter(clean_audio, 500, 3000, sr)

# Step 3: Save the processed audio (with both noise reduction and bandpass filter applied)
output_path = r"C:\Users\Mora siri\Desktop\output_audio_files_seconds\processed_vocal_1.wav"
sf.write(output_path, filtered_audio, sr)

print("Noise reduction and equalization (bandpass filter) applied, and audio saved.")


Noise reduction and equalization (bandpass filter) applied, and audio saved.
