In [1]:
%load_ext autoreload
%autoreload 2

import sys
sys.path.append('.')


In [2]:
from load_miller import split_classes, continues_classes, continues_classes_idle

# Load data

In [3]:
import numpy as np
fname = 'motor_imagery.npz'
data = np.load(fname, allow_pickle=True)['dat']

In [11]:
data[0][0].keys()

dict_keys(['t_off', 'stim_id', 't_on', 'srate', 'V', 'scale_uv', 'locs', 'hemisphere', 'lobe', 'gyrus', 'Brodmann_Area'])

In [4]:
a = np.zeros((10,10))
len(a.shape)

2

In [None]:
import torch
import torch.nn as nn
import numpy as np

class AdjustableMeanPooling(nn.Module):
    def __init__(self, kernel_size=(1, 10)):
        super(AdjustableMeanPooling, self).__init__()
        self.kernel_size = kernel_size
        self.pool = nn.AvgPool2d(kernel_size=kernel_size)
    
    def forward(self, x):

        if isinstance(x, np.ndarray):
            x = torch.tensor(x, dtype=torch.float32)
        
        original_shape = x.shape
        
        *batch_dims, D, C = original_shape
        
        B = np.prod(batch_dims)
        x = x.view(B, 1, C, D)
        
        x = self.pool(x)        
        x = x.view(*batch_dims, x.shape[-1], x.shape[-2])  
        
        return x.numpy()

kernel_size = (1, 5) 
input_tensor = np.random.randn(10,11,12, 13, 16, 20)  

pooling_layer = AdjustableMeanPooling(kernel_size=kernel_size)
output_tensor = pooling_layer(input_tensor)
print(output_tensor.shape) 


torch.Size([17160, 1, 20, 3])
(10, 11, 12, 13, 3, 20)


In [90]:
import numpy as np

def threshold_eeg_data(ecog_data, threshold):
    '''
    Converts ECoG data into a neural-like binary format with a given threshold.
    
    Args:
    - ecog_data (numpy.ndarray): Input ECoG data with shape (..., D, C),
                                  where D is timepoints and C is channels.
    - threshold (float): The threshold value. Values above this threshold are set to 1, otherwise to 0.
    
    Returns:
    - numpy.ndarray: ECoG data in a binary format, same shape as input.
    '''
    # Apply threshold to the data
    binary_data = np.where(ecog_data > threshold, 1, 0)
    
    return binary_data

# Example usage:
# Suppose your ECoG data has shape (840, 46, 3000)
ecog_data = np.random.rand(5,5,5,13, 46, 3000)  # Simulating ECoG data
threshold = 0.5  # Example threshold

binary_ecog_data = threshold_eeg_data(ecog_data, threshold)
print(binary_ecog_data.shape)  # Should be (840, 46, 3000), with b


(5, 5, 5, 13, 46, 3000)


In [None]:
import torch
import torch.nn as nn
import numpy as np

class AdjustableModePooling(nn.Module):
    def __init__(self, kernel_size):
        super(AdjustableModePooling, self).__init__()
        self.kernel_size = kernel_size
    
    def forward(self, x):
        if isinstance(x, np.ndarray):
            x = torch.tensor(x, dtype=torch.float32)
        
        original_shape = x.shape
        *batch_dims, D, C = original_shape
        
        B = np.prod(batch_dims)
        x = x.view(B, 1, C, D)
        
        pooled_results = []
        
        for i in range(0, C - self.kernel_size[0] + 1, self.kernel_size[0]):  
            pool = []
            for j in range(0, D - self.kernel_size[1] + 1, self.kernel_size[1]):  
                window = x[:, :, i:i+self.kernel_size[0], j:j+self.kernel_size[1]]
                
                mode_values, _ = torch.mode(window.view(-1, window.shape[-1]), dim=1)
                pool.append(mode_values)
            pooled_results.append(torch.stack(pool, dim=0))

        pooled_results = torch.stack(pooled_results, dim=0)
        pooled_results = pooled_results.view(*batch_dims, pooled_results.shape[1], pooled_results.shape[0])
        
        return pooled_results.numpy()

