In [1]:
from pyNAVIS import Loaders, MainSettings, Functions
from pathlib import Path
from scipy.spatial import distance
from math import exp, sqrt
import numpy as np
# from graficas import graph, plotConnectivity
from os import makedirs, path
# from winsound import Beep

In [2]:
def load_file(file_path: Path, settings: MainSettings):
    
    loaded_file  = Loaders.loadAEDAT(path=str(file_path), settings=settings)
    # adapted_file = Functions.adapt_timestamps(loaded_file, settings)
    
    return loaded_file

def load_directory(data_folder: Path, settings: MainSettings, file_extension: str = ".aedat", verbose:bool=False):
        
    all_data = []
    file_list = sorted(data_folder.glob(f"*{file_extension}"))
    
    for file_path in file_list:
        
        data = load_file(file_path, settings)
        addresses = data.addresses
        timestamps = data.timestamps
        max_timestamps = np.max(timestamps) if timestamps.size > 0 else 0
        
        all_data.append({
            "file_name": file_path.name,
            "addresses": addresses,
            "timestamps": timestamps,
            "max_timestamps": max_timestamps
        })
        
    print(f"Se cargaron {len(all_data)} archivos exitosamente desde {data_folder}")
        
    return all_data

In [3]:
settings = MainSettings(mono_stereo=0, address_size=2, ts_tick=0.2, bin_size=2000, num_channels=64)

off_folder = Path("On_Off") / "off_aedats"
all_data_off = load_directory(off_folder, settings)

# on_folder = Path("On_Off") / "on_aedats"
# all_data_on = load_directory(on_folder, settings)

[Functions.check_SpikesFile] > The loaded SpikesFile file has been checked and it's OK
[Functions.check_SpikesFile] > The loaded SpikesFile file has been checked and it's OK
[Functions.check_SpikesFile] > The loaded SpikesFile file has been checked and it's OK
[Functions.check_SpikesFile] > The loaded SpikesFile file has been checked and it's OK
[Functions.check_SpikesFile] > The loaded SpikesFile file has been checked and it's OK
[Functions.check_SpikesFile] > The loaded SpikesFile file has been checked and it's OK
[Functions.check_SpikesFile] > The loaded SpikesFile file has been checked and it's OK
[Functions.check_SpikesFile] > The loaded SpikesFile file has been checked and it's OK
[Functions.check_SpikesFile] > The loaded SpikesFile file has been checked and it's OK
[Functions.check_SpikesFile] > The loaded SpikesFile file has been checked and it's OK
[Functions.check_SpikesFile] > The loaded SpikesFile file has been checked and it's OK
[Functions.check_SpikesFile] > The loaded S

OnOff tiene 64 canales, por lo que deben tenerse más de 64 neuronas.

