**Loading Data**

**PreProcessing**



In [None]:

config = {

    # Waveform model: sample level CNN
    'CNN': {
        'signal': 'spectrogram',
        "SampleRate": 44100,
        "NumberOfBins": 84,
        "BinsPerOctave": 12, # higher the value better resolution more computationally expensive
        "HopLength": 512,
        "ChunkSize": {"large":8613, "mid": 4306 ,"small": 2153,"verySmall": 431},
        "SpectrogramDuration": 10,
        "Mono":1,
        "InstrumentLookup": {1:0,  41:1, 42:2, 43:3, 72:4, 71:5, 61:6, 69:7, 74:8, 7:9, 44:10},
        "Instruments" : {"Grand Piano":1, "Viola": 41, "Cello":42, "Contrabass":43, "Piccolo":72, "Brass Section":61, "English Horn":69, "Recorder":74,"Clavinet":7, "Tremolo Strings":44},
        "FilterShapes": [(32, (3, 3)), (64, (5, 5)), (128, (16, 2)), (128, (2,16)), (256,(7,7))],
        "NumberInstruments": 11,
        "NumberPitches": 128,
        "BatchSize": 16,
        "LearningRate": 0.000001,
        "NumberEpochs": 100,
        "Regularisation": 0.00001
        },
    
    'Path': {
        "TestAudio": "test_data",
        "TrainAudio": "train_data",
        "ValidAudio": "valid_data",
        "TestLabels": "test_labels",
        "TrainLabels": "train_labels",
        "ValidLabels": "valid_labels",
        "SavedModels": "saved_models/model.pth"
    }
    # Waveform model: sample level CNN
}


In [None]:
##Main Pre processing
import librosa
import numpy as np
import os
import torch
import math
import csv
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, TensorDataset, random_split
 
class AudioProcessing:
    def __init__(self, sampleRate, chunkSize, config):
        self.sampleRate = sampleRate
        self.chunkSize = chunkSize
        self.duration = config['CNN']["SpectrogramDuration"]
        self.mono = config['CNN']["Mono"]
        self.numberOfBins = config["CNN"]["NumberOfBins"]
        self.binsPerOctave = config["CNN"]["BinsPerOctave"]
        self.hopLength = config["CNN"]["HopLength"]

    # Extracts the CQT Spectrogram from the audio files 
    def extractAudio(self, audioSignal):
        
        cqt = librosa.cqt(audioSignal, sr=self.sampleRate, n_bins=self.numberOfBins, bins_per_octave=self.binsPerOctave, hop_length=self.hopLength)
        #print(f"First 5 audio: {cqt[:5]}")
        return librosa.amplitude_to_db(np.abs(cqt), ref=np.max)
    
    # Splits the audio files into chunks
    def chunkAudio(self, audioCQT):
        chunkLength = (audioCQT.shape[-1] + self.chunkSize - 1) // self.chunkSize
        paddedLength = chunkLength * self.chunkSize - audioCQT.shape[-1]
        paddedData = np.pad(audioCQT, ((0, 0), (0, paddedLength)), mode='constant', constant_values=0)
        return paddedData.reshape(-1, self.numberOfBins, self.chunkSize).transpose(0, 2, 1)  # Rearrange dimensions to match the expected output
    
    # Normalises audio files using gaussian normalisation
    def normaliseAudio(self, audioCQT):
        mean = np.mean(audioCQT, keepdims=True)
        std = np.std(audioCQT, keepdims=True)
        normalisedData = (audioCQT - mean) / std
        return normalisedData
            
    # Goes through each file in the directoryPath and applies all of the functions above
    def processAudio(self, directoryPath):
        signals = []
        for filename in os.listdir(directoryPath):
            filePath = os.path.join(directoryPath, filename)
            if filePath.endswith('.wav'):
                signal, _ = librosa.load(filePath, sr=self.sampleRate, mono=self.mono)
                cqtFeatures = self.extractAudio(signal)
                chunkedFeatures = self.chunkAudio(cqtFeatures)
                normalisedFeatures = self.normaliseAudio(chunkedFeatures)
                signals.append(normalisedFeatures)
                # Explicit memory management
        return signals

