In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class DeepRhythmModel(nn.Module):
    def __init__(self, num_classes):
        super(DeepRhythmModel, self).__init__()

        # Assuming the input shape is (240, 8, 6) which is (φ, b, h)
        self.num_classes = num_classes

        # Updated to reflect the simplified flattening size of 5760
        self.conv1 = nn.Conv2d(in_channels=6, out_channels=128, kernel_size=(4, 6), padding='same')
        self.bn1 = nn.BatchNorm2d(128)

        self.conv2 = nn.Conv2d(in_channels=128, out_channels=64, kernel_size=(4, 6), padding='same')
        self.bn2 = nn.BatchNorm2d(64)

        self.conv3 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=(4, 6), padding='same')
        self.bn3 = nn.BatchNorm2d(64)

        self.conv4 = nn.Conv2d(in_channels=64, out_channels=32, kernel_size=(4, 6), padding='same')
        self.bn4 = nn.BatchNorm2d(32)

        # The kernel size for the last convolutional layer covers the entire φ dimension
        self.conv5 = nn.Conv2d(in_channels=32, out_channels=8, kernel_size=(120, 6))
        self.bn5 = nn.BatchNorm2d(8)

        # The output of the last convolutional layer is (1, 1, 8), so after flattening it becomes 8*1*1 = 8
        self.fc1 = nn.Linear(2904, 256)  # The flattening size is now 8*1*6 = 48
        self.elu = nn.ELU()

        self.dropout = nn.Dropout(0.5)

        self.fc2 = nn.Linear(256, num_classes)

        # self.softmax = nn.Softmax(dim=1)

        # Initialize weights
        self._initialize_weights()

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = F.relu(self.bn4(self.conv4(x)))
        x = F.relu(self.bn5(self.conv5(x)))

        x = x.reshape(x.size(0), -1)  # Flatten the tensor

        x = self.dropout(self.elu(self.fc1(x)))

        x = self.fc2(x)
        # x = self.softmax(x)

        return x

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.xavier_normal_(m.weight)
                nn.init.constant_(m.bias, 0)

# Number of classes based on the problem specification
num_classes = 256

# Instantiate the simplified model
simplified_model = DeepRhythmModel(num_classes)

# Check simplified model architecture
print(simplified_model)


