<a href="https://colab.research.google.com/github/LukaszSzarecki/music-source-separation/blob/develop/ml_algorithms_pl_small_size.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**[PL]** W ramach tego notatnika stworzone zostały architektury modelów uczenia głębokiego, służące separacji dźwiękowej pojedynczego źródła według strategii jeden przeciw wszystkim (one-vs-all).
Każdy z modeli był trenowany a następnie poddany ocenie

In [None]:
!pip install nussl
!pip install scaper
!pip install git+https://github.com/source-separation/tutorial

import nussl
import matplotlib.pyplot as plt
import numpy as np

# Pobranie datasetu

In [None]:
from common import data, viz
from nussl.datasets import transforms as nussl_tfm

data.prepare_musdb('~/.nussl/tutorial/')

In [None]:
song_name = "The Long Wait - Dark Horses.stem.mp4"

signal = nussl.AudioSignal(f"/root/.nussl/musdb18/test/{song_name}")
_ = signal.embed_audio()



In [None]:
print("Czas trwania utworu: {} [s]".format(signal.signal_duration))
print("Liczba próbek w sygnale: {} ".format(signal.signal_length))


# Pobranie modeli zapisanych w repozytorium

In [None]:
from common import utils
from common.models import MaskInference

In [None]:
!git clone -b models https://github.com/LukaszSzarecki/music-source-separation.git


Modele na repozytorium:

In [None]:
import os
from pprint import pprint

model_dir = '/content/music-source-separation/models/'
new_models_names = os.listdir(model_dir + 'new/')
old_models_names = os.listdir(model_dir + 'old/')

pprint("New trained models")
for m in new_models_names:
  pprint(m)

pprint("Old trained models")
for m in old_models_names:
  pprint(m)

["old/" + model_name for model_name in old_models_names]

old_models_paths = ["old/" + model_name for model_name in old_models_names]
new_models_paths = ["new/" + model_name for model_name in new_models_names]


# Separacja perkusji

In [None]:
tfm = nussl_tfm.Compose([
    # nussl_tfm.SumSources([['bass', 'drums', 'other']]), 
    nussl_tfm.MagnitudeSpectrumApproximation(),
    nussl_tfm.IndexSources('source_magnitudes', 1),
    nussl_tfm.ToSeparationModel(),
])

ps = ('uniform', -3, 3)
ts = ('uniform', 0.6, 1.4)

stft_params = nussl.STFTParams(window_length=1024, hop_length=512, window_type='sqrt_hann') 

# Dane treningowe
fg_path = "~/.nussl/tutorial/train"
train_data_1 = data.on_the_fly(stft_params, transform=tfm, pitch_shift=ps, time_stretch=ts, fg_path=fg_path, num_mixtures=10000, coherent_prob=0.7)

# Dane walidacyjne
fg_path = "~/.nussl/tutorial/valid"
val_data_1 = data.on_the_fly(stft_params, transform=tfm, pitch_shift=ps, time_stretch=ts, fg_path=fg_path, num_mixtures=100)

# Dane testowe
fg_path = "~/.nussl/tutorial/test"
test_data_1 = data.on_the_fly(stft_params, transform=None, fg_path=fg_path, num_mixtures=100, coherent_prob=1.0)

In [None]:
print(test_data_1[0].keys())

print(f"Tensor shape of mix_magnitude {test_data_1[0]['mix_magnitude'].shape}")
print(f"Tensor shape of source_magnitudes {test_data_1[0]['source_magnitudes'].shape}")


print(f"Tensor shape of ideal binary mask {test_data_1[0]['ideal_binary_mask'].shape}")

In [None]:
mix_magnitude = train_data_1[0]['mix_magnitude']
estimates = mix_magnitude.unsqueeze(-1)
print(estimates.shape)

## Model

In [None]:
from nussl.ml.networks.modules import AmplitudeToDB, BatchNorm, RecurrentStack, Embedding
from torch import nn
import torch

