In [None]:
# Скачать MNIST с сайта Kaggle
# https://www.kaggle.com/datasets/aadeshkoirala/mnist-784
# Скачать аудио-датасет free-spoken-digit-dataset 
# git clone https://github.com/Jakobovski/free-spoken-digit-dataset

In [None]:
# !apt-get install imagemagick
# !pip install matplotlib==3.9.2
# !pip install pandas==2.2.2
# !pip install numpy==1.26.4
# !pip install librosa==0.10.2.post1
# !pip install brian2==2.7.1
# !pip install soundfile==0.12.1
# !pip install ipython==8.27.0

In [None]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import pandas as pd
from brian2 import *
import librosa
import glob
import soundfile as sf
from IPython.display import HTML, display

# Воспроизведение звуков в jupyter-notebook
try:
    import IPython.display as ipd
    from IPython.display import display
    in_notebook = True
except ImportError:
    in_notebook = False

start_scope()

# Основные параметры
tau = 10*ms
dt = 1*ms
defaultclock.dt = dt
num_samples = 4 
eta = 1e-3 * Hz       # Скорость обучения (1/секунду)
decay = 1e-2          # Скорость распада (безразмерная)
sample_duration_image = 200*ms   # Продолжительность одного образца изображения
sample_duration_audio = 500*ms   # Продолжительность одного образца звука

# Убедитесь, что оба имеют одинаковое количество временных шагов для синхронизации
sample_duration = max(sample_duration_image, sample_duration_audio)
num_time_steps_per_sample = int(sample_duration / dt)
total_duration = num_samples * sample_duration

def load_preprocess_mnist(num_samples):
    df = pd.read_csv('mnist_784.csv')
    X = df.iloc[:num_samples, 0:784].values / 255.0 # Нормализация [0-1]
    return X

def load_preprocess_audio(num_samples, target_length=5000):
    audio_files = glob.glob('free-spoken-digit-dataset/recordings/*.wav')
    selected_files = audio_files[:num_samples]
    
    X = []
    sr_list = []
    for file in selected_files:
        signal, sr = librosa.load(file, sr=None)
        sr_list.append(sr)
        
        # Обрезка или приведение сигналов к фиксированной длине        
        if len(signal) > target_length:
            signal = signal[:target_length]
        else:
            signal = np.pad(signal, (0, target_length - len(signal)), 'constant')
        
        # Нормализация [0-1]
        signal = (signal - np.min(signal)) / (np.max(signal) - np.min(signal))
        X.append(signal)
    
    X = np.array(X)
    # Предполагается, что все частоты дискретизации одинаковы
    sr = sr_list[0] if sr_list else 22050  # По умолчанию 22050, если пусто
    return X, sr

def prepare_input_data(X_image, X_audio, num_time_steps_per_sample):
    num_samples = X_image.shape[0]
    N_input_image = X_image.shape[1]
    N_input_audio = X_audio.shape[1]
    
    # Инициализация входных массивов
    input_image_array = np.zeros((int(total_duration / dt), N_input_image))
    input_audio_array = np.zeros((int(total_duration / dt), N_input_audio))
    
    for i in range(num_samples):
        start_idx = i * num_time_steps_per_sample
        end_idx = (i + 1) * num_time_steps_per_sample
        input_image_array[start_idx:end_idx, :] = X_image[i]
        input_audio_array[start_idx:end_idx, :] = X_audio[i]
    
    # Создать временные массивы 
    Image_ext = TimedArray(input_image_array, dt=dt)
    Audio_ext = TimedArray(input_audio_array, dt=dt)
    
    return Image_ext, Audio_ext

# Загрузка и предварительная обработка изображений и аудиоданных
X_image = load_preprocess_mnist(num_samples=num_samples)
X_audio, sr = load_preprocess_audio(num_samples=num_samples, target_length=5000)

# Убедитесь, что оба набора данных имеют одинаковое количество образцов.
assert X_image.shape[0] == X_audio.shape[0], "Несоответствие количества сэмплов между изображением и аудиоданными."

# Подготовка входных данных
Image_ext, Audio_ext = prepare_input_data(X_image, X_audio, num_time_steps_per_sample)

