In [109]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Input
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras.optimizers import SGD
from scipy.integrate import quad, nquad

## Set hyperparameters

In [134]:
dim = 1
initilizer = RandomNormal(mean=0.0, stddev=25.0)

activation_func = "relu"

def create_model(input_dim):
    hidden_dim = 4 * input_dim
    output_dim = 1

    model = Sequential()
    model.add(Input(shape=(input_dim,)))
    #model.add(Dense(units=hidden_dim, activation=activation_func, kernel_initializer=initilizer))
    #model.add(Dense(units=hidden_dim, activation=activation_func, kernel_initializer=initilizer))
    #model.add(Dense(units=hidden_dim, activation=activation_func, kernel_initializer=initilizer))
    model.add(Dense(units=output_dim, kernel_initializer=initilizer))

    return model

lr_schedule = keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=0.01,
    decay_steps=10000, decay_rate=0.9
)

opt = keras.optimizers.Adam(learning_rate=lr_schedule)

In [179]:
lam = 1.0  # Tbc
rho = 1.0  # Tbc

def f(x):
    return 1

def h_i(x, i):
    return x ** 2  # Tbc

def g(x):
    return x ** 2  # tbc

def mu_i_density(x, i):
    return np.exp(-x**2 / 2) / np.sqrt(2 * np.pi)  # tbc

def sample_mu(n_samples, dim):
    return tf.random.normal((n_samples, dim))  # tbc

def beta_gamma():
    return 1

def cost_function(x, y):
    return x+y

def theta(x, y):
    return 1

## Create models

In [180]:
g_model = create_model(dim)
h_models = [create_model(1) for _ in range(dim)]
lambda_model = Sequential([Input(shape=(1,)),
                           Dense(units=1, activation="relu", kernel_initializer=tf.constant_initializer(1), bias_initializer=tf.constant_initializer(np.pi))])


class CombinedModel(tf.keras.Model):
    def __init__(self, g_model, h_models, lambda_model):
        super(CombinedModel, self).__init__()
        self.lambda_model = lambda_model
        self.h_models = h_models
        self.g_model = g_model

    def call(self, inputs):
        h_outputs = [h_model(inputs[:, i:i+1]) for i, h_model in enumerate(self.h_models)]
        g_output = self.g_model(inputs[:, dim:-1])
        lambda_output = self.lambda_model(inputs[:, -1:])
        return g_output, h_outputs, lambda_output


combined_model = CombinedModel(g_model, h_models, lambda_model)

## Loss function

In [181]:
n_samples = 10 ** 5
y_samples = tf.random.normal((n_samples, 1))

def phi_theta_gamma(samples):
    h_integral = tf.reduce_sum([tf.reduce_mean(h_model(samples[:, i:i+1])) for i, h_model in enumerate(h_models)])
    g_integral = tf.reduce_mean(g_model(samples[:, dim:-1]))
    lamda_value = lambda_model(samples[:, -1:])
    
    # theta_integral = tf.constant(0.0, dtype=tf.float32)
    # for x in samples:
    #     for y in y_values:
    #         integral_value += beta_gamma(f(y) - tf.reduce_sum([h_i(y, i) for i in tf.range(dim)]) - lam * cost_function(x, y) - g(x)) * theta(x, y)

    theta_integral = 0

    return lamda_value * rho + h_integral + g_integral + theta_integral

# def phi_theta_gamma(samples):
#     return (h_models[0](samples[:, 0]) + 1) ** 2 + (g_model(samples[:, 1]) - 1) ** 2

class CustomLoss(tf.keras.losses.Loss):
    def __init__(self):
        super().__init__()
    
    def call(self, x_pred, x_true):
        return phi_theta_gamma(x_pred)
    
combined_model.compile(optimizer='adam', loss=CustomLoss())

combined_model.summary()
combined_model.g_model.summary()
for model in combined_model.h_models:
    model.summary()
combined_model.lambda_model.summary()

In [182]:
x_samples = sample_mu(n_samples, 2 * dim).numpy()

lambda_samples = np.zeros(n_samples)
lambda_samples = lambda_samples.reshape(-1, 1)

x_samples = np.hstack((x_samples, lambda_samples))
x_true_irrelevant = np.zeros_like(x_samples)

print(np.shape(x_samples))

combined_model.fit(x=x_samples,y=x_true_irrelevant, epochs=5, batch_size=32)

print(combined_model.get_weights())
for model in combined_model.h_models:
    print(model.get_weights())
print(combined_model.g_model.get_weights())

(100000, 3)
Epoch 1/5
[1m3125/3125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 3ms/step - loss: 0.7979
Epoch 2/5
[1m3125/3125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 3ms/step - loss: -7.8120
Epoch 3/5
[1m3125/3125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 3ms/step - loss: -14.0619
Epoch 4/5
[1m3125/3125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 3ms/step - loss: -20.3134
Epoch 5/5
[1m3125/3125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 3ms/step - loss: -26.5660
[array([[1.]], dtype=float32), array([-0.00940881], dtype=float32), array([[21.075235]], dtype=float32), array([-15.627683], dtype=float32), array([[21.075235]], dtype=float32), array([-15.627683], dtype=float32)]
[array([[21.075235]], dtype=float32), array([-15.627683], dtype=float32)]
[array([[21.075235]], dtype=float32), array([-15.627683], dtype=float32)]


In [184]:
# Testdaten erstellen
x_test = tf.linspace(-10.0, 10.0, 500)
x_test = tf.reshape(x_test, (-1, 2 * dim))

# Vorhersagen mit dem kombinierten Modell
g_predictions, h_predictions = combined_model.predict(x_test)

# Visualization of g_predictions
plt.plot(x_test[:, 0], g_predictions, label='Approximierte Funktion g(x)')
plt.xlabel('x')
plt.ylabel('g(x)')
plt.legend()
plt.show()

# Visualization of h_i predictions
for i in range(dim):
    plt.plot(x_test[:, 0], h_predictions[i], label=f'h_{i}(x)')

plt.xlabel('x')
plt.ylabel('h_i(x)')
plt.legend()
plt.show()

ValueError: Exception encountered when calling Sequential.call().

[1mInput 0 of layer "dense_167" is incompatible with the layer: expected axis -1 of input shape to have value 1, but received input with shape (32, 0)[0m

Arguments received by Sequential.call():
  • inputs=tf.Tensor(shape=(32, 0), dtype=float32)
  • training=None
  • mask=None