<a href="https://colab.research.google.com/github/albim72/ML_ZAAWANSOWANY_11/blob/main/autoenkoder_wariacyjny_v3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install tensorflow-probability



In [2]:
!pip install imageio



In [3]:
!pip install git+https://github.com/tensorflow/docs

Collecting git+https://github.com/tensorflow/docs
  Cloning https://github.com/tensorflow/docs to /tmp/pip-req-build-4p1x9l73
  Running command git clone --filter=blob:none --quiet https://github.com/tensorflow/docs /tmp/pip-req-build-4p1x9l73
  Resolved https://github.com/tensorflow/docs to commit af33301a434ea70e104865b9d2e93e230494c1cb
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting astor (from tensorflow-docs==2023.10.27.81990)
  Downloading astor-0.8.1-py2.py3-none-any.whl (27 kB)
Building wheels for collected packages: tensorflow-docs
  Building wheel for tensorflow-docs (setup.py) ... [?25l[?25hdone
  Created wheel for tensorflow-docs: filename=tensorflow_docs-2023.10.27.81990-py3-none-any.whl size=184142 sha256=44240a88b6b1dc3139093c964008724293dc2fe24f022ac0c0080ab1aa4aac4d
  Stored in directory: /tmp/pip-ephem-wheel-cache-hv4biypi/wheels/86/0f/1e/3b62293c8ffd0fd5a49508e6871cdb7554abe9c62afd35ec53
Successfully built tensorflow-docs
Installing collected packag

In [4]:
from IPython import display
import glob
import imageio
import matplotlib.pyplot as plt
import numpy as np
import PIL
import tensorflow as tf
import tensorflow_probability as tfp
import time

In [5]:
#ładowanie zbioru danych
(train_images, _),(test_images, _) = tf.keras.datasets.mnist.load_data()

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


In [6]:
def preprocess_images(images):
  images = images.reshape((images.shape[0],28,28,1))/255.
  return np.where(images>.5,1.0,0.0).astype('float32')

train_images = preprocess_images(train_images)
test_images = preprocess_images(test_images)

In [7]:
train_size = 60000
batch_size = 32
test_size = 10000

In [8]:
#użyjemy tf.data do grupowania i tasowania danych
train_dataset = (tf.data.Dataset.from_tensor_slices(train_images).shuffle(train_size).batch(batch_size))
test_dataset = (tf.data.Dataset.from_tensor_slices(test_images).shuffle(test_size).batch(batch_size))

In [9]:
#Architektura sieci
class CVAE(tf.keras.Model):
  """Konwolucyjny enkoder wariacyjny"""
  def __init__(self,latent_dim):
    super(CVAE,self).__init__()
    self.latent_dim=latent_dim
    self.encoder=tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(input_shape=(28,28,1)),
            tf.keras.layers.Conv2D(filters=32, kernel_size=3,strides=(2,2),activation='relu'),
            tf.keras.layers.Conv2D(filters=64, kernel_size=3,strides=(2,2),activation='relu'),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(latent_dim + latent_dim),
        ]
    )

    self.decoder = tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(input_shape=(latent_dim,)),
            tf.keras.layers.Dense(units=7*7*32, activation=tf.nn.relu),
            tf.keras.layers.Reshape(target_shape=(7,7,32)),
            tf.keras.layers.Conv2DTranspose(filters=64,kernel_size=3, strides=2, padding='same', activation='relu'),
            tf.keras.layers.Conv2DTranspose(filters=32,kernel_size=3, strides=2, padding='same', activation='relu'),
            tf.keras.layers.Conv2DTranspose(filters=1,kernel_size=3, strides=1, padding='same'),
        ]
    )

  @tf.function
  def sample(self,eps=None):
    if eps is None:
      eps = tf.random.normal(shape=(100,self.latest_dim))
    return self.decode(eps, apply_sigmoid = True)

  def encode(self,x):
    mean,logvar = tf.split(self.encoder(x),num_or_size_splits=2,axis=1)
    return mean,logvar

  def reparametrize(self,mean,logvar):
    eps = tf.random.normal(shape=mean.shape)
    return eps*tf.exp(logvar*.5) + mean

  def decode(self,z,apply_sigmoid=False):
    logits = self.decode(z)
    if apply_sigmoid:
      probs = tf.sigmoid(logits)
      return probs
    return logits

In [10]:
optimizer = tf.keras.optimizers.Adam(1e-4)

def log_normal_pdf(sample,mean,logvar,raxis=1):
  log2pi = tf.math.log(2*np.pi)
  return tf.reduce_sum(
      -.5*((sample - mean)**2*tf.exp(-logvar)+log2pi),axis=raxis
  )

def compute_loss(model,x):
  mean,logvar = model.encode(x)
  z = model.reparametrize(mean,logvar)
  x_logit = model.decode(z)
  cross_en = tf.nn.sigmoid_cross_entropy_with_logits(logits=x_logit,labels=x)
  logpx_z = -tf.reduce_sum(cross_en,axis=[1,2,3])
  logpz = log_normal_pdf(z,0.,0.)
  logqz_x = log_normal_pdf(z,mean,logvar)
  return -tf.reduce_mean(logpx_z + logpz - logqz_x)

def train_step(model, x, optimizer):
  with tf.GradientTape() as tape:
    loss = compute_loss(model,x)
  gradients = tape.gradient(loss,model.trainable_variables)
  optimizer.apply_gradients(zip(gradients,model.trainable_variables))

Trening:
1. iteracja po zbiorze danych
2. podczas każdej iteracji przekazanie obrazu do kodera
3. reparametryzacja do próbki z q
4. ostatecznie reparametryzacja próbki do dekodera w celu uzyskania logitów rozkładu generatywnego


In [11]:
epochs = 10
latent_dim = 2
num_examples_togenerate = 16

random_vector_for_generation = tf.random.normal(shape=[num_examples_togenerate,latent_dim])

model = CVAE(latent_dim)

In [12]:
def generate_and_save_images(model,epoch,test_sample):
  mean,logvar = model.encode(test_sample)
  z = model.reparametrize(mean,logvar)
  predictions = model.sample(z)
  fig = plt.figure(figsize=(4,4))

  for i in range(predictions.shape[0]):
    plt.subplot(4,4,i+1)
    plt.imshow(predictions[i,:,:,0],cmap='gray')
    plt.axis('off')

  plt.savefig('image_at_epoch_{:04d}.png'.format(epoch))

In [13]:
assert batch_size >= num_examples_togenerate
for test_batch in test_dataset.take(1):
  test_sample = test_batch[0:num_examples_togenerate,:,:,:]


In [14]:
generate_and_save_images(model,0,test_sample)

for epoch in range(1,epochs+1):
  start_time = time.time()
  for train_x in train_dataset:
    train_step(model,train_x, optimizer)
  end_time = time.time()

  loss = tf.keras.metrics.Mean()
  for test_x in test_dataset:
    loss(compute_loss(model,test_x))
  elbo = -loss.result()
  display.clear_output(wait=False)
  print(f'Epoch: {epoch}, Test zbioru ELBO: {elbo}, czas wykonania aktualnej epoki: {end_time - start_time} s ')
  generate_and_save_images(model,epoch,test_sample)

StagingError: ignored