class MaskInference(nn.Module):
    def __init__(self, num_features, num_audio_channels, hidden_size,
                 num_layers, bidirectional, dropout, num_sources, 
                activation='sigmoid'):
        super().__init__()
        
        self.amplitude_to_db = AmplitudeToDB()
        self.input_normalization = BatchNorm(num_features)
        self.recurrent_stack = RecurrentStack(
            num_features * num_audio_channels, hidden_size, 
            num_layers, bool(bidirectional), dropout
        )
        hidden_size = hidden_size * (int(bidirectional) + 1)
        self.embedding = Embedding(num_features, hidden_size, 
                                   num_sources, activation, 
                                   num_audio_channels)
        
    def forward(self, data):
        mix_magnitude = data # save for masking
        
        data = self.amplitude_to_db(mix_magnitude)
        data = self.input_normalization(data)
        data = self.recurrent_stack(data)
        mask = self.embedding(data)
        estimates = mix_magnitude.unsqueeze(-1) * mask
        
        output = {
            'mask': mask,
            'estimates': estimates
        }
        return output
    
    # Added function
    @classmethod
    def build(cls, num_features, num_audio_channels, hidden_size, 
              num_layers, bidirectional, dropout, num_sources, 
              activation='sigmoid'):
        # Step 1. Register our model with nussl
        nussl.ml.register_module(cls)
        
        # Step 2a: Define the building blocks.
        modules = {
            'model': {
                'class': 'MaskInference',
                'args': {
                    'num_features': num_features,
                    'num_audio_channels': num_audio_channels,
                    'hidden_size': hidden_size,
                    'num_layers': num_layers,
                    'bidirectional': bidirectional,
                    'dropout': dropout,
                    'num_sources': num_sources,
                    'activation': activation
                }
            }
        }
        
        
        # Step 2b: Define the connections between input and output.
        # Here, the mix_magnitude key is the only input to the model.
        connections = [
            ['model', ['mix_magnitude']]
        ]
        
        # Step 2c. The model outputs a dictionary, which SeparationModel will
        # change the keys to model:mask, model:estimates. The lines below 
        # alias model:mask to just mask, and model:estimates to estimates.
        # This will be important later when we actually deploy our model.
        for key in ['mask', 'estimates']:
            modules[key] = {'class': 'Alias'}
            connections.append([key, f'model:{key}'])
        
        # Step 2d. There are two outputs from our SeparationModel: estimates and mask.
        # Then put it all together.
        output = ['estimates', 'mask',]
        config = {
            'name': cls.__name__,
            'modules': modules,
            'connections': connections,
            'output': output
        }
        # Step 3. Instantiate the model as a SeparationModel.
        return nussl.ml.SeparationModel(config)




## Trenowanie modelu

In [None]:
from common import utils
from common.models import MaskInference
from ignite.engine import Engine
from ignite.contrib.handlers import ProgressBar
from ignite.engine import create_supervised_evaluator



utils.logger()

nf = stft_params.window_length // 2 + 1
nac = 1
model = MaskInference.build(nf, nac, 300, 4, True, 0.25,1, 'sigmoid')
# model = nussl.ml.SeparationModel(config)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nussl.ml.train.loss.L1Loss()


train_dataloader = torch.utils.data.DataLoader(
    train_data_1, num_workers=1, batch_size=10)
val_dataloader = torch.utils.data.DataLoader(
    val_data_1, num_workers=1, batch_size=10)
from pathlib import Path


def train_step(engine, batch):
    optimizer.zero_grad()
    model.cuda()
    output = model(batch) # forward pass

    # l1_lambda = 0.00001
    # l1_norm = sum(abs(p).sum()
    #               for p in model.parameters())

    loss = loss_fn(
        output['estimates'],
        batch['source_magnitudes']
    )
    # loss = loss + l1_lambda * l1_norm

    
    loss.backward() # backwards + gradient step
    optimizer.step()
    
    loss_vals = {
        'L1Loss': loss.item(),
        'loss': loss.item()
    }
    
    return loss_vals

def val_step(engine, batch):
    with torch.no_grad():
        output = model(batch) # forward pass
    loss = loss_fn(
        output['estimates'],
        batch['source_magnitudes']
    )    
    loss_vals = {
        'L1Loss': loss.item(), 
        'loss': loss.item()
    }
    return loss_vals

# Create the engines
trainer, validator = nussl.ml.train.create_train_and_validation_engines(
    train_step, val_step, device="cuda"
)

# We'll save the output relative to this notebook.
output_folder = Path('.').absolute()

# Adding handlers from nussl that print out details about model training
# run the validation step, and save the models.
nussl.ml.train.add_stdout_handler(trainer, validator)
nussl.ml.train.add_validate_and_checkpoint(output_folder, model, 
    optimizer, train_data_1, trainer, val_dataloader, validator)



# trainer = Engine(print_train_data)


# trainer, validator = add_progress_bar_handler(trainer, validator)


# ProgressBar().attach(trainer, output_transform=lambda x: {'batch loss': x})
nussl.ml.train.add_progress_bar_handler(trainer,validator)

trainer.run(
    train_dataloader, 
    epoch_length=100, 
    max_epochs=30
)


### Wczytanie modelu

Ustawienie modelu po trenowaniu

In [None]:
separator_1 = nussl.separation.deep.DeepMaskEstimation(
    nussl.AudioSignal(), model_path='checkpoints/best.model.pth',
    device="cuda",
)

Pobranie modelu z repozytorium

In [None]:
selected_model = [match for match in new_models_paths if "percussion" in match][0]