class LabelProcessing:
    def __init__(self, instrumentLookup, numberPitches, chunkSize, numFrames, frameRate):
        self.instrumentLookup = instrumentLookup
        self.numberPitches = numberPitches
        self.chunkSize = chunkSize
        self.numFrames = numFrames
        self.frameRate = frameRate

    def labelsToFrames(self, labels, num_frames, frame_rate):
        numberInstruments = len(self.instrumentLookup)
        annotationMatrix = torch.zeros((num_frames, numberInstruments + self.numberPitches))

        for label in labels:
            startFrame = math.floor(label['start_time'] * frame_rate / self.chunkSize)
            endFrame = math.ceil(label['end_time'] * frame_rate / self.chunkSize)
           # print(f"Label: {label}, Start Frame: {startFrame}, End Frame: {endFrame}")  # Debugging print
            
            if label['instrument'] in self.instrumentLookup:
                instrumentIdx = self.instrumentLookup[label['instrument']]
                annotationMatrix[startFrame:endFrame, instrumentIdx] = 1
            
            if 0 <= label['note'] < self.numberPitches:
                pitchIdx = numberInstruments + label['note']
                annotationMatrix[startFrame:endFrame, pitchIdx] = 1

        return annotationMatrix

    def processLabels(self, directoryPath):
        labels = []
        for filename in os.listdir(directoryPath):
            if filename.endswith(".csv"):
                labelFilePath = os.path.join(directoryPath, filename)
                with open(labelFilePath, 'r') as csvfile:
                    csvreader = csv.reader(csvfile)
                    next(csvreader)  # Skip header row
                    for row in csvreader:
                        labels.append({
                            'start_time': float(row[0]),
                            'end_time': float(row[1]),
                            'instrument': int(row[2]),
                            'note': int(row[3]),
                        })
        # Debugging print to verify labels format
        print(f"First 5 labels: {labels[:5]}")
        
        annotationMatrix = self.labelsToFrames(labels, self.numFrames, self.frameRate)
        numInstruments = len(self.instrumentLookup)
        instrumentMatrix = annotationMatrix[:, :numInstruments]
        pitchMatrix = annotationMatrix[:, numInstruments:]
        
        return instrumentMatrix, pitchMatrix


class PreProcessingPipeline:
    def __init__(self, config, numFrames, frameRate):
        self.config = config
        self.numFrames = numFrames
        self.frameRate = frameRate 
        self.sampleRate = config["CNN"]["SampleRate"]
        self.chunkSize = config["CNN"]["ChunkSize"]["verySmall"]
        self.numberPitches = config["CNN"]["NumberPitches"]
        self.instrumentLookup = config["CNN"]["InstrumentLookup"]
        self.audioProcessing = AudioProcessing(self.sampleRate, self.chunkSize, self.config)
        self.labelProcessing = LabelProcessing(self.instrumentLookup, self.numberPitches, self.chunkSize, self.numFrames, self.frameRate)

    def processAll(self, labelsDir, audioDir):
        
        features = self.audioProcessing.processAudio(audioDir)
    
        featuresArray = np.concatenate(features, axis=0)
        featuresArray = np.expand_dims(featuresArray, axis=1)

        # Convert the concatenated numpy array to a PyTorch tensor
        featuresTensor = torch.from_numpy(featuresArray).float()
    
        # Process labels and get tensors for instrument and pitch matrices
        instrumentMatrix, pitchMatrix = self.labelProcessing.processLabels(labelsDir)
        print(instrumentMatrix.shape)
        
       
        # Ensure the tensors for features, instrument labels, and pitch labels are of the same length
        min_size = min(featuresTensor.size(0), instrumentMatrix.size(0), pitchMatrix.size(0))
        featuresTensor = featuresTensor[:min_size]
        instrumentMatrix = instrumentMatrix[:min_size]
        pitchMatrix = pitchMatrix[:min_size]

        print(pitchMatrix.shape)
        print(instrumentMatrix.shape)
       
        print(featuresTensor.shape)
         # Visualize the instrument activations
        fig, ax1 = plt.subplots(figsize=(10, 5))
        ax1.imshow(instrumentMatrix.T, aspect='auto', origin='lower', cmap='hot')
        ax1.set_title('Instrument Activations')
        ax1.set_xlabel('Frame Index')
        ax1.set_ylabel('Instrument Index')
        plt.show()

        # Visualize the pitch activations
        fig, ax2 = plt.subplots(figsize=(10, 5))
        ax2.imshow(pitchMatrix.T, aspect='auto', origin='lower', cmap='hot')
        ax2.set_title('Pitch Activations')
        ax2.set_xlabel('Frame Index')
        ax2.set_ylabel('Pitch Index')
        plt.show()
    
        # Create the dataset from tensors
        dataset = TensorDataset(featuresTensor, instrumentMatrix, pitchMatrix)
        print(dataset)
        return dataset
    