kernel_size = (1, 5) 
input_tensor = np.random.randn(10,11,12, 13, 16, 20)  

pooling_layer = AdjustableModePooling(kernel_size=kernel_size)
output_tensor = pooling_layer(input_tensor)
print(output_tensor.shape)  


torch.Size([20, 3, 17160])
(10, 11, 12, 13, 3, 20)


In [38]:
from scipy.signal import welch
import numpy as np

def compute_psd(data, fs, log_scale=True):
    """
    Computes the Power Spectral Density (PSD) of the input data.

    Parameters:
        data (np.ndarray): Input data, typically shape (..., D, C).
        fs (float): Sampling frequency.
        log_scale (bool): If True, returns PSD in decibels (log scale).

    Returns:
        tuple: Frequencies and PSD values.
    """
    f, Pxx = welch(data, fs, nperseg=256, axis=-2)
    return f, 10 * np.log10(Pxx) if log_scale else Pxx

def filter_psd(frequencies, psd_values, bands):
    """
    Filters PSD values based on a list of frequency bands.

    Parameters:
        frequencies (np.ndarray): Array of frequency bins.
        psd_values (np.ndarray): PSD values corresponding to the frequency bins.
        bands (list of tuples): List of frequency bands as (low, high).

    Returns:
        list: A list where each item is the PSD values filtered by a specific band.
    """
    filtered_psd_list = []
    for low_cut, high_cut in bands:
        mask = (frequencies >= low_cut) & (frequencies <= high_cut)
        filtered_psd = psd_values[..., mask, :]
        filtered_psd_list.append(filtered_psd)
    return filtered_psd_list


In [41]:
# Example signal
fs = 1000  # Sampling frequency
D = 3000  # Time points
C = 46  # Channels
N = 10  # Number of samples

np.random.seed(42)
signal = np.random.randn(N, D, C)

# Compute PSD
freqs, psd = compute_psd(signal, fs)
print(signal.shape)
print(psd.shape)

# Define frequency bands
bands = [(0.5, 4), (4, 8), (8, 13), (13, 30), (30, 50)]

# Filter PSD based on bands
filtered_psd_list = filter_psd(freqs, psd, bands)

# Output shapes and example content
print("Number of bands:", len(filtered_psd_list))
for i, band_psd in enumerate(filtered_psd_list):
    print(f"Band {bands[i]} - PSD shape: {band_psd.shape}")


(10, 3000, 46)
(10, 129, 46)
Number of bands: 5
Band (0.5, 4) - PSD shape: (10, 1, 46)
Band (4, 8) - PSD shape: (10, 1, 46)
Band (8, 13) - PSD shape: (10, 1, 46)
Band (13, 30) - PSD shape: (10, 4, 46)
Band (30, 50) - PSD shape: (10, 5, 46)


In [5]:
for i in range(7):
    for j in range(2):
        print(data[i][j]['V'].shape[0], data[i][j]['t_off'][-1], data[i][j]['t_on'][0], data[i][j]['t_off'][-1] - data[i][j]['t_on'][0] + 1)

376400 369440 10160 359281
376600 369640 10160 359481
390680 383720 10120 373601
390320 383360 10160 373201
376240 369280 10120 359161
376840 369880 10160 359721
390720 383760 10120 373641
390200 383240 10120 373121
390640 383680 10200 373481
390160 383200 10200 373001
390720 383760 10120 373641
390200 383240 10120 373121
390240 383280 10120 373161
390920 383960 10120 373841


In [8]:
390920  - 376240

14680

In [10]:
(390920  - 376240) // 2

7340

In [None]:
data[0][0]['stim_id']