print(selected_model)

separator_1 = nussl.separation.deep.DeepMaskEstimation(
    nussl.AudioSignal(), model_path=model_dir+selected_model,
    device="cuda",
)

In [None]:
from common import viz
stft_params = nussl.STFTParams(window_length=1024, hop_length=512, window_type='sqrt_hann') 

item = test_data_1[4]
separator_1.audio_signal = item['mix']

estimates = separator_1()

viz.show_sources(estimates)

In [None]:
viz.show_sources(item['sources'])

## Ocena dzialania modelu

In [None]:
import json
from pathlib import Path

tfm = nussl_tfm.Compose([
    nussl_tfm.SumSources([['bass', 'vocals', 'other']]),
])
test_evaluation_dataset_1 = nussl.datasets.MUSDB18(subsets=['test'], transform=tfm)

In [None]:
test_evaluation_dataset_1[0].keys()

In [None]:
len(test_evaluation_dataset_1)

In [None]:
import json
from pathlib import Path

output_folder = Path('.').absolute()

for i in range(50):
    item = test_evaluation_dataset_1[i]
    separator_1.audio_signal = item['mix']
    estimates = separator_1()

    source_keys = list(item['sources'].keys())
    estimates = {
        'drums': estimates[0],
        'bass+vocals+other': item['mix'] - estimates[0]
    }

    sources = [item['sources'][k] for k in source_keys]
    estimates = [estimates[k] for k in source_keys]

    evaluator = nussl.evaluation.BSSEvalScale(
        sources, estimates, source_labels=source_keys
    )
    scores = evaluator.evaluate()
    output_folder = Path(output_folder).absolute()
    output_folder.mkdir(exist_ok=True)
    output_file = output_folder / sources[0].file_name.replace('wav', 'json')
    with open(output_file, 'w') as f:
        json.dump(scores, f, indent=4)

In [None]:
import glob
import numpy as np



json_files = glob.glob(f"*drums.json")
df1 = nussl.evaluation.aggregate_score_files(
    json_files, aggregator=np.nanmedian)
nussl.evaluation.associate_metrics(separator_1.model, df1, test_evaluation_dataset_1)
report_card_1 = nussl.evaluation.report_card(
    df1, report_each_source=True)
print(report_card_1)

filepath_1 = Path('/content/sample_data/results/drums.csv')  
filepath_1.parent.mkdir(parents=True, exist_ok=True)  
df1.to_csv(filepath_1)  

# Separacja wokalu

In [None]:
tfm = nussl_tfm.Compose([
    # nussl_tfm.SumSources([['bass', 'drums', 'other']]), 
    nussl_tfm.MagnitudeSpectrumApproximation(),
    nussl_tfm.IndexSources('source_magnitudes', 3),
    nussl_tfm.ToSeparationModel(),
])

ps = ('uniform', -3, 3)
ts = ('uniform', 0.6, 1.4)

stft_params = nussl.STFTParams(window_length=1024, hop_length=512, window_type='sqrt_hann') 

# Dane treningowe
fg_path = "~/.nussl/tutorial/train"
train_data_2 = data.on_the_fly(stft_params, transform=tfm, pitch_shift=ps, time_stretch=ts, fg_path=fg_path, num_mixtures=10000, coherent_prob=0.6)

# Dane walidacyjne
fg_path = "~/.nussl/tutorial/valid"
val_data_2 = data.on_the_fly(stft_params, transform=tfm, pitch_shift=ps, time_stretch=ts, fg_path=fg_path, num_mixtures=200)

# Dane testowe
fg_path = "~/.nussl/tutorial/test"
test_data_2 = data.on_the_fly(stft_params, transform=None, fg_path=fg_path, num_mixtures=100, coherent_prob=1.0)

In [None]:
print(test_data_2[0].keys())

print(f"Tensor shape of mix_magnitude {test_data_2[0]['mix_magnitude'].shape}")
print(f"Tensor shape of source_magnitudes {test_data_2[0]['source_magnitudes'].shape}")


print(f"Tensor shape of ideal binary mask {test_data_2[0]['ideal_binary_mask'].shape}")

In [None]:
mix_magnitude = train_data_2[0]['mix_magnitude']
estimates = mix_magnitude.unsqueeze(-1)
print(estimates.shape)

## Model

In [None]:
from nussl.ml.networks.modules import AmplitudeToDB, BatchNorm, RecurrentStack, Embedding
from torch import nn
import torch

