# Tilable terrain gan

- DC GAN for now
- pending tiling features

In [1]:
%pip install tensorflow h5py pyyaml rasterio

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.1.2[0m[39;49m -> [0m[32;49m23.2.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
# Bringing in tensorflow
import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)

2023-08-19 03:52:14.306626: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-08-19 03:52:15.614976: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:995] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2023-08-19 03:52:15.624069: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:995] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2023-08-

In [3]:
import rasterio
import glob
import numpy as np
import random

# Constants
BUFFER_SIZE = 10000
BATCH_SIZE = 32
IMAGE_SIZE = 28
MAX_ALTITUDE = 1105

# 1. Read the GeoTIFF files
file_list = glob.glob("TesisCode/Dataset/download2/*.tif")
ds = tf.data.Dataset.from_tensor_slices(file_list)

# 2. Decode and preprocess the images
def decode_image(filename):
    with rasterio.open(filename.numpy().decode('utf-8')) as src:
        image = src.read()
        image = image[0]
        
    # Resize to desired image size using TensorFlow
    size_rows, size_cols = np.shape(image)
    # random 28x28 crop of np array image
    start_row = random.randint(0, size_rows - IMAGE_SIZE)
    start_col = random.randint(0, size_cols - IMAGE_SIZE)
    image = image[start_row:start_row+IMAGE_SIZE, start_col:start_col+IMAGE_SIZE]

    # Normalize the pixel values to [0,1]
    image = image / MAX_ALTITUDE
    
    # reshape to 28x28x1
    image = np.expand_dims(image, axis=-1)

    # Convert to float32
    image = tf.cast(image, tf.float32)

    return image

# Use tf.py_function to call the rasterio function in a tensorflow map function
def tf_decode_image(filename):
    return tf.py_function(decode_image, [filename], tf.float32)

ds = ds.map(tf_decode_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)

# 3. Shuffle and batch the dataset for training
## Geotif pixel values are altitude in meters, so we need to normalize them to [0,1] based on the max altitude
# ds = ds.shuffle(BUFFER_SIZE).repeat().batch(BATCH_SIZE).prefetch(tf.data.experimental.AUTOTUNE).cache()
ds = ds.shuffle(BUFFER_SIZE).batch(BATCH_SIZE).prefetch(tf.data.experimental.AUTOTUNE).cache()


2023-08-19 03:52:15.679073: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:995] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2023-08-19 03:52:15.679261: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:995] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2023-08-19 03:52:15.679338: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:995] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysf

In [4]:
ds.element_spec
ds.as_numpy_iterator().next()[0].shape

StopIteration: 

##  2. Viz data and build dataset

In [None]:
# do data transformation
import numpy as np

In [None]:
# setup connection aka iterator
dataiterator = ds.as_numpy_iterator()

In [None]:
# Bringing in matplotlib for viz stuff
from matplotlib import pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

In [None]:
# Setup the subplot formatting 
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
sample = dataiterator.next()
# Loop four times and get images 
for idx in range(4): 
    # Grab an image and label
    # Plot the image using a specific subplot
    ax[idx].imshow(np.squeeze(sample[idx]), cmap='gray')

## Build NN

In [None]:
# Bring in the sequential api for the generator and discriminator
from tensorflow.keras.models import Sequential
# Bring in the layers for the neural network
from tensorflow.keras.layers import Conv2D, Dense, Flatten, Reshape, LeakyReLU, Dropout, UpSampling2D

