In [1]:
import numpy as np

from sklearn.neural_network import MLPRegressor

from Chempy.parameter import ModelParameters

import sbi.utils as utils
from sbi.inference.base import infer

import torch
from torch.distributions.normal import Normal
from torch.distributions.uniform import Uniform

import time as t
import pickle

  from .autonotebook import tqdm as notebook_tqdm


# Train Neural Network to simulate Chempy

In [None]:
# ------ Load & prepare the data ------

# --- Load in training data ---
path_training = '../ChempyMulti/tutorial_data/TNG_Training_Data.npz'
training_data = np.load(path_training, mmap_mode='r')

elements = training_data['elements']
train_x = training_data['params']
train_y = training_data['abundances']


# ---  Load in the validation data ---
path_test = '../ChempyMulti/tutorial_data/TNG_Test_Data.npz'
val_data = np.load(path_test, mmap_mode='r')

val_x = val_data['params']
val_y = val_data['abundances']


# --- Clean the data ---
def clean_data(x, y):
    # Remove all zeros from the training data
    index = np.where((y == 0).all(axis=1))[0]
    x = np.delete(x, index, axis=0)
    y = np.delete(y, index, axis=0)

    # Remove all infinite values from the training data
    index = np.where(np.isfinite(y).all(axis=1))[0]
    x = x[index]
    y = y[index]

    return x, y


train_x, train_y = clean_data(train_x, train_y)
val_x, val_y     = clean_data(val_x, val_y)


# --- Normalize the data ---
x_mean, x_std = train_x.mean(axis=0), train_x.std(axis=0)
y_mean, y_std = train_y.mean(axis=0), train_y.std(axis=0)


def normalize_data(x, y, x_mean=x_mean, x_std=x_std, y_mean=y_mean, y_std=y_std):
    x = (x - x_mean) / x_std
    y = (y - y_mean) / y_std

    return x, y


train_x, train_y = normalize_data(train_x, train_y)
val_x, val_y     = normalize_data(val_x, val_y)


# add time squared as parameter
def add_time_squared(x):
    return np.concatenate((x, (x[:, -1]**2).reshape((len(x), 1))), axis=1)


train_x = add_time_squared(train_x)
val_x = add_time_squared(val_x)

In [None]:
# ----- Train the neural network -----

# --- Define the neural network ---
def single_regressor(x, y, neurons=40, epochs=3000, verbose=False):
    """Return out-of-sample score for a given number of neurons for one element"""
    model = MLPRegressor(solver='adam', alpha=0.001, max_iter=epochs, learning_rate='adaptive', tol=1e-13,
                         hidden_layer_sizes=(neurons,), activation='tanh', verbose=verbose,
                         shuffle=True, early_stopping=True)

    model.fit(x, y)

    model_pred = model.predict(x)
    score = np.mean((model_pred-y)**2.)
    diff = np.abs(y-model_pred)

    w0, w1 = model.coefs_
    b0, b1 = model.intercepts_

    return score, diff, [w0, w1, b0, b1]


# --- Train the neural network ---
# Train an independent neural network for each element and save the weights
output = []
neurons = 40
for el_i, el in enumerate(elements):
    print("Running net %d of %d" % (el_i + 1, len(elements)))
    o = single_regressor(train_x, train_y[:, el_i], neurons=neurons, epochs=3000, verbose=False)
    print("Score for element %s is %.3f" % (el, o[0]))
    output.append(o)


# --- Save the neural network outputs ---
scores = [score for score, _, _ in output]
diffs = [diff for _, diff, _ in output]
coeffs = [co for _, _, co in output]

w0 = np.hstack([co[0] for co in coeffs])
b0 = np.hstack([co[2] for co in coeffs])
b1 = np.hstack([co[3] for co in coeffs])

# Read in w1 vector into sparse structure
w1 = np.zeros([w0.shape[1], b1.shape[0]])
assert neurons == w0.shape[1] / len(coeffs)
for i in range(len(coeffs)):
    w1[int(neurons * i):int(neurons * (i + 1)), i] = coeffs[i][1][:, 0]

In [None]:
# --- Save the weights and normalization parameters ---
# Save output
np.savez('data/tutorial_weights.npz',
         w0=w0, w1=w1, b0=b0, b1=b1,
         in_mean=x_mean, in_std=x_std, out_mean=y_mean, out_std=y_std,
         activation='tanh', neurons=neurons)


# Train SBI

In [None]:
# ----- Load the Network -----
# Load network weights trained in train_chempyNN.py
x = np.load('data/tutorial_weights.npz')

w0 = x['w0']
w1 = x['w1']
b0 = x['b0']
b1 = x['b1']
in_mean = x['in_mean']
in_std = x['in_std']
out_mean = x['out_mean']
out_std = x['out_std']
activation = x['activation']
neurons = x['neurons']

In [None]:
# ----- Set-up the Simulator -----
def add_time_squared(x):
    return np.concatenate((x, (x[:, -1]**2).reshape((len(x), 1))), axis=1)


def stacked_net_output(in_par):
    in_par = (in_par - in_mean) / in_std
    in_par = add_time_squared(in_par)

    l1 = np.matmul(in_par, w0) + b0
    l2 = np.matmul(np.tanh(l1), w1) + b1

    return l2 * out_std + out_mean

In [None]:
# ----- Set-up priors -----
a = ModelParameters()
priors = torch.tensor([[a.priors[opt][0], a.priors[opt][1]] for opt in a.to_optimize])

combined_priors = utils.MultipleIndependent(
    [Normal(p[0]*torch.ones(1), p[1]*torch.ones(1)) for p in priors] +
    [Uniform(torch.tensor([2.0]), torch.tensor([12.8]))],
    validate_args=False)

In [None]:
# ----- sbi setup -----
num_sim = 100000
method = 'SNPE' #SNPE or SNLE or SNRE

start = t.time()
posterior = infer(
    stacked_net_output,
    combined_priors,
    method=method,
    num_simulations=num_sim)

print(f'Time taken to train the posterior with {num_sim} samples: {round(t.time() - start, 4)}s')

In [None]:
# ----- Save the posterior -----
with open("data/posterior_SNPE.pickle", "wb") as f:
    pickle.dump(posterior, f)


# Evaluate the posterior

In [2]:
# ----- Load the posterior -----
with open("data/posterior_SNPE.pickle", "rb") as f:
    posterior = pickle.load(f)

In [None]:
# ----- Evaluate the posterior -----
