In [2]:
%matplotlib qt
%load_ext autoreload
%autoreload 2

import sys; sys.path.insert(0, r'../../invert')
import mne
import pickle as pkl

from time import time
from copy import deepcopy
import os
import numpy as np
import pandas as pd
from scipy.sparse import csr_matrix
from invert.forward import get_info, create_forward_model
from invert.simulate import generator
from invert import Solver
from config import forward_models
base_path = "D:/data/flex_ssm/simulation"
os.makedirs(base_path, exist_ok=True)
random_seed = 42

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


# Load Files

In [5]:
path_fwd = os.path.join("D:/data/flex_ssm", forward_models[0]["path_fwd"])
path_info = os.path.join("D:/data/flex_ssm",forward_models[0]["path_info"])

fwd = mne.read_forward_solution(path_fwd, verbose=0)
fwd = mne.convert_forward_solution(fwd, force_fixed=True)
fwd["sol"]["data"] /= np.linalg.norm(fwd["sol"]["data"], axis=0)

fn = path_info
with open(fn, 'rb') as f:
    info = pkl.load(f)

    No patch info available. The standard source space normals will be employed in the rotation to the local surface coordinates....
    Changing to fixed-orientation forward solution with surface-based source orientations...
    [done]


# Simulate Data

In [6]:
generator_args_baseline = dict(
    use_cov = False, 
    return_mask = False, 
    n_sources = 2,   # 2 simultaneous sources
    n_orders = 0,  # Neighborhood orders (source extent)
    snr_range = (1,1),  # Signal-to-noise ratio (actual ratio, not dB)
    amplitude_range = (0.5, 1),  # Range of source time course gain 
    batch_repetitions = 1, 
    batch_size = 500,  # 500 No. of samples
    scale_data = False,  
    n_timepoints = 50,  # No. of time points per sample
    beta_range = (1 ,1),  # Determines the frequency spectrum of each simulted time course (1/f**beta)
    return_info = True,  # Return information on the simulated sources
    inter_source_correlation = 0.5,  # Inter source correlation coefficient rho
    diffusion_parameter = 0.1,  # Diffusion parameter alpha: determines the speed of diffusion, thus shape of extended sources
    random_seed=random_seed,  # Random seed for replicability
    iid_noise=True)  # Sensor noise is IID, so not correlated among channels

# Figure 1

In [None]:
generator_args = deepcopy(generator_args_baseline)
generator_args["n_orders"] = 0
sim_condition = "figure-1"
fullpath = os.path.join(base_path, f"""sim_{sim_condition}.pkl""")

iscs = [0.1, 0.5, 0.9, 0.95, 0.975, 0.99, 1.0]
x_tests = []
y_tests = []
sim_infos = []
for isc in iscs:
    print(isc)
    generator_args["inter_source_correlation"] = isc
    gen_test = generator(fwd, **generator_args)
    x_test, y_test, sim_info = gen_test.__next__()
    x_tests.append(x_test)
    y_tests.append([csr_matrix(yy) for yy in y_test])
    sim_infos.append(sim_info)

x_test = np.concatenate(x_tests, axis=0)
y_test = np.concatenate(y_tests, axis=0)
sim_info = pd.concat(sim_infos)

with open(fullpath, "wb") as f:
    pkl.dump([x_test, y_test, sim_info, generator_args], f)

# Figure 12  - 3 sources

In [5]:
generator_args = deepcopy(generator_args_baseline)
generator_args["n_orders"] = 0
generator_args["n_sources"] = 3
sim_condition = "figure-12"
fullpath = os.path.join(base_path, f"""sim_{sim_condition}.pkl""")

iscs = [0.1, 0.5, 0.9, 0.95, 0.975, 0.99, 1.0]
x_tests = []
y_tests = []
sim_infos = []
for isc in iscs:
    print(isc)
    generator_args["inter_source_correlation"] = isc
    gen_test = generator(fwd, **generator_args)
    x_test, y_test, sim_info = gen_test.__next__()
    x_tests.append(x_test)
    y_tests.append([csr_matrix(yy) for yy in y_test])
    sim_infos.append(sim_info)