class MaskInference(nn.Module):
    def __init__(self, num_features, num_audio_channels, hidden_size,
                 num_layers, bidirectional, dropout, num_sources, 
                activation='sigmoid'):
        super().__init__()
        
        self.amplitude_to_db = AmplitudeToDB()
        self.input_normalization = BatchNorm(num_features)
        self.recurrent_stack = RecurrentStack(
            num_features * num_audio_channels, hidden_size, 
            num_layers, bool(bidirectional), dropout
        )
        hidden_size = hidden_size * (int(bidirectional) + 1)
        self.embedding = Embedding(num_features, hidden_size, 
                                   num_sources, activation, 
                                   num_audio_channels)
        
    def forward(self, data):
        mix_magnitude = data # save for masking
        
        data = self.amplitude_to_db(mix_magnitude)
        data = self.input_normalization(data)
        data = self.recurrent_stack(data)
        mask = self.embedding(data)
        estimates = mix_magnitude.unsqueeze(-1) * mask
        
        output = {
            'mask': mask,
            'estimates': estimates
        }
        return output
    
    # Added function
    @classmethod
    def build(cls, num_features, num_audio_channels, hidden_size, 
              num_layers, bidirectional, dropout, num_sources, 
              activation='sigmoid'):
        # Step 1. Register our model with nussl
        nussl.ml.register_module(cls)
        
        # Step 2a: Define the building blocks.
        modules = {
            'model': {
                'class': 'MaskInference',
                'args': {
                    'num_features': num_features,
                    'num_audio_channels': num_audio_channels,
                    'hidden_size': hidden_size,
                    'num_layers': num_layers,
                    'bidirectional': bidirectional,
                    'dropout': dropout,
                    'num_sources': num_sources,
                    'activation': activation
                }
            }
        }
        
        
        # Step 2b: Define the connections between input and output.
        # Here, the mix_magnitude key is the only input to the model.
        connections = [
            ['model', ['mix_magnitude']]
        ]
        
        # Step 2c. The model outputs a dictionary, which SeparationModel will
        # change the keys to model:mask, model:estimates. The lines below 
        # alias model:mask to just mask, and model:estimates to estimates.
        # This will be important later when we actually deploy our model.
        for key in ['mask', 'estimates']:
            modules[key] = {'class': 'Alias'}
            connections.append([key, f'model:{key}'])
        
        # Step 2d. There are two outputs from our SeparationModel: estimates and mask.
        # Then put it all together.
        output = ['estimates', 'mask',]
        config = {
            'name': cls.__name__,
            'modules': modules,
            'connections': connections,
            'output': output
        }
        # Step 3. Instantiate the model as a SeparationModel.
        return nussl.ml.SeparationModel(config)




## Trenowanie modelu

In [None]:
from common import utils
from common.models import MaskInference
from ignite.engine import Engine
from ignite.contrib.handlers import ProgressBar
from ignite.engine import create_supervised_evaluator



utils.logger()

nf = stft_params.window_length // 2 + 1
nac = 1
model = MaskInference.build(nf, nac, 300, 4, True, 0.25,1, 'sigmoid')
# model = MaskInference.build(nf, nac, 400, 5, True, 0.35,1, 'sigmoid') # testowe
# model = nussl.ml.SeparationModel(config)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nussl.ml.train.loss.L1Loss()


train_dataloader = torch.utils.data.DataLoader(
    train_data_2, num_workers=1, batch_size=10)
val_dataloader = torch.utils.data.DataLoader(
    val_data_2, num_workers=1, batch_size=10)
from pathlib import Path


def train_step(engine, batch):
    optimizer.zero_grad()
    model.cuda()
    output = model(batch) # forward pass
    loss = loss_fn(
        output['estimates'],
        batch['source_magnitudes']
    )
    
    loss.backward() # backwards + gradient step
    optimizer.step()
    
    loss_vals = {
        'L1Loss': loss.item(),
        'loss': loss.item()
    }
    
    return loss_vals

def val_step(engine, batch):
    with torch.no_grad():
        output = model(batch) # forward pass
    loss = loss_fn(
        output['estimates'],
        batch['source_magnitudes']
    )    
    loss_vals = {
        'L1Loss': loss.item(), 
        'loss': loss.item()
    }
    return loss_vals

# Create the engines
trainer, validator = nussl.ml.train.create_train_and_validation_engines(
    train_step, val_step, device="cuda"
)

# We'll save the output relative to this notebook.
output_folder = Path('.').absolute()

# Adding handlers from nussl that print out details about model training
# run the validation step, and save the models.
nussl.ml.train.add_stdout_handler(trainer, validator)
nussl.ml.train.add_validate_and_checkpoint(output_folder, model, 
    optimizer, train_data_2, trainer, val_dataloader, validator)



# trainer = Engine(print_train_data)
    

# trainer, validator = add_progress_bar_handler(trainer, validator)


# ProgressBar().attach(trainer, output_transform=lambda x: {'batch loss': x})
nussl.ml.train.add_progress_bar_handler(trainer,validator)