In [None]:
data[0][0]['V'].shape

In [None]:
np.zeros(data[0][0]['V'][:,:46].shape).shape

In [None]:
data[0][0]['t_on'].shape[0]

In [7]:
dat1 = data[1][0] 

discrete_data, extra_info_dat1 = split_classes(data=dat1)

continues_data_1_dat1, extra_info_1_dat1 = continues_classes(data=dat1)

continues_data_2_dat1, extra_info_2_dat1 = continues_classes_idle(data=dat1)

In [9]:
continues_data_1_dat1['stim_id'].shape

(60, 3000, 46)

In [37]:
a = [[11,11,12,11,11,12,12,12,11] , [1,1,2,1,1,2,11,2,13]]
a = np.array(a)

# (a == 12).astype(int)
a = a+2
a[a == 3] = 1
a

array([[13, 13, 14, 13, 13, 14, 14, 14, 13],
       [ 1,  1,  4,  1,  1,  4, 13,  4, 15]])

In [12]:
dat2 = data[2][0] 

tongue_data_dat2, hand_data_dat2, extra_info_dat2 = split_classes(data=dat2)

continues_data_1_dat2, extra_info_1_dat2 = continues_classes(data=dat2)

continues_data_2_dat2, extra_info_2_dat2 = continues_classes_idle(data=dat2)

In [13]:
max_iterations = 10000
output_dimension = 3

from cebra import CEBRA

cebra_posdir3_model = CEBRA(model_architecture='offset10-model',
                        batch_size=64,
                        learning_rate=3e-4,
                        temperature=1,
                        output_dimension=output_dimension,
                        max_iterations=max_iterations,
                        distance='cosine',
                        conditional='time_delta',
                        device='cuda_if_available',
                        verbose=True,
                        time_offsets=10)


In [None]:
cebra_posdir3_model.fit(continues_data_2_dat1['V'], continues_data_2_dat1['stim_id'])
cebra_posdir3 = cebra_posdir3_model.transform(continues_data_2_dat1['V'])

In [None]:
cebra_posdir3.shape

In [None]:
continues_data_2_dat1['stim_id'].shape

In [None]:
cebra_posdir3.shape

In [None]:
# import numpy as np
# import plotly.express as px

# # Example data
# # Replace these with your actual data
# points = cebra_posdir3  # Shape: (390680, 3)
# labels = continues_data_2_dat1['stim_id'][:,0]  # Shape: (390680,)

# # Create a DataFrame for easier handling (optional)
# import pandas as pd
# df = pd.DataFrame(points, columns=["X", "Y", "Z"])
# df["Label"] = labels

# # Create a dynamic 3D scatter plot
# fig = px.scatter_3d(
#     df,
#     x="X", 
#     y="Y", 
#     z="Z", 
#     color="Label",  # Color by labels
#     color_continuous_scale="rainbow",
#     title="Dynamic 3D Scatter Plot",
#     labels={"Label": "Classes"},
#     opacity=0.8
# )

# # Show the plot
# fig.show()


In [None]:
# import numpy as np
# import plotly.express as px

# # Example data
# # Replace these with your actual data
# points = cebra_posdir3  # Shape: (390680, 3)
# labels = continues_data_2_dat1['stim_id'][:,0]  # Shape: (390680,)

# # Create a DataFrame for easier handling (optional)
# import pandas as pd
# df = pd.DataFrame(points, columns=["X", "Y", "Z"])
# df["Label"] = labels

# # Create a dynamic 3D scatter plot
# fig = px.scatter_3d(
#     df,
#     x="X", 
#     y="Y", 
#     z="Z", 
#     color="Label",  # Color by labels
#     color_continuous_scale="rainbow",
#     title="Dynamic 3D Scatter Plot",
#     labels={"Label": "Classes"},
#     opacity=0.8
# )

# fig.update_traces(marker=dict(size=2))

# # Show the plot
# fig.show()
