In [None]:
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
from tensorflow_probability import distributions as tfd
from tensorflow.keras import layers as kl

In [None]:
# simplest example - gaussian distribution
distr_layer = tfp.layers.DistributionLambda(lambda t: tfd.Normal(loc = t[... , 0], scale = 1))
# zde se tfd.Normal nahradi tfd.NegativeBinomial, u ktereho chci ale odhadovat 2 parametry

In [None]:
# simplest model - linear regression
model = tf.keras.models.Sequential([
    kl.Dense(units = 1, use_bias = False),
    distr_layer
])
negloglik = lambda x, rv_x: -rv_x.log_prob(x)
model.compile(optimizer='adam', loss=negloglik)
#model.compile(optimizer='adam', loss='mean_squared_error')

In [None]:
N = 100000 # sample size
coefs = np.array([[1, 2, 3]]).T # true linear regression (= dense layer's) coeffs, 3 regressors
tf.random.set_seed(1)
X = tfd.Normal(1, 1).sample(sample_shape = [N, 3]) # random regression matrix (inputs)
# zde coeffs zmenime na 3 x 2 matici

In [None]:
true_distr_params = X @ coefs # means of the respective normal distributions (Y)
true_distr = distr_layer(true_distr_params)
tf.random.set_seed(1)
Y = true_distr.sample() # sample some targets

In [None]:
np.mean(negloglik(Y, true_distr)) # sample loss with true model

In [None]:
np.mean(negloglik(Y, model(X))) # sample loss with randomly initialized model

In [None]:
model.weights # initial weights

In [None]:
# fit the model (surprisingly does not work!!!)  
# it seems there is some quadratic prior on model parameters and we are not able to turn it off
model.fit(x = X, y = Y, epochs = 100, batch_size=128) 

In [None]:
model.weights # not a very good output

In [None]:
model.weights[0] * model.weights[1]

In [None]:
# loss for "manual" optimization
def get_loss():
    return tf.reduce_mean(negloglik(Y, model(X)))

In [None]:
optimizer = tf.optimizers.Adam() # the same optimizer as above

In [None]:
# manually apply optimizer's gradient step to the model weights
@tf.function
def train_step():
    with tf.GradientTape() as tape:
        loss = get_loss()

    grads = tape.gradient(loss, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    return loss

In [None]:
# train as long as you wish :-)
for i in range(1000):
    loss = train_step()
    if i % 100 == 0:
        print(loss)

In [None]:
model.trainable_weights # correct weights