audioLength = config['CNN']["SampleRate"] * config['CNN']["SpectrogramDuration"]
numFrames = math.ceil(audioLength / config['CNN']["ChunkSize"]["verySmall"])
frameRate = config['CNN']["SampleRate"] / config['CNN']["ChunkSize"]["verySmall"]
pipeline = PreProcessingPipeline(config,numFrames,frameRate)


trainDataset = pipeline.processAll(config["Path"]["TrainLabels"], config["Path"]["TrainAudio"])

testDataset = pipeline.processAll(config["Path"]["TestLabels"], config["Path"]["TestAudio"])

validDataset = pipeline.processAll(config["Path"]["ValidLabels"], config["Path"]["ValidAudio"])


trainDataloader = DataLoader(trainDataset, batch_size=config["CNN"]["BatchSize"], shuffle=True)
testDataloader = DataLoader(testDataset, batch_size=config["CNN"]["BatchSize"], shuffle=True,pin_memory=True)
validDataloader = DataLoader(validDataset, batch_size=config["CNN"]["BatchSize"], shuffle=True,pin_memory=True)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class MultiTaskCnnModel(nn.Module):
    """A CNN model for multi-task learning with separate branches for pitch and instrument detection."""
    def __init__(self, NumberPitches, NumberInstruments, NumberOfBins):
        super(MultiTaskCnnModel, self).__init__()

        # Shared initial convolutional blocks
        self.sharedConvBlocks = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=32, kernel_size=(3, 3), padding=1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(0.001),
            nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),
            nn.Dropout(0.8),
            
            
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(5, 5), padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(0.001),
            nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),
            nn.Dropout(0.8)
            
        )

        # Pitch-specific convolutional blocks
        self.pitchConvBlocks = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=(1, int(0.2 * NumberOfBins)), padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.001),
            nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),
            nn.Dropout(0.8),

            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=(3, 2), padding=1),
            nn.BatchNorm2d(256),  # Corrected to match out_channels
            nn.LeakyReLU(0.001),
            nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),
            nn.Dropout(0.8)
           
        )

        # Instrument-specific convolutional blocks
        self.instrumentConvBlocks = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=(5, int(0.25 * NumberOfBins)), padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.001),
            nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),
            nn.Dropout(0.8),
            

            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=(7, 1), padding=1),
            nn.BatchNorm2d(256),  # Corrected to match out_channels
            nn.LeakyReLU(0.001),
            nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),
            nn.Dropout(0.8)
            
        )

        # Adaptive pooling layer
        self.adaptivePool = nn.AdaptiveAvgPool2d((1, 1)) 

        # Fully connected layers for pitch detection
        self.pitch = nn.Sequential(
            nn.Linear(in_features=256 , out_features=1024),  # Adjusted in_features based on flattened size
            nn.Dropout(0.8),
            nn.LeakyReLU(0.001),
            nn.Linear(in_features=1024, out_features=NumberPitches)
        )

        # Fully connected layers for instrument detection
        self.instrument = nn.Sequential(
            nn.Linear(in_features=256 , out_features=512),  # Adjusted in_features based on flattened size
            nn.Dropout(0.8),
            nn.LeakyReLU(0.001),
            nn.Linear(in_features=512, out_features= NumberInstruments)
        )

        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, (nn.Linear, nn.Conv2d)):
            torch.nn.init.kaiming_uniform_(module.weight, nonlinearity='relu')
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.BatchNorm2d):
            torch.nn.init.ones_(module.weight)
            torch.nn.init.zeros_(module.bias)
            
    def forward(self, x):
       # print('x_shape:',x.shape)
       
        x = self.sharedConvBlocks(x)  # Shared processing
        

        # Task-specific processing
        pitchFeatures = self.pitchConvBlocks(x)
        instrumentFeatures = self.instrumentConvBlocks(x)

        # Apply adaptive pooling to make the output size consistent
        pitchFeatures = self.adaptivePool(pitchFeatures)
        instrumentFeatures = self.adaptivePool(instrumentFeatures)

        # Flatten features for FC layers
        pitchFeatures = torch.flatten(pitchFeatures, 1)
        instrumentFeatures = torch.flatten(instrumentFeatures, 1)

        # Final task-specific classification
        pitchOut = self.pitch(pitchFeatures)
        instrumentOut = self.instrument(instrumentFeatures)
        #print('x_shape:',x.shape)
       # print('instrumentOut:',instrumentOut.shape)
        #print('pitchOut:',pitchOut.shape)

        return pitchOut, instrumentOut
