# Setup (dependencies and data)

In [1]:
!pip show tensorflow tensorflow-probability
!git clone https://github.com/Marcos-L/MI-EEG-ClassMeth
!pip install tensorflow-probability==0.24.0
!pip install -U git+https://github.com/UN-GCPDS/python-gcpds.databases #Package for database reading.
!pip install -U git+https://github.com/UN-GCPDS/python-gcpds.visualizations.git
!pip install mne

Name: tensorflow
Version: 2.17.1
Summary: TensorFlow is an open source machine learning framework for everyone.
Home-page: https://www.tensorflow.org/
Author: Google Inc.
Author-email: packages@tensorflow.org
License: Apache 2.0
Location: /usr/local/lib/python3.10/dist-packages
Requires: absl-py, astunparse, flatbuffers, gast, google-pasta, grpcio, h5py, keras, libclang, ml-dtypes, numpy, opt-einsum, packaging, protobuf, requests, setuptools, six, tensorboard, tensorflow-io-gcs-filesystem, termcolor, typing-extensions, wrapt
Required-by: dopamine_rl, tensorflow-text, tensorflow_decision_forests, tf_keras
---
Name: tensorflow-probability
Version: 0.24.0
Summary: Probabilistic modeling and statistical inference in TensorFlow
Home-page: http://github.com/tensorflow/probability
Author: Google LLC
Author-email: no-reply@google.com
License: Apache 2.0
Location: /usr/local/lib/python3.10/dist-packages
Requires: absl-py, cloudpickle, decorator, dm-tree, gast, numpy, six
Required-by: dopamine_r

# Imports and utilities

In [3]:
import numpy as np
import os
import itertools
import random
import pickle
import mne
import h5py
import pandas as pd
import tensorflow as tf
import tensorflow_probability as tfp
import matplotlib.pyplot as plt


from typing import Sequence, Tuple
from scipy.signal import iirnotch, filtfilt, butter, freqz
import matplotlib.pyplot as plt
from scipy.spatial import distance
import networkx as nx
from tqdm import tqdm
from mne.preprocessing import compute_current_source_density
from mne.channels import make_standard_montage, read_custom_montage
from scipy.signal import butter, filtfilt, resample, iirnotch
from gcpds.visualizations.series import plot_eeg
from scipy.stats import norm
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score, confusion_matrix, precision_score
from tqdm import tqdm

from tensorflow.keras.utils import plot_model
from tensorflow.keras.optimizers import Adam
from gcpds.databases import GIGA_MI_ME
from sklearn.metrics import accuracy_score, f1_score, cohen_kappa_score
from sklearn.model_selection import KFold, train_test_split
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger
from sklearn.model_selection import train_test_split, StratifiedKFold
from tensorflow.keras.utils import register_keras_serializable
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.model_selection import LeavePGroupsOut, StratifiedGroupKFold
from tensorflow.keras.models import Model
from scipy.spatial.distance import cdist
from sklearn.model_selection import GroupKFold
from tensorflow.keras.models import load_model

# Custom layers and metrics

In [4]:
@register_keras_serializable(package="CustomLayers")
class TakensConv1D(tf.keras.layers.Layer):
    def __init__(self, dx=4, dy=3, tau=1, mu=4, **kwargs):
        super().__init__(**kwargs)
        self.dx = int(dx)
        self.dy = int(dy)
        self.tau = int(tau)
        self.mu = int(mu)
        self.num_filters = self.dx + self.dy + 1

    def build(self, input_shape):
        kernel_size = self.mu + (self.dx - 1) * self.tau + 1
        kernel_shape = (kernel_size, 1, self.num_filters)
        kernel = tf.zeros(kernel_shape, dtype=tf.float32)

        offsets_x = self.mu + tf.range(self.dx) * self.tau
        offsets_y = tf.range(1, self.dy + 1) * self.tau
        offset_y_t = tf.constant([0], dtype=tf.int32)

        filas_x = tf.range(self.dx)
        filas_y = self.dx + tf.range(self.dy)
        fila_y_t = tf.constant([self.dx + self.dy])

        cols_x = filas_x
        cols_y = filas_y
        col_y_t = fila_y_t

        idx_x = tf.stack([filas_x, offsets_x, cols_x], axis=1)
        idx_y = tf.stack([filas_y, offsets_y, cols_y], axis=1)
        idx_yt = tf.stack([fila_y_t, offset_y_t, col_y_t], axis=1)

        indices_total = tf.concat([idx_x, idx_y, idx_yt], axis=0)
        updates = tf.ones([tf.shape(indices_total)[0]], dtype=tf.float32)

        sort_order = tf.argsort(indices_total[:, 0])
        indices_sorted = tf.gather(indices_total, sort_order)

        row_for_scatter = tf.cast(indices_sorted[:, 1], tf.int32)
        col_for_scatter = tf.cast(indices_sorted[:, 2], tf.int32)

        final_indices = tf.stack([row_for_scatter,
                                  tf.zeros_like(row_for_scatter),
                                  col_for_scatter], axis=1)

        kernel = tf.scatter_nd(final_indices, updates, kernel_shape)
        self.kernel = kernel.numpy()[::-1]

        self.conv1d = tf.keras.layers.Conv1D(
            filters=self.num_filters,
            kernel_size=kernel_size,
            strides=self.tau,
            padding="valid",
            use_bias=False
        )
        self.conv1d.build((None, input_shape[2], 1))
        self.conv1d.set_weights([self.kernel])
        self.conv1d.trainable=False

    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]
        channels = tf.shape(inputs)[1]
        time_steps = tf.shape(inputs)[2]

        reshaped = tf.reshape(inputs, (-1, time_steps, 1))
        conv_output = self.conv1d(reshaped)
        new_time = tf.shape(conv_output)[1]

        output = tf.reshape(conv_output, 
                            (batch_size, channels, new_time, self.num_filters))

        x_sub_t_minus_mu = output[..., :self.dx]
        y_sub_t_minus_1 = output[..., self.dx:self.dx + self.dy]
        y_sub_t = output[..., -1:]

        return x_sub_t_minus_mu, y_sub_t_minus_1, y_sub_t
       

