In [198]:
import os
import pandas as pd
import numpy as np

In [199]:
# Paths to the directories
train_data_dir = '../Data/musicnet/train_data'
train_labels_dir = '../Data/musicnet/train_labels'
test_data_dir = '../Data/musicnet/test_data'
test_labels_dir = '../Data/musicnet/test_labels'

In [200]:
HOP_LENGTH = 512

In [201]:
def calculate_cqt_stats(data_dir, preprocess_audio, frameSize=2048, overlapFac=0.5, bins_per_octave=12, n_octaves=7):
    sum_ = None
    sum_of_squares = None
    count = 0

    # Wrap the file iteration with tqdm for a progress bar
    for file in tqdm(os.listdir(data_dir), desc="Calculating CQT stats"):
        if file.endswith('.wav'):
            cqt = preprocess_audio(os.path.join(data_dir, file), frameSize, overlapFac, bins_per_octave, n_octaves)
            if sum_ is None:
                sum_ = np.sum(cqt, axis=0)
                sum_of_squares = np.sum(cqt ** 2, axis=0)
            else:
                sum_ += np.sum(cqt, axis=0)
                sum_of_squares += np.sum(cqt ** 2, axis=0)
            count += cqt.shape[0]

    mean = sum_ / count
    variance = (sum_of_squares - (sum_ ** 2) / count) / count
    std_dev = np.sqrt(variance)

    return mean, std_dev

In [202]:
def generate_cqt(audio_file, frameSize=2048, overlapFac=0.5, bins_per_octave=12, n_octaves=7, mean=None, std=None):
    """
    Generates a Constant-Q Transform (CQT) of an audio file.

    Parameters:
    - audio_file: Path to the audio file.
    - frameSize: Size of each frame for the CQT. Default is 2048.
    - overlapFac: Overlap factor between frames. Default is 0.5 (50% overlap).
    - bins_per_octave: Number of frequency bins per octave. Default is 12.
    - n_octaves: Number of octaves to analyze. Default is 7.
    - mean: Mean to use for standardization. If None, no standardization is applied. Default is None.
    - std: Standard deviation to use for standardization. If None, no standardization is applied. Default is None.

    Returns:
    - C_dB.T: The Constant-Q Transform of the audio file, with time frames as rows.
    """

    # Load the audio file
    y, sr = librosa.load(audio_file, sr=44100)

    # Calculate the hop length from frameSize and overlapFac
    hop_length = int(frameSize - (overlapFac * frameSize))

    # Compute the Constant-Q Transform (CQT)
    C = librosa.cqt(y, sr=sr, hop_length=hop_length, bins_per_octave=bins_per_octave, n_bins=n_octaves * bins_per_octave)

    # Convert the amplitude to decibels
    C_dB = librosa.amplitude_to_db(abs(C))

    # Apply standardization if mean and std are provided
    if mean is not None and std is not None:
        mean = mean.reshape(-1, 1)  # Reshape mean to be broadcastable
        std = std.reshape(-1, 1)    # Reshape std to be broadcastable
        C_dB = (C_dB - mean) / std

    # Return the CQT with time frames as rows
    return C_dB.T

In [203]:
def find_note_range(file_paths):
    min_note, max_note = float('inf'), 0
    for file_path in file_paths:
        for file in os.listdir(file_path):
            if file.endswith('.csv'):
                labels_df = pd.read_csv(os.path.join(file_path, file))
                min_note = min(min_note, labels_df['note'].min())
                max_note = max(max_note, labels_df['note'].max())
    return min_note, max_note + 1

all_label_paths = [train_labels_dir, test_labels_dir]
min_note_value, max_note_value = find_note_range(all_label_paths)
note_range = max_note_value - min_note_value

In [204]:
print('Min note value:', min_note_value)
print('Max note value:', max_note_value)
print('Note range:', note_range)

Min note value: 21
Max note value: 105
Note range: 84


