In [1]:
import yaml
import numpy as np
import os, sys, gc
import soundfile as sf
from scipy.signal import stft
from tqdm import tqdm
import torch
from multiprocessing import Pool
# If you have a GPU, put the data on the GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

directory = "../Data/slakh2100_flac_redux/train"
listdir = os.listdir(directory)
listdir.sort()
format = '.flac'
savedir = 'data/train'
sample_freq = 44100
freq_amount = 129
mixing = True
mix_amount = 2
np.random.seed(0)
amount = 100
L = freq_amount
C = 2
NUMBER_OF_ITERATIONS = 2


In [2]:
def data_dictS(N):
    for _ in range(N):
        tr = np.random.choice(listdir)
        tr_dict = {}
        tr_path = os.path.join(directory, tr)
        with open(os.path.join(tr_path, "metadata.yaml")) as meta:
            metadata = yaml.safe_load(meta)

        file_inst = np.array([[stem + format, metadata['stems'][stem]['inst_class']] for stem in metadata['stems'].keys()])
        file_inst = file_inst[[file_inst[:,0][i] in os.listdir(os.path.join(tr_path, 'stems')) for i in range(len(file_inst))]]
        # combine all stems from the same instrument and track using soundfile
        for inst in np.unique(file_inst[:,1]):
            inst_files = file_inst[file_inst[:,1] == inst][:,0]
            inst_files = [os.path.join(tr_path, 'stems', inst_file) for inst_file in inst_files]
            inst_data = np.array([sf.read(inst_file)[0] for inst_file in inst_files])
            inst_data = np.sum(inst_data, axis=0)
            # add the combined data to the dictionary
            tr_dict.update({inst: inst_data})
        
        
        # Choose a random amount of instruments to combine (between 2 and lenght-1)
        if mixing:
            # Find all instruments that are in the dictionary
            inst_in_dict = list(tr_dict.keys())
            inst_amount = np.random.randint(2, len(inst_in_dict), size=mix_amount)
            for i in range(mix_amount):
                # Choose the instruments to combine
                inst_to_mix = np.random.choice(inst_in_dict, inst_amount[i], replace=False)
                # Add the combined data to the dictionary
                tr_dict.update({''.join(inst_to_mix):  np.sum([tr_dict[inst] for inst in inst_to_mix], axis=0)})

        # Add mix to the dictionary
        inst_data = np.array([sf.read(os.path.join(tr_path, 'mix' + format))[0]])
        tr_dict = {'mix': inst_data}

        
        tr_dicts_2 = {inst: torch.view_as_real_copy(torch.from_numpy(stft(tr_dict[inst], fs=sample_freq, nperseg=freq_amount*2-2)[2]).to(device)) for inst in tr_dict.keys()}
        yield tr_dicts_2

In [8]:
for data in data_dictS(NUMBER_OF_ITERATIONS):
    dat = {}
    lenght = data['mix'].shape[2]
    for inst in data.keys():
        dat[inst] = torch.zeros(amount,L,2*C+1,2, dtype=torch.float64)
        for j, rand in enumerate(torch.randint(0, lenght, (amount,))):
            if rand < C:
                dat[inst][j][:,0] = torch.zeros(L, 2)
                i = 1
                while rand-C+i < 0:
                    dat[inst][j][:,i] = torch.zeros(L, 2)
                    i+=1
                dat[inst][j][:,i:] = data[inst][0][:, 0:rand+C+1]
            elif rand > lenght-1-C:
                dat[inst][j][:,-1] = torch.zeros(L, 2)
                i = 1
                while rand+C-i > lenght-1:
                    dat[inst][j][:,-1-i] = torch.zeros(L, 2)
                    i+=1
                dat[inst][j][:,:-1-i] = data[inst][0][:, rand-C:-1]
            else:
                dat[inst][j] = data[inst][0][:, rand-C:rand+C+1]

for j, rand in enumerate(torch.randint(0, lenght, (amount,))):
    if rand < C:
        dat[j][0] = torch.zeros(L, 2)
        i = 1
        while rand-C+i < 0:
            dat[j][i] = torch.zeros(L, 2)
            i+=1
        dat[j][i:] = data['mix'][:, rand-C+i:rand+C+1]
    elif rand > lenght-1-C:
        dat[j][-1] = torch.zeros(L, 2)
        i = 1
        while rand+C-i > lenght-1:
            dat[j][-1-i] = torch.zeros(L, 2)
            i+=1
        dat[j][:-1-i] = data['mix'][:, rand-C:rand+C+1-i]
    else:
        dat[j] = data['mix'][:, rand-C:rand+C+1]

tr = np.random.choice(listdir)
tr_dicts_2 = {}
tr_dict = {}
tr_path = os.path.join(directory, tr)
with open(os.path.join(tr_path, "metadata.yaml")) as meta:
    metadata = yaml.load(meta, Loader=yaml.Loader)

file_inst = []
for stem in metadata['stems'].keys():
    file_inst.append([stem + format, metadata['stems'][stem]['inst_class']])
file_inst = np.array(file_inst)
file_inst = file_inst[[file_inst[:,0][i] in os.listdir(os.path.join(tr_path, 'stems')) for i in range(len(file_inst))]]
# combine all stems from the same instrument and track using soundfile
for inst in np.unique(file_inst[:,1]):
    inst_files = file_inst[file_inst[:,1] == inst][:,0]
    inst_files = [os.path.join(tr_path, 'stems', inst_file) for inst_file in inst_files]
    inst_data = np.array([sf.read(inst_file)[0] for inst_file in inst_files])
    inst_data = np.sum(inst_data, axis=0)
    # add the combined data to the dictionary
    tr_dict.update({inst: inst_data})


# Choose a random amount of instruments to combine (between 2 and lenght-1)
if mixing:
    # Find all instruments that are in the dictionary
    inst_in_dict = list(tr_dict.keys())
    for _ in range(mix_amount):
        # Choose the instruments to combine
        inst_amount = np.random.randint(2, len(inst_in_dict))
        inst_to_mix = np.random.choice(inst_in_dict, inst_amount, replace=False)
        # Add the combined data to the dictionary
        tr_dict.update({''.join(inst_to_mix):  np.sum([tr_dict[inst] for inst in inst_to_mix], axis=0)})

    # Add mix to the dictionary
inst_data = np.array([sf.read(os.path.join(tr_path, 'mix' + format))[0]])
tr_dict = {'mix': inst_data}

for inst in tr_dict.keys():
    # compute the STFT of the combined data
    f, t, Zxx = stft(tr_dict[inst], fs=sample_freq, nperseg=freq_amount*2-2)
    tr_dicts_2.update({inst: torch.view_as_real_copy(torch.tensor(Zxx, dtype=torch.complex128, device=device))})