x_test = np.concatenate(x_tests, axis=0)
y_test = np.concatenate(y_tests, axis=0)
sim_info = pd.concat(sim_infos)

with open(fullpath, "wb") as f:
    pkl.dump([x_test, y_test, sim_info, generator_args], f)

0.1
0.5
0.9
0.95
0.975
0.99
1.0


# Figure 13  - equal magnitude sources

In [6]:
generator_args = deepcopy(generator_args_baseline)
generator_args["n_orders"] = 0
generator_args["amplitude_range"] = (1,1)
sim_condition = "figure-13"
fullpath = os.path.join(base_path, f"""sim_{sim_condition}.pkl""")

iscs = [0.1, 0.5, 0.9, 0.95, 0.975, 0.99, 1.0]
x_tests = []
y_tests = []
sim_infos = []
for isc in iscs:
    print(isc)
    generator_args["inter_source_correlation"] = isc
    gen_test = generator(fwd, **generator_args)
    x_test, y_test, sim_info = gen_test.__next__()
    x_tests.append(x_test)
    y_tests.append([csr_matrix(yy) for yy in y_test])
    sim_infos.append(sim_info)

x_test = np.concatenate(x_tests, axis=0)
y_test = np.concatenate(y_tests, axis=0)
sim_info = pd.concat(sim_infos)

with open(fullpath, "wb") as f:
    pkl.dump([x_test, y_test, sim_info, generator_args], f)

0.1
0.5
0.9
0.95
0.975
0.99
1.0


# Figure 1.4 - white spectrum signals

In [7]:
generator_args = deepcopy(generator_args_baseline)
generator_args["n_orders"] = 0
generator_args["beta_range"] = (0,0)
sim_condition = "figure-14"
fullpath = os.path.join(base_path, f"""sim_{sim_condition}.pkl""")

iscs = [0.1, 0.5, 0.9, 0.95, 0.975, 0.99, 1.0]
x_tests = []
y_tests = []
sim_infos = []
for isc in iscs:
    print(isc)
    generator_args["inter_source_correlation"] = isc
    gen_test = generator(fwd, **generator_args)
    x_test, y_test, sim_info = gen_test.__next__()
    x_tests.append(x_test)
    y_tests.append([csr_matrix(yy) for yy in y_test])
    sim_infos.append(sim_info)

x_test = np.concatenate(x_tests, axis=0)
y_test = np.concatenate(y_tests, axis=0)
sim_info = pd.concat(sim_infos)

with open(fullpath, "wb") as f:
    pkl.dump([x_test, y_test, sim_info, generator_args], f)

0.1
0.5
0.9
0.95
0.975
0.99
1.0


# Figure 2

In [None]:
generator_args = deepcopy(generator_args_baseline)
generator_args["n_orders"] = 0
sim_condition = "figure-2"
fullpath = os.path.join(base_path, f"""sim_{sim_condition}.pkl""")

snr_ranges = [-10, -5, 0.]
snr_ranges = [10**(x/10) for x in snr_ranges]
x_tests = []
y_tests = []
sim_infos = []
for snr_range in snr_ranges:
    print(snr_range)
    generator_args["snr_range"] = (snr_range, snr_range)
    gen_test = generator(fwd, **generator_args)
    x_test, y_test, sim_info = gen_test.__next__()
    x_tests.append(x_test)
    y_tests.append([csr_matrix(yy) for yy in y_test])
    sim_infos.append(sim_info)

x_test = np.concatenate(x_tests, axis=0)
y_test = np.concatenate(y_tests, axis=0)
sim_info = pd.concat(sim_infos)

with open(fullpath, "wb") as f:
    pkl.dump([x_test, y_test, sim_info, generator_args], f)

# Figure 3

In [16]:
generator_args = deepcopy(generator_args_baseline)
generator_args["n_orders"] = 0
sim_condition = "figure-3"
fullpath = os.path.join(base_path, f"""sim_{sim_condition}.pkl""")

time_samples = [10, 20, 50]

x_tests = []
y_tests = []
sim_infos = []
for n_timepoints in time_samples:
    print(n_timepoints)
    generator_args["n_timepoints"] = n_timepoints
    gen_test = generator(fwd, **generator_args)
    x_test, y_test, sim_info = gen_test.__next__()
    x_tests.extend(x_test)
    y_tests.extend([csr_matrix(yy) for yy in y_test])
    sim_infos.append(sim_info)

