# Autoencoders and diffusion models

Autoencoders are a type of artificial neural network used for unsupervised learning tasks, primarily designed for data compression and feature extraction or
dimensionality reduction. They work by learning a compressed representation (encoding) of input data, which is then reconstructed (decoded) to resemble the 
original input. They can learn from dense representation of the input data (called __latent representations__ or __codings__). These type of models can 
also serve as generative models (which mean creating new data that looks similar to the fed input). GANs (__Generative Adverserial Networks__) are also a 
type of model capable of generating data; they can also be used for __super resolution__ which is increasing the resolution of an image, image editing, 
video generations and many other things. Recently diffusion models were also introduces for data generation but they are much slower than the other ones.  
For autoencoders they simply learn to copy their inputs into their outputs for data generation but we can limit the size of the data so that they don't 
simply do a direct copy of the inputs.  
For GANs, they are composed of 2 neural networks, one for data generation and one that try to tell if the generated data is fake or not. In this chapter we will start by exploring in more depth how autoencoders work and how to use them for dimensionality reduction, feature extraction, unsupervised pretraining, 
or as generative models. 

## Latent representation

The best way to make model like autoencoders learn patterns is to constraints their latent representations to avoid them just memorizing the inputs and
regurgitating them onto us. For that an autoencoder is always composed of an encoder(a recognition network) and a decoder(the generator) that convert the
internal representations into outputs. It has the roughly the same architecture as a MultiLayer Perceptron but it have to possess the same number of 
output neurons as number of inputs(the outputs are ofter called __reconstructions__). The cost function contains a reconstruction loss that penalizes the
model when the reconstructions are different from the inputs. Since the codings have a lower dimension than the inputs, we say that the autoencoder is
__undercomplete__. 

## Using PCA on an autoencoder for dimensionality reduction

If an autoencoder uses only linear activation functions and Mean Square Error as a loss function then it perform Principal Component Analysis. The following
code does just that:

In [None]:
import tensorflow as tf
import numpy as np

encoder = tf.keras.Sequential([
    tf.keras.layers.Dense(2) # we don't need any activation function
])
decoder = tf.keras.Sequential([
    tf.keras.layers.Dense(3)
])
autoencoder = tf.keras.Sequential([encoder, decoder])

optimizer = tf.keras.optimizers.SGD(learning_rate=0.5)
autoencoder.compile(loss="mse", optimizer=optimizer)

# Let's train it on a 3 dimensional dataset and reduce it dimension in 2d
num_samples = 100
X = np.random.rand(num_samples, 3)

history = autoencoder.fit(X, X, epochs=500, verbose=False)
codings = encoder.predict(X)


print(codings)

