In [None]:
import numpy as np
import os
import librosa
import pandas as pd
from scipy.io import wavfile

# Constants
HOP_LENGTH = 512
NBINS = 252
BINS_OCTAVES_IN = 36
WINDOW_STEP = 0.01
NUM_NOTES = 88
LENGTH_PER_FILE = 2000000
SOURCE_FOLDER = "/media/scar-face/Windows/Hopefully the final implementation/CQT/train"
OUTPUT_DIR = "/media/scar-face/Windows/Hopefully the final implementation/CQT/train/"
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Global arrays
train_to_npyarray, labels_to_npyarray = [], []
counter_var = 0

def get_folder_name(path):
    return os.path.basename(os.path.normpath(path))

def process_wav_file(wav_path, sampling_freq):
    try:
        _, stereo_vector = wavfile.read(wav_path)
        mono_vector = np.mean(stereo_vector, axis=1)
        cqt_features = np.abs(librosa.cqt(mono_vector, sr=sampling_freq, hop_length=HOP_LENGTH, n_bins=NBINS, bins_per_octave=BINS_OCTAVES_IN)).T
        return cqt_features, np.arange(1, cqt_features.shape[0] + 1) * (HOP_LENGTH / float(sampling_freq))
    except Exception as e:
        print(f"Error reading {wav_path}: {e}")
        return None, None

def process_csv_file(csv_path, vector_aux, num_frames):
    labels = np.zeros((num_frames, NUM_NOTES))
    try:
        df = pd.read_csv(csv_path, sep='\t')
        if {'OnsetTime', 'OffsetTime', 'MidiPitch'}.issubset(df.columns):
            for _, row in df.iterrows():
                pitch_idx = int(row['MidiPitch']) - 21
                onset_idx = np.searchsorted(vector_aux, row['OnsetTime'])
                offset_idx = np.searchsorted(vector_aux, row['OffsetTime'] - 0.01)
                labels[onset_idx:offset_idx, pitch_idx] = 1
        else:
            print(f"CSV missing required columns: {csv_path}")
    except FileNotFoundError:
        print(f"CSV file not found: {csv_path}")
    return labels

def save_data(counter, data, labels, folder_name):
    np.save(os.path.join(OUTPUT_DIR, f"{counter}{folder_name}_X"), data)
    np.save(os.path.join(OUTPUT_DIR, f"{counter}{folder_name}_y"), labels)
    print(f"Saved data and labels with shape {data.shape} to {counter}{folder_name}_X.npy and {counter}{folder_name}_y.npy")

def process_directory(source_folder):
    global train_to_npyarray, labels_to_npyarray, counter_var
    folder_name = get_folder_name(source_folder)
    for root, _, files in os.walk(source_folder):
        for filename in filter(lambda f: f.endswith('.wav'), files):
            wav_path = os.path.join(root, filename)
            csv_path = os.path.splitext(wav_path)[0] + '.csv'
            
            if not os.path.exists(csv_path):
                print(f"CSV not found for {wav_path}. Skipping.")
                continue

            print(f"Processing {wav_path}")
            sampling_freq, _ = wavfile.read(wav_path)
            cqt_features, vector_aux = process_wav_file(wav_path, sampling_freq)
            if cqt_features is None:
                continue

            labels = process_csv_file(csv_path, vector_aux, cqt_features.shape[0])

            while (len(train_to_npyarray) + len(cqt_features)) >= LENGTH_PER_FILE:
                size_to_add = LENGTH_PER_FILE - len(train_to_npyarray)
                train_to_npyarray.extend(cqt_features[:size_to_add])
                labels_to_npyarray.extend(labels[:size_to_add])
                save_data(counter_var, np.array(train_to_npyarray), np.array(labels_to_npyarray), folder_name)
                counter_var += 1
                train_to_npyarray, labels_to_npyarray = [], []
                cqt_features, labels = cqt_features[size_to_add:], labels[size_to_add:]

            train_to_npyarray.extend(cqt_features)
            labels_to_npyarray.extend(labels)

    if train_to_npyarray:
        save_data(counter_var, np.array(train_to_npyarray), np.array(labels_to_npyarray), folder_name)
process_directory(SOURCE_FOLDER)


In [None]:
import numpy as np
import os

# Constants
SOURCE_DIR = ''
TRAIN_FOLDER = os.path.join(SOURCE_DIR, "/media/scar-face/Windows/Hopefully the final implementation/Mel/train/")
VAL_FOLDER = os.path.join(SOURCE_DIR, "/media/scar-face/Windows/Hopefully the final implementation/Mel/dev/")
TEST_FOLDER = os.path.join(SOURCE_DIR, "/media/scar-face/Windows/Hopefully the final implementation/Mel/test/")

# Global variables to store calculated values
mean_X, min_X, max_X = [], [], []

def get_min_max_from_files(folder, file_identifier):
    
    for filename in filter(lambda f: file_identifier in f, os.listdir(folder)):
        data = np.load(os.path.join(folder, filename))
        print(f"{filename} shape: {data.shape}")
        max_X.append(data.max())
        min_X.append(data.min())
    return max(max_X), min(min_X)

def calculate_mean(folder, file_identifier, min_val, max_val):
    total_sum, total_length = 0, 0
    for filename in filter(lambda f: file_identifier in f, os.listdir(folder)):
        data = np.load(os.path.join(folder, filename))
        normalized_data = (data - min_val) / (max_val - min_val)
        total_sum += normalized_data.sum(axis=0)
        total_length += len(normalized_data)
    return total_sum / total_length

def normalize_and_save(folder, file_identifier, min_val, max_val, mean_val):
    for filename in filter(lambda f: file_identifier in f, os.listdir(folder)):
        data = np.load(os.path.join(folder, filename))
        normalized_data = ((data - min_val) / (max_val - min_val)) - mean_val
        save_path = os.path.join(folder, filename.split('.')[0])
        np.save(save_path, normalized_data)
        print(f"Processed and saved {filename} -> {save_path}.npy")


print("Calculating max and min values from training data")
max_train, min_train = get_min_max_from_files(TRAIN_FOLDER, "train_X")

print("Calculating mean from normalized training data")
train_mean = calculate_mean(TRAIN_FOLDER, "train_X", min_train, max_train)


print("Normalizing and saving training data")
normalize_and_save(TRAIN_FOLDER, "train_X", min_train, max_train, train_mean)

print("Normalizing and saving validation data...")
normalize_and_save(VAL_FOLDER, "dev_X", min_train, max_train, train_mean)

print("Normalizing and saving test data...")
normalize_and_save(TEST_FOLDER, "_X", min_train, max_train, train_mean)

# Output calculated values for verification
print(f"Train Mean:\n{train_mean}")
print(f"Train Min:\n{min_train}")
print(f"Train Max:\n{max_train}")