trainer.run(
    train_dataloader, 
    epoch_length=100, 
    max_epochs=30  
)


### Wczytanie modelu

Ustawienie modelu po trenowaniu

In [None]:
separator_2 = nussl.separation.deep.DeepMaskEstimation(
    nussl.AudioSignal(), model_path='checkpoints/best.model.pth',
    device="cuda",
)

Pobranie modelu z repozytorium

In [None]:
selected_model = [match for match in new_models_paths if "vocal" in match][0]

print(selected_model)


separator_2 = nussl.separation.deep.DeepMaskEstimation(
    nussl.AudioSignal(), model_path=model_dir+selected_model,
    device="cuda",
)

new/bestbest.model_vocal_11_11_e100_m40.pth


In [None]:
from common import viz
stft_params = nussl.STFTParams(window_length=1024, hop_length=512, window_type='sqrt_hann') 

item = test_data_2[30]

separator_2.audio_signal = item['mix']
estimates = separator_2()

viz.show_sources(estimates)

In [None]:
viz.show_sources(item['sources'])

## Ocena dzialania modelu

In [None]:
import json
from pathlib import Path

tfm = nussl_tfm.Compose([
    nussl_tfm.SumSources([['drums', 'bass', 'other']]),
])
test_evaluation_dataset_2 = nussl.datasets.MUSDB18(subsets=['test'], transform=tfm)

In [None]:
import json
from pathlib import Path

output_folder = Path('.').absolute()

for i in range(50):
    item = test_evaluation_dataset_2[i]
    separator_2.audio_signal = item['mix']
    estimates = separator_2()

    source_keys = list(item['sources'].keys())
    estimates = {
        'vocals': estimates[0],
        'drums+bass+other': item['mix'] - estimates[0]
    }

    sources = [item['sources'][k] for k in source_keys]
    estimates = [estimates[k] for k in source_keys]

    evaluator = nussl.evaluation.BSSEvalScale(
        sources, estimates, source_labels=source_keys
    )
    scores = evaluator.evaluate()
    output_folder = Path(output_folder).absolute()
    output_folder.mkdir(exist_ok=True)
    output_file = output_folder / sources[0].file_name.replace('wav', 'json')
    with open(output_file, 'w') as f:
        json.dump(scores, f, indent=4)

In [None]:
import glob
import numpy as np



json_files = glob.glob(f"*vocals.json")
df2 = nussl.evaluation.aggregate_score_files(
    json_files, aggregator=np.nanmedian)
nussl.evaluation.associate_metrics(separator_2.model, df2, test_evaluation_dataset_2)
report_card_2 = nussl.evaluation.report_card(
    df2, report_each_source=True)
print(report_card_2)

filepath_2 = Path('/content/sample_data/results/vocals.csv')  
filepath_2.parent.mkdir(parents=True, exist_ok=True)  
df2.to_csv(filepath_2)  

# Separacja basu

In [None]:
tfm = nussl_tfm.Compose([
    # nussl_tfm.SumSources([['bass', 'drums', 'other']]), 
    nussl_tfm.MagnitudeSpectrumApproximation(),
    nussl_tfm.IndexSources('source_magnitudes', 0),
    nussl_tfm.ToSeparationModel(),
])

ps = ('uniform', -3, 3)
ts = ('uniform', 0.6, 1.4)

stft_params = nussl.STFTParams(window_length=1024, hop_length=512, window_type='sqrt_hann') 

# Dane treningowe
fg_path = "~/.nussl/tutorial/train"
train_data_3 = data.on_the_fly(stft_params, transform=tfm, pitch_shift=ps, time_stretch=ts, fg_path=fg_path, num_mixtures=10000, coherent_prob=0.7)

# Dane walidacyjne
fg_path = "~/.nussl/tutorial/valid"
val_data_3 = data.on_the_fly(stft_params, transform=tfm, pitch_shift=ps, time_stretch=ts, fg_path=fg_path, num_mixtures=200)

# Dane testowe
fg_path = "~/.nussl/tutorial/test"
test_data_3 = data.on_the_fly(stft_params, transform=None, fg_path=fg_path, num_mixtures=100, coherent_prob=1.0)

In [None]:
print(test_data_3[0].keys())

print(f"Tensor shape of mix_magnitude {test_data_3[0]['mix_magnitude'].shape}")
print(f"Tensor shape of source_magnitudes {test_data_3[0]['source_magnitudes'].shape}")


print(f"Tensor shape of ideal binary mask {test_data_3[0]['ideal_binary_mask'].shape}")

In [None]:
mix_magnitude = train_data_3[0]['mix_magnitude']
estimates = mix_magnitude.unsqueeze(-1)
print(estimates.shape)

## Model

In [None]:
from nussl.ml.networks.modules import AmplitudeToDB, BatchNorm, RecurrentStack, Embedding
from torch import nn
import torch

