# SteganoGAN in Keras
This notebook contains code attempting to reimplement SteganoGAN in Keras, for the purpose of better understanding (and scrutinizing) it.

*Based on https://github.com/DAI-Lab/SteganoGAN/tree/master/steganogan*

### Modules

In [1]:
import os
os.environ["KERAS_BACKEND"] = "tensorflow"

import tensorflow as tf
import numpy as np
from keras.optimizers import Adam
from keras.losses import BinaryCrossentropy
from keras.utils import plot_model

from models import steganogan_encoder_dense_model, steganogan_decoder_dense_model, steganogan_critic_model
from keras_steganogan import KerasSteganoGAN

2024-06-18 01:23:33.739773: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


### Constants

In [2]:
# Image dimensions
IMAGE_HEIGHT = 128
IMAGE_WIDTH = 128
IMAGE_CHANNELS = 3

IMAGE_SHAPE = (IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_CHANNELS)
MESSAGE_DEPTH = 2
BATCH_SIZE = 4
MODEL_PATH = 'pretrained_models/steganoGAN.keras'

### Call main encode and decode functions (with creating steganographic image and decoding it)

In [3]:
steganoGAN = KerasSteganoGAN(image_shape=IMAGE_SHAPE, data_depth=MESSAGE_DEPTH, model_path=MODEL_PATH)
steganoGAN.encode('input.png', 'images/output.png', 'Hello, World!')
steganoGAN.decode('images/output.png')

'Í¿N}ÿÁ\x8fW@ØïÏ\x99ðÿþ\x7fÿÿ\x94ªª»ÆU£ï~¿c\x0eù>4\x87ü\x04\x93<\x0e±=\x19\x06Ðì\x08\x08\x81\x02\x10ºöÿUL\x89×ÆQSI\x9cÚ[\x90Þn«$\x1b\x00Ð\x120\x83\x90©\x95\x15Ý\x85\x17\x0cd\x00\x08\x8fºÏ«º\x90É\x8e1ö,\x91\x85«\x0eÇ\x8eCzgÃ\x889zböF\x034ûÖôCwL2(µæs¸\x89<©\x04\x15ÇÍìa1\x7fC\x908\x92yà\x8e\x0f\x0cd-ÐÎ*4\x92Ê\x1c\x96\x92\x19ü³Î(\x82Í²\x8d\xad8d;øxyñÿî\x87¬ã\x01-\x0c0üÒ\x86\x08FÈø}CùÃ\x1fFÓ:Æ\x9d3\x87\x12xïñáÞÇ½m7º\x8bwl³¼ü\x87u9\x16ßfÓ+c,\x919\x16³GâxÜ#Á^d\x13\x1eA\x18\x96*%Ú\xa0\x9cõbùÃi\x86é\'a"\x0f\x18}\x1f\x07"8°qÖì\x1ey\x9cöß\x8c\x1c\x7f\x8aX\x9bö´Üû\x91eË.\x03ÞÆ[¾\x837Ï:XùâÞ&\x14YaàÌøÈ_D\x19\x06zP1\xad`\x87vç\x1clôpÓQÍ\x11;ðåM\x0e[\xad\x03<\x8e\x15U\x17\x95Ù¹\x94Î\x03.\x84h$\x8a\x8a Ê¨Vâ¡À£qòÄ\x0fi\x9eëûÎ\nª¨\x02\x8b<<¼¯\x95\x7f\x8d\x8au×Z7N´dxò\x94\x07õÎ\r0G\x90\x83Î÷wUUZu\x1bgAä\x01Ç\x1di³\x8c¬s§¬\nÀ\x7fA\x04O\x9d\x1b\x19¡rF \x02j\xa0¬;8l\xa0NAÙ\x92\x95\x10\r\x8f4ÈöÔúþéîî\x1a"\x86ª\xa0N\x11D\x08PÞ>ÜÁôÙÇ2q\x9a»³H\x81\xadP©\x1cÝw}UÅ\x15eQ_\x8f:º¬ø+r%\x8el0É¿ 0R:ñüX\x87QI\x00\x00"ª\x

### SteganoGAN predict random data with metrics

In [8]:
cover_image = tf.random.uniform([1, IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_CHANNELS], -1, 1, dtype=tf.float32)
message = tf.cast(tf.random.uniform([1, IMAGE_HEIGHT, IMAGE_WIDTH, MESSAGE_DEPTH], 0, 2, dtype=tf.int32), tf.float32)

stego_img, recovered_msg = steganoGAN.predict([cover_image, message])