In [4]:
def connProb(i, j, inh, shape, l):
    C = 0.3
    if i in inh and j in inh:
        C = 0.1
    if i in inh and j not in inh:
        C = 0.4
    if i not in inh and j in inh:
        C = 0.2
    
    _, Y, Z = shape
    
    a = (i // (Y*Z), (i % (Y*Z))//Z, i % Z)
    b = (j // (Y*Z), (j % (Y*Z))//Z, j % Z)
    D = distance.euclidean(a, b)

    return C * exp(- (D/l)**2)

def getConnMatrix(n_neurons, perc_inh, shape_liquid, l):
    inh = np.random.choice(range(n_neurons), int(n_neurons * perc_inh), replace=False)
    exh = list(set(range(n_neurons)) - set(inh))
    
    C_mat = np.zeros((n_neurons, n_neurons))
    
    for i in range(n_neurons):
        for j in range(n_neurons):
            
            umb = np.random.uniform(0, 1)
            if umb < connProb(i, j, inh, shape_liquid, l):
                C_mat[i, j] = -1 if i in inh else 1
    
    return inh, exh, C_mat
    
def getInputLayer(n_neurons, n_channels, exh, prob_IL):
    IL = []

    for nc in range(n_channels):
        aux = []
        for i in exh:
            umb = np.random.uniform(0, 1)
            if umb < prob_IL:
                aux.append(i)
        IL.append(aux)
    
    mat = np.zeros((n_neurons, n_channels))
    for i in range(len(IL)):
        for j in IL[i]:
            mat[j][i] = 1
    
    return mat
    

In [5]:
shape_liquid = (5,5,5)
n_channels = 64 
t_factor = 0.01
l = 2.2
prob_IL = 0.3
perc_inh = 0.2

In [6]:
n_neurons = shape_liquid[0] * shape_liquid[1] * shape_liquid[2]
inh, exh, C_eq = getConnMatrix(n_neurons, perc_inh, shape_liquid, l)

inputLayer = getInputLayer(n_neurons, n_channels, exh, prob_IL)

W_random = np.random.uniform(0, 1, (n_neurons, n_neurons+n_channels))
C_eq = np.hstack([C_eq, inputLayer])

WC = W_random * C_eq

In [7]:
# Hiperparámetros Controller
N = len(WC)             # Cantidad de neuronas en la red
gamma = 0.5
i_ext = 0.1
theta = 1

In [8]:
def reset(duracion):
    K = int(duracion + t_factor*duracion)                  # Tiempos de simulacion
    V = np.zeros((N, K+1))  
    Z = np.zeros_like(V, dtype=int)
    
    return K, V, Z

def cambio(i, k, K, V, Z, entrada = None):
    acum = 0
    #reemplazar por if not entrada:
    if entrada is not None and len(entrada) != 0:
        acum += sum([WC[i][j+N]*entrada[j] for j in range(len(entrada))])
    
    acum += sum([WC[i][j]*Z[j][k-1] for j in range(N)])
    # print(type(acum))

    V[i][k] = float(gamma * V[i][k-1] * (1 - Z[i][k-1]) + acum + i_ext)
    Z[i][k] = 1 if V[i][k] >= theta else 0 

# vector de ceros del tamaño de los canales de la entrada
def tiempos(dato, zero_vec):
    
    i = 0
    n = len(dato['timestamps'])
    t_curr = dato['timestamps'][0]
    t_end = dato['max_timestamps']
    
    while t_curr <= t_end:
        lista = []
        
        while i < n and dato['timestamps'][i] == t_curr:
            lista.append(dato['addresses'][i])
            # v[dato['addresses'][i]] = 1
            i += 1
        
        if lista:
            v = zero_vec.copy()
            v[lista] = 1
            yield(v)
        else:
            yield(zero_vec)
        t_curr += 1

def simu(dato):
    # K, V, Z = reset(int(dato['max_timestamps']))
    duracion = int(dato['max_timestamps']) - int(dato['timestamps'][0])
    
    K = int(duracion + t_factor*duracion)                  # Tiempos de simulacion
    V = np.zeros((N, K+1))  
    Z = np.zeros_like(V, dtype=int)
    
    # t = 0
    zero_vec = np.zeros(n_channels)
    
    # for idx in range(len(dato['timestamps'])):
    for k, estado in enumerate(tiempos(dato, zero_vec)):
        for i in range(N):
            cambio(i, k, K, V, Z, entrada=estado)
        
        if (k+1)%1000 == 0:
            print(f'paso {k+1} de {K}')

    for k2 in range(k+1, K+1):
        for i in range(N):
            cambio(i, k2, K, V, Z)
        if k2%1000 == 0:
            print(f'paso {k2+1} de {K}')

    return Z

In [9]:
def simulacion(dataset):
    sim = []

    # Crear carpeta 'simulacion' si no existe
    makedirs("Simulacion", exist_ok=True)

    for idx in range(len(dataset)):
        matriz = simu(dataset[idx])
        sim.append(matriz)
        nombre_archivo = path.join("Simulacion", f"sim_{idx}.txt")
        np.savetxt(nombre_archivo, matriz, fmt="%d")  # Ajusta el formato si es necesario

    return sim

In [10]:
simulacion(all_data_off[:1])

paso 1000 de 2287055
paso 2000 de 2287055
paso 3000 de 2287055
paso 4000 de 2287055
paso 5000 de 2287055
paso 6000 de 2287055
paso 7000 de 2287055
paso 8000 de 2287055
paso 9000 de 2287055
paso 10000 de 2287055
paso 11000 de 2287055
paso 12000 de 2287055
paso 13000 de 2287055
paso 14000 de 2287055
paso 15000 de 2287055
paso 16000 de 2287055
paso 17000 de 2287055
paso 18000 de 2287055
paso 19000 de 2287055
paso 20000 de 2287055
paso 21000 de 2287055
paso 22000 de 2287055
paso 23000 de 2287055
paso 24000 de 2287055
paso 25000 de 2287055
paso 26000 de 2287055
paso 27000 de 2287055
paso 28000 de 2287055
paso 29000 de 2287055
paso 30000 de 2287055
paso 31000 de 2287055
paso 32000 de 2287055
paso 33000 de 2287055
paso 34000 de 2287055
paso 35000 de 2287055
paso 36000 de 2287055
paso 37000 de 2287055
paso 38000 de 2287055
paso 39000 de 2287055
paso 40000 de 2287055
paso 41000 de 2287055
paso 42000 de 2287055
paso 43000 de 2287055
paso 44000 de 2287055
paso 45000 de 2287055
paso 46000 de 22870

KeyboardInterrupt: 

In [None]:
try:
    from winsound import Beep
    Beep(500, 1000)
except ImportError:
    print("\a")  # caracter de campana ANSI

In [None]:
# plotConnectivity(C_eq, shape_liquid, show_arrows=True, show_labels=True)

In [None]:
all_data_off[0]

{'file_name': '00176480_nohash_0.wav.aedat',
 'addresses': array([41, 34, 39, ..., 36, 14, 18], dtype='>u2'),
 'timestamps': array([12222788, 12223404, 12227117, ..., 14486661, 14486853, 14487199],
       dtype='>u4'),
 'max_timestamps': 14487199}