# Source Separation Using Classifier Transfer Learning

In [1]:
import os
import pickle
import tensorflow as tf
from keras.layers import Input, Dense, Conv2D, MaxPool2D, Flatten, add
from keras.models import Model

2023-05-09 23:48:54.684878: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-05-09 23:48:56.640861: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /hpc/mp/spack/opt/spack/linux-ubuntu20.04-zen2/gcc-10.3.0/cudnn-8.2.4.15-11.4-eluwegpwn6adr7hlku5p5wru5xzefpop/lib64:/hpc/mp/spack/opt/spack/linux-ubuntu20.04-zen2/gcc-10.3.0/cuda-11.4.4-ctldo35wmmwws3jbgwkgjjcjawddu3qz/lib64
2023-05-09 23:48:56.641025: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libn

In [2]:
import vggish_params as params


path = 'vggish_model.ckpt'

class VGGish(tf.keras.Model):
    def __init__(self, training=False):
        super(VGGish, self).__init__()
        self.training = training

        # The VGG stack of alternating convolutions and max-pools.
        self.conv1 = Conv2D(64, kernel_size=[3, 3], padding='same', activation=tf.nn.relu, trainable=self.training)
        self.pool1 = MaxPool2D(pool_size=[2, 2], padding='same', trainable=self.training)
        self.conv2 = Conv2D(128, kernel_size=[3, 3], padding='same', activation=tf.nn.relu, trainable=self.training)
        self.pool2 = MaxPool2D(pool_size=[2, 2], padding='same', trainable=self.training)
        self.conv3_1 = Conv2D(256, kernel_size=[3, 3], padding='same', activation=tf.nn.relu, trainable=self.training)
        self.conv3_2 = Conv2D(256, kernel_size=[3, 3], padding='same', activation=tf.nn.relu, trainable=self.training)
        self.pool3 = MaxPool2D(pool_size=[2, 2], padding='same', trainable=self.training)
        self.conv4_1 = Conv2D(512, kernel_size=[3, 3], padding='same', activation=tf.nn.relu, trainable=self.training)
        self.conv4_2 = Conv2D(512, kernel_size=[3, 3], padding='same', activation=tf.nn.relu, trainable=self.training)
        self.pool4 = MaxPool2D(pool_size=[2, 2], padding='same', trainable=self.training)

        # Flatten before entering fully-connected layers
        self.flatten = Flatten()
        self.fc1_1 = Dense(4096, activation=tf.nn.relu, trainable=self.training)
        self.fc1_2 = Dense(4096, activation=tf.nn.relu, trainable=self.training)
        # The embedding layer.
        self.fc2 = Dense(params.EMBEDDING_SIZE, activation=None, trainable=self.training)

    def call(self, inputs):
        net = self.conv1(inputs)
        net = self.pool1(net)
        net = self.conv2(net)
        net = self.pool2(net)
        net = self.conv3_1(net)
        net = self.conv3_2(net)
        net = self.pool3(net)
        net = self.conv4_1(net)
        net = self.conv4_2(net)
        net = self.pool4(net)

        net = self.flatten(net)
        net = self.fc1_1(net)
        net = self.fc1_2(net)
        net = self.fc2(net)
        
        return net

    def load_vggish_slim_checkpoint(self, checkpoint_path):
        """Loads a pre-trained VGGish-compatible checkpoint."""
        self.load_weights(checkpoint_path)

vggish = VGGish()
vggish.load_vggish_slim_checkpoint(path)


In [None]:
import os
import pandas as pd
import librosa
import numpy as np

# train csv path
train_csv_path = 'openmic-2018/partitions/split01_train.csv'
# test csv path
test_csv_path = 'openmic-2018/partitions/split01_test.csv'

# open csvs
train_df = pd.read_csv(train_csv_path)
test_df = pd.read_csv(test_csv_path)

# convert to numpy arrays
train_df = train_df.to_numpy()
test_df = test_df.to_numpy()

# make each a single list
train_df = train_df.flatten()
test_df = test_df.flatten()

# print the first 5 rows of the train and test dataframes
print(train_df[:5])
print(test_df[:5])

# only use the first 10% of each csv
train_df = train_df[:int(len(train_df) * 1)]
test_df = test_df[:int(len(test_df) * 1)]

['000135_483840' '000139_119040' '000141_153600' '000144_30720'
 '000145_172800']
['000308_61440' '000312_184320' '000319_145920' '000321_218880'
 '000327_88320']


In [None]:
dataset_path = 'spectrograms'
labels_path = 'labels.csv'