class MaskInference(nn.Module):
    def __init__(self, num_features, num_audio_channels, hidden_size,
                 num_layers, bidirectional, dropout, num_sources, 
                activation='sigmoid'):
        super().__init__()
        
        self.amplitude_to_db = AmplitudeToDB()
        self.input_normalization = BatchNorm(num_features)
        self.recurrent_stack = RecurrentStack(
            num_features * num_audio_channels, hidden_size, 
            num_layers, bool(bidirectional), dropout
        )
        hidden_size = hidden_size * (int(bidirectional) + 1)
        self.embedding = Embedding(num_features, hidden_size, 
                                   num_sources, activation, 
                                   num_audio_channels)
        
    def forward(self, data):
        mix_magnitude = data # save for masking
        
        data = self.amplitude_to_db(mix_magnitude)
        data = self.input_normalization(data)
        data = self.recurrent_stack(data)
        mask = self.embedding(data)
        estimates = mix_magnitude.unsqueeze(-1) * mask
        
        output = {
            'mask': mask,
            'estimates': estimates
        }
        return output
    
    # Added function
    @classmethod
    def build(cls, num_features, num_audio_channels, hidden_size, 
              num_layers, bidirectional, dropout, num_sources, 
              activation='sigmoid'):
        # Step 1. Register our model with nussl
        nussl.ml.register_module(cls)
        
        # Step 2a: Define the building blocks.
        modules = {
            'model': {
                'class': 'MaskInference',
                'args': {
                    'num_features': num_features,
                    'num_audio_channels': num_audio_channels,
                    'hidden_size': hidden_size,
                    'num_layers': num_layers,
                    'bidirectional': bidirectional,
                    'dropout': dropout,
                    'num_sources': num_sources,
                    'activation': activation
                }
            }
        }
        
        
        # Step 2b: Define the connections between input and output.
        # Here, the mix_magnitude key is the only input to the model.
        connections = [
            ['model', ['mix_magnitude']]
        ]
        
        # Step 2c. The model outputs a dictionary, which SeparationModel will
        # change the keys to model:mask, model:estimates. The lines below 
        # alias model:mask to just mask, and model:estimates to estimates.
        # This will be important later when we actually deploy our model.
        for key in ['mask', 'estimates']:
            modules[key] = {'class': 'Alias'}
            connections.append([key, f'model:{key}'])
        
        # Step 2d. There are two outputs from our SeparationModel: estimates and mask.
        # Then put it all together.
        output = ['estimates', 'mask',]
        config = {
            'name': cls.__name__,
            'modules': modules,
            'connections': connections,
            'output': output
        }
        # Step 3. Instantiate the model as a SeparationModel.
        return nussl.ml.SeparationModel(config)




## Trenowanie modelu

In [None]:
from common import utils
from common.models import MaskInference
from ignite.engine import Engine
from ignite.contrib.handlers import ProgressBar
from ignite.engine import create_supervised_evaluator



utils.logger()

nf = stft_params.window_length // 2 + 1
nac = 1
# model = MaskInference.build(nf, nac, 512, 3, True, 0.25,1, 'sigmoid')
model = MaskInference.build(nf, nac, 300, 4, True, 0.25,1, 'sigmoid')

# model = nussl.ml.SeparationModel(config)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nussl.ml.train.loss.L1Loss()


train_dataloader = torch.utils.data.DataLoader(
    train_data_3, num_workers=1, batch_size=10)
val_dataloader = torch.utils.data.DataLoader(
    val_data_3, num_workers=1, batch_size=10)
from pathlib import Path


def train_step(engine, batch):
    optimizer.zero_grad()
    model.cuda()
    output = model(batch) # forward pass
    loss = loss_fn(
        output['estimates'],
        batch['source_magnitudes']
    )
    
    loss.backward() # backwards + gradient step
    optimizer.step()
    
    loss_vals = {
        'L1Loss': loss.item(),
        'loss': loss.item()
    }
    
    return loss_vals

def val_step(engine, batch):
    with torch.no_grad():
        output = model(batch) # forward pass
    loss = loss_fn(
        output['estimates'],
        batch['source_magnitudes']
    )    
    loss_vals = {
        'L1Loss': loss.item(), 
        'loss': loss.item()
    }
    return loss_vals

# Create the engines
trainer, validator = nussl.ml.train.create_train_and_validation_engines(
    train_step, val_step, device="cuda"
)

# We'll save the output relative to this notebook.
output_folder = Path('.').absolute()

# Adding handlers from nussl that print out details about model training
# run the validation step, and save the models.
nussl.ml.train.add_stdout_handler(trainer, validator)
nussl.ml.train.add_validate_and_checkpoint(output_folder, model, 
    optimizer, train_data_3, trainer, val_dataloader, validator)



