# settings

In [None]:
import os
import librosa
import numpy as np
import soundfile as sf
from tqdm import tqdm
import pandas as pd
from datetime import datetime
import torch

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
### GPU 사용 여부 체크
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device} for training.")

Using cuda for training.


# load and map data

In [None]:
org_path = '/content/drive/MyDrive/Hmm2Song/data/song/org.npy'
fh_path = '/content/drive/MyDrive/Hmm2Song/data/song/fh.npy'
fl_path = '/content/drive/MyDrive/Hmm2Song/data/song/fl.npy'
mh_path = '/content/drive/MyDrive/Hmm2Song/data/song/mh.npy'
ml_path = '/content/drive/MyDrive/Hmm2Song/data/song/ml.npy'
n_path = '/content/drive/MyDrive/Hmm2Song/data/song/n.npy'

In [None]:
org = np.load(org_path, allow_pickle=True)
fh = np.load(fh_path, allow_pickle=True)
fl = np.load(fl_path, allow_pickle=True)
mh = np.load(mh_path, allow_pickle=True)
ml = np.load(ml_path, allow_pickle=True)
n = np.load(n_path, allow_pickle=True)
# 5개 불러오는데 약 5분 40초정도 걸렸으니 n까지 만들면 모두 7분 정도에 불러올 수 있을 것 같습니다!

In [None]:
org_fh = list(zip(org, fh))
org_fl = list(zip(org, fl))
org_mh = list(zip(org, mh))
org_ml = list(zip(org, ml))
org_n = list(zip(org, n))

file_map = org_fh + org_fl + org_mh + org_ml + org_n

In [None]:
file_map[0][0].shape

(128, 431)

In [None]:
len(file_map)

90920

# dataset

In [None]:
from torch.utils.data import Dataset, DataLoader
import numpy as np
import torch

