<a href="https://colab.research.google.com/github/bealowman/music-reconstruction-working/blob/main/music_reconstruction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Data Pipeline

In [None]:
import numpy as np
from google.colab import files

uploaded = files.upload()

hfa = np.load('HFA.npy')
spectrogram = np.load('spectrogram.npy')
print(hfa.shape, spectrogram.shape)

KeyboardInterrupt: 

In [None]:

def create_lagged_spectrogram(spectrogram, n_lags):
    lagged_features, targets = [], []
    for t in range(n_lags, len(spectrogram)):
        feat = spectrogram[t-n_lags:t].flatten()
        lagged_features.append(feat)
        targets.append(hfa[t])
    return np.array(lagged_features), np.array(targets)

n_lags = 75
X_lagged, y = create_lagged_spectrogram(spectrogram, n_lags)
print(X_lagged.shape, y.shape)

#train here

#Fit Linear Regression Encoding Model

#Visualize STRF

#


In [None]:
import tensorflow as tf
from sklearn.model_selection import GroupShuffleSplit
from sklearn.processing import RobustScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.losses import Huber
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.stats import pearsonr
from sklearn.metrics import r2_score
from sklearn.stats import zscore

all_r = []
all_r2 = []
all_coeffs = []

# iterate 250 times
iterations = 250

n_samples, n_features = X_lagged.shape

# "We defined relatively long, 2-second groups of
#  consecutive samples as indivisible blocks of data"
# sampling rate of 100Hz, 2 seconds = 200 samples
group_size = 200
groups = np.arange(n_samples) // group_size

# 60% train. 20% validation, 20% test
# Assign 60% train, 40% temp (test/val)
training = GroupShuffleSplit(n_splits=iterations, test_size=0.6, random_state=42)

# 50% test, 50% validation (from 40% temp)
val_test_temp = GroupShuffleSplit(n_splits=1, test_size=0.5, random_state=42)

for i, (train_index, temp_index) in enumerate(training.split(X_lagged, y, groups)):

    # Split temp into test and val
    temp_groups = groups[temp_index]
    relative_val_index, relative_test_index = next(val_test_temp.split(X_lagged[temp_index], y[temp_index], temp_groups))
    val_index = temp_index[relative_val_index]
    test_index = temp_index[relative_test_index]

    X_train, y_train = X_lagged[train_index], y[train_index]
    X_val, y_val = X_lagged[val_index], y[val_index]
    X_test, y_test = X_lagged[test_index], y[test_index]

    # Standardization
    # "standardized the features by fitting a robust scaler to the
    #  training set only (estimates the median and the 2 to 98 quantile range)"
    scaler = RobustScaler(quantile_range=(2, 98))
    X_train_scaled = scaler.fit_transform(X_train)
    X_val_scaled = scaler.transform(X_val)
    X_test_scaled = scaler.transform(X_test)

    # Model (TensorFlow)
    model = Sequential([Dense(1, input_shape=(n_features,), activation='linear')])

    # RMSProp optimizer
    optimizer = RMSprop(learning_rate=0.001)

    # Huber loss function
    loss = Huber()

    # Compile the model
    model.compile(optimizer=optimizer, loss=loss)

    # "early stopping to further prevent overfitting"
    # "estimated on the validation set at each training step, and model
    #  fitting ends after this error stops diminishing for 10 consecutive steps"
    early_stopping = EarlyStopping(monitor='val_loss', patience=10,
                                   restore_best_weights=True)

    # Model fitting
    model.fit(X_train_scaled, y_train, validation_data=(X_val_scaled, y_val),
              epochs=150, batch_size=64, callbacks=[early_stopping], verbose=0)

    # Evaluation
    y_pred = model.predict(X_test_scaled).flatten()

    # Correlation Coefficient
    r = pearsonr(y_test, y_pred)[0]

    # r squared
    r2 = r2_score(y_test, y_pred)

    # Model's coefficients
    coeffs = model.layers[0].get_weights()[0].flatten()

    all_r.append(r)
    all_r2.append(r2)
    all_coeffs.append(coeffs)

all_r = np.array(all_r)
all_r2 = np.array(all_r2)
all_coeffs = np.array(all_coeffs)

# "z-scored each coefficient across the 250 models"
z_scores = zscore(all_coeffs, axis=0)
final_strf = np.mean(all_coeffs, axis=0)