print("stego_img min: {0}, max: {1}".format(tf.reduce_min(stego_img), tf.reduce_max(stego_img)))
print("recovered_msg min: {0}, max: {1}".format(tf.reduce_min(recovered_msg), tf.reduce_max(recovered_msg)))

print("BinaryCrossentropy: {0}".format(BinaryCrossentropy(from_logits=True)(message, recovered_msg)))
print("PSNR: {0}".format(tf.reduce_mean(tf.image.psnr(cover_image, stego_img, 1))))
print("SSIM: {0}".format(tf.reduce_mean(tf.image.ssim(cover_image, stego_img, 1))))

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 43ms/step
stego_img min: -1.0, max: 1.0
recovered_msg min: 0.0, max: 1.0
BinaryCrossentropy: 0.7435410022735596
PSNR: 14.657017707824707
SSIM: 0.7204346656799316


----

### Build model for future train

In [4]:
encoder = steganogan_encoder_dense_model(IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_CHANNELS, MESSAGE_DEPTH)
decoder = steganogan_decoder_dense_model(IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_CHANNELS, MESSAGE_DEPTH)
critic  = steganogan_critic_model(IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_CHANNELS)

steganoGAN = KerasSteganoGAN(
  encoder=encoder,
  decoder=decoder,
  critic=critic,
  image_shape=IMAGE_SHAPE,
  data_depth=MESSAGE_DEPTH,
  model_path=MODEL_PATH
)

steganoGAN.compile(
  encoder_optimizer = Adam(learning_rate=1e-4, beta_1=0.5),
  decoder_optimizer = Adam(learning_rate=1e-4, beta_1=0.5),
  critic_optimizer = Adam(learning_rate=1e-4, beta_1=0.5),
  loss_fn = BinaryCrossentropy(from_logits=True)
)

#steganoGAN.models_summary()
#steganoGAN.summary()
#plot_model(steganoGAN.encoder, to_file='model_images/encoder.png', show_shapes=True, show_layer_names=True)
#plot_model(steganoGAN.decoder, to_file='model_images/decoder.png', show_shapes=True, show_layer_names=True)
#plot_model(steganoGAN.critic, to_file='model_images/critic.png', show_shapes=True, show_layer_names=True)

### Download div2k dataset and complete it with random message dataset of {0, 1}

In [5]:
train_dir = '/Users/dmitryhoma/Projects/phd_dissertation/state_2/SteganoGAN/research/data/div2k/train'
val_dir = '/Users/dmitryhoma/Projects/phd_dissertation/state_2/SteganoGAN/research/data/div2k/val'

train_image_ds = tf.keras.preprocessing.image_dataset_from_directory(
    train_dir, 
    label_mode=None, 
    color_mode='rgb',
    batch_size=BATCH_SIZE,
    seed=123,
    image_size=(IMAGE_HEIGHT, IMAGE_WIDTH),
    shuffle=True
)

val_image_ds = tf.keras.preprocessing.image_dataset_from_directory(
    val_dir, 
    label_mode=None, 
    color_mode='rgb',
    batch_size=BATCH_SIZE,
    seed=123,
    image_size=(IMAGE_HEIGHT, IMAGE_WIDTH),
    shuffle=True
)

def normalize_img(img):
    return (img / 127.5) - 1

train_image_ds = train_image_ds.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
val_image_ds = val_image_ds.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)

def create_message_tensor_for_training(batch_size, width, height, data_depth):
    message = tf.random.uniform([batch_size, width, height, data_depth], 0, 2, dtype=tf.int32)
    message = tf.cast(message, tf.float32)
    return message

def create_message_dataset(batch_size, num_batches, width, height, data_depth):
    message_tensors = [create_message_tensor_for_training(batch_size, width, height, data_depth) for _ in range(num_batches)]
    return tf.data.Dataset.from_tensor_slices(tf.concat(message_tensors, axis=0)).batch(batch_size)

train_message_ds = create_message_dataset(BATCH_SIZE, len(train_image_ds), IMAGE_HEIGHT, IMAGE_WIDTH, MESSAGE_DEPTH)
val_message_ds = create_message_dataset(BATCH_SIZE, len(val_image_ds), IMAGE_HEIGHT, IMAGE_WIDTH, MESSAGE_DEPTH)

train_ds = tf.data.Dataset.zip((train_image_ds, train_message_ds)).prefetch(buffer_size=tf.data.AUTOTUNE)
val_ds = tf.data.Dataset.zip((val_image_ds, val_message_ds)).prefetch(buffer_size=tf.data.AUTOTUNE)

