In [1]:
import numpy as np
import tensorflow as tf
import tensorflow_compression as tfc
import tensorflow_probability as tfp

from time import time
from tqdm.notebook import tqdm

import os
import h5py

from BalleFFP import BalleFFP
from read_data import read_data_numpy

import constants

2023-03-04 22:49:03.765515: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F AVX512_VNNI FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-03-04 22:49:03.900707: I tensorflow/core/util/port.cc:104] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2023-03-04 22:49:04.449033: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: :/home/ubuntu/data/miniconda3/envs/tf2/lib/
2023-03-04 22:49:04.449120: W tensorflow/co

## Functions

In [2]:
def gpu_settings() -> tf.distribute.MirroredStrategy:
    gpus = tf.config.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    strategy = tf.distribute.MirroredStrategy(['/gpu:0', '/gpu:1']) # gpu distribution strategy
    return strategy

def read_data(ch_format='channels_first') -> np.ndarray:
    data_path = os.path.join(constants.DATA_FOLDER, constants.DATA_FILE)
    data      = read_data_numpy(data_path, ch_format)
    return data

def split_train_test(data: np.ndarray):
    train_images = data[:constants.TRAINING_SET_SIZE]
    test_images  = data[constants.TRAINING_SET_SIZE:constants.TRAINING_SET_SIZE+constants.VALIDATION_SET_SIZE]
    return train_images, test_images

## Read Data

In [3]:
strategy  = gpu_settings()

print('Number of devices: {}'.format(strategy.num_replicas_in_sync))

print('Reading data...')
ch_format = 'channels_first'
data      = read_data(ch_format)
data      = data.astype('float32') # / 255.0
print('Data shape: {}'.format(data.shape))

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')


2023-03-04 22:49:17.172045: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:981] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2023-03-04 22:49:17.173487: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:981] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2023-03-04 22:49:17.182052: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:981] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2023-03-04 22:49:17.183310: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:981] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2023-03-04 22:49:17.184588: I tensorflow/compiler/xla/stream_executo

Number of devices: 2
Reading data...
Data shape: (100000, 3, 96, 96)


## Training/Test distribution

In [4]:
train_images, test_images = split_train_test(data)
del data

buffer_size       = len(train_images) # buffer size for shuffling
global_batch_size = constants.BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync # global batch size (in our case 2gpu * BATCH_SIZE_PER_REPLICA)

train_dataset = tf.data.Dataset.from_tensor_slices(train_images).shuffle(buffer_size).batch(global_batch_size)
test_dataset  = tf.data.Dataset.from_tensor_slices(test_images).batch(global_batch_size)

print('Distributing data...')
train_dataset_dist = strategy.experimental_distribute_dataset(train_dataset)
test_dataset_dist  = strategy.experimental_distribute_dataset(test_dataset)

del train_dataset, test_dataset

Distributing data...


2023-03-04 22:49:51.037455: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_1"
op: "TensorSliceDataset"
input: "Placeholder/_0"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 80000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 3
        }
        dim {
          size: 96
        }
        dim {
          size: 96
        }
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT


## Model

In [5]:
with strategy.scope():
    vae       = BalleFFP(N=128, M=192, k2=3, c=3, format=ch_format)
    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4) 

## Training/Test steps

In [6]:
with strategy.scope():
    
    @tf.function
    def Loss(inputs, outputs):
        return tf.reduce_mean(tf.square(inputs - outputs[0])) + 0.5*(tf.reduce_mean(outputs[1]))

    @tf.function # compile the function to a graph for faster execution
    def train_step(inputs, vae):
        with tf.GradientTape() as tape: # create a tape to record operations
            reconstructed, rateb = vae(inputs) # forward pass
            loss = Loss(inputs, (reconstructed, rateb)) # MSE loss (maybe put this in a function)
        gradients = tape.gradient(loss, vae.trainable_variables) # compute gradients    
        optimizer.apply_gradients(zip(gradients, vae.trainable_variables)) # gradient descent
        return loss # return loss for logging
    
    @tf.function
    def val_step(inputs, vae):
        outputs = vae(inputs, training=False) # forward pass
        loss    = Loss(inputs, outputs)       
        return loss 


def train_step_dist(inputs, vae, strategy):
    loss = strategy.run(train_step, args=(inputs, vae))
    return strategy.reduce(tf.distribute.ReduceOp.SUM, loss, axis=None)

def val_step_dist(inputs, vae, strategy):
    loss = strategy.run(val_step, args=(inputs, vae))
    return strategy.reduce(tf.distribute.ReduceOp.SUM, loss, axis=None)

## Training loop

In [7]:
training_losses = []
test_losses     = []

print('Training started...')

for epoch in range(constants.EPOCHS):
    total_loss  = 0.0 
    num_batches = 0 
    for inputs in tqdm(train_dataset_dist, 'training steps'): 
        total_loss += train_step_dist(inputs, vae, strategy) # type: ignore # sum losses across replicas (type: ignore is for mypy
        num_batches += 1 # count number of batches
        
    train_loss = total_loss / num_batches # compute average loss
    training_losses.append(train_loss)
    
    print('Epoch {} train loss: {}'.format(epoch, train_loss))

    total_loss  = 0.0
    num_batches = 0
    for inputs in tqdm(test_dataset_dist, 'validation steps'): 
        total_loss  += val_step_dist(inputs, vae, strategy) # type: ignore # sum losses across replicas
        num_batches += 1 # count number of batches
        
    test_loss = total_loss / num_batches # compute average loss
    test_losses.append(test_loss)
    
    print('Epoch {} test loss: {}'.format(epoch, test_loss))
    
print('Training finished!')

Training started...


training steps: 0it [00:00, ?it/s]

Instructions for updating:
Lambda fuctions will be no more assumed to be used in the statement where they are used, or at least in the same block. https://github.com/tensorflow/tensorflow/issues/56089
INFO:tensorflow:batch_all_reduce: 36 all-reduces with algorithm = nccl, num_packs = 1
INFO:tensorflow:batch_all_reduce: 36 all-reduces with algorithm = nccl, num_packs = 1


2023-03-04 22:50:12.468987: I tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:428] Loaded cuDNN version 8100
2023-03-04 22:50:13.129854: I tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:428] Loaded cuDNN version 8100
2023-03-04 22:50:14.784216: I tensorflow/compiler/xla/service/service.cc:173] XLA service 0x7f5370975970 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
2023-03-04 22:50:14.784257: I tensorflow/compiler/xla/service/service.cc:181]   StreamExecutor device (0): Tesla T4, Compute Capability 7.5
2023-03-04 22:50:14.784262: I tensorflow/compiler/xla/service/service.cc:181]   StreamExecutor device (1): Tesla T4, Compute Capability 7.5
2023-03-04 22:50:14.790711: I tensorflow/compiler/mlir/tensorflow/utils/dump_mlir_util.cc:268] disabling MLIR crash reproducer, set env var `MLIR_CRASH_REPRODUCER_DIRECTORY` to enable.
2023-03-04 22:50:14.917299: I tensorflow/compiler/jit/xla_compilation_cache.cc:477] Compiled cluster using

INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Redu

KeyboardInterrupt: 

In [None]:
print('Saving model and losses...')
current_time = time()

# save model
model_name = f"model_{current_time}.h5"
model_path = constants.MODEL_FOLDER + model_name
vae.save_weights(model_path)

# save losses in .h5 file
losses_name = f"losses_{current_time}.h5"
losses_path = constants.MODEL_FOLDER + losses_name
with h5py.File(losses_path, 'w') as f:
    f.create_dataset('train', data=training_losses)
    f.create_dataset('test', data=test_losses)