# Кол-во нейронов в слоях
N_input_image = 784
N_input_audio = X_audio.shape[1]
N_hidden = 100

# Инициализация весов
w_input_image_hidden_init = np.random.randn(N_input_image * N_hidden) * 0.01
w_input_audio_hidden_init = np.random.randn(N_input_audio * N_hidden) * 0.01
w_hidden_image_input_init = np.random.randn(N_hidden * N_input_image) * 0.01
w_hidden_audio_input_init = np.random.randn(N_hidden * N_input_audio) * 0.01

# Создание групп нейронов
input_image_neurons = NeuronGroup(N_input_image, '''
    dv/dt = (-v + V_error) / tau : 1
    V_error = Image_ext(t, i) - v + V_feedback : 1
    V_feedback : 1
    ''',
    threshold='v > 0.5', reset='v = 0', method='euler')

input_audio_neurons = NeuronGroup(N_input_audio, '''
    dv/dt = (-v + V_error) / tau : 1
    V_error = Audio_ext(t, i) - v + V_feedback : 1
    V_feedback : 1
    ''',
    threshold='v > 0.5', reset='v = 0', method='euler')

# Группа скрытых нейронов с раздельными входами**
hidden_neurons = NeuronGroup(N_hidden, '''
    dv/dt = (-v + V_input_image + V_input_audio) / tau : 1
    V_input_image : 1
    V_input_audio : 1
    ''',
    threshold='v > 0.5', reset='v = 0', method='euler')

# Создание и подключение синапсов от входа изображения к скрытому слою с помощью обучения
syn_input_image_hidden = Synapses(input_image_neurons, hidden_neurons, '''
    V_input_image_post = w * v_pre : 1 (summed)
    dw/dt = eta * (v_post * v_pre - w * decay) : 1 (clock-driven)
    ''')
syn_input_image_hidden.connect()
syn_input_image_hidden.w = w_input_image_hidden_init 

# Создание и подключение синапсов от аудиовхода к скрытому слою с помощью обучения
syn_input_audio_hidden = Synapses(input_audio_neurons, hidden_neurons, '''
    V_input_audio_post = w * v_pre : 1 (summed)
    dw/dt = eta * (v_post * v_pre - w * decay) : 1 (clock-driven)
    ''')
syn_input_audio_hidden.connect()
syn_input_audio_hidden.w = w_input_audio_hidden_init

# Создание и подключение синапсов от скрытого слоя к входу изображения (обратная связь) с помощью обучения
syn_hidden_image_input = Synapses(hidden_neurons, input_image_neurons, '''
    V_feedback_post = w * v_pre : 1 (summed)
    dw/dt = eta * (v_post * v_pre - w * decay) : 1 (clock-driven)
    ''')
syn_hidden_image_input.connect()
syn_hidden_image_input.w = w_hidden_image_input_init

# Создание и подключение синапсов от скрытого слоя к аудиовходу (обратная связь) с помощью обучения
syn_hidden_audio_input = Synapses(hidden_neurons, input_audio_neurons, '''
    V_feedback_post = w * v_pre : 1 (summed)
    dw/dt = eta * (v_post * v_pre - w * decay) : 1 (clock-driven)
    ''')
syn_hidden_audio_input.connect()
syn_hidden_audio_input.w = w_hidden_audio_input_init

# Мониторинг мембранных потенциалов
mon_hidden = StateMonitor(hidden_neurons, 'v', record=True)
mon_input_image = StateMonitor(input_image_neurons, 'v', record=True)
mon_input_audio = StateMonitor(input_audio_neurons, 'v', record=True)

# Мониторинг спайков
spikes_input_image = SpikeMonitor(input_image_neurons)
spikes_input_audio = SpikeMonitor(input_audio_neurons)
spikes_hidden = SpikeMonitor(hidden_neurons)

print("Выполнение симуляции...")
run(total_duration) # Запуск симуляции
print("Симуляция завершена")