# trainer = Engine(print_train_data)


# trainer, validator = add_progress_bar_handler(trainer, validator)


# ProgressBar().attach(trainer, output_transform=lambda x: {'batch loss': x})
nussl.ml.train.add_progress_bar_handler(trainer,validator)

trainer.run(
    train_dataloader, 
    epoch_length=100, 
    max_epochs=30  
)


### Wczytanie modelu

Ustawienie modelu po trenowaniu

In [None]:
separator_3 = nussl.separation.deep.DeepMaskEstimation(
    nussl.AudioSignal(), model_path='checkpoints/best.model.pth',
    device="cuda",
)

Pobranie modelu z repozytorium

In [None]:
selected_model = [match for match in new_models_paths if "bass" in match][0]

print(selected_model)


separator_3 = nussl.separation.deep.DeepMaskEstimation(
    nussl.AudioSignal(), model_path=model_dir+selected_model,
    device="cuda",
)

new/bestbest.model_bass_12_11_e100_m40.pth


In [None]:
from common import viz
stft_params = nussl.STFTParams(window_length=1024, hop_length=512, window_type='sqrt_hann') 

item = test_data_3[32]
separator_3.audio_signal = item['mix']

estimates = separator_3()

viz.show_sources(estimates)

In [None]:
viz.show_sources(item['sources'])

## Ocena dzialania modelu

In [None]:
import json
from pathlib import Path

tfm = nussl_tfm.Compose([
    nussl_tfm.SumSources([['drums', 'vocals', 'other']]),
])
test_evaluation_dataset_3 = nussl.datasets.MUSDB18(subsets=['test'], transform=tfm)

In [None]:
import json
from pathlib import Path

output_folder = Path('.').absolute()

for i in range(50):
    item = test_evaluation_dataset_3[i]
    separator_3.audio_signal = item['mix']
    estimates = separator_3()

    source_keys = list(item['sources'].keys())
    estimates = {
        'bass': estimates[0],
        'drums+vocals+other': item['mix'] - estimates[0]
    }

    sources = [item['sources'][k] for k in source_keys]
    estimates = [estimates[k] for k in source_keys]

    evaluator = nussl.evaluation.BSSEvalScale(
        sources, estimates, source_labels=source_keys
    )
    scores = evaluator.evaluate()
    output_folder = Path(output_folder).absolute()
    output_folder.mkdir(exist_ok=True)
    output_file = output_folder / sources[0].file_name.replace('wav', 'json')
    with open(output_file, 'w') as f:
        json.dump(scores, f, indent=4)

In [None]:
import glob
import numpy as np



json_files = glob.glob(f"*bass.json")
df3 = nussl.evaluation.aggregate_score_files(
    json_files, aggregator=np.nanmedian)
nussl.evaluation.associate_metrics(separator_3.model, df3, test_evaluation_dataset_3)
report_card_3 = nussl.evaluation.report_card(
    df3, report_each_source=True)
print(report_card_3)

filepath_3 = Path('/content/sample_data/results/bass.csv')  
filepath_3.parent.mkdir(parents=True, exist_ok=True)  
df3.to_csv(filepath_3)  

# Wczytanie dowolnego utworu z YT

In [None]:
!pip install youtube-dl #for downloading video/audio from youtube

import youtube_dl

## I utwór

In [None]:
ydl_args = {
  'format' : 'bestaudio/best',
  'outtmpl' : 'audio_sample.mp3'
}

ydl = youtube_dl.YoutubeDL(ydl_args)

song_yt_link = 'https://youtu.be/aJ5IzGBnWAc?list=RDaJ5IzGBnWAc' 

ydl.download([song_yt_link])

In [None]:
stft_params = nussl.STFTParams(window_length=1024, hop_length=512, window_type='sqrt_hann') 


signal_sample = nussl.AudioSignal('audio_sample.mp3')
signal_sample.to_mono()
signal_sample.stft(*stft_params)

signal_sample.embed_audio()


Lista modeli do separacji:


*   separator_1 - model separujący perkusje
*   separator_2 - moel separujący wokal
*   separator_3 - moel separujący bas

Dla utworu o długości około 4 min separacja wykonuje się 2-3 min

In [None]:
from common import viz

separator_1.audio_signal = signal_sample
estimates1 = separator_1()


separator_2.audio_signal = signal_sample
estimates2 = separator_2()


separator_3.audio_signal = signal_sample
estimates3 = separator_3()


signal_sample_with_sources = {
    'mix': signal_sample,
    'sources':{
        'drums': estimates1[0],
        'vocals': estimates2[0],
        'bass': estimates3[0]
    }
}

viz.show_sources(signal_sample_with_sources['sources'])