Found 800 files.
Found 100 files.


In [6]:
steganoGAN.build([(1, IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_CHANNELS), (1, IMAGE_HEIGHT, IMAGE_WIDTH, MESSAGE_DEPTH)])
steganoGAN.fit(train_ds, epochs=10, validation_data=val_ds)
steganoGAN.save(MODEL_PATH)

Epoch 1/10
[1m200/200[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m241s[0m 1s/step - bpp: 2.0000 - critic_loss: -5.1265e-04 - decoding_loss: 0.7417 - encoder_decoder_total_loss: 21.2169 - psnr: 7.5500 - realism_loss: -6.0168e-05 - similarity_loss: 20.4753 - ssim: 0.3535 - val_bpp: 2.0000 - val_critic_loss: 0.0477 - val_decoding_loss: 0.6832 - val_encoder_decoder_total_loss: 7.4260 - val_psnr: 11.7181 - val_realism_loss: -0.0630 - val_similarity_loss: 6.8058 - val_ssim: 0.4928
Epoch 2/10
[1m200/200[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m234s[0m 1s/step - bpp: 2.0000 - critic_loss: -0.0116 - decoding_loss: 0.6630 - encoder_decoder_total_loss: 4.7502 - psnr: 14.2472 - realism_loss: 0.0038 - similarity_loss: 4.0835 - ssim: 0.6260 - val_bpp: 2.0000 - val_critic_loss: 0.0214 - val_decoding_loss: 0.6696 - val_encoder_decoder_total_loss: 2.4416 - val_psnr: 17.5421 - val_realism_loss: -0.0775 - val_similarity_loss: 1.8496 - val_ssim: 0.7411
Epoch 3/10
[1m200/200[0m [32m━━━━━━━━

----

### Testing different functions 

In [None]:
from tensorflow.keras.losses import BinaryCrossentropy

# Create two random tensors
tensor1 = tf.random.uniform((4, 128, 128, 2), minval=0, maxval=1, dtype=tf.int32)
tensor2 = tf.random.uniform((4, 128, 128, 2), minval=0, maxval=1, dtype=tf.float32)

# Calculate binary crossentropy
loss = BinaryCrossentropy(from_logits=False)
loss = loss(tf.constant([3, 0, -3]), tf.constant([1, 1, 1]))
#loss = loss(tensor1, tensor2)

loss.numpy().astype(np.uint8)

In [None]:
first_value = next(iter(train_ds.take(1)))
first, second = first_value
#print(first)
#print(second)

tf.Tensor(
[[[[1. 0.]
   [1. 0.]
   [0. 0.]
   ...
   [0. 1.]
   [1. 0.]
   [0. 1.]]

  [[1. 1.]
   [1. 0.]
   [0. 1.]
   ...
   [1. 1.]
   [0. 1.]
   [0. 1.]]

  [[0. 0.]
   [0. 0.]
   [1. 0.]
   ...
   [1. 0.]
   [0. 1.]
   [1. 0.]]

  ...

  [[1. 0.]
   [1. 0.]
   [0. 0.]
   ...
   [1. 1.]
   [0. 1.]
   [0. 0.]]

  [[1. 1.]
   [1. 0.]
   [1. 0.]
   ...
   [0. 1.]
   [0. 1.]
   [0. 0.]]

  [[0. 1.]
   [1. 0.]
   [1. 1.]
   ...
   [1. 0.]
   [0. 0.]
   [0. 1.]]]


 [[[0. 0.]
   [1. 1.]
   [1. 1.]
   ...
   [1. 1.]
   [0. 1.]
   [1. 1.]]

  [[0. 1.]
   [0. 0.]
   [0. 1.]
   ...
   [1. 1.]
   [0. 0.]
   [1. 1.]]

  [[1. 0.]
   [1. 1.]
   [1. 0.]
   ...
   [0. 1.]
   [0. 0.]
   [0. 0.]]

  ...

  [[1. 0.]
   [0. 0.]
   [0. 1.]
   ...
   [0. 0.]
   [1. 0.]
   [1. 0.]]

  [[1. 1.]
   [0. 1.]
   [0. 1.]
   ...
   [0. 1.]
   [0. 1.]
   [0. 1.]]

  [[1. 1.]
   [1. 1.]
   [0. 1.]
   ...
   [1. 1.]
   [1. 1.]
   [1. 1.]]]


 [[[0. 1.]
   [1. 0.]
   [0. 1.]
   ...
   [1. 0.]
   [0. 1.]
   [0. 1.