In [205]:
def generate_labels(label_file, n_frames, frame_duration, min_note_value=21):
    labels_df = pd.read_csv(label_file)
    labels = np.zeros((n_frames, 88), dtype=int)  # 88 piano keys

    for _, row in labels_df.iterrows():
        start_frame = int(row['start_time'] // frame_duration)
        end_frame = int(row['end_time'] // frame_duration)
        note = int(row['note']) - min_note_value
        if 0 <= note < 88:
            labels[start_frame:end_frame, note] = 1

    return labels

In [206]:
import matplotlib.pyplot as plt
from tqdm import tqdm

In [207]:
def process_files(data_dir, label_dir, preprocess_audio, preprocess_labels):
    X, y = [], []
    frame_duration = HOP_LENGTH / 44100  # Hop length divided by sample rate

    for file in tqdm(os.listdir(data_dir), desc=f"Processing label files in {data_dir}"):
        if file.endswith('.wav'):
            cqt = preprocess_audio(os.path.join(data_dir, file))
            n_frames = cqt.shape[0]

            label_file = os.path.join(label_dir, file.replace('.wav', '.csv'))
            labels = preprocess_labels(label_file, n_frames, frame_duration)

            X.append(cqt)
            y.append(labels)

    return X, y


In [208]:
mean, std_dev = calculate_cqt_stats(train_data_dir, generate_cqt)
X_train, y_train = process_files(train_data_dir, train_labels_dir, lambda f: generate_cqt(f, mean=mean, std=std_dev), generate_labels)

mean, std_dev = calculate_cqt_stats(test_data_dir, generate_cqt)
X_test, y_test = process_files(test_data_dir, test_labels_dir, lambda f: generate_cqt(f, mean=mean, std=std_dev), generate_labels)

Calculating CQT stats: 100%|██████████| 320/320 [02:23<00:00,  2.24it/s]
Processing label files in ../Data/musicnet/train_data: 100%|██████████| 320/320 [03:10<00:00,  1.68it/s]
Calculating CQT stats: 100%|██████████| 10/10 [00:02<00:00,  4.06it/s]
Processing label files in ../Data/musicnet/test_data: 100%|██████████| 10/10 [00:02<00:00,  3.50it/s]


In [209]:
print('Number of training examples:', len(X_train))
print('Number of training labels:', len(y_train))
print(f"X_train[0] shape: {X_train[0].shape}")
print(f"y_train[0] shape: {y_train[0].shape}")

Number of training examples: 320
Number of training labels: 320
X_train[0] shape: (19254, 84)
y_train[0] shape: (19254, 88)


In [210]:
for i, sample in enumerate(X_train):
    print(f"Sample {i + 1} shape: {sample.shape}")


Sample 1 shape: (19254, 84)
Sample 2 shape: (10816, 84)
Sample 3 shape: (19144, 84)
Sample 4 shape: (15872, 84)
Sample 5 shape: (23515, 84)
Sample 6 shape: (14024, 84)
Sample 7 shape: (30790, 84)
Sample 8 shape: (21122, 84)
Sample 9 shape: (39817, 84)
Sample 10 shape: (29991, 84)
Sample 11 shape: (33772, 84)
Sample 12 shape: (20486, 84)
Sample 13 shape: (15621, 84)
Sample 14 shape: (9897, 84)
Sample 15 shape: (15999, 84)
Sample 16 shape: (30604, 84)
Sample 17 shape: (20173, 84)
Sample 18 shape: (28234, 84)
Sample 19 shape: (25854, 84)
Sample 20 shape: (18875, 84)
Sample 21 shape: (29139, 84)
Sample 22 shape: (18406, 84)
Sample 23 shape: (12527, 84)
Sample 24 shape: (27880, 84)
Sample 25 shape: (17979, 84)
Sample 26 shape: (13238, 84)
Sample 27 shape: (24902, 84)
Sample 28 shape: (31271, 84)
Sample 29 shape: (27111, 84)
Sample 30 shape: (22102, 84)
Sample 31 shape: (19868, 84)
Sample 32 shape: (13948, 84)
Sample 33 shape: (15140, 84)
Sample 34 shape: (20362, 84)
Sample 35 shape: (21745,

In [211]:
def find_max_length(data_list):
    max_length = max(data.shape[0] for data in data_list)
    return max_length

max_length = max(find_max_length(X_train), find_max_length(X_test))


In [212]:
print('Max length:', max_length)

Max length: 46040


In [213]:
def pad_data(data, max_length):
    padded_data = [np.pad(x, ((0, max_length - x.shape[0]), (0, 0)), 'constant') for x in data]
    return np.array(padded_data)

def pad_labels(labels, max_length):
    padded_labels = [np.pad(y, ((0, max_length - y.shape[0]), (0, 0)), 'constant') for y in labels]
    return np.array(padded_labels)

X_train_padded = pad_data(X_train, max_length)
y_train_padded = pad_labels(y_train, max_length)
X_test_padded = pad_data(X_test, max_length)
y_test_padded = pad_labels(y_test, max_length)


In [214]:
print('X_train_padded shape:', X_train_padded.shape)
print('y_train_padded shape:', y_train_padded.shape)

X_train_padded shape: (320, 46040, 84)
y_train_padded shape: (320, 46040, 88)


In [215]:
# Slice all spectrograms and make 5x84 images from them
X_train = np.zeros((X_train_padded.shape[0] * X_train_padded.shape[1], 5, 84))
X_train_padded = X_train_padded.reshape(-1, 84)
y_train = y_train_padded.reshape(-1, 88)

for i in range(len(X_train_padded)):
    X_train[i, 2, :] = X_train_padded[i]
    
    if i > 1:
        X_train[i, 0] = X_train_padded[i - 2]
        X_train[i, 1] = X_train_padded[i - 1]
    elif i == 1:
        X_train[i, 1] = X_train_padded[i - 1]
        
    if i < len(X_train_padded) - 2:
        X_train[i, 3] = X_train_padded[i + 1]
        X_train[i, 4] = X_train_padded[i + 2]
    elif i == len(X_train_padded) - 2:
        X_train[i, 3] = X_train_padded[i + 1]


In [216]:
print('X_train shape:', X_train.shape)
print('y_train shape:', y_train.shape)

X_train shape: (14732800, 5, 84)
y_train shape: (14732800, 88)


In [217]:
from torch.utils.data import Dataset, DataLoader


class AudioDataset(Dataset):
    def __init__(self):
        self.n_samples = X_train.shape[0]
    
    def __getitem__(self, index):
        return torch.tensor(X_train[index], dtype=torch.float32), torch.tensor(y_train[index], dtype=torch.float32)
    
    def __len__(self):
        return self.n_samples

In [230]:
train_dataset = AudioDataset()
train_loader = DataLoader(dataset=train_dataset, batch_size=512, shuffle=True, num_workers=0)

In [234]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# Define the PyTorch model
class MusicModel(nn.Module):
    def __init__(self):
        super(MusicModel, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=(3, 3))
        self.pool = nn.MaxPool2d((1, 2))
        self.conv2 = nn.Conv2d(32, 64, kernel_size=(3, 3))
        self.flatten = nn.Flatten()
        self.lstm = nn.LSTM(input_size=64, hidden_size=512, batch_first=True)
        self.fc = nn.Linear(512, 88)

    def forward(self, x):
        # Convolutional layers
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))

        # Reshape for LSTM
        # Assuming x is the output of your conv layers with shape [batch_size, channels, height, width]
        x = x.permute(0, 3, 1, 2)  # Change to [batch_size, width, channels, height]
        x = x.reshape(x.size(0), x.size(1), -1)  # Flatten channels and height into a single dimension

        # LSTM layer
        x, _ = self.lstm(x)

        # Final output layer
        x = self.fc(x)
        return torch.sigmoid(x)

In [235]:
# Check if CUDA is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print('Device:', device)

# Move the model to the device (GPU if available, otherwise CPU)
model = MusicModel().to(device)

Device: cuda


In [None]:
from tqdm import tqdm

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Training loop
num_epochs = 5
for epoch in range(num_epochs):
    running_loss = 0.0
    # Wrap train_loader with tqdm for a progress bar
    train_loader_progress = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}")

    for i, (inputs, labels) in enumerate(train_loader_progress):
        # Move input and label data to the same device as the model
        inputs = inputs.reshape(inputs.shape[0], 1, inputs.shape[1], inputs.shape[2]).to(device)
        labels = labels.to(device).long()

        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        # Update progress bar description with the running loss
        train_loader_progress.set_description(f"Epoch {epoch+1}/{num_epochs} Loss: {running_loss/(i+1):.3f}")

    # Print loss every epoch
    print(f'Epoch [{epoch + 1}/{num_epochs}] Loss: {running_loss / len(train_loader):.3f}')