DeepRhythmModel(
  (conv1): Conv2d(6, 128, kernel_size=(4, 6), stride=(1, 1), padding=same)
  (bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2): Conv2d(128, 64, kernel_size=(4, 6), stride=(1, 1), padding=same)
  (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv3): Conv2d(64, 64, kernel_size=(4, 6), stride=(1, 1), padding=same)
  (bn3): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv4): Conv2d(64, 32, kernel_size=(4, 6), stride=(1, 1), padding=same)
  (bn4): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv5): Conv2d(32, 8, kernel_size=(120, 6), stride=(1, 1))
  (bn5): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (fc1): Linear(in_features=2904, out_features=256, bias=True)
  (elu): ELU(alpha=1.0)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc2): Linear(in_features=256, out_features

In [20]:
dummy_input = torch.randn(1, 6, 240, 8)  # batch size of 1 for testing

# Move the model to an appropriate device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = simplified_model.to(device)
dummy_input = dummy_input.to(device)

# Perform a forward pass with the dummy input
try:
    dummy_output = model(dummy_input)
    print("Dummy output shape:", dummy_output.shape)
    print("Model setup is valid and forward pass works.")
except Exception as e:
    print("Model setup has issues:", e)

Dummy output shape: torch.Size([1, 256])
Model setup is valid and forward pass works.


In [3]:
import h5py
from torch.utils.data import Dataset, DataLoader
import torch
from torch.utils.data.dataset import random_split

def bpm_to_class(bpm, min_bpm=30, max_bpm=286, num_classes=256):
    """Map a BPM value to a class index."""
    # Linearly map BPM values to class indices
    class_width = (max_bpm - min_bpm) / num_classes
    class_index = int((bpm - min_bpm) // class_width)
    return max(0, min(num_classes - 1, class_index))

def class_to_bpm(class_index, min_bpm=30, max_bpm=286, num_classes=256):
    """Map a class index back to a BPM value (to the center of the class interval)."""
    class_width = (max_bpm - min_bpm) / num_classes
    bpm = min_bpm + class_width * (class_index + 0.5)
    return bpm

class HDF5Dataset(Dataset):
    def __init__(self, hdf5_file, transform=None):
        self.file_path = hdf5_file
        self.transform = transform
        with h5py.File(self.file_path, 'r') as file:
            self.items = []
            for group_name in file.keys():
                group = file[group_name]
                for item_name in group.keys():
                    self.items.append((group_name, item_name))

    def __len__(self):
        return len(self.items)

    def __getitem__(self, idx):
        group_name, item_name = self.items[idx]
        with h5py.File(self.file_path, 'r') as file:
            item = file[group_name][item_name]
            data = torch.tensor(item['hcqm'][:], dtype=torch.float)
            bpm = torch.tensor([item.attrs['bpm']], dtype=torch.int)
        label_class_index = bpm_to_class(bpm)  # Convert BPM to class index
        data = data.permute(2, 0, 1)
        return data, label_class_index

def split_dataset(dataset, train_ratio, test_ratio, validate_ratio):
    total_ratio = train_ratio + test_ratio + validate_ratio
    assert abs(total_ratio - 1) < 1e-6, "Ratios must sum to 1"

    dataset_size = len(dataset)
    train_size = int(train_ratio * dataset_size)
    test_size = int(test_ratio * dataset_size)
    validate_size = dataset_size - train_size - test_size

    train_dataset, test_dataset, validate_dataset = random_split(dataset, [train_size, test_size, validate_size])
    return train_dataset, test_dataset, validate_dataset

# Example usage
dataset = HDF5Dataset('output_data2.hdf5')
# train_dataset, test_dataset, validate_dataset = split_dataset(dataset, 0.7, 0.2, 0.1)

# train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
# test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)
# validate_loader = DataLoader(validate_dataset, batch_size=32, shuffle=True)

# # Now you have DataLoaders for training, testing, and validation.
# len(train_dataset.indices), len(test_dataset.indices), len(validate_dataset.indices)

Data batch shape: torch.Size([32, 240, 8, 6])
Labels batch shape: torch.Size([32, 1])
First few labels: tensor([[170],
        [128],
        [189],
        [ 74],
        [157]], dtype=torch.int32)


In [29]:
len(dataset.items)

46979

In [30]:
import json

def save_split_indices(train_dataset, test_dataset, validate_dataset, filename="dataset_splits.json"):
    # Extract indices from the subsets
    splits = {
        'train_indices': train_dataset.indices,
        'test_indices': test_dataset.indices,
        'validate_indices': validate_dataset.indices
    }
    # Save to JSON file
    with open(filename, 'w') as f:
        json.dump(splits, f)

# Assuming you've already created train_dataset, test_dataset, and validate_dataset
save_split_indices(train_dataset, test_dataset, validate_dataset)


In [15]:
from torch.utils.data import Subset
import json
def load_split_datasets(dataset, filename="dataset_splits.json"):
    # Load the saved indices
    with open(filename, 'r') as f:
        splits = json.load(f)

    # Recreate the subsets using the loaded indices
    train_dataset = Subset(dataset, splits['train_indices'])
    test_dataset = Subset(dataset, splits['test_indices'])
    validate_dataset = Subset(dataset, splits['validate_indices'])

    return train_dataset, test_dataset, validate_dataset

# Assuming dataset is your full HDF5Dataset instance
train_dataset, test_dataset, validate_dataset = load_split_datasets(dataset)

# Now you can create DataLoader instances as before
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)
validate_loader = DataLoader(validate_dataset, batch_size=32, shuffle=True)

In [14]:
len(train_dataset.indices), len(test_dataset.indices), len(validate_dataset.indices)

(32885, 9395, 4699)

In [15]:
# Load a single batch from the dataloader
data_iter = iter(train_loader)
data, labels = next(data_iter)

# Print the shapes of the data and labels to verify
print(f"Data batch shape: {data.shape}")  # Should be [batch_size, 240, 8, 6]
print(f"Labels batch shape: {labels.shape}")  # Shape depends on how you've set up labels

# Print the first few labels to check they're loaded correctly
print(f"First few labels: {labels[:5]}")

Data batch shape: torch.Size([32, 240, 8, 6])
Labels batch shape: torch.Size([32])
First few labels: tensor([ 60, 102,  92, 174,  82])


In [22]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

# Assuming 'HDF5Dataset' is your dataset class and 'model' is your CNN model instance
hdf5_file_path='output_data2.hdf5'
dataset = HDF5Dataset(hdf5_file_path)
train_dataset, test_dataset, validate_dataset = load_split_datasets(dataset)

train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True)
validate_loader = DataLoader(validate_dataset, batch_size=256, shuffle=False)