# Read the labels CSV file
# ['filename' 'clarinet' 'flute' 'trumpet' 'saxophone' 'voice' 'accordion' 'ukulele' 'mallet_percussion' 'piano' 'guitar' 'mandolin' 'banjo' 'synthesizer' 'trombone' 'organ' 'drums' 'bass' 'cymbals' 'cello' 'violin']
labels_df = pd.read_csv(labels_path)

# Get the list of all the filenames
filenames = labels_df['filename'].values.tolist()

# load the spectrograms and labels
spectrograms_train = []
labels_train = []

spectrograms_test = []
labels_test = []

#check if pickle file exists
if not os.path.isfile('pickle/spectrograms_train.pkl'):
    
    for filename in filenames:
        # if the filename is not in the train or test dataframe, skip it
        if filename not in train_df and filename not in test_df:
            continue

        # load the spectrogram
        spectrogram = np.load(os.path.join(dataset_path, filename + '.npy'))

        # the fist index is the filename, the next 20 are the labels and the last 20 are the masks
        label = labels_df[labels_df['filename'] == filename].values.tolist()[0][1:21]
        mask = labels_df[labels_df['filename'] == filename].values.tolist()[0][21:]

        # threshold the labels
        label = np.array(label) > 0.5

        # make a pair of the spectrogram and the label
        combined = list(zip(label, mask))

        # append each second seperatly
        if filename in train_df:
            for i in range(10):
                spectrograms_train.append(spectrogram[i])
                labels_train.append(combined)
        elif filename in test_df:
            for i in range(10):
                spectrograms_test.append(spectrogram[i])
                labels_test.append(combined)
        else:
            continue
            # print(f"Filename {filename} not found in train or test dataframes")

    # convert the lists to numpy arrays
    spectrograms_train = np.array(spectrograms_train)
    labels_train = np.array(labels_train)

    spectrograms_test = np.array(spectrograms_test)
    labels_test = np.array(labels_test)

    #spectrograms = spectrograms.reshape(spectrograms.shape[0], num_frames, num_bands, 1)
    spectrograms_test = np.expand_dims(spectrograms_test, axis=-1)
    spectrograms_train = np.expand_dims(spectrograms_train, axis=-1)

    #pickle the spectrogram test and train data
    pickle.dump(spectrograms_test, open('pickle/spectrograms_test.pkl', 'wb'))
    pickle.dump(spectrograms_train, open('pickle/spectrograms_train.pkl', 'wb'))
    #pickle the labels
    pickle.dump(labels_test, open('pickle/labels_test.pkl', 'wb'))
    pickle.dump(labels_train, open('pickle/labels_train.pkl', 'wb'))
else:
    #load the spectrogram test and train data
    spectrograms_test = pickle.load(open('pickle/spectrograms_test.pkl', 'rb'))
    spectrograms_train = pickle.load(open('pickle/spectrograms_train.pkl', 'rb'))
    #load the labels
    labels_test = pickle.load(open('pickle/labels_test.pkl', 'rb'))
    labels_train = pickle.load(open('pickle/labels_train.pkl', 'rb'))



print(f"Spectrograms shape: {spectrograms_train.shape}")
print(f"Labels shape: {labels_train.shape}")

Spectrograms shape: (149140, 96, 64, 1)
Labels shape: (149140, 20, 2)


In [1]:
x = vggish.predict(spectrograms_train[:10])

print(x.shape)

NameError: name 'vggish' is not defined

In [None]:
import torch
import torch.nn as nn

