## Prepare environment

In [None]:
import os

ORIGINAL_DIRECTORY = None

def prepare_env():
    global ORIGINAL_DIRECTORY
    if ORIGINAL_DIRECTORY is None:
        ORIGINAL_DIRECTORY = os.getcwd()
    %cd {ORIGINAL_DIRECTORY}
    running_locally = os.path.isfile('./python-code')
    if not running_locally:
        if not os.path.isdir('./voxel-nn'):
            !git clone -b dev https://github.com/Sotakebk/voxel-nn.git
        %cd ./voxel-nn/Python
        !git pull
    
    %pip install -r ./requirements.txt
    %load_ext autoreload
    %autoreload 2
prepare_env()

In [None]:
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'  # or any {'0', '1', '2'}

import tensorflow as tf
from voxelnn import tensorflow_tools as tft
import os

if tft.maybe_use_tpu() is False:
    gpu = tf.config.list_physical_devices('GPU')
    print(gpu)

## Prepare dataset for further operations

In [None]:
from voxelnn import dataset_manipulation as dm

shared_model_path = 'simple-terrain-2d-16x16'

files_to_load = [
    'simple-terrain-2d-16x16-dataset.json'
]

files_to_load = [f'../Dataset/{filename}' for filename in files_to_load]

tag_names, block_names, entry_names, tags_multilabel, blocks = dm.load_dataset(files_to_load)
print(f'Blocks: {block_names} ({len(block_names)})')
print(f'Tags: {tag_names} ({len(tag_names)})')
print(f'Data shape: {blocks.shape}')

In [None]:
import seaborn as sns

tag_cooccurence_matrix = dm.cooccurence_matrix(tags_multilabel)
_ = sns.heatmap(data=tag_cooccurence_matrix, vmin=0, annot=True, xticklabels=tag_names, yticklabels=tag_names)

In [None]:
import numpy as np
import seaborn as sns

unique, counts = np.unique(blocks, return_counts=True)

ax = sns.barplot(x=[block_names[u] for u in unique], y=counts)
_ = ax.set_title("Occurence of block types in the dataset.")

## Shared variables

In [None]:
real_dims = len(blocks.shape)-1 # 2D or 3D
block_type_count = len(block_names) # number of separate classes
latent_dims = int(block_type_count**0.5) # how many dimensions to encode the block types in
print(f'real dimensions in data: {real_dims}')
print(f'block types in data: {block_type_count}')
print(f'latent dimensions to encode blocks in: {latent_dims}')

## Encoders

In [None]:
import tensorflow as tf
import keras
from keras import layers
from voxelnn.coding.custom_layers import OneHot, Sampling

is3d = real_dims == 3

def _conv(filters: int, kernel_size=1, activation='elu', padding='same', kernel_initializer='glorot_normal', **kwargs):
    def apply(x):
        if is3d:
            return layers.Conv3D(filters=filters, kernel_size=kernel_size, activation=activation, padding=padding, kernel_initializer=kernel_initializer, **kwargs)(x)
        else:
            return layers.Conv2D(filters=filters, kernel_size=kernel_size, activation=activation, padding=padding, kernel_initializer=kernel_initializer, **kwargs)(x)
    return apply

def _encoder_input():
    def apply():
        if is3d:
            return layers.Input(shape=(None, None, None), dtype='uint8', name='encoder_input')
        else:
            return layers.Input(shape=(None, None), dtype='uint8', name='encoder_input')
    return apply()

def _decoder_input(latent_dimensions):
    def apply():
        if is3d:
            return layers.Input(shape=(None, None, None, latent_dimensions), name='decoder_input')
        else:
            return layers.Input(shape=(None, None, latent_dimensions), name='decoder_input')
    return apply()