In [None]:
def build_generator(): 
    model = Sequential()
    
    # Takes in random values and reshapes it to 7x7x128
    # Beginnings of a generated image
    model.add(Dense(7*7*128, input_dim=128))
    model.add(LeakyReLU(0.2))
    model.add(Reshape((7,7,128)))
    
    # Upsampling block 1 
    model.add(UpSampling2D())
    model.add(Conv2D(128, 5, padding='same'))
    model.add(LeakyReLU(0.2))
    
    # Upsampling block 2 
    model.add(UpSampling2D())
    model.add(Conv2D(128, 5, padding='same'))
    model.add(LeakyReLU(0.2))
    
    # Convolutional block 1
    model.add(Conv2D(128, 4, padding='same'))
    model.add(LeakyReLU(0.2))
    
    # Convolutional block 2
    model.add(Conv2D(128, 4, padding='same'))
    model.add(LeakyReLU(0.2))
    
    # Conv layer to get to one channel
    model.add(Conv2D(1, 4, padding='same', activation='sigmoid'))
    
    return model

In [None]:
generator = build_generator()

In [None]:
generator.summary()

In [None]:
img = generator.predict(np.random.randn(4, 128, 1))

In [None]:
# Generate new fashion
img = generator.predict(np.random.randn(4,128,1))
# Setup the subplot formatting 
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
# Loop four times and get images 
for idx, img in enumerate(img): 
    # Plot the image using a specific subplot 
    ax[idx].imshow(np.squeeze(img))
    # Appending the image label as the plot title 
    ax[idx].title.set_text(idx)

In [None]:
def build_discriminator():
    model = Sequential()

    # first conv block
    model.add(Conv2D(32, 5, input_shape = (28, 28, 1)))
    model.add(LeakyReLU(0.2))
    model.add(Dropout(0.4))

    # Second conv block
    model.add(Conv2D(64, 5))
    model.add(LeakyReLU(0.2))
    model.add(Dropout(0.4))

    # Second conv block
    model.add(Conv2D(128, 5))
    model.add(LeakyReLU(0.2))
    model.add(Dropout(0.4))

    # Second conv block
    model.add(Conv2D(256, 5))
    model.add(LeakyReLU(0.2))
    model.add(Dropout(0.4))

    # Flatten then pass to dense layer
    model.add(Flatten())
    model.add(Dropout(0.4)) # Might be best to not have
    model.add(Dense(1, activation='sigmoid'))

    return model

In [None]:
discriminator = build_discriminator()

In [None]:
discriminator.summary()

In [None]:
## Pass generated iamge into discriminator to prototype ;)
discriminator.predict(np.expand_dims(img, 0))

# 4. Construct training loop

# 4.1 Setup losses and optimizers

In [None]:
# Optimizer for both
from tensorflow.keras.optimizers import Adam
# Loss for both
from tensorflow.keras.losses import BinaryCrossentropy

In [None]:
g_opt = Adam(learning_rate=0.0001)
d_opt = Adam(learning_rate=0.00001)

g_loss = BinaryCrossentropy()
d_loss = BinaryCrossentropy()

# 4.2 Subclass model

In [None]:
# Import base model class to subclass our training step
from tensorflow.keras.models import Model

In [None]:
# 3 key functions