[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 34ms/step
[[ 0.16522342  0.32384145]
 [-0.23413214 -0.5091713 ]
 [ 0.40434515 -0.20902419]
 [ 0.06688297  0.27796385]
 [-0.14037907  0.17638358]
 [ 0.35200047  0.10714732]
 [ 0.0584873   0.32540306]
 [-0.36659646 -0.4886512 ]
 [ 0.10589898 -0.12928466]
 [ 0.04103708 -0.7495854 ]
 [-0.1117608  -0.59058994]
 [-0.6049117  -0.32423735]
 [-0.19678414 -0.3413568 ]
 [ 0.21883571  0.25637305]
 [ 0.3481742  -0.07300492]
 [-0.16636962 -0.73139524]
 [-0.06996274 -0.470938  ]
 [-0.11682546  0.24838498]
 [ 0.14649338  0.4748063 ]
 [ 0.21879315  0.08633684]
 [-0.27510154  0.0593375 ]
 [-0.37081712 -0.89605814]
 [ 0.33813936 -0.11352368]
 [ 0.28720796 -0.22949375]
 [ 0.38569105 -0.3845369 ]
 [-0.13480824 -0.13314119]
 [-0.5750909  -0.09549962]
 [ 0.31575763 -0.539759  ]
 [-0.5180564  -0.34529066]
 [-0.09816223 -0.10286307]
 [-0.10133952 -0.5949105 ]
 [-0.39297435  0.00253003]
 [ 0.38198626 -0.21721609]
 [-0.2501844  -0.6972898 ]
 [-0.19265

## Stacked autoencoders

Autoencoder like any other model can have multiple layers(in that case they are called __stacked autoencoders__). Adding more layers make them prone to
learn more complex codings but we have to be careful of it being too powerful then overfitting might take place. Such network is usually symmetrical around
the central hidden layer. Here is an implementation of a stacked autoencoder:

In [2]:
stacked_encoder = tf.keras.Sequential([
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(100, activation="relu"),
    tf.keras.layers.Dense(30, activation="relu")
])
stacked_decoder = tf.keras.Sequential([
    tf.keras.layers.Dense(100, activation="relu"),
    tf.keras.layers.Dense(28 * 28),
    tf.keras.layers.Reshape([28, 28])
])

stacked_ae = tf.keras.Sequential([stacked_encoder, stacked_decoder])
stacked_ae.compile(loss="mse", optimizer="nadam")
history = stacked_ae.fit(X, X, epochs=20, validation_data=(X[80:], X[80:]))

Epoch 1/20


ValueError: Dimensions must be equal, but are 3 and 28 for '{{node compile_loss/mse/sub}} = Sub[T=DT_FLOAT](data_1, sequential_5_1/sequential_4_1/reshape_1/Reshape)' with input shapes: [?,3], [?,28,28].

When an autoencoder is symmetrical one way to speed up training and reduce overfitting is to tie the weights of the decoder to the weights of the encoder
effectively halving the number of weights in the model. To do that we can create a custom layer:

In [None]:
class DenseTranspose(tf.keras.layers.Layer):
    def __init__(self, dense, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.dense = dense
        self.activation = tf.keras.activations.get(activation)

    def build(self, batch_input_shape):
        self.biases = self.add_weight(name="bias", shape=self.dense.input_shape[-1], initializer="zeros")
        super().build(batch_input_shape)

    def call(self, inputs):
        Z = tf.matmul(inputs, self.dense.weights[0], transpose_b=True)
        return self.activation(Z + self.biases)
    
# Now we can build an autoencoder with the weights of the decoder tied to the weights of the encoder
dense_1 = tf.keras.layers.Dense(100, activation="relu")
dense_2 = tf.keras.layers.Dense(30, activation="relu")

tied_encoder = tf.keras.Sequential([
    tf.keras.layers.Flatten(),
    dense_1,
    dense_2
])

tied_decoder = tf.keras.Sequential([
    DenseTranspose(dense_2, activation="relu"),
    DenseTranspose(dense_1),
    tf.keras.layers.Reshape([28, 28])
])

tied_ae = tf.keras.Sequential([tied_encoder, tied_decoder])

## Convolutional autoencoders

If we want to use autoencoders for images we can use convolutional autoencoder. Like a regular CNN the encoder is composed of convolutional layers and 
pooling layers. It oftens reduces the dimensions of the input(the width and the height of the images) but increase the number of feature maps. The decoder
should do the reverse(upscale the image and reduce its depth). For that we can use transpose convolutional layers in the decoder or a combination of 
upsampling layers with convolutional layers.

In [None]:
conv_encoder = tf.keras.Sequential([
    tf.keras.layers.Reshape([28, 28, 1]),
    tf.keras.layers.Conv2D(16, 3, padding="same", activation="relu"),
    tf.keras.layers.MaxPool2D(pool_size=2),
    tf.keras.layers.Conv2D(32, 3, padding="same", activation="relu"),
    tf.keras.layers.MaxPool2D(pool_size=2),
    tf.keras.layers.Conv2D(64, 3, padding="same", activation="relu"),
    tf.keras.layers.MaxPool2D(pool_size=2),
    tf.keras.layers.Conv2D(30, 3, padding="same", activation="relu"),
    tf.keras.layers.GlobalAvgPool2D()
])

conv_decoder = tf.keras.Sequential([
    tf.keras.layers.Dense(3 * 3 * 16),
    tf.keras.layers.Reshape((3, 3, 16)),
    tf.keras.layers.Conv2DTranspose(32, 3, strides=2, activation="relu"),
    tf.keras.layers.Conv2DTranspose(16, 3, strides=2, padding="same", activation="relu"),
    tf.keras.layers.Conv2DTranspose(1, 3, strides=2, padding="same"),
    tf.keras.layers.Reshape([28, 28])
])

conv_ae = tf.keras.Sequential([conv_encoder, conv_decoder])

Another way to force autoencoders to learn patterns is to add noise to images and ask it to denoise them. Here is an implementation of such autoencoder:

In [None]:
dropout_encoder = tf.keras.Sequential([
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dropout(0.5),
    tf.keras.layers.Dense(100, activation="relu"),
    tf.keras.layers.Dense(30, activation="relu")
])

dropout_decoder = tf.keras.Sequential([
    tf.keras.layers.Dense(100, activation="relu"),
    tf.keras.layers.Dense(28 * 28),
    tf.keras.layers.Reshape([28, 28])
])

dropout_ae = tf.keras.Sequential([dropout_encoder, dropout_decoder])

We can also constraint the model by adding an appropriate term to the cost function. The autoencoder will be pushed to reduce the number of active number
in the coding layer(this often leads to good feature extractions). This method is called __sparse autoencoding__.

In [None]:
sparse_l1_encoder = tf.keras.Sequential([
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(100, activation="relu"),
    tf.keras.layers.Dense(300, activation="sigmoid"),
    tf.keras.layers.ActivityRegularization(l1=1e-4)
])

sparse_l1_decoder = tf.keras.Sequential([
    tf.keras.layers.Dense(100, activation="relu"),
    tf.keras.layers.Dense(28 * 28),
    tf.keras.layers.Reshape([28, 28])
])

sparse_l1_ae = tf.keras.Sequential([sparse_l1_encoder, sparse_l1_decoder])

This ActivityRegularization layer just returns its inputs, but as a side effect it adds a training loss equal to the sum of the absolute values of its 
inputs. It is equivalent to add the _activity\_regularizer=tf.keras.regularizers.l1(1e-4)_ option to the previous layer. This will penalize the model if it
doesn't output codings close to 0 but because it will also be penalized if it doesn't reconstruct the inputs correctly, it will also output a few non zero
values.

## Variational Autoencoders

This is one of the most popular type of autoencoders is the variational autoencoders. They are quite different from the other autoencoders in the sense that
they are probabilistic(meaning that their outputs are partly determined by chance) and they are __generative autoencoders__(meaning that they can generate 
new instances that look like they were sampled from the training set). Variational autoencoder performs __variational Bayesian inference__ which is an
approximation of bayesian inference. Recall that Bayesian inference means updating a probability distribution based on new data, using equations derived 
from Bayes’ theorem. The original distribution is called the _prior_ while the updated one is called the _posterior_. Instead of directly producing the 
coding of a given input, it produces a mean coding $\mathbf{\mu}$ and a standard deviation $\mathbf{\sigma}$. And the actual coding is sampled randomly 
from a gaussian distribution of mean $\mathbf{\mu}$ and standard deviation $\mathbf{\sigma}$. The loss function is composed of 2 parts: the usual
reconstruction loss that push the autoencoder to generate instances close to the inputs and the second is the latent loss that pushes the autoencoder to 
have codings that look as if they were sampled from a simple Gaussian distribution. The latent loss can be computed using the following equation:
$$\mathbf{L} = -\frac{1}{2}\sum_{i=1}^{n}[1 + log(\sigma _i^2) - \sigma _i^2 - \mu _i^2] $$
Where:
- __n__ is the coding's dimensionality.
- $\mu _i$ is the mean of the coding of the $i^{th}$ instance in the dataset.
- $\sigma _i$ is the standard deviation of the $i^{th}$ instance.

A common variation to this equation is to make the encoder output $\gamma = log(\sigma ^2)$ to speed up training:
$$\mathbf{L} = -\frac{1}{2}\sum _{i=1}^{n}(1 + \gamma _i - \exp(\gamma _i) - \mu _i^2)$$
We are going to build a variational autoencoder with the fashion MNIST dataset but we first need a custom layer to sample a coding give the parameters.

In [7]:
class Sampling(tf.keras.layers.Layer):
    def call(self, inputs):
        mean, log_var = inputs
        return tf.random.normal(tf.shape(log_var)) * tf.exp(log_var / 2) + mean

Now we can create the model using the functional API.

In [None]:
import tensorflow as tf
import numpy as np

fashion_mnist = tf.keras.datasets.fashion_mnist.load_data()
(X_train_full, y_train_full), (X_test, y_test) = fashion_mnist
X_train_full = X_train_full.astype(np.float32) / 255
X_test = X_test.astype(np.float32) / 255
X_train, X_valid = X_train_full[:-5000], X_train_full[-5000:]
y_train, y_valid = y_train_full[:-5000], y_train_full[-5000:]

In [None]:
codings_size = 10

inputs = tf.keras.layers.Input(shape=[28, 28])
Z = tf.keras.layers.Flatten()(inputs)
Z = tf.keras.layers.Dense(150, activation="relu")(Z)
Z = tf.keras.layers.Dense(100, activation="relu")(Z)
codings_mean = tf.keras.layers.Dense(codings_size)(Z)
codings_log_var = tf.keras.layers.Dense(codings_size)(Z)
codings = Sampling()([codings_mean, codings_log_var])
variational_encoder = tf.keras.Model(inputs=[inputs], outputs=[codings_mean, codings_log_var, codings])

# The decoder is a regular decoder
decoder_input = tf.keras.layers.Input(shape=[codings_size])
x = tf.keras.layers.Dense(100, activation="relu")(decoder_input)
x = tf.keras.layers.Dense(150, activation="relu")(x)
x = tf.keras.layers.Dense(28 * 28)(x)
outputs = tf.keras.layers.Reshape([28, 28])(x)
variational_decoder= tf.keras.Model(inputs=[decoder_input], outputs=[outputs])

# Now the autoencoder
_, _, codings = variational_encoder(inputs)
reconstructions = variational_decoder(codings)
variational_ae = tf.keras.Model(inputs=[inputs], outputs=[reconstructions])

# We need to add the latent and reconstruction loss
latent_loss = -0.5 * tf.reduce_sum(1 + codings_log_var - tf.exp(codings_log_var) - tf.square(codings_mean), axis=-1)
variational_ae.add_loss(tf.reduce_mean(latent_loss) / 784.)

variational_ae.compile(loss="mse", optimizer="nadam")
history = variational_ae.fit(X_train, X_train, epochs=25, batch_size=128, validation_data=(X_valid, X_valid))

# and now we can try generating fashion images
codings = tf.random.normal(shape=[3 * 7, codings_size])
images = variational_decoder(inputs).numpy()

## Generative Adversarial Networks(GANs)

The concept of GANs is to train 2 neural network to compete against each other hoping that it will make them excel. The 2 model are a __generator__: it 
takes a random distribution as inputs and output some data, and a __discriminator__ that try to tell if the generated data is fake or not. Knowing that we
cannot train a GAN like a regular neural network. First we train the discriminator by giving it as inputs a set of real inputs and an equal number of fake
inputs generated from the generator(the labels of the real data is set to 1 and 0 for the fake ones). After that we start training the generator to generate
other fake data while still using the discriminator to try to make distinction between the real data and the fake data(the weights of the discriminator are
frozen during this step so that backpropagation only affect the generator). We are going to try to implement it on the fashion mnist dataset.

In [None]:
codings_size = 30

dense = tf.keras.layers.Dense
generator = tf.keras.Sequential([
    dense(100, activation="relu", kernel_initializer="he_normal"),
    dense(150, activation="relu", kernel_initializer="he_normal"),
    dense(28*28, activation="sigmoid"),
    tf.keras.layers.Reshape([28, 28])
])

discriminator = tf.keras.Sequential([
    tf.keras.layers.Flatten(),
    dense(150, activation="relu", kernel_initializer="he_normal"),
    dense(100, activation="relu", kernel_initializer="he_normal"),
    dense(1, activation="sigmoid")
])

gan = tf.keras.Sequential([generator, discriminator])

# Now we compile the GAN
discriminator.compile(loss="binary_crossentropy", optimizer="rmsprop")
discriminator.trainable = False
gan.compile(loss="binary_crossentropy", optimizer="rmsprop")

batch_size= 32
dataset = tf.data.Dataset.from_tensor_slices(X_train).shuffle(buffer_size=1000)
dataset = dataset.batch(batch_size, drop_remainder=True).prefetch(1)

# Since the training steps are quite unusual we need to build our own training loop
def train_gan(gan, dataset, batch_size, codings_size, n_epochs):
    generator, discriminator = gan.layers
    for epoch in range(n_epochs):
        for X_batch in dataset:
            noise = tf.random.normal(shape=[batch_size, codings_size])
            generated_imgs = generator(noise)
            X_fake_and_real = tf.concat([generated_imgs, X_batch], axis=0)
            y1 = tf.constant([[0.]] * batch_size + [[1.]] * batch_size)
            discriminator.train_on_batch(X_fake_and_real, y1)
            noise = tf.random.normal(shape=[batch_size, codings_size])
            y2 = tf.constant([[1.]] * batch_size)
            gan.train_on_batch(noise, y2)


train_gan(gan, dataset, batch_size, codings_size, n_epochs=50)

And now we can randomly sample a coding from a gaussian distribution and ask the generator to create new images from it.

In [None]:
codings = tf.random.normal(shape=[batch_size, codings_size])
generated_imgs = generator.predict(codings)

During training the GANs may reaches a state called __Nash equilibrium__ where both the generator and the discriminator wouldn't do better if they change 
strategy assuming the other one doesn't change his. Meaning when the generator produce perfectly realistic images and the discriminator is force to guess
50% true and 50% fake. Reaching the equilibrium is the goal but the difficulty is called __mode collapse__. It is when the outputs of the generator become
less diverse(suppose the generator becomes good at generating a certain type of images like shoes it will fool the discriminator which in turn will 
encourage the generator to generate more image of shoes), it will eventually become less viable at generating al the other types of images. A popular 
technique called __experience replay__ consists of storing the images produced by the generator at each iteration in a replay buffer (gradually dropping 
older generated images) and training the discriminator using real images plus fake images drawn from this buffer (rather than just fake images produced by 
the current generator). This reduces the chances that the discriminator will overfit the latest generator’s outputs. Another common technique is called __mini-batch discrimination__: it measures how similar images are across the batch and provides this statistic to the discriminator, so it can easily 
reject a whole batch of fake images that lack diversity. This encourages the generator to produce a greater variety of images, reducing the chance of mode 
collapse.

## Deep convolutional GANs

Researchers have come up for a way to build stable deep convolutional GANs:
- Replace any pooling layers by strided convolutions in the discriminator and with transposed convolutions in the generator.
- Use batch normalization in both the discriminator and the generator except in the output layer of the generator and the input layer of the discriminator.
- Remove fully connected hidden layers
- Use ReLU activation in all the layers in the generator except in the output layer which should use tanh.
- Use LeakyReLU activation for all the layers in the discriminator.  

Here is an example on the fashion MNIST:

In [None]:
codings_size = 100

generator = tf.keras.Sequential([
    tf.keras.layers.Dense(7 * 7 * 128),
    tf.keras.layers.Reshape([7, 7, 128]),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Conv2DTranpose(64, kernel_size=5, strides=2, padding="same", activation="tanh"),
])

discriminator = tf.keras.Sequential([
    tf.keras.layers.Conv2D(64, kernel_size=5, strides=2, padding="same", activation=tf.keras.layers.LeakyReLU(0.2)),
    tf.keras.layers.Dropout(0.4),
    tf.keras.layers.Conv2D(128, kernel_size=5, strides=2, padding="same", activation=tf.keras.layers.LeakyReLU(0.2)),
    tf.keras.layers.Dropout(0.4),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(1, activation="sigmoid")
])

deep_gan = tf.keras.Sequential([generator, discriminator])

X_train_dcgan = X_train.reshape(-1, 28, 28, 1) * 2. - 1.

This model would produce fairly good images though but to produce larger images researchers suggests producing small images at the beggining of the 
generator and upsample the image progressively by adding more layers at the end of the generator and at the beggining of the discriminator. This technique
is called __Progressive GANs__.

## Diffusion models

Researchers found a way to generate very convincing images using diffusion models, their core method is called 
__denoising diffusion probabilistic model(DDPM)__. They are much easier to train than GANs and the images it outputs are more diverse. DDPM works as follows
,suppose you have an image of a cat noted $\mathbf{x_0}$ and at each time step _t_ we add a bit of gaussian noise to it with mean 0 and variance $\beta _t$
(this noise is called __isotropic__) we obtain the image $x_1$, $x_2$ ... until the cat is completely recovered by the noise(the last time step is noted T).
This process is the _forward process_. The forward process is summarize in the following equation:
$$q(\mathbf{x}_t, \mathbf{x}_{t-1}) = \mathbf{N}(\sqrt{1 - \beta _t}x_{t-1}, \beta _t \mathbf{I}) $$

Note that every pixel is rescaled at each time step by a value of $\sqrt{1 - \beta _t}$ to ensure that the mean of the pixel values gradually approaches 0, 
since the scaling factor is a bit smaller than 1. We can shorten this equation, it exists a way to compute an image $x_t$ given $x_0$ without having to
compute $x_1$, $x_2$ ...etc:
$$q(\mathbf{x}_t, \mathbf{x}_0) = \mathbf{N}(\sqrt{\alpha _t}x_0, (1 - \alpha _t)\mathbf{I}) $$

Now the goal of our model is to be able to reverse the process. The first thing we need to do is to code the forward process. For this, we will first need 
to implement the variance schedule. How can we control how fast the cat disappears? Initially, 100% of the variance comes from the original cat image. Then 
at each time step t, the variance gets multiplied by 1 – $\beta _t$ , as explained earlier, and noise gets added. So, the part of the variance that comes 
from the initial distribution shrinks by a factor of 1 – $\beta _t$ at each step. If we define $\alpha$ = 1 – $\beta$, then after t time steps, the cat
signal will have been multiplied by a factor of $\bar{\alpha}_t$ = $\alpha _t$. It’s this “cat signal” factor $\bar{\alpha}$ that we want to schedule so it 
shrinks down from 1 to 0 gradually between time steps 0 and T. 

In [None]:
def variance_schedule(T, s=0.008, max_beta=0.999):
    t = np.arange(T + 1)
    f = np.cos((t / T + s) / (1 + s) * np.pi / 2)**2
    alpha = np.clip(f[1:] / f[:-1], 1 - max_beta, 1)
    alpha = np.append(alpha, 1).astype(np.float32)
    beta = 1 - alpha
    alpha_cumprod = np.cumprod(alpha)
    return alpha, alpha_cumprod, beta

T = 4000
alpha, alpha_cumprod, beta = variance_schedule(T)
# This function will will take a batch of clean images from the dataset and prepare them
def prepare_batch(X):
    X = tf.cast(X[..., tf.newaxis], tf.float32) * 2 - 1
    X_shape = tf.shape(X)
    t = tf.random.uniform([X_shape[0]], minval=1, maxval=T+1, dtype=tf.int32)
    alpha_cm = tf.gather(alpha_cumprod, t)
    alpha_cm = tf.reshape(alpha_cm, [X_shape[0]] + [1] * (len(X_shape) - 1))
    noise = tf.random.normal(X_shape)
    return {"X_noisy": alpha_cm ** 0.5 * X + (1 - alpha_cm) ** 0.5 * noise, "time": t}, noise

def prepare_dataset(X, batch_size=32, shuffle=False):
    ds = tf.data.Dataset.from_tensor_slices(X)
    if shuffle:
        ds = ds.shuffle(buffer_size=10_000)
    return ds.batch(batch_size).map(prepare_batch).prefetch(1)

class TimeEncoding(tf.keras.layers.Layer):
    def __init__(self, T, embed_size, dtype=tf.float32, **kwargs):
        super().__init__(dtype=dtype, **kwargs)
        assert embed_size % 2 == 0, "embed_size must be even"
        p, i = np.meshgrid(np.arange(T + 1), 2 * np.arange(embed_size // 2))
        t_emb = np.empty((T + 1, embed_size))
        t_emb[:, ::2] = np.sin(p / 10_000 ** (i / embed_size)).T
        t_emb[:, 1::2] = np.cos(p / 10_000 ** (i / embed_size)).T
        self.time_encodings = tf.constant(t_emb.astype(self.dtype))

    def call(self, inputs):
        return tf.gather(self.time_encodings, inputs)


train_set = prepare_dataset(X_train, batch_size=32, shuffle=True)
valid_set = prepare_dataset(X_valid, batch_size=32)

# Now time to build the diffusion model
def build_diffusion_model():
    X_noisy = tf.keras.layers.Input(shape=[28, 28, 1], name="X_noisy")
    time_input = tf.keras.layers.Input(shape=[], dtype=tf.int32, name="time")
    time_enc = TimeEncoding(T, embed_size)(time_input)

    dim = 16
    Z = tf.keras.layers.ZeroPadding2D((3, 3))(X_noisy)
    Z = tf.keras.layers.Conv2D(dim, 3)(Z)
    Z = tf.keras.layers.BatchNormalization()(Z)
    Z = tf.keras.layers.Activation("relu")(Z)

    time = tf.keras.layers.Dense(dim)(time_enc)  # adapt time encoding
    Z = time[:, tf.newaxis, tf.newaxis, :] + Z  # add time data to every pixel

    skip = Z
    cross_skips = []  # skip connections across the down & up parts of the UNet

    for dim in (32, 64, 128):
        Z = tf.keras.layers.Activation("relu")(Z)
        Z = tf.keras.layers.SeparableConv2D(dim, 3, padding="same")(Z)
        Z = tf.keras.layers.BatchNormalization()(Z)

        Z = tf.keras.layers.Activation("relu")(Z)
        Z = tf.keras.layers.SeparableConv2D(dim, 3, padding="same")(Z)
        Z = tf.keras.layers.BatchNormalization()(Z)

        cross_skips.append(Z)
        Z = tf.keras.layers.MaxPooling2D(3, strides=2, padding="same")(Z)
        skip_link = tf.keras.layers.Conv2D(dim, 1, strides=2,
                                           padding="same")(skip)
        Z = tf.keras.layers.add([Z, skip_link])

        time = tf.keras.layers.Dense(dim)(time_enc)
        Z = time[:, tf.newaxis, tf.newaxis, :] + Z
        skip = Z

    for dim in (64, 32, 16):
        Z = tf.keras.layers.Activation("relu")(Z)
        Z = tf.keras.layers.Conv2DTranspose(dim, 3, padding="same")(Z)
        Z = tf.keras.layers.BatchNormalization()(Z)

        Z = tf.keras.layers.Activation("relu")(Z)
        Z = tf.keras.layers.Conv2DTranspose(dim, 3, padding="same")(Z)
        Z = tf.keras.layers.BatchNormalization()(Z)

        Z = tf.keras.layers.UpSampling2D(2)(Z)

        skip_link = tf.keras.layers.UpSampling2D(2)(skip)
        skip_link = tf.keras.layers.Conv2D(dim, 1, padding="same")(skip_link)
        Z = tf.keras.layers.add([Z, skip_link])

        time = tf.keras.layers.Dense(dim)(time_enc)
        Z = time[:, tf.newaxis, tf.newaxis, :] + Z
        Z = tf.keras.layers.concatenate([Z, cross_skips.pop()], axis=-1)
        skip = Z

    outputs = tf.keras.layers.Conv2D(1, 3, padding="same")(Z)[:, 2:-2, 2:-2]
    return tf.keras.Model(inputs=[X_noisy, time_input], outputs=[outputs])

# And finnaly a function to generate the images
def generate(model, batch_size=32):
    X = tf.random.normal([batch_size, 28, 28, 1])
    for t in range(T, 0, -1):
        noise = (tf.random.normal if t > 1 else tf.zeros)(tf.shape(X))
        X_noise = model({"X_noisy": X, "time": tf.constant([t] * batch_size)})
        X = (1 / alpha[t] ** 0.5 * (X - beta[t] / (1 - alpha_cumprod[t]) ** 0.5 * X_noise) + (1 - alpha[t]) ** 0.5 * noise)
    return X