# Move the model to the appropriate device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Loss function
criterion = nn.CrossEntropyLoss()

# Optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.999), eps=1e-8)

# Early stopping setup
early_stopping_patience = 5
early_stopping_counter = 0
best_validate_loss = float('inf')

# Training loop
num_epochs = 20
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    # Validation loop
    model.eval()
    validate_loss = 0.0
    with torch.no_grad():
        for inputs, labels in validate_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            validate_loss += loss.item()

    average_train_loss = running_loss / len(train_loader)
    average_validate_loss = validate_loss / len(validate_loader)

    print(f"Epoch {epoch+1}, Train Loss: {average_train_loss:.4f}, Validate Loss: {average_validate_loss:.4f}")

    # Check for early stopping
    if average_validate_loss < best_validate_loss:
        best_validate_loss = average_validate_loss
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1
        if early_stopping_counter >= early_stopping_patience:
            print("Early stopping triggered.")
            break


Epoch 1, Train Loss: 3.6976, Validate Loss: 2.9462
Epoch 2, Train Loss: 3.0090, Validate Loss: 2.8088
Epoch 3, Train Loss: 2.7967, Validate Loss: 2.7384
Epoch 4, Train Loss: 2.6433, Validate Loss: 2.6563
Epoch 5, Train Loss: 2.5019, Validate Loss: 2.6279
Epoch 6, Train Loss: 2.3472, Validate Loss: 2.5916
Epoch 7, Train Loss: 2.1786, Validate Loss: 2.6046
Epoch 8, Train Loss: 2.0020, Validate Loss: 2.6053
Epoch 9, Train Loss: 1.8201, Validate Loss: 2.6179
Epoch 10, Train Loss: 1.6141, Validate Loss: 2.6340
Epoch 11, Train Loss: 1.4168, Validate Loss: 2.7615
Early stopping triggered.


In [29]:
model_path = 'deeprhythm0.1.pth'
torch.save(model.state_dict(), model_path)

In [10]:
model = DeepRhythmModel(256)

# Load the weights
model.load_state_dict(torch.load('deeprhythm0.1.pth'))
model = model.cpu() #.to(device='cuda')
# Ensure to call model.eval() to set dropout and batch normalization layers to evaluation mode
model.eval()