@register_keras_serializable(package="CustomLayers")
class GaussianKernelLayer(tf.keras.layers.Layer):
    def __init__(self, 
                 amplitude=1.0,
                 trainable_amplitude=False, 
                 length_scale=1.0,
                 trainable_length_scale=False,
                 **kwargs):
        super(GaussianKernelLayer, self).__init__(**kwargs)

        self.init_amplitude = amplitude
        self.trainable_amplitude = trainable_amplitude
        self.init_length_scale = length_scale
        self.trainable_length_scale = trainable_length_scale

    def build(self, input_shape):
        
        self.amplitude = self.add_weight(
            name="amplitude",
            shape=(),
            initializer=tf.constant_initializer(self.init_amplitude),
            trainable=self.trainable_amplitude,
            dtype=self.dtype
        )
        self.length_scale = self.add_weight(
            name="length_scale",
            shape=(),
            initializer=tf.constant_initializer(self.init_length_scale),
            trainable=self.trainable_length_scale,
            dtype=self.dtype
        )
        
        super(GaussianKernelLayer, self).build(input_shape)

    def call(self, X):
        
        kernel = tfp.math.psd_kernels.ExponentiatedQuadratic(
            amplitude=self.amplitude,
            length_scale=self.length_scale
        )
        return kernel.matrix(X, X)

        
@register_keras_serializable(package="CustomLayers")
class TransferEntropyLayer(tf.keras.layers.Layer):
    def __init__(self, alpha=2, **kwargs):

        super().__init__(**kwargs)
        self.alpha = int(alpha)

    def compute_entropy(self, K_hadamard):
        
        trace_hadamard = tf.reduce_sum(tf.linalg.diag_part(K_hadamard), axis=-1) 
        trace_hadamard = tf.expand_dims(tf.expand_dims(trace_hadamard, axis=-1), axis=-1)
        K_normalized = K_hadamard / trace_hadamard

        # Ejecuta la multiplicación en CPU para poder obtener gradientes en ambos operandos
        with tf.device('/CPU:0'):
            K_power = tf.linalg.matmul(K_normalized, K_normalized)
    
        trace_power = tf.reduce_sum(tf.linalg.diag_part(K_power), axis=-1)  
        H_alpha = (1 / (1 - self.alpha)) * tf.math.log(trace_power)
        return H_alpha

    def call(self, K_x, K_y_minus_1, K_y):
        
        K_x_exp = tf.expand_dims(K_x, axis=2)
        
        K_y_minus_1_exp = tf.expand_dims(K_y_minus_1, axis=1)
        K_y_exp = tf.expand_dims(K_y, axis=1)

        
        H_1 = self.compute_entropy(K_y_minus_1_exp * K_x_exp)
        H_2 = self.compute_entropy(K_y_exp * K_y_minus_1_exp * K_x_exp)
        H_3 = self.compute_entropy(K_y_exp * K_y_minus_1_exp)
        H_4 = self.compute_entropy(K_y_minus_1_exp)

        TE = H_1 - H_2 + H_3 - H_4
        
        return TE
        
@register_keras_serializable(package="CustomLayers")      
class RemoveDiagonalFlatten(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
    
        super().__init__(**kwargs)

    def call(self, inputs):
        
        shape_dyn = tf.shape(inputs)  
        batch_size = shape_dyn[0]
        c = tf.shape(inputs)[1]

        
        tf.debugging.assert_equal(
            tf.shape(inputs)[1], tf.shape(inputs)[2],
            message="RemoveDiagonalFlatten: la matriz de entrada no es cuadrada."
        )  
        diag_mask = tf.eye(c, dtype=inputs.dtype)  
        inputs_no_diag = inputs * (1 - diag_mask) 
        flattened = tf.reshape(inputs_no_diag, [batch_size, -1])  
        non_diag = tf.boolean_mask(flattened, tf.reshape(1 - diag_mask, [-1]), axis=1)
        num_features = c * (c - 1)  
        result = tf.reshape(non_diag, [batch_size, num_features])  

        return result

In [5]:
custom_objects = {
    "TakensConv1D": TakensConv1D,
    "GaussianKernelLayer": GaussianKernelLayer,
    "TransferEntropyLayer": TransferEntropyLayer,
    "RemoveDiagonalFlatten": RemoveDiagonalFlatten
}

model_path = "/kaggle/input/best-model-te/best_model_3_fold2.keras"
model = tf.keras.models.load_model(model_path, custom_objects=custom_objects)
model.summary()

In [None]:
# Retrieve the DepthwiseConv1D layer by its name
depthwise_layer = model.get_layer("block1_depthwise_conv1d")

# Get the layer weights; this usually returns a list where:
# - The first element is the kernel (filter weights)
# - The second element (if present) is the bias vector
weights = depthwise_layer.get_weights()

# Extract the depthwise kernel
# Shape: (kernel_size, number_of_input_channels, depth_multiplier)
kernel = weights[0]
print("Filters:", kernel.shape)

# If biases exist, extract them as well
if len(weights) > 1:
    bias = weights[1]
    print("Bias:", bias.shape)

Filtros: (22, 22, 1)
 Bias: (22,)