# Функция для анимации восстановления изображения
def animate_reconstruction_image(sample_index):
    # Получение временных индексов для образца
    start_time = sample_index * sample_duration
    end_time = (sample_index + 1) * sample_duration
    start_idx = int(start_time / dt)
    end_idx = int(end_time / dt)
    
    fig = plt.figure()
    plt.axis('off')
    ims = []
    
    for t in range(start_idx, end_idx):
        reconstructed_img = mon_input_image.v[:, t]
        im = plt.imshow(reconstructed_img.reshape(28,28), cmap='gray', animated=True)
        ims.append([im])
    
    ani = animation.ArtistAnimation(fig, ims, interval=50, blit=True, repeat_delay=1000)
    ani.save(f'reconstruct_image{sample_index}.gif', writer='pillow', fps=20)
    plt.title(f'Реконструкция образца изображения {sample_index} по времени')
    plt.close(fig)  # Закрыть фигуру, чтобы избежать дополнительного вывода
    display(HTML(ani.to_jshtml()))


# Функция для анимирования реконструкции звука
def animate_reconstruction_audio(sample_index):
    start_time = sample_index * sample_duration
    end_time = (sample_index + 1) * sample_duration
    start_idx = int(start_time / dt)
    end_idx = int(end_time / dt)
    
    # Реконструкция аудиосигнала путем усреднения активности скрытых нейронов
    reconstructed_signal = mon_input_audio.v[:, start_idx:end_idx].mean(axis=1)
    original_signal = X_audio[sample_index]
    
    # Визуализация
    plt.figure(figsize=(12, 6))
    plt.plot(original_signal, label='Original Signal')
    plt.plot(reconstructed_signal, label='Reconstructed Signal')
    plt.legend()
    plt.title(f'Реконструкция аудиообразца {sample_index}')
    plt.xlabel('Время')
    plt.ylabel('Амплитуда')
    plt.show()
    
    # Сохранить аудиосигналы
    sf.write(f'original_signal_sample{sample_index}.wav', original_signal, sr)
    sf.write(f'reconstructed_signal_sample{sample_index}.wav', reconstructed_signal, sr)
    
    # Воспроизведение звука (если в ноутбуке)
    if in_notebook:
        print("Оригинальный сигнал:")
        display(ipd.Audio(original_signal, rate=sr))
        print("Реконструированный сигнал:")
        display(ipd.Audio(reconstructed_signal, rate=sr))
    else:
        print(f"Аудиофайлы, сохраненные как 'original_signal_sample{sample_index}.wav' and 'reconstructed_signal_sample{sample_index}.wav'.")

# Функция для построения графика мембранных потенциалов со смещением
def plot_membrane_potentials_with_offset(state_monitor, layer_name, step=10):
    plt.figure(figsize=(12, 8))
    neuron_indices = range(0, len(state_monitor.v), step)
    offset = 0
    for i in neuron_indices:
        plt.plot(state_monitor.t/ms, state_monitor.v[i] + offset, label=f'Neuron {i}')
        offset += 1.0
    plt.title(f'Мембранный потенциал в {layer_name} (Каждый {step} нейрон со смещением)')
    plt.xlabel('Время (мсек)')
    plt.ylabel('Мембранный потенциал + смещение')
    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.tight_layout()
    plt.show()

# Функция для построения графика спайков
def plot_spikes(spike_monitor, layer_name):
    plt.figure(figsize=(12, 6))
    plt.plot(spike_monitor.t/ms, spike_monitor.i, '|')
    plt.title(f'Спайковая активность в {layer_name}')
    plt.xlabel('Время (мсек)')
    plt.ylabel('Индекс нейрона')
    plt.show()

for idx in range(num_samples):
    print(f'\nАнимационная реконструкция для образца изображения {idx}')
    animate_reconstruction_image(idx)
    print(f'\nАнимационная реконструкция для аудиосэмпла {idx}')
    animate_reconstruction_audio(idx)


# plot_membrane_potentials_with_offset(mon_input_image, 'Входний слой с изображениями', step=100)
# plot_membrane_potentials_with_offset(mon_input_audio, 'Входной аудиовход', step=500)
# plot_membrane_potentials_with_offset(mon_hidden, 'Скрытый слой', step=10)

plot_spikes(spikes_input_image, 'Входний слой с изображениями')
plot_spikes(spikes_input_audio, 'Входной аудиовход')
plot_spikes(spikes_hidden, 'Скрытый слой')
