In [15]:
from pathlib import Path
import sys
from pathlib import Path
sys.path.insert(1, str(Path.cwd().parent))
from tensor_hero.preprocessing.data import get_list_of_ogg_files
import traceback
from tqdm import tqdm
import numpy as np
import torch
import math
import os

def __process_spectrogram(spec):
    '''
    Normalizes spectrogram in [0,1]
    
    ~~~~ ARGUMENTS ~~~~
    - spec (2D numpy array):padded spectrogram, loaded from spectrogram.npy in processed folder
    
    ~~~~ RETURNS ~~~~
    - 2D numpy array : Normalized spectrogram
    '''
    spec = (spec+80) / 80   # Regularize
    return spec

def __notes_to_output_space(notes):
    '''
    Takes a notes array as input, and outputs a matrix of numpy arrays in the output format specified
    by sequence to sequence piano transcription.
    
    ~~~~ ARGUMENTS ~~~~
    - notes (1D numpy array) : notes array
    
    ~~~~ RETURNS ~~~~
    - 2D numpy array:
        - Y axis is sequential note events, with each new row being a time event, then note event
        - X axis is one hot encoded arrays, where the index will be fed into the transformer as output
    '''
    # Get number of notes in array
    num_notes = np.count_nonzero(notes)

    # Convert "218" i.e. open notes to
    notes = np.where(notes == 218, 32, notes)

    # Construct a numpy array of the proper dimensionality
    # 32 positions for the one hot encoded notes, 400 for the absolute time
    formatted = np.zeros(shape=(num_notes*2, 32 + 400))

    # Loop through notes array and populate formatted
    i = 0
    for time_pos, x in enumerate(notes):
        if x != 0:
            formatted[2*i, time_pos+32] = 1  # One hot encode the time step
            formatted[2*i+1, int(x)-1] = 1   # One hot encode the note
            i += 1
            
    return formatted

def __formatted_notes_to_indices(notes):
    '''
    Takes formatted notes and returns a 1D array of indices, reverse one hot operation.
    Helper function for __prepare_notes_tensor()
    ~~~~ ARGUMENTS ~~~~
    notes (1D numpy array): formatted notes, as output by __notes_to_output_space()
    
    ~~~~ RETURNS ~~~~
    indices (1D numpy array): 
        - de-one hot encoded indices of note event series.
        - the format is [time, note, time, note, etc...] 
    '''
    indices = np.argwhere(notes == 1)
    indices = indices[:,-1].flatten()
    return indices

def __prepare_notes_tensor(notes):
    '''
    Takes formatted notes and converts them to the format suitable for PyTorch's transformer model.
    Helper function for populate_model_1_training_data()
    
    ~~~~ ARGUMENTS ~~~~
    - notes (1D numpy array): formatted notes, as output by __notes_to_output_space()
    
    ~~~~ RETURNS ~~~~
    - 1D numpy array : notes with format [<sos>, time, note, time, note, etc..., <eos>]
    '''
    # Concatenate two extra dimensions for SOS and EOS to self.notes
    notes_append = np.zeros(shape=(notes.shape[0], 2))
    notes = np.c_[notes, notes_append]
    # Add a row at the beginning and end of note for <sos>, <eos>
    notes_append = np.zeros(shape=(1,notes.shape[1]))
    notes = np.vstack([notes_append, notes, notes_append])
    # Add proper values to self.notes
    notes[0,-2] = 1  # <sos>
    notes[-1,-1] = 1 # <eos>
    notes = __formatted_notes_to_indices(notes)
    # Note: pytorch tensors don't compress as well as numpy arrays
    # notes = torch.tensor(notes, dtype=torch.float)
    return notes

'''
Takes a directory of processed song level training data and slices each song into 4 second segments.
Splits train, test, and validation at the song level.

File structure looks like

Training_Data
|
|----training_ready
    |----train
    |   |----<song name 1>
    |   |   |----notes
    |   |   |   |----1.npy
    |   |   |   |----2.npy
    |   |   |----spectrograms
    |   |   |   |----1.npy
    |   |   |   |----2.npy
    |   |
    |   |----<song name 2>
    |----test
    |----val
'''

segment_length = 400 # segment length in number of 10ms time slices
training_data_path = Path(r'X:\Training Data')
train_val_test_probs = [0.95, 0.025, 0.025]  # Probabilities of song being placed in [train, val, test]
COLAB = True
unprocessed_path = training_data_path / 'Unprocessed'
train_path = training_data_path / 'training_ready' / 'train'
val_path = training_data_path / 'training_ready' / 'val'
test_path = training_data_path / 'training_ready' / 'test'

# Make directories if they don't exist
if not os.path.isdir(training_data_path / 'training_ready'):
    os.mkdir(training_data_path / 'training_ready')
    print(f'made directory {str(training_data_path / "training_ready")}')
if not os.path.isdir(train_path):
    os.mkdir(train_path)
    print(f'made directory {str(train_path)}')