class TerrainTileGan(Model):
    def __init__(self, generator, discriminator, *args, **kwargs):
        # Pass to base class
        super().__init__(*args, **kwargs)
        
        # Create attributes for gen end disc
        self.generator = generator
        self.discriminator = discriminator

    def compile(self, g_opt, d_opt, g_loss, d_loss, *args, **kwargs):
        ## Compile w/ baseclass
        super().compile(*args, **kwargs)

        # Attributes for losses and optimizers
        self.g_opt = g_opt
        self.d_opt = d_opt
        self.g_loss = g_loss
        self.d_loss = d_loss

    def train_step(self, batch):
        # get the data
        real_images = batch
        # 128 because we start with a linear layer w/ 128 neurons. Gets reshaped lated
        fake_images = self.generator(tf.random.normal((128, 128, 1)), training = False)

        # Train discriminator
        with tf.GradientTape() as d_tape:
            # 1. pass real and fake imgs to discriminaotr
            yhat_real = self.discriminator(real_images, training=True)
            yhat_fake = self.discriminator(fake_images, training=True)
            yhat_realfake = tf.concat([yhat_real, yhat_fake], axis=0)
            
            # 2. cresate labels for real and fakes
            y_realfake = tf.concat([tf.zeros_like(yhat_real), tf.ones_like(yhat_fake)], axis=0)

            # 3. add noise to output to avoid learning too fast
            noise_real = 0.15*tf.random.uniform(tf.shape(yhat_real))
            ## Not sure if - is good idea.
            noise_fake = -0.15*tf.random.uniform(tf.shape(yhat_fake))
            y_realfake += tf.concat([noise_real, noise_fake], axis=0)

            # 4. Calculate loss - BINARYCROSSENTROPY
            total_d_loss = self.d_loss(y_realfake, yhat_realfake)

        # 5. Backpropagate -- nn learn
        dgrad = d_tape.gradient(total_d_loss, self.discriminator.trainable_variables)
        self.d_opt.apply_gradients(zip(dgrad, self.discriminator.trainable_variables))

        # Train gen
        with tf.GradientTape() as g_tape:
            # Generate new images
            gen_images = self.generator(tf.random.normal((128, 128, 1)), training=True)

            # Create predicted labels
            predicted_labels = self.discriminator(gen_images, training=False)

            # Calculate loss (rewared when disc says fake is real - hence zeros)
            total_g_loss = self.g_loss(tf.zeros_like(predicted_labels), predicted_labels)
            # No need to pass real images bc we're not training discriminator

        # Apply backprop
        ggrad = g_tape.gradient(total_g_loss, self.generator.trainable_variables)
        self.g_opt.apply_gradients(zip(ggrad, self.generator.trainable_variables))

        return {"d_loss":total_d_loss, "g_loss": total_g_loss}
            
    # Also option for train_test

In [None]:
## Alt
# @tf.function
# def train_step():
#     pass

In [None]:
# Create instance of subclassed model
terrainTileGan = TerrainTileGan(generator, discriminator)

In [None]:
# Compile model
terrainTileGan.compile(g_opt, d_opt, g_loss, d_loss)

## 4.3 Build callback

In [None]:
import os
from tensorflow.keras.preprocessing.image import array_to_img
from tensorflow.keras.callbacks import Callback

In [None]:
## Create images dir for model
!mkdir -p images

In [None]:
class ModelMonitor(Callback):
    def __init__(self, num_img=3, latent_dim=128):
        self.num_img = num_img
        self.latent_dim = latent_dim

    def on_epoch_end(self, epoch, logs=None):
        random_latent_vectors = tf.random.uniform((self.num_img, self.latent_dim,1))
        generated_images = self.model.generator(random_latent_vectors)
        generated_images *= 255
        generated_images.numpy()
        for i in range(self.num_img):
            img = array_to_img(generated_images[i])
            img.save(os.path.join('images', f'generated_img_{epoch}_{i}.png'))

## Train

In [None]:
checkpoint_path = "training_1/cp.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)

# Cresate checkpoint callback
cp_callback = tf.keras.callbacks.ModelCheckpoint(checkpoint_path,
                                                 save_weights_only=True,
                                                 verboxe=1)

In [None]:
## Recomended 2000
hist = terrainTileGan.fit(ds, epochs=500, callbacks=[ModelMonitor(), cp_callback])

In [None]:
## Recomended 2000
hist = terrainTileGan.fit(ds, epochs=500, callbacks=[ModelMonitor(), cp_callback])

In [None]:
plt.suptitle('Loss')
plt.plot(hist.history['d_loss'], label='d_loss')
plt.plot(hist.history['g_loss'], label='g_loss')
plt.legend()
plt.show()

##  5. Test out generator

### 5.1 Generate Images

In [None]:
imgs = generator.predict(tf.random.normal((16, 128, 1)))

In [None]:
fig, ax = plt.subplots(ncols=4, nrows=4, figsize=(10,10))
for r in range(4): 
    for c in range(4): 
        ax[r][c].imshow(imgs[(r+1)*(c+1)-1])

### 5.2 Save the model

In [None]:
generator.save('generator.h5')
discriminator.save('discriminator.h5')