model = MultiTaskCnnModel(config["CNN"]["NumberPitches"], config["CNN"]["NumberInstruments"], config["CNN"]["NumberOfBins"])

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.cuda.amp import GradScaler, autocast
import librosa
import librosa.display


def visualiseLossesTotal(trainLosses, validLosses, validAccuracies):
    fig, (ax1, ax2) = plt.subplots(1, 2)
    ax1.plot(trainLosses, label='Training')
    ax1.plot(validLosses, label='Validation')
    ax1.legend()
    ax1.set_title('Loss')
    ax1.set_ylim([0, 1.5]) 
    ax2.plot(validAccuracies)
    ax2.set_title('Validation Accuracy')
    plt.tight_layout()
    plt.show()

def visualiseLossesPitch(trainLosses, validLosses, validAccuracies):
    fig, (ax1, ax2) = plt.subplots(1, 2)
    
    ax1.plot(trainLosses, label='Training', color='purple')
    ax1.plot(validLosses, label='Validation', color='silver')
    ax1.legend()
    ax1.set_title('Loss')
    ax1.set_ylim([0, 1.5]) 

    ax2.plot( validAccuracies, label='Accuracy', color='purple')
    ax2.legend()
    ax2.set_title('Validation Accuracy')
    
    plt.tight_layout()
    plt.show()

def visualiseLossesInstrument(trainLosses, validLosses, validAccuracies):
    fig, (ax1, ax2) = plt.subplots(1, 2)
    
    ax1.plot( trainLosses, label='Training', color='red')
    ax1.plot( validLosses, label='Validation', color='pink')
    ax1.legend()
    ax1.set_title('Loss')
    ax1.set_ylim([0, 1.5]) 

    ax2.plot(validAccuracies, label='Accuracy', color='red')
    ax2.legend() # This was missing in your code to actually display the label
    ax2.set_title('Validation Accuracy')
    
    plt.tight_layout()
    plt.show()
def evaluate(model, dataLoader, pitchLoss, instrumentLoss,device):
    model.eval()
    numBatches = len(dataLoader)
    pitchEpochLoss = instrumentEpochLoss = pitchAccuracy = instrumentAccuracy = 0
    with torch.inference_mode():
        for batchInputs, batchLabelsInstrument, batchLabelsPitch in dataLoader:
            batchInputs = batchInputs.to(device)
            batchLabelsPitch = batchLabelsPitch.to(device)
            batchLabelsInstrument = batchLabelsInstrument.to(device)
            pitchOutputs,instrumentOutputs = model(batchInputs)
            pitchOutputs,instrumentOutputs = pitchOutputs.squeeze(),instrumentOutputs.squeeze()

            pitchProbabilities = torch.sigmoid(pitchOutputs)
            instrumentProbabilities = torch.sigmoid(instrumentOutputs)

            # Convert probabilities to binary outputs
            pitchBinaryOutputs = (pitchProbabilities >= 0.5).int()
            instrumentBinaryOutputs = (instrumentProbabilities >= 0.5).int()

            pitchAccuracy += (pitchBinaryOutputs == batchLabelsPitch).sum().item()
            instrumentAccuracy += (instrumentBinaryOutputs == batchLabelsInstrument).sum().item()
          
            pitchEpochLoss += pitchLoss(pitchOutputs, batchLabelsPitch).item()
            instrumentEpochLoss += instrumentLoss(instrumentOutputs, batchLabelsInstrument).item()
           
        
        pitchEpochLoss /= numBatches
        pitchAccuracy /= len(dataLoader.dataset)
        
        instrumentEpochLoss /= numBatches
        instrumentAccuracy /= len(dataLoader.dataset)

        totalEpochLoss = (pitchEpochLoss + instrumentEpochLoss)/2
        totalAccuracy = (pitchAccuracy + instrumentAccuracy)/2
    return totalEpochLoss, totalAccuracy, pitchEpochLoss, pitchAccuracy, instrumentEpochLoss, instrumentAccuracy 

   