class MusicTripletDataset(Dataset):
    def __init__(self, music_files, npy=False):
        """
        Args:
            music_files (list): List of tuples containing paths or identifiers for (original, noised) music pairs.
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.music_files = music_files
        self.npy = npy

    def __len__(self):
        return len(self.music_files)

    def __getitem__(self, idx):
        positive_file, anchor_file = self.music_files[idx]

        # Load the anchor (original music) and positive (noised version) files
        anchor = self.load_spectrogram(anchor_file)
        positive = self.load_spectrogram(positive_file)

        # Choose a negative example. Make sure it's not the same as the anchor.
        negative_idx = np.random.choice([i for i in range(len(self.music_files)) if i != idx])
        negative_file = self.music_files[negative_idx][0]  # Choosing the original as a negative example
        negative = self.load_spectrogram(negative_file)

        return anchor, positive, negative

    def load_spectrogram(self, spectrogram):
        if not isinstance(spectrogram, torch.Tensor):
            spectrogram = torch.tensor(spectrogram)
        if spectrogram.shape != (128, 431): # 길이가 짧게 잘린 음원에 패딩을 0으로 넣어 모양을 맞춰줍니다
            pad = (0, 431 - spectrogram.shape[1])
            spectrogram = F.pad(spectrogram, pad, "constant", 0)
        return spectrogram

In [None]:
# Assume you have a list of tuples containing your (original, noised) music file paths or identifiers
music_files = file_map

# Instantiate your custom dataset
dataset = MusicTripletDataset(music_files, npy=True) # npy=True는 저장된 npy를 불러온다는 뜻

# Create a DataLoader
batch_size = 64  # You can adjust the batch size
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)


# modeling

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ConvSubnet(nn.Module):
    def __init__(self, embedding_dims=128):
        super(ConvSubnet, self).__init__()
        self.conv1 = nn.Conv2d(1, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        # Adjust the size accordingly
        self.fc = nn.Linear(848, embedding_dims)

    def forward(self, x):
        # print(x.shape)
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = x.view(x.size(0), -1)  # Flatten

        x = self.fc(x)
        return x

class TripletNetwork(nn.Module):
    def __init__(self):
        super(TripletNetwork, self).__init__()
        self.subnetwork = ConvSubnet()

    def forward(self, anchor, positive, negative):
        # print(anchor.shape)
        anchor, positive, negative = anchor.unsqueeze(0)[:, 0, :, :], positive.unsqueeze(0)[:, 0, :, :], negative.unsqueeze(0)[:, 0, :, :]
        # print(anchor.shape)
        # Process each of the inputs through the same network
        embedded_anchor = self.subnetwork(anchor)
        embedded_positive = self.subnetwork(positive)
        embedded_negative = self.subnetwork(negative)
        return embedded_anchor, embedded_positive, embedded_negative

In [None]:
# Initialize the TripletMarginLoss
margin = 1.0  # You can adjust this margin
triplet_loss = nn.TripletMarginLoss(margin=margin, p=2)

# Example usage in a training loop
model = TripletNetwork().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
num_epochs = 5
checkpoint = 1
now = datetime.now().strftime('%Y%m%d_%H%M%S')
model_dir = f'/content/drive/MyDrive/Hmm2Song/model/{now}/'
log_steps = 50
os.mkdir(model_dir)

step = 0
for epoch in range(num_epochs):
    for anchor, positive, negative in dataloader:
        step += 1
        optimizer.zero_grad()
        embedded_anchor, embedded_positive, embedded_negative = model(anchor, positive, negative)
        loss = triplet_loss(embedded_anchor, embedded_positive, embedded_negative)
        loss.backward()
        optimizer.step()
        if step % log_steps == 0:
            print(f'Epoch {epoch+1}, Step {step}, Loss: {loss.item()}')
        if epoch % checkpoint == 0:
            torch.save(model.state_dict(), model_dir + f'model_weights_{epoch+1}.pth')

    print(f'Epoch {epoch+1}, Loss: {loss.item()}')


# inference

In [None]:
model = TripletNetwork().to(device)
model.load_state_dict(torch.load('/content/drive/MyDrive/Hmm2Song/model/20240213_134845/model_weights_5.pth'))

<All keys matched successfully>

In [None]:
model.eval()

tensor = []
for spectrogram in tqdm(org):
    if not isinstance(spectrogram, torch.Tensor):
        spectrogram = torch.tensor(spectrogram)
    if spectrogram.shape != (128, 431): # 길이가 짧게 잘린 음원에 패딩을 0으로 넣어 모양을 맞춰줍니다
        pad = (0, 431 - spectrogram.shape[1])
        spectrogram = F.pad(spectrogram, pad, "constant", 0)
    with torch.no_grad():  # No need to track gradients during inference
        tensor.append(model.subnetwork(spectrogram.unsqueeze(0).to(device)))

100%|██████████| 22730/22730 [00:34<00:00, 653.46it/s]


In [None]:
model.eval()  # Set the model to evaluation mode

def create_melspec(filename):
    x, sr = librosa.load(filename)
    melspec = librosa.feature.melspectrogram(y=x, sr=sr)
    log_melspec = librosa.power_to_db(melspec)
    return log_melspec

noise_mel = []
origin_mel = []
for file in file_map:
  noise_mel.append(create_melspec(file[1]))
  origin_mel.append(create_melspec(file[0]))

mel_zip = list(zip(origin_mel, noise_mel))

print(np.array(np.array(mel_zip[0][0])).shape)

noised_tensor = []
origin_tensor = []
for file in mel_zip:
    noised_tensor.append(torch.tensor(file[1].reshape(1, 128, 431), dtype=torch.float))
    origin_tensor.append(torch.tensor(file[0].reshape(1, 128, 431), dtype=torch.float))

embedded_noised = []
original_embeddings = []
for i in range(len(noised_tensor)):
    # print(ntensor.shape)
    with torch.no_grad():  # No need to track gradients during inference
        embedded_noised.append(model.subnetwork(noised_tensor[i]))
        original_embeddings.append(model.subnetwork(origin_tensor[i]))

(128, 431)


In [None]:
original_ids = np.array([14, 13, 8, 10, 6, 12, 11, 7, 2, 5, 9, 1, 4, 0, 3])

In [None]:
original_embeddings[0].shape

torch.Size([256, 128])

In [None]:
embedded_noised[0].shape

torch.Size([256, 128])

In [None]:
torch.norm(original_embeddings[0] - embedded_noised[0])

tensor(161.0320)

In [None]:
# Assuming 'original_embeddings' is a tensor containing embeddings of original music pieces
# and 'original_ids' is a list containing identifiers for each original music piece
distances = [torch.norm(original_embeddings[i] - embedded_noised[7]) for i in range(len(original_embeddings))]  # Calculate distances to all originals
closest_match_idx = torch.argmin(torch.tensor(distances))  # Find the index of the closest original music piece
closest_match_id = original_ids[closest_match_idx]  # Retrieve the identifier of the closest match

print(f'The closest original music piece to the noised input is: {closest_match_id}')

The closest original music piece to the noised input is: 7


In [None]:
distances

[tensor(236.1041),
 tensor(232.8732),
 tensor(299.4578),
 tensor(263.0068),
 tensor(239.7916),
 tensor(245.4198),
 tensor(237.6523),
 tensor(154.4806),
 tensor(256.7777),
 tensor(252.1364),
 tensor(268.6747),
 tensor(234.1603),
 tensor(243.7114),
 tensor(239.4623),
 tensor(416.5758)]