# x_test = np.concatenate(x_tests, axis=0)
# y_test = np.concatenate(y_tests, axis=0)
sim_info = pd.concat(sim_infos)

with open(fullpath, "wb") as f:
    pkl.dump([x_tests, y_tests, sim_info, generator_args], f)

10
20
50


# Figure 4

In [21]:
generator_args = deepcopy(generator_args_baseline)
generator_args["n_orders"] = 0
generator_args["n_sources"] = 3
sim_condition = "figure-4"
fullpath = os.path.join(base_path, f"""sim_{sim_condition}.pkl""")

iscs = [0.1, 0.5, 0.9]
x_tests = []
y_tests = []
sim_infos = []
for isc in iscs:
    print(isc)
    generator_args["inter_source_correlation"] = isc
    gen_test = generator(fwd, **generator_args)
    x_test, y_test, sim_info = gen_test.__next__()
    x_tests.append(x_test)
    y_tests.append([csr_matrix(yy) for yy in y_test])
    sim_infos.append(sim_info)

x_test = np.concatenate(x_tests, axis=0)
y_test = np.concatenate(y_tests, axis=0)
sim_info = pd.concat(sim_infos)

with open(fullpath, "wb") as f:
    pkl.dump([x_test, y_test, sim_info, generator_args], f)

0.1
0.5
0.9


# Figure 4.2 (Amirs simulation paradigm)

In [13]:
from invert.simulate import generator_simple
generator_args = dict(
    batch_size=generator_args_baseline["batch_size"], 
    corrs=(0, 1), 
    T=generator_args_baseline["n_timepoints"], 
    n_sources=3, 
    SNR_range=(0,0),
    random_seed=42)


sim_condition = "figure-42"
fullpath = os.path.join(base_path, f"""sim_{sim_condition}.pkl""")

iscs = [0.1, 0.5, 0.9]
x_tests = []
y_tests = []
sim_infos = []
for isc in iscs:
    print(isc)
    generator_args["corrs"] = [isc, isc]
    gen_test = generator_simple(fwd, **generator_args)
    x_test, y_test, sim_info = gen_test.__next__()
    x_tests.append(x_test)
    y_tests.append([csr_matrix(yy) for yy in y_test])
    sim_infos.append(sim_info)

x_test = np.concatenate(x_tests, axis=0)
y_test = np.concatenate(y_tests, axis=0)
sim_info = pd.concat(sim_infos)
generator_args["n_orders"] = 0

with open(fullpath, "wb") as f:
    pkl.dump([x_test, y_test, sim_info, generator_args], f)

0.1
0.5
0.9


In [21]:
y_tests[0][0]

<5124x50 sparse matrix of type '<class 'numpy.float64'>'
	with 150 stored elements in Compressed Sparse Row format>

In [14]:
generator_args

{'batch_size': 500,
 'corrs': [0.9, 0.9],
 'T': 50,
 'n_sources': 3,
 'SNR_range': (0, 0),
 'random_seed': 42,
 'n_orders': 0}

# Figure 5

In [9]:
generator_args = deepcopy(generator_args_baseline)
generator_args["inter_source_correlation"] = 0.5

sim_condition = "figure-5"
fullpath = os.path.join(base_path, f"""sim_{sim_condition}.pkl""")

n_orders = [0, 2, 4, 6]
x_tests = []
y_tests = []
sim_infos = []
for n_order in n_orders:
    print(f"{n_order} orders")
    generator_args["n_orders"] = [n_order, n_order]
    gen_test = generator(fwd, **generator_args)
    x_test, y_test, sim_info = gen_test.__next__()
    x_tests.append(x_test)
    y_tests.append([csr_matrix(yy) for yy in y_test])
    sim_infos.append(sim_info)

x_test = np.concatenate(x_tests, axis=0)
y_test = np.concatenate(y_tests, axis=0)
sim_info = pd.concat(sim_infos)

with open(fullpath, "wb") as f:
    pkl.dump([x_test, y_test, sim_info, generator_args], f)

0 orders
2 orders
4 orders
6 orders