class UNet(nn.Module):
    def __init__(self, input_shape, num_filters):
        super().__init__()
        self.input_shape = input_shape
        self.num_filters = num_filters
        self.build()

    def conv_block(self, input, num_filters):
        x = nn.Conv2d(input.shape[1], num_filters, 3, padding=1)(input)
        x = nn.BatchNorm2d(num_filters)(x)
        x = nn.ReLU()(x)
        x = nn.Conv2d(num_filters, num_filters, 3, padding=1)(x)
        x = nn.BatchNorm2d(num_filters)(x)
        x = nn.ReLU()(x)
        return x

    def encoder_block(self, input, num_filters):
        x = self.conv_block(input, num_filters)
        p = nn.MaxPool2d(2)(x)
        return x, p

    def decoder_block(self, input, skip_features, num_filters):
        x = nn.ConvTranspose2d(input.shape[1], num_filters, 2, stride=2)(input)
        x = torch.cat([x, skip_features], dim=1)
        x = self.conv_block(x, num_filters)
        return x

    def build(self):
        # Input
        self.inputs = nn.Parameter(torch.randn(1, self.input_shape[2], self.input_shape[0], self.input_shape[1]), requires_grad=True)

        # Encoder
        e1, p1 = self.encoder_block(self.inputs, self.num_filters)
        e2, p2 = self.encoder_block(p1, self.num_filters*2)
        e3, p3 = self.encoder_block(p2, self.num_filters*4)
        e4, p4 = self.encoder_block(p3, self.num_filters*8)

        # Bridge
        b1 = self.conv_block(p4, self.num_filters*16)

        # Decoder
        d1 = self.decoder_block(b1, e4, self.num_filters*8)
        d2 = self.decoder_block(d1, e3, self.num_filters*4)
        d3 = self.decoder_block(d2, e2, self.num_filters*2)
        d4 = self.decoder_block(d3, e1, self.num_filters)

        # Output
        self.outputs = nn.Sequential(
            nn.Conv2d(self.num_filters, 1, 1),
            nn.Sigmoid()
        )

    def forward(self):

        # Encoder
        e1, p1 = self.encoder_block(self.inputs, self.num_filters)
        e2, p2 = self.encoder_block(p1, self.num_filters*2)
        e3, p3 = self.encoder_block(p2, self.num_filters*4)
        e4, p4 = self.encoder_block(p3, self.num_filters*8)

        # Bridge
        b1 = self.conv_block(p4, self.num_filters*16)

        # Decoder
        d1 = self.decoder_block(b1, e4, self.num_filters*8)
        d2 = self.decoder_block(d1, e3, self.num_filters*4)
        d3 = self.decoder_block(d2, e2, self.num_filters*2)
        d4 = self.decoder_block(d3, e1, self.num_filters)

        # Output
        return self.outputs(d4)

    def summary(self):
        print(self)

    def compile(self, optimizer, loss):
        self.optimizer = optimizer(self.parameters())
        self.loss = loss

    def fit(self, X_train, y_train, epochs):
        self.train()
        for epoch in range(epochs):
            epoch_loss = 0.0
            for X, y in zip(X_train, y_train):
                # Clear gradients
                self.optimizer.zero_grad()

                # Forward pass
                y_pred = self.forward()

                # Calculate loss
                loss = self.loss(y_pred, y)

                # Backward pass
                loss.backward()

                # Update weights
                self.optimizer.step()

                # Update epoch loss
                epoch_loss += loss.item()

            # Print epoch information
            print("Epoch {} - loss: {}".format(epoch, epoch_loss))



: 

## UNet Spectrogram Masker

In [26]:
class UNet:
    def __init__(self, input_shape, num_filters):
        self.input_shape = input_shape
        self.num_filters = num_filters
        self.build()

    def conv_block(self, input, num_filters):
        x = Conv2D(num_filters, 3, padding="same", activation="relu")(input)
        x = BatchNormalization()(x)
        x = Conv2D(num_filters, 3, padding="same", activation="relu")(x)
        x = BatchNormalization()(x)
        return x

    def encoder_block(self, input, num_filters):
        x = self.conv_block(input, num_filters)
        p = MaxPooling2D((2, 2))(x)
        return x, p

    def decoder_block(self, input, skip_features, num_filters):
        x = Conv2DTranspose(num_filters, (2, 2), strides=2, padding="same")(input)
        x = concatenate([x, skip_features], axis=3)
        x = self.conv_block(x, num_filters)
        return x

    def build(self):
        # Input
        inputs = Input(shape=self.input_shape)

        # Encoder
        e1, p1 = self.encoder_block(inputs, self.num_filters)
        e2, p2 = self.encoder_block(p1, self.num_filters*2)
        e3, p3 = self.encoder_block(p2, self.num_filters*4)
        e4, p4 = self.encoder_block(p3, self.num_filters*8)

        # Bridge
        b1 = self.conv_block(p4, self.num_filters*16)

        # Decoder
        d1 = self.decoder_block(b1, e4, self.num_filters*8)
        d2 = self.decoder_block(d1, e3, self.num_filters*4)
        d3 = self.decoder_block(d2, e2, self.num_filters*2)
        d4 = self.decoder_block(d3, e1, self.num_filters)

        # Output
        outputs = Conv2D(1, (1, 1), activation='sigmoid')(d4)

        # Model
        self.model = Model(inputs, outputs)

    def summary(self):
        self.model.summary()

    def compile(self, optimizer, loss, metrics=None):
        self.model.compile(optimizer=optimizer, loss=loss, metrics=metrics)


In [27]:
model = UNet((512, 128, 1), 32)
model.summary()

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_4 (InputLayer)           [(None, 512, 128, 1  0           []                               
                                )]                                                                
                                                                                                  
 conv2d_20 (Conv2D)             (None, 512, 128, 32  320         ['input_4[0][0]']                
                                )                                                                 
                                                                                                  
 batch_normalization_18 (BatchN  (None, 512, 128, 32  128        ['conv2d_20[0][0]']              
 ormalization)                  )                                                           