print('Finished Training')

Epoch 1/5 Loss: 2.031: 100%|██████████| 28775/28775 [46:54<00:00, 10.22it/s]  


Epoch [1/5] Loss: 2.031


Epoch 2/5 Loss: 2.031: 100%|██████████| 28775/28775 [50:43<00:00,  9.46it/s]  


Epoch [2/5] Loss: 2.031


Epoch 3/5 Loss: 2.031:  38%|███▊      | 11026/28775 [19:53<31:54,  9.27it/s] 

In [None]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import librosa

class AudioDataset(Dataset):
    def __init__(self, data_dir, label_dir, chunk_size, bins_per_octave, n_octaves, min_note_value, max_length):
        self.data_dir = data_dir
        self.label_dir = label_dir
        self.chunk_size = chunk_size
        self.bins_per_octave = bins_per_octave
        self.n_octaves = n_octaves
        self.min_note_value = min_note_value
        self.max_length = max_length
        self.files = [f for f in os.listdir(data_dir) if f.endswith('.wav')]
        self.chunk_indices = self._create_chunk_indices()

    def _create_chunk_indices(self):
        chunk_indices = []
        for file in self.files:
            audio_file_path = os.path.join(self.data_dir, file)
            y, _ = librosa.load(audio_file_path, sr=44100)
            total_frames = len(y)
            num_chunks = (total_frames - 1) // (self.chunk_size * HOP_LENGTH) + 1
            for chunk_idx in range(num_chunks):
                chunk_indices.append((file, chunk_idx))
        return chunk_indices

    def __len__(self):
        return len(self.chunk_indices)

    def __getitem__(self, idx):
        file, chunk_idx = self.chunk_indices[idx]
        audio_file_path = os.path.join(self.data_dir, file)
        label_file_path = os.path.join(self.label_dir, file.replace('.wav', '.csv'))

        # Load labels just once for the file
        labels = generate_labels(label_file_path, self.max_length, HOP_LENGTH / 44100, self.min_note_value)

        # Process audio file in chunks
        y, _ = librosa.load(audio_file_path, sr=44100)
        start_sample = chunk_idx * self.chunk_size * HOP_LENGTH
        end_sample = min(start_sample + self.chunk_size * HOP_LENGTH, len(y))
        y_chunk = y[start_sample:end_sample]

        # Generate CQT for the chunk
        C = librosa.cqt(y_chunk, sr=44100, hop_length=HOP_LENGTH, bins_per_octave=self.bins_per_octave, n_bins=self.n_octaves * self.bins_per_octave)
        C_dB = librosa.amplitude_to_db(abs(C)).T

        # Correct padding calculation
        labels_chunk = labels[chunk_idx * self.chunk_size : (chunk_idx + 1) * self.chunk_size]

        # Calculate padding based on max_length
        padding = max(self.max_length - C_dB.shape[0], 0)

        # Apply consistent padding
        C_dB_padded = np.pad(C_dB, ((0, padding), (0, 0)), 'constant')
        labels_padded = np.pad(labels_chunk, ((0, padding), (0, 0)), 'constant')

        # Add channel dimension
        C_dB_padded = np.expand_dims(C_dB_padded, axis=0)

        return torch.tensor(C_dB_padded, dtype=torch.float32), torch.tensor(labels_padded, dtype=torch.float32)