if not os.path.isdir(val_path):
    os.mkdir(val_path)
    print(f'made directory {str(val_path)}')
if not os.path.isdir(test_path):
    os.mkdir(test_path)
    print(f'made directory {str(test_path)}')
    
_, processed_list = get_list_of_ogg_files(unprocessed_path)

# Get paths of notes and corresponding paths of spectrograms
spec_paths = [song / 'spectrogram.npy' for song in processed_list]
notes_paths = [song / 'notes_simplified.npy' for song in processed_list]


# Used to create the outfile names of the saved slices
# Will also be able to use these in conjunction with "train_key", "test_key", and "val_key"
# to determine which indices go to which song
train_count = 0
val_count = 0
test_count = 0

for i in tqdm(range(len(processed_list))):
    # Process spectrogram
    try:
        spec = np.load(spec_paths[i])
    except FileNotFoundError:
        print('There is no spectrogram at {}'.format(spec_paths[i]))
        continue
    except ValueError as err:
        print(err)
        print(traceback.format_exc())
        continue
    spec = __process_spectrogram(spec)

    # Process notes
    try:
        notes = np.load(notes_paths[i])
    except FileNotFoundError:
        print('There is no notes_simplified at {}'.format(notes_paths[i]))
        continue

    assert notes.shape[0] == spec.shape[1], 'ERROR: Spectrogram and notes shape do not match'
    
    # Get number of segment_length second slices
    num_slices = math.floor(spec.shape[1]/segment_length)
    
    # Split notes and spectrogram into bins
    spec_bins = np.array([spec[:,j*segment_length:(j+1)*segment_length] for j in range(num_slices)])
    notes_bins = np.array([notes[j*segment_length:(j+1)*segment_length] for j in range(num_slices)])
    
    # This list will hold the final note representations ready for the transformer
    final_notes = []
    for j in range(num_slices):
        t_notes = __notes_to_output_space(notes_bins[j,:])
        t_notes = __prepare_notes_tensor(t_notes)
        final_notes.append(t_notes)
    
    # Randomly select whether it goes in train, val, or test based on the desired split
    train_val_test_selection = np.random.choice(3, 1, p=train_val_test_probs)[0]
    if train_val_test_selection == 0:
        prepend_path = train_path
    elif train_val_test_selection == 1:
        prepend_path = val_path
    else:
        prepend_path = test_path

    # Create a folder for the outfile
    if not os.path.isdir(prepend_path / processed_list[i].stem): # Makes a folder with song name in correct subdirectory
        os.mkdir(prepend_path / processed_list[i].stem)

    if not COLAB:
        for j in range(len(final_notes)):
            spec_outfile = prepend_path / processed_list[i].stem / 'spectrograms' / (str(j) + '.npy')
            if not os.path.isdir(spec_outfile.parent):
                os.mkdir(spec_outfile.parent)

            notes_outfile = prepend_path / processed_list[i].stem / 'notes' / (str(j) + '.npy') 
            if not os.path.isdir(notes_outfile.parent):
                os.mkdir(notes_outfile.parent)

            np.save(spec_outfile, spec_bins[j,...].astype('float16')) # change to float16 to reduce hard disk memory
            np.save(notes_outfile, final_notes[j])
    
    else:
        for j in range(len(final_notes)):
            spec_outfile = prepend_path / processed_list[i].stem / 'spectrograms' / str(math.floor(j/40)) / (str(j) + '.npy')
            if not os.path.isdir(spec_outfile.parent.parent):
                os.mkdir(spec_outfile.parent.parent)
            if not os.path.isdir(spec_outfile.parent):
                os.mkdir(spec_outfile.parent)
            
            notes_outfile = prepend_path / processed_list[i].stem / 'notes' / str(math.floor(j/40)) / (str(j) + '.npy') 
            if not os.path.isdir(notes_outfile.parent.parent):
                os.mkdir(notes_outfile.parent.parent)
            if not os.path.isdir(notes_outfile.parent):
                os.mkdir(notes_outfile.parent)

            np.save(spec_outfile, spec_bins[j,...].astype('float16')) # change to float16 to reduce hard disk memory
            np.save(notes_outfile, final_notes[j])
    
    if i > 5:
        break

  0%|          | 0/6 [00:00<?, ?it/s]

made directory X:\Training Data\training_ready
made directory X:\Training Data\training_ready\train
made directory X:\Training Data\training_ready\val
made directory X:\Training Data\training_ready\test


100%|██████████| 6/6 [00:07<00:00,  1.32s/it]


In [52]:
train_val_test_probs = [0.95, 0.025, 0.025]  # Probabilities of song being placed in [train, val, test]
train_val_test_selection = np.random.choice(3, 1, p=train_val_test_probs)[0]
print(train_val_test_selection)

0


In [14]:
np.min_scalar_type(spec_bins[100, 300, 200])

dtype('float16')