In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
PNUM = "14"

Vllai.py


In [None]:
import tensorflow as tf

def extractor(
    filters=(256, 256, 256, 128, 128),
    kernels=(8,) * 5,
    input_channels=64,
    normalization_fn=lambda x: tf.keras.layers.LayerNormalization()(x),
    activation_fn=lambda x: tf.keras.layers.LeakyReLU()(x),
    name="extractor",
):
    """Construct the extractor model.

    Parameters
    ----------
    filters: Sequence[int]
        Number of filters for each layer.
    kernels: Sequence[int]
        Kernel size for each layer.
    input_channels: int
        Number of EEG channels in the input
    normalization_fn: Callable[[tf.Tensor], tf.Tensor]
        Function to normalize the contents of a tensor.
    activation_fn: Callable[[tf.Tensor], tf.Tensor]
        Function to apply an activation function to the contents of a tensor.
    name: str
        Name of the model.

    Returns
    -------
    tf.keras.models.Model
        The extractor model.
    """
    eeg = tf.keras.layers.Input((None, input_channels))

    x = eeg

    if len(filters) != len(kernels):
        raise ValueError("'filters' and 'kernels' must have the same length")

    # Add the convolutional layers
    for filter_, kernel in zip(filters, kernels):
        x = tf.keras.layers.Conv1D(filter_, kernel)(x)
        x = normalization_fn(x)
        x = activation_fn(x)
        x = tf.keras.layers.ZeroPadding1D((0, kernel - 1))(x)

    return tf.keras.models.Model(inputs=[eeg], outputs=[x], name=name)


def output_context(
    filter_=64,
    kernel=32,
    input_channels=64,
    normalization_fn=lambda x: tf.keras.layers.LayerNormalization()(x),
    activation_fn=lambda x: tf.keras.layers.LeakyReLU()(x),
    name="output_context_model",
):
    """Construct the output context model.

    Parameters
    ----------
    filter_: int
        Number of filters for the convolutional layer.
    kernel: int
        Kernel size for the convolutional layer.
    input_channels: int
        Number of EEG channels in the input.
    normalization_fn: Callable[[tf.Tensor], tf.Tensor]
        Function to normalize the contents of a tensor.
    activation_fn: Callable[[tf.Tensor], tf.Tensor]
        Function to apply an activation function to the contents of a tensor.
    name: str
        Name of the model.

    Returns
    -------
    tf.keras.models.Model
        The output context model.
    """
    inp = tf.keras.layers.Input((None, input_channels))
    x = tf.keras.layers.ZeroPadding1D((kernel - 1, 0))(inp)
    x = tf.keras.layers.Conv1D(filter_, kernel)(x)
    x = normalization_fn(x)
    x = activation_fn(x)
    return tf.keras.models.Model(inputs=[inp], outputs=[x], name=name)


def vlaai(
    nb_blocks=4,
    extractor_model=None,
    output_context_model=None,
    use_skip=True,
    input_channels=64,
    output_dim=1,
    name="vlaai",
):
    """Construct the VLAAI model.

    Parameters
    ----------
    nb_blocks: int
        Number of repeated blocks to use.
    extractor_model: Callable[[tf.Tensor], tf.Tensor]
        The extractor model to use.
    output_context_model: Callable[[tf.Tensor], tf.Tensor]
        The output context model to use.
    use_skip: bool
        Whether to use skip connections.
    input_channels: int
        Number of EEG channels in the input.
    output_dim: int
        Number of output dimensions.
    name: str
        Name of the model.

    Returns
    -------
    tf.keras.models.Model
        The VLAAI model.
    """
    if extractor_model is None:
        extractor_model = extractor()
    if output_context_model is None:
        output_context_model = output_context()

    eeg = tf.keras.layers.Input((None, input_channels))

    # If using skip connections: start with x set to zero
    if use_skip:
        x = tf.zeros_like(eeg)
    else:
        x = eeg

    # Iterate over the blocks
    for i in range(nb_blocks):
        if use_skip:
            x = extractor_model(eeg + x)
        else:
            x = extractor_model(x)
        x = tf.keras.layers.Dense(input_channels)(x)
        x = output_context_model(x)

    x = tf.keras.layers.Dense(output_dim)(x)

    return tf.keras.models.Model(inputs=[eeg], outputs=[x], name=name)