In [None]:
chunk_size = 16
batch_size = 8
num_time_steps = 5755 // 4

In [None]:
max_length = 0
for file in os.listdir(train_data_dir):
    if file.endswith('.wav'):
        audio_file_path = os.path.join(train_data_dir, file)
        y, _ = librosa.load(audio_file_path, sr=44100)
        total_frames = len(y)
        num_chunks = (total_frames - 1) // (chunk_size * HOP_LENGTH) + 1
        max_length = max(max_length, num_chunks * chunk_size)
for file in os.listdir(test_data_dir):
    if file.endswith('.wav'):
        audio_file_path = os.path.join(test_data_dir, file)
        y, _ = librosa.load(audio_file_path, sr=44100)
        total_frames = len(y)
        num_chunks = (total_frames - 1) // (chunk_size * HOP_LENGTH) + 1
        max_length = max(max_length, num_chunks * chunk_size)

In [None]:
print('Max length:', max_length)

In [None]:

# Create Dataset and DataLoader instances
train_dataset = AudioDataset(train_data_dir, train_labels_dir, chunk_size, 12, 7, min_note_value, max_length)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

In [None]:
# import torch
# import torch.nn as nn
# import torch.optim as optim
# import torch.nn.functional as F
# 
# # Define the PyTorch model
# class MusicModel(nn.Module):
#     def __init__(self):
#         super(MusicModel, self).__init__()
#         self.conv1 = nn.Conv2d(1, 32, kernel_size=(3, 3), padding='same')
#         self.pool = nn.MaxPool2d((2, 2))
#         self.conv2 = nn.Conv2d(32, 64, kernel_size=(3, 3), padding='same')
#         self.flatten = nn.Flatten()
#         self.lstm = nn.LSTM(input_size=1935360, hidden_size=256, batch_first=True)
#         self.fc = nn.Linear(256, 88)
# 
#     def forward(self, x):
#         # Convolutional layers
#         x = self.pool(F.relu(self.conv1(x)))
#         x = self.pool(F.relu(self.conv2(x)))
# 
#         # Get the dimensions after the final pooling layer
#         batch_size, channels, height, width = x.size()
#         print(f"X shape: {x.shape}")
# 
#         # Flatten
#         x = self.flatten(x)
# 
#         # Calculate number of features for LSTM input
#         num_features = channels * height * width
#         print(f"Number of features: {num_features}")
#         print(f"X shape: {x.shape}")
# 
#         # Reshape for LSTM
#         x = x.view(batch_size, 1, num_features)
#         print(f"X shape: {x.shape}")
# 
#         # LSTM layer
#         x, _ = self.lstm(x)
# 
#         # Final output layer
#         x = self.fc(x)
#         return torch.sigmoid(x)