def train(model, trainLoader, validLoader, numEpochs, saved_model, optimizer):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    pitchLoss = nn.BCEWithLogitsLoss()
    instrumentLoss = nn.BCEWithLogitsLoss()
    scaler = GradScaler()
    bestValidAcc = 0
    earlyStopping = 100
    evaluateEveryNEpochs = 10
    
    trainLosses, validLosses, validAccuracies, = [], [], []
    pitchTrainLosses, pitchValidLosses, pitchValidAccuracies = [], [], []
    instrumentTrainLosses, instrumentValidLosses, instrumentValidAccuracies = [], [], []

    for epoch in range(numEpochs):
        model.train()
        totalTrainLoss= pitchTrainLoss= instrumentTrainLoss  = 0.0
        for inputs, instrumentTargets, pitchTargets in trainLoader:
            inputs, instrumentTargets, pitchTargets = inputs.to(device), instrumentTargets.to(device), pitchTargets.to(device)
            optimizer.zero_grad()
            with autocast():
                pitchOutputs, instrumentOutputs = model(inputs)
                lossPitch = pitchLoss(pitchOutputs, pitchTargets.float())
                lossInstrument = instrumentLoss(instrumentOutputs, instrumentTargets.float())
                totalLoss = lossPitch + lossInstrument
            scaler.scale(totalLoss).backward()
            scaler.step(optimizer)
            scaler.update()
            totalTrainLoss += totalLoss.item()
            pitchTrainLoss += lossPitch.item()
            instrumentTrainLoss += lossInstrument.item()
        avgTotalTrainLoss = totalTrainLoss / len(trainLoader)
        avgPitchTrainLoss = pitchTrainLoss / len(trainLoader)
        avgInstrumentTrainLoss = instrumentTrainLoss / len(trainLoader)
        trainLosses.append(avgTotalTrainLoss)
        pitchTrainLosses.append(avgPitchTrainLoss)
        instrumentTrainLosses.append(avgInstrumentTrainLoss)

        totalValidLoss = totalValidAcc = pitchValidLoss = pitchValidAcc =InstrumentValidLoss= InstrumentValidAcc = 0
        if((epoch+1)):
            totalValidLoss, totalValidAcc, pitchValidLoss, pitchValidAcc, InstrumentValidLoss, InstrumentValidAcc = evaluate(model, validLoader, pitchLoss, instrumentLoss, device)
            
            validLosses.append(totalValidLoss)
            validAccuracies.append(totalValidAcc)
            pitchValidLosses.append(pitchValidLoss)
            pitchValidAccuracies.append(pitchValidAcc)
            instrumentValidLosses.append(InstrumentValidLoss)
            instrumentValidAccuracies.append(InstrumentValidAcc)
            print(f'Epoch {epoch + 1}/{numEpochs}, Total Train Loss: {trainLosses[-1]:.6f}, Total Validation Loss: {validLosses[-1]:.6f}, Total Validation Accuracy: {validAccuracies[-1]:.4f}')
            print(f'Epoch {epoch + 1}/{numEpochs}, Pitch Train Loss: {pitchTrainLosses[-1]:.6f}, Pitch Validation Loss: {pitchValidLosses[-1]:.6f}, Pitch Validation Accuracy: {pitchValidAccuracies[-1]:.4f}')
            print(f'Epoch {epoch + 1}/{numEpochs}, Instrument Train Loss: {instrumentTrainLosses[-1]:.6f}, Instrument Validation Loss: {instrumentValidLosses[-1]:.6f}, Instrument Validation Accuracy: {instrumentValidAccuracies[-1]:.4f}')
        if((epoch+1) % evaluateEveryNEpochs == 0):
            totalValidLoss, totalValidAcc, pitchValidLoss, pitchValidAcc, InstrumentValidLoss, InstrumentValidAcc = evaluate(model, validLoader, pitchLoss, instrumentLoss, device)
            
            validLosses.append(totalValidLoss)
            validAccuracies.append(totalValidAcc)
            pitchValidLosses.append(pitchValidLoss)
            pitchValidAccuracies.append(pitchValidAcc)
            instrumentValidLosses.append(InstrumentValidLoss)
            instrumentValidAccuracies.append(InstrumentValidAcc)
            visualiseLossesTotal(trainLosses, validLosses, validAccuracies)
            visualiseLossesPitch(pitchTrainLosses, pitchValidLosses, pitchValidAccuracies)
            visualiseLossesInstrument(instrumentTrainLosses, instrumentValidLosses, instrumentValidAccuracies)


       
        if(totalValidAcc >= bestValidAcc):
            bestValidAcc = totalValidAcc
            torch.save(model.state_dict(), saved_model)
            print('Saving model with lowest validation loss')
            epochsNoGood = 0
        else:
            epochsNoGood += 1
            if epochsNoGood >= earlyStopping:
                print('Early stopping!')
                break

optimizer = optim.Adam(model.parameters(), lr= config['CNN']['LearningRate'],weight_decay= config['CNN']['Regularisation'])

train(model, trainDataloader, validDataloader, config['CNN']["NumberEpochs"], config["Path"]["SavedModels"],optimizer )