def pearson_tf(y_true, y_pred, axis=1):
    """Pearson correlation function implemented in tensorflow.

    Parameters
    ----------
    y_true: tf.Tensor
        Ground truth labels. Shape is (batch_size, time_steps, n_features)
    y_pred: tf.Tensor
        Predicted labels. Shape is (batch_size, time_steps, n_features)
    axis: int
        Axis along which to compute the pearson correlation. Default is 1.

    Returns
    -------
    tf.Tensor
        Pearson correlation.
        Shape is (batch_size, 1, n_features) if axis is 1.
    """
    # Compute the mean of the true and predicted values
    y_true_mean = tf.reduce_mean(y_true, axis=axis, keepdims=True)
    y_pred_mean = tf.reduce_mean(y_pred, axis=axis, keepdims=True)

    # Compute the numerator and denominator of the pearson correlation
    numerator = tf.reduce_sum(
        (y_true - y_true_mean) * (y_pred - y_pred_mean),
        axis=axis,
        keepdims=True,
    )
    std_true = tf.reduce_sum(tf.square(y_true - y_true_mean), axis=axis, keepdims=True)
    std_pred = tf.reduce_sum(tf.square(y_pred - y_pred_mean), axis=axis, keepdims=True)
    denominator = tf.sqrt(std_true * std_pred)

    # Compute the pearson correlation
    return tf.math.divide_no_nan(numerator, denominator)


Linear.py


In [None]:
""" This module contains linear backward model"""
import tensorflow as tf

# from task2_regression.models.vlaai import pearson_tf


@tf.function
def pearson_loss_cut(y_true, y_pred, axis=1):
    """Pearson loss function.

    Parameters
    ----------
    y_true: tf.Tensor
        True values. Shape is (batch_size, time_steps, n_features)
    y_pred: tf.Tensor
        Predicted values. Shape is (batch_size, time_steps, n_features)

    Returns
    -------
    tf.Tensor
        Pearson loss.
        Shape is (batch_size, 1, n_features)
    """
    return -pearson_tf(y_true[:, : tf.shape(y_pred)[1], :], y_pred, axis=axis)


@tf.function
def pearson_metric_cut(y_true, y_pred, axis=1):
    """Pearson metric function.

    Parameters
    ----------
    y_true: tf.Tensor
        True values. Shape is (batch_size, time_steps, n_features)
    y_pred: tf.Tensor
        Predicted values. Shape is (batch_size, time_steps, n_features)

    Returns
    -------
    tf.Tensor
        Pearson metric.
        Shape is (batch_size, 1, n_features)
    """
    return pearson_tf(y_true[:, : tf.shape(y_pred)[1], :], y_pred, axis=axis)


def simple_linear_model(integration_window=32, nb_filters=1, nb_channels=64):
    inp = tf.keras.layers.Input(
        (
            None,
            nb_channels,
        )
    )
    out = tf.keras.layers.Conv1D(nb_filters, integration_window)(inp)
    model = tf.keras.models.Model(inputs=[inp], outputs=[out])
    model.compile(
        tf.keras.optimizers.Adam(),
        loss=pearson_loss_cut,
        metrics=[pearson_metric_cut]
    )
    return model


In [None]:
"""
Sample code to generate test labels (reconstructed envelopes) for
the regression task. The requested format for submitting the reconstructed envelopes is
as follows:
for each subject a json file containing a python dictionary in the
format of  ==> {'sample_id': reconstructed_envelope, ... }.
"""

import os
import glob
import json
import numpy as np
# from task2_regression.models.linear import simple_linear_model


# Parameters
window_length = 60*64  # one minute
# Root dataset directory containing test set
# Change the path to the downloaded test dataset dir
dataset_dir = "/content/drive/MyDrive/CAPSTONE_626_645_648_651/Data/OpenMIIR/eeg/64_InputChannels/Sub"+PNUM+'/P' + PNUM + '-formatted.json'
# Path to your pretrained model
pretrained_model = '/content/drive/MyDrive/CAPSTONE_626_645_648_651/Data/ICASSP_Auditory_Challenge/vlaai_baseline_task2_regression/pretrained_model/vlaai.h5'
# Define and load the pretrained model
model = vlaai()
model.load_weights(pretrained_model)