In [None]:
# Check if CUDA is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print('Device:', device)

# Move the model to the device (GPU if available, otherwise CPU)
model = MusicModel().to(device)

In [None]:
# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 5
for epoch in range(num_epochs):
    running_loss = 0.0
    for i, data in enumerate(train_loader, 0):
        # Move input and label data to the same device as the model
        inputs, labels = data[0].to(device), data[1].to(device)

        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        if i % 100 == 99:    # print every 100 mini-batches
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 100:.3f}')
            running_loss = 0.0

print('Finished Training')


In [None]:
# print('X_train_padded shape:', X_train_padded.shape)
# print('y_train_padded shape:', y_train_padded.shape)

In [None]:
# # Reduce the size of the training set to speed up training
# X_train_padded = X_train_padded[:100]
# y_train_padded = y_train_padded[:100]

In [None]:
# X_train_padded = np.expand_dims(X_train_padded, axis=-1)  # Add a channel dimension
# X_test_padded = np.expand_dims(X_test_padded, axis=-1)    # Add a channel dimension


In [None]:
# print('X_train_padded shape:', X_train_padded.shape)

In [None]:
def data_generator(data_dir, label_dir, batch_size, chunk_size, bins_per_octave, n_octaves, min_note_value):
    while True:
        for file in os.listdir(data_dir):
            if file.endswith('.wav'):
                audio_file_path = os.path.join(data_dir, file)
                label_file_path = os.path.join(label_dir, file.replace('.wav', '.csv'))

                # Load labels just once for the file
                labels = generate_labels(label_file_path, chunk_size, HOP_LENGTH / 44100, min_note_value)

                # Process audio file in chunks
                y, sr = librosa.load(audio_file_path, sr=44100)
                total_frames = len(y)
                num_chunks = (total_frames - 1) // (chunk_size * HOP_LENGTH) + 1

                for chunk_idx in range(num_chunks):
                    start_sample = chunk_idx * chunk_size * HOP_LENGTH
                    end_sample = min(start_sample + chunk_size * HOP_LENGTH, total_frames)
                    y_chunk = y[start_sample:end_sample]

                    # Generate CQT for the chunk
                    C = librosa.cqt(y_chunk, sr=sr, hop_length=HOP_LENGTH, bins_per_octave=bins_per_octave, n_bins=n_octaves * bins_per_octave)
                    C_dB = librosa.amplitude_to_db(abs(C)).T

                    # Correct padding calculation
                    padding = max(chunk_size - C_dB.shape[0], 0)
                    C_dB_padded = np.pad(C_dB, ((0, padding), (0, 0)), 'constant')

                    # Get corresponding labels
                    labels_chunk = labels[chunk_idx * chunk_size : (chunk_idx + 1) * chunk_size]
                    labels_padded = np.pad(labels_chunk, ((0, padding), (0, 0)), 'constant')

                    # Yield data in batches
                    for i in range(0, len(C_dB_padded), batch_size):
                        X_batch = C_dB_padded[i:i + batch_size]
                        y_batch = labels_padded[i:i + batch_size]
                        yield np.expand_dims(X_batch, axis=-1), y_batch


In [None]:
batch_size = 8
chunk_size = 16

In [None]:
# Create generators
train_generator = data_generator(train_data_dir[:50], train_labels_dir[:50], batch_size, chunk_size, 12, 7, min_note_value)
val_generator = data_generator(train_data_dir[50:55], train_labels_dir[50:55], batch_size, chunk_size, 12, 7, min_note_value)

# Calculate steps per epoch for training and validation
steps_per_epoch = 50 // batch_size
validation_steps = 5 // batch_size

# Model training
model = create_music_model(input_shape=(5755, 84, 1))
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

model.fit(
    train_generator,
    steps_per_epoch=steps_per_epoch,
    epochs=5,
    validation_data=val_generator,
    validation_steps=validation_steps,
    verbose=1
)

In [None]:
# Assuming X_train and y_train are your preprocessed CQT data and labels
model = create_music_model(input_shape=X_train_padded.shape[1:])  # Adjust input shape accordingly
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

model.fit(X_train_padded, y_train_padded, batch_size=8, epochs=1, validation_split=0.2, verbose=1)

In [None]:
test_loss = model.evaluate(X_test_padded, y_test_padded, verbose=1)
print("Test Loss:", test_loss)