def create_encoder(block_type_count, latent_dimensions):
    input = _encoder_input()
    oh = OneHot(block_type_count)(input)
    c1  = _conv(filters=40, kernel_size=1, padding='same', activation='elu', kernel_initializer='glorot_normal')(oh)
    c2  = _conv(filters=30, kernel_size=1, padding='same', activation='elu', kernel_initializer='glorot_normal')(c1)
    c3  = _conv(filters=20, kernel_size=1, padding='same', activation='elu', kernel_initializer='glorot_normal')(c2)
    mean = _conv(filters=latent_dimensions, kernel_size=1, padding='same', activation='linear', name='mean')(c2)
    var = _conv(filters=latent_dimensions, kernel_size=1, padding='same', activation='linear', kernel_initializer='zeros', name='var')(c3)
    z = Sampling(name='encoder_output')([mean, var])
    encoder = keras.Model(input, [mean, var, z], name="encoder")
    return encoder

def create_decoder(block_type_count, latent_dimensions):
    input = _decoder_input(latent_dimensions)
    c1 = _conv(filters=20, kernel_size=1, padding='same', activation='elu')(input)
    c2 = _conv(filters=30, kernel_size=1, padding='same', activation='elu')(c1)
    c3 = _conv(filters=40, kernel_size=1, padding='same', activation='elu')(c2)
    c4 = _conv(block_type_count, kernel_size=1, padding='same', activation='linear', name='decoder_output')(c3)
    sm = layers.Softmax(axis=(4 if is3d else 3))(c4)
    decoder = keras.Model(input, sm, name="decoder")
    return decoder

In [None]:
from voxelnn.coding import SAE, VAE
from voxelnn import filter_kernels as fk
from voxelnn import jupyter_tools as jt

e = create_encoder(block_type_count, latent_dims)
d = create_decoder(block_type_count, latent_dims)
e.compile()
d.compile()
vae = SAE.SAETrainer(e, d,
                     filter_array=fk.create_kernel(3, real_dims, latent_dims),
                     kld_loss_weight=0.1,
                     str_loss_weight=0.1
                     )
vae.compile()

In [None]:
split = int(0.9 * blocks.shape[0])
print(f'training on {split} entries')

history = vae.fit(blocks[:split,...], epochs=100, validation_data=(blocks[split:,...], None))

jt.plot_history(history)

In [None]:
import matplotlib.pyplot as plt
import numpy as np