# test_data_path = glob.glob(os.path.join(dataset_dir, '*.json'))
print("test_data_path: ",dataset_dir)

with open(dataset_dir, 'r') as f:
  sub_dictionary = json.load(f)



# Get test data from subject dictionary
id_list, sub_data = zip(*sub_dictionary.items())




test_data_path:  /content/drive/MyDrive/CAPSTONE_626_645_648_651/Data/OpenMIIR/eeg/64_InputChannels/Sub14/P14-formatted.json


In [None]:
list_3D=[]
for key,value in sub_dictionary.items():
  # print(len(value), type(value) ,len(value[0]),type(value[0]))
  list_3D.append(value)

print(len(list_3D), len(list_3D[0]) , len(list_3D[0][0]))
array_3D = np.array(list_3D)
array_3D.shape


100 618 64


(100, 618, 64)

In [None]:

# Normalize data
data_mean = np.expand_dims(np.mean(array_3D, axis=1), axis=1)
data_std = np.expand_dims(np.std(array_3D, axis=1), axis=1)
array_3D = (array_3D - data_mean) / data_std

predictions = model.predict(array_3D)
# Make predictions json-serializable
predictions = [np.array(value).tolist() for value in np.squeeze(predictions)]






In [None]:
print(len(predictions), type(predictions) , len(predictions[0]) , type(predictions[0]) ,type(predictions[0][0]))

100 <class 'list'> 618 <class 'list'> <class 'float'>


In [None]:
# Create dictionary from id_list and predictions
sub = dict(zip(id_list, predictions))



In [None]:
for key,value in sub.items():
  print(key, len(value))

e0_s12_c1 618
e1_s12_c2 618
e2_s12_c3 618
e3_s3_c1 618
e4_s3_c2 618
e5_s3_c3 618
e6_s23_c1 618
e7_s23_c2 618
e8_s13_c1 618
e9_s13_c2 618
e10_s13_c3 618
e11_s14_c1 618
e12_s14_c3 618
e13_s11_c1 618
e14_s11_c3 618
e15_s1_c1 618
e16_s1_c2 618
e17_s1_c3 618
e18_s2_c1 618
e19_s2_c2 618
e20_s4_c1 618
e21_s4_c2 618
e22_s21_c1 618
e23_s21_c2 618
e24_s21_c3 618
e25_s22_c1 618
e26_s22_c2 618
e27_s22_c3 618
e28_s24_c1 618
e29_s22_c1 618
e30_s22_c2 618
e31_s22_c3 618
e32_s4_c1 618
e33_s4_c3 618
e34_s11_c1 618
e35_s11_c2 618
e36_s2_c1 618
e37_s2_c2 618
e38_s21_c1 618
e39_s21_c2 618
e40_s13_c1 618
e41_s13_c2 618
e42_s13_c3 618
e43_s3_c1 618
e44_s3_c3 618
e45_s23_c1 618
e46_s23_c3 618
e47_s12_c1 618
e48_s12_c2 618
e49_s12_c3 618
e50_s24_c1 618
e51_s24_c2 618
e52_s1_c1 618
e53_s1_c2 618
e54_s1_c3 618
e55_s14_c1 618
e56_s14_c3 618
e57_s14_c1 618
e58_s22_c1 618
e59_s22_c2 618
e60_s22_c3 618
e61_s23_c1 618
e62_s23_c2 618
e63_s23_c3 618
e64_s12_c1 618
e65_s12_c2 618
e66_s12_c3 618
e67_s21_c1 618
e68_s21_c

In [None]:
prediction_dir = "/content/drive/MyDrive/CAPSTONE_626_645_648_651/Data/OpenMIIR/eeg/Testing/64Hz_vlaai_temp/P" + PNUM + '-predictions.json'
# os.makedirs(prediction_dir, exist_ok=True)
with open(prediction_dir, 'w') as f:
    json.dump(sub, f)