DeepRhythmModel(
  (conv1): Conv2d(6, 128, kernel_size=(4, 6), stride=(1, 1), padding=same)
  (bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2): Conv2d(128, 64, kernel_size=(4, 6), stride=(1, 1), padding=same)
  (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv3): Conv2d(64, 64, kernel_size=(4, 6), stride=(1, 1), padding=same)
  (bn3): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv4): Conv2d(64, 32, kernel_size=(4, 6), stride=(1, 1), padding=same)
  (bn4): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv5): Conv2d(32, 8, kernel_size=(120, 6), stride=(1, 1))
  (bn5): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (fc1): Linear(in_features=2904, out_features=256, bias=True)
  (elu): ELU(alpha=1.0)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc2): Linear(in_features=256, out_features

In [11]:
import librosa

def load_and_split_audio(filename, sr=22050, clip_length=8):
    """
    Load an audio file, split it into 8-second clips, and return a single tensor of all clips.

    Parameters:
    - filename: Path to the audio file.
    - sr: Sampling rate to use for loading the audio.
    - clip_length: Length of each clip in seconds.

    Returns:
    A tensor of shape [clips, audio] where each row is an 8-second clip.
    """

    clips = []
    clip_samples = sr * clip_length
    try:
        audio, _ = librosa.load(filename, sr=sr)
        for i in range(0, len(audio), clip_samples):
            if i + clip_samples <= len(audio):
                clip_tensor = torch.tensor(audio[i:i + clip_samples], dtype=torch.float32)
                clips.append(clip_tensor)
    except Exception as e:
        print(e, filename)

    # Stack all clips along a new dimension to form a single tensor
    if clips:
        stacked_clips = torch.stack(clips, dim=0)
    else:
        # Return an empty tensor if no clips were created (file is shorter than clip_length)
        return None

    # Share memory of the stacked clips tensor
    stacked_clips.share_memory_()

    return stacked_clips


In [13]:
from hcqm import make_specs, compute_hcqm

def predict_global_bpm(model, input_path):
    clips = load_and_split_audio(input_path, sr=22050)
    sr=22050
    model_device = next(model.parameters()).device
    len_audio = sr*8
    stft, band, cqt = make_specs(len_audio, sr, device=model_device)
    input_batch = compute_hcqm(clips.to(device=model_device), stft, band, cqt).permute(0,3,1,2)
    print(input_batch.shape)
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():
        # Ensure the batch is on the same device as the model
        input_batch = input_batch.to(device=model_device)
        outputs = model(input_batch)

        # Apply softmax to convert logits to probabilities
        probabilities = torch.softmax(outputs, dim=1)

        # Compute the average probability across the batch for each class
        mean_probabilities = probabilities.mean(dim=0)

        # Find the class with the maximum average probability
        _, predicted_class = torch.max(mean_probabilities, 0)
        predicted_global_bpm = class_to_bpm(predicted_class.item())

    return predicted_global_bpm

predict_global_bpm(model,'data/Anime World.flac')

STFT kernels created, time used = 0.0440 seconds
CQT kernels created, time used = 0.0263 seconds
CQT kernels created, time used = 0.0190 seconds
CQT kernels created, time used = 0.0178 seconds
CQT kernels created, time used = 0.0143 seconds
CQT kernels created, time used = 0.0138 seconds
CQT kernels created, time used = 0.0144 seconds
torch.Size([24, 6, 240, 8])


  return F.conv2d(input, weight, bias, self.stride,


162.5

In [14]:
device = next(model.parameters()).device
print(device)

cuda:0


In [16]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

# Assuming 'HDF5Dataset' is your dataset class and 'model' is your CNN model instance
hdf5_file_path='output_data2.hdf5'
dataset = HDF5Dataset(hdf5_file_path)
train_dataset, test_dataset, validate_dataset = load_split_datasets(dataset)

test_loader = DataLoader(test_dataset, batch_size=256, shuffle=True)


In [23]:

def calc_acc1():
    model = DeepRhythmModel(256)
    # Load the weights
    model.load_state_dict(torch.load('deeprhythm0.1.pth'))
    model = model.to(device='cuda')
    # Ensure to call model.eval() to set dropout and batch normalization layers to evaluation mode
    model.eval()
    device = 'cuda'
    correct = 0
    total = 0
    with torch.no_grad():  # Disable gradient computation
        for inputs, labels in test_loader:  # Assuming 'test_loader' is your DataLoader
            inputs, labels = inputs.to(device), labels.to(device)  # Move data to the same device as model

            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)  # Get the index of the max log-probability
            # Convert predicted and true class indices to BPM
            predicted_bpms = torch.tensor([class_to_bpm(pred.item()) for pred in predicted], device=device)
            true_bpms = torch.tensor([class_to_bpm(label.item()) for label in labels], device=device)

            # Calculate the acceptable range based on 4% of the true BPMs
            lower_bounds = true_bpms * 0.96  # 4% lower
            upper_bounds = true_bpms * 1.04  # 4% higher

            # Determine which predictions are correct within the acceptable range
            correct_preds = (predicted_bpms >= lower_bounds) & (predicted_bpms <= upper_bounds)
            correct += correct_preds.sum().item()
            total += labels.size(0)

    accuracy = 100 * correct / total
    print(f'Acc1 on the test set: {accuracy:.2f}%')

In [21]:
def calc_acc2():
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            probabilities = F.softmax(outputs, dim=1)
            _, predicted_classes = torch.max(probabilities, dim=1)

            # Convert predicted and true class indices to BPM
            predicted_bpms = torch.tensor([class_to_bpm(pred.item()) for pred in predicted_classes], device=device)
            true_bpms = torch.tensor([class_to_bpm(label.item()) for label in labels], device=device)

            # Initialize a tensor to track correctness
            correct_preds = torch.zeros_like(predicted_bpms, dtype=torch.bool)

            # Check against multiples of the true BPM
            for multiple in [0.5, 1, 2]:  # Consider 1/2x, 1x, and 2x the true BPM
                adjusted_true_bpms = true_bpms * multiple
                lower_bounds = adjusted_true_bpms * 0.96
                upper_bounds = adjusted_true_bpms * 1.04

                # Update correct predictions
                correct_mask = (predicted_bpms >= lower_bounds) & (predicted_bpms <= upper_bounds)
                correct_preds = correct_preds | correct_mask

            correct += correct_preds.sum().item()
            total += labels.size(0)

    accuracy = 100 * correct / total
    print(f'Acc2 (considering multiples): {accuracy:.2f}%')


In [24]:
calc_acc1()
calc_acc2()

Acc1 on the test set: 64.54%
Acc2 (considering multiples): 73.34%