def display_some_encoded_samples(with_sampling: bool):
  rows = 2
  columns = 5
  size_per_unit = 4
  z_mean, _, z = e.predict(blocks[(rows*columns):,...])
  data = z_mean if with_sampling else z
  new_array = np.zeros((*data.shape[:3], 3))
  new_array[:, :, :, :2] = data
  min_val = np.min(new_array)
  max_val = np.max(new_array)
  print((min_val, max_val))
  normalized_array = (new_array - min_val) / (max_val - min_val)
  print(f'min, max: {(min_val, max_val)}')
  f, axarr = plt.subplots(rows, columns, sharex=True, sharey=True, figsize=(columns * size_per_unit, rows * size_per_unit)) 
  for i in range(rows*columns):
    axarr[i//columns, i%columns].imshow(normalized_array[i])
    axarr[i//columns, i%columns].axis('off')

In [None]:
if real_dims == 2:
  display_some_encoded_samples(with_sampling=True)

In [None]:
if real_dims == 2:
  display_some_encoded_samples(with_sampling=False)

In [None]:
z_mean, _, z = e.predict(blocks)

In [None]:
normalization = keras.layers.Normalization(axis=-1)
normalization.adapt(z_mean)
print(f'mean: {normalization.mean}')
print(f'variance: {normalization.variance}')

norm_z = normalization(z, training=False)

denorm_z = normalization.mean + norm_z * normalization.variance ** 0.5

data = norm_z
new_array = np.zeros((*data.shape[:3], 3))
new_array[:, :, :, :2] = data
min_val = np.min(new_array)
max_val = np.max(new_array)
print((min_val, max_val))
normalized_array = (new_array - min_val) / (max_val - min_val)
new_array[:,:,:,[2]] = 0
f, axarr = plt.subplots(10, 5, sharex=True, sharey=True, figsize=(20, 40)) 
for i in range(50):
  axarr[i//5, i%5].imshow(normalized_array[i])
  axarr[i//5, i%5].axis('off')

In [None]:
from voxelnn import jupyter_tools as jt
jt.save_model(shared_model_path, "encoder.keras", e)
jt.save_model(shared_model_path, "decoder.keras", d)

In [None]:
from voxelnn import jupyter_tools as jt

e = jt.load_model(shared_model_path, "encoder.keras")
d = jt.load_model(shared_model_path, "decoder.keras")

## Diffusion

In [None]:
from voxelnn.diffusion import unet_builder

layer_base_units = 32

unet = unet_builder.build_model(input_shape=blocks.shape[1:],
                          latent_dims=latent_dims,
                          layer_units=[a * layer_base_units for a in [1, 2, 3, 4]],
                          layer_attn_heads=[0, 1, 2, 2],
                          layer_res_blocks=[1, 1, 1, 1],
                          norm_groups=4,
                          first_conv_channels=48,
                          time_embedding_dims=32,
                          max_time_emb_frequency=200.0)
unet.compile()

In [None]:
from voxelnn.diffusion.diffusion_model import DiffusionModel
import keras

diffusion_model = DiffusionModel(unet)
diffusion_model.compile(
        optimizer=keras.optimizers.AdamW(
        learning_rate=1e-3, weight_decay=1e-4
    ),
    loss=keras.losses.mean_absolute_error,
)

In [None]:
z_mean, z_var, z = e.predict(np.repeat(np.concatenate([blocks, np.flip(blocks, axis=1)], axis=0), 10, axis=0))

In [None]:
diffusion_model.learn_data_distribution(z)

In [None]:
history = diffusion_model.fit(z_mean, epochs=100)

jt.plot_history(history)

In [None]:
import voxelnn.jupyter_tools as jt

jt.save_model(shared_model_path, "diffusion.keras", diffusion_model)

In [None]:
import voxelnn.jupyter_tools as jt

diffusion_model = jt.load_model(shared_model_path, "diffusion.keras")

In [None]:
import voxelnn.jupyter_tools as jt

jt.save_model(shared_model_path, "unet.keras", unet)

In [None]:
import voxelnn.jupyter_tools as jt

unet = jt.load_model(shared_model_path, "unet.keras")

In [None]:
import math

noise = tf.random.normal(shape=(1,16,16,2))
noise_power = 0.5
noise_rate = math.sqrt(noise_power)
signal_rate = math.sqrt(1-noise_power)
noise_rates = np.array([noise_rate], dtype=np.float32)
signal_rates = np.array([signal_rate], dtype=np.float32)

pred_noises, pred_data = diffusion_model._denoise(noisy_data=noise, noise_rates=noise_rates, signal_rates=signal_rates, training=False)

In [None]:
pred_noises, pred_data, noises, normalized_data = diffusion_model(z_mean[[0],...], training=False)

In [None]:
new_array = np.zeros((*pred_noises.shape[:3], 3))
new_array[:, :, :, :2] = pred_noises
min_val = np.min(new_array)
max_val = np.max(new_array)
print((min_val, max_val))
normalized_array = (new_array - min_val) / (max_val - min_val)
print(f'min, max: {(min_val, max_val)}')

plt.imshow(normalized_array[0]) 

In [None]:
new_array = np.zeros((*pred_data.shape[:3], 3))
new_array[:, :, :, :2] = (pred_data+noises)
min_val = np.min(new_array)
max_val = np.max(new_array)
print((min_val, max_val))
normalized_array = (new_array - min_val) / (max_val - min_val)
print(f'min, max: {(min_val, max_val)}')

plt.imshow(normalized_array[0]) 

In [None]:
new_array = np.zeros((*pred_data.shape[:3], 3))
new_array[:, :, :, :2] = pred_data
min_val = np.min(new_array)
max_val = np.max(new_array)
print((min_val, max_val))
normalized_array = (new_array - min_val) / (max_val - min_val)
print(f'min, max: {(min_val, max_val)}')

plt.imshow(normalized_array[0]) 

In [None]:
new_array = np.zeros((*normalized_data.shape[:3], 3))
new_array[:, :, :, :2] = normalized_data
min_val = np.min(new_array)
max_val = np.max(new_array)
print((min_val, max_val))
normalized_array = (new_array - min_val) / (max_val - min_val)
print(f'min, max: {(min_val, max_val)}')

plt.imshow(normalized_array[0]) 

In [None]:
def show_many_generated_samples():
    rows = 2
    columns = 2
    size_per_unit = 4

    result = diffusion_model.generate_many_samples(rows * columns, 1000)
    new_array = np.zeros((*result.shape[:3], 3))
    new_array[:, :, :, :2] = result
    min_val = np.min(new_array)
    max_val = np.max(new_array)
    print((min_val, max_val))
    normalized_array = (new_array - min_val) / (max_val - min_val)
    print(f'min, max: {(min_val, max_val)}')
    f, axarr = plt.subplots(rows, columns, sharex=True, sharey=True, figsize=(columns * size_per_unit, rows * size_per_unit)) 
    for i in range(rows*columns):
      axarr[i//columns, i%columns].imshow(normalized_array[i])
      axarr[i//columns, i%columns].axis('off')

show_many_generated_samples()

In [None]:
result, data_history, noises_history, noisy_data_history = diffusion_model.generate_one_sample_with_history(diffusion_steps=2)

In [None]:
import json
import keras
import numpy as np
import voxelnn.dataset_manipulation as dm

def decode_many_to_json(latent_data: np.ndarray, decoder: keras.Model, name_pattern: str = None):
    elements = latent_data.shape[0]
    name_pattern = name_pattern or f"Unnamed element ({{}}/{elements})"
    
    entry_list = []
    decoded = decoder.predict(latent_data)
    real = np.argmax(decoded, axis=-1)

    for i in range(elements):
        entry = dm.construct_entry_dto(
            str.format(name_pattern, i+1),
            tag_names,
            np.zeros(len(tag_names)),
            block_names,
            real[i,:])
        entry_list.append(entry)
    return json.dumps(entry_list)

In [None]:
print("result:")
voxels_result = decode_many_to_json(result, d)
jt.result_json_widget(voxels_result)

In [None]:
print("noisy data history:")
voxels_noisy_data_history = decode_many_to_json(noisy_data_history, d)
jt.result_json_widget(voxels_noisy_data_history)

In [None]:
print("data history:")
voxels_data_history = decode_many_to_json(data_history, d)
jt.result_json_widget(voxels_data_history)

In [None]:
import matplotlib.pyplot as plt

# render data_history

new_array = np.zeros((*data_history.shape[:3], 3))
new_array[:, :, :, :2] = data_history
min_val = np.min(new_array)
max_val = np.max(new_array)
print((min_val, max_val))
normalized_array = (new_array - min_val) / (max_val - min_val)
f, axarr = plt.subplots(10, 5, sharex=True, sharey=True, figsize=(20, 40)) 
for i in range(50):
  axarr[i//5, i%5].imshow(normalized_array[i])
  axarr[i//5, i%5].axis('off')

In [None]:
import matplotlib.pyplot as plt

# render noisy_data_history

new_array = np.zeros((*noisy_data_history.shape[:3], 3))
new_array[:, :, :, :2] = noisy_data_history
min_val = np.min(new_array)
max_val = np.max(new_array)
print((min_val, max_val))
normalized_array = (new_array - min_val) / (max_val - min_val)
f, axarr = plt.subplots(10, 5, sharex=True, sharey=True, figsize=(20, 40)) 
for i in range(50):
  axarr[i//5, i%5].imshow(normalized_array[i])
  axarr[i//5, i%5].axis('off')