In [None]:
nussl.play_utils.multitrack(signal_sample_with_sources['sources'])

## II utwór

In [None]:
ydl_args = {
  'format' : 'bestaudio/best',
  'outtmpl' : 'audio_sample2.mp3'
}

ydl = youtube_dl.YoutubeDL(ydl_args)

song_yt_link = 'https://www.youtube.com/watch?v=d8ekz_CSBVg' 

ydl.download([song_yt_link])

In [None]:
stft_params = nussl.STFTParams(window_length=1024, hop_length=512, window_type='sqrt_hann') 


signal_sample2 = nussl.AudioSignal('audio_sample2.mp3')
signal_sample2.to_mono()
signal_sample2.stft(*stft_params)

signal_sample2.embed_audio()


Lista modeli do separacji:


*   separator_1 - model separujący perkusje
*   separator_2 - moel separujący wokal
*   separator_3 - moel separujący bas

Dla utworu o długości około 4 min separacja wykonuje się 2-3 min

In [None]:
from common import viz

separator_1.audio_signal = signal_sample2
estimates1 = separator_1()


separator_2.audio_signal = signal_sample2
estimates2 = separator_2()


separator_3.audio_signal = signal_sample2
estimates3 = separator_3()


signal_sample_with_sources2 = {
    'mix': signal_sample2,
    'sources':{
        'drums': estimates1[0],
        'vocals': estimates2[0],
        'bass': estimates3[0]
    }
}

viz.show_sources(signal_sample_with_sources2['sources'])

In [None]:
nussl.play_utils.multitrack(signal_sample_with_sources2['sources'])

## III utwór

In [None]:
ydl_args = {
  'format' : 'bestaudio/best',
  'outtmpl' : 'audio_sample.mp3'
}

ydl = youtube_dl.YoutubeDL(ydl_args)

song_yt_link = 'https://www.youtube.com/watch?v=w1RttxsaIBY' 

ydl.download([song_yt_link])

In [None]:
stft_params = nussl.STFTParams(window_length=1024, hop_length=512, window_type='sqrt_hann') 


signal_sample = nussl.AudioSignal('audio_sample.mp3')
signal_sample.to_mono()
signal_sample.stft(*stft_params)

signal_sample.embed_audio()


Lista modeli do separacji:


*   separator_1 - model separujący perkusje
*   separator_2 - moel separujący wokal
*   separator_3 - moel separujący bas

Dla utworu o długości około 4 min separacja wykonuje się 2-3 min

In [None]:
from common import viz

separator_1.audio_signal = signal_sample
estimates1 = separator_1()


separator_2.audio_signal = signal_sample
estimates2 = separator_2()


separator_3.audio_signal = signal_sample
estimates3 = separator_3()


signal_sample_with_sources = {
    'mix': signal_sample,
    'sources':{
        'drums': estimates1[0],
        'vocals': estimates2[0],
        'bass': estimates3[0]
    }
}

viz.show_sources(signal_sample_with_sources['sources'])

In [None]:
nussl.play_utils.multitrack(signal_sample_with_sources['sources'])

## IV utwór

In [None]:
ydl_args = {
  'format' : 'bestaudio/best',
  'outtmpl' : 'audio_sample4.mp3'
}

ydl = youtube_dl.YoutubeDL(ydl_args)

song_yt_link = 'https://www.youtube.com/watch?v=haW_ruZ_Be8' 

ydl.download([song_yt_link])

[youtube] haW_ruZ_Be8: Downloading webpage
[download] Destination: audio_sample4.mp3
[download] 100% of 3.46MiB in 01:14
[ffmpeg] Correcting container in "audio_sample4.mp3"


0

In [None]:
stft_params = nussl.STFTParams(window_length=1024, hop_length=512, window_type='sqrt_hann') 


signal_sample = nussl.AudioSignal('audio_sample4.mp3')
signal_sample.to_mono()
signal_sample.stft(*stft_params)

signal_sample.embed_audio()


Lista modeli do separacji:


*   separator_1 - model separujący perkusje
*   separator_2 - moel separujący wokal
*   separator_3 - moel separujący bas

Dla utworu o długości około 4 min separacja wykonuje się 2-3 min

In [None]:
from common import viz

separator_1.audio_signal = signal_sample
estimates1 = separator_1()


separator_2.audio_signal = signal_sample
estimates2 = separator_2()


separator_3.audio_signal = signal_sample
estimates3 = separator_3()


signal_sample_with_sources = {
    'mix': signal_sample,
    'sources':{
        'drums': estimates1[0],
        'vocals': estimates2[0],
        'bass': estimates3[0]
    }
}

viz.show_sources(signal_sample_with_sources['sources'])

In [None]:
nussl.play_utils.multitrack(signal_sample_with_sources['sources'])