In [None]:
!pip install scikit-image

In [None]:
# Neural Network Steganography: Image Key Embedding & Extraction

# 1. Install and Import Required Libraries
!pip install pillow matplotlib 

import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from PIL import Image
import matplotlib.pyplot as plt
import os
from skimage.metrics import peak_signal_noise_ratio as psnr

In [None]:
# 2. Utility Functions: Image I/O and Message Conversion
def load_image(path, target_size=(128, 128)):
    img = Image.open(path).convert('RGB').resize(target_size)
    img_arr = np.asarray(img) / 255.0
    return img_arr

def save_image(img_arr, path):
    img_uint8 = (img_arr * 255).astype(np.uint8)
    Image.fromarray(img_uint8).save(path)

def text_to_bits(text, length=128*128):
    bits = ''.join(format(ord(c), '08b') for c in text)
    bits = bits.ljust(length, '0')[:length]
    bit_arr = np.array(list(bits), dtype=np.float32)
    return bit_arr.reshape((1, 128, 128, 1))

def bits_to_text(bit_array):
    bits = ''.join(str(int(b > 0.5)) for b in bit_array.flatten())
    chars = [chr(int(bits[i:i+8], 2)) for i in range(0, len(bits), 8)]
    text = ''.join(chars)
    return text.strip('\x00')

In [None]:
# 3. Neural Network Architectures

def build_encoder(input_shape=(128, 128, 3), message_shape=(128, 128, 1)):
    cover_input = layers.Input(shape=input_shape)
    message_input = layers.Input(shape=message_shape)
    x = layers.Concatenate()([cover_input, message_input])
    x = layers.Conv2D(64, (3,3), activation='relu', padding='same')(x)
    x = layers.Conv2D(32, (3,3), activation='relu', padding='same')(x)
    stego_output = layers.Conv2D(3, (1,1), activation='sigmoid', padding='same')(x)
    return keras.Model([cover_input, message_input], stego_output, name='encoder')

def build_decoder(stego_shape=(128, 128, 3)):
    stego_input = layers.Input(shape=stego_shape)
    x = layers.Conv2D(64, (3,3), activation='relu', padding='same')(stego_input)
    x = layers.Conv2D(32, (3,3), activation='relu', padding='same')(x)
    message_output = layers.Conv2D(1, (1,1), activation='sigmoid', padding='same')(x)
    return keras.Model(stego_input, message_output, name='decoder')

encoder = build_encoder()
decoder = build_decoder()

In [None]:
def create_folder(folder_name, base_path=None):
    if base_path is None:
        base_path = os.getcwd()
    full_path = os.path.join(base_path, folder_name)
    os.makedirs(full_path, exist_ok=True)
    return full_path

def load_all_imgs(path, size=(128,128)):
    imgs = []
    for filename in os.listdir(path):
        if filename.lower().endswith(('.png','.jpg','.jpeg','.bmp','.tiff')):
            full_path = os.path.join(path, filename)
            img = Image.open(full_path).convert('RGB').resize(size)
            imgs.append(np.asarray(img)/255.0)
    return np.stack(imgs)

# 3) Demo
train_dir = r'C:/Users/User/Desktop/stable_signature/data/train/train'
secret_message  = "InvisibleKey!"
message_arr     = text_to_bits(secret_message)

In [None]:
# # —– Load and split your data —–
# # (Fill in with your actual directories or data-loading logic)
train_covers = load_all_imgs(train_dir)         # shape: (N_train, H, W, C)
# train_bits   = np.stack([text_to_bits(msg) for msg in train_messages])  # shape: (N_train, msg_len)
# val_covers   = load_all_imgs(val_dir)
# val_bits     = np.stack([text_to_bits(msg) for msg in val_messages])

import tensorflow as tf

# Precompute once:
message_arr = text_to_bits(secret_message)                   # numpy array, e.g. shape (msg_len,)
message_tensor = tf.constant(message_arr, dtype=tf.float32)  # now a TF tensor

def make_dataset(image_dir, batch_size, shuffle=True):
    # 1) build list of file‐paths
    files = [os.path.join(image_dir, f)
             for f in os.listdir(image_dir)
             if f.lower().endswith(('.png','.jpg','.jpeg','.bmp','.tiff'))]

    # 2) Create a dataset of file‐paths
    ds = tf.data.Dataset.from_tensor_slices(files)
    if shuffle:
        ds = ds.shuffle(buffer_size=len(files))

    # 3) Map each path → (image, same message_tensor)
    ds = ds.map(
        lambda p: (parse_image(p), message_tensor),
        num_parallel_calls=tf.data.AUTOTUNE
    )

    # 4) Batch & prefetch
    return ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)

In [None]:
# %% [code]
import os
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint

# —— Hyper-params & checkpoints ——
batch_size     = 16
epochs         = 50
checkpoint_dir = "checkpoints"
os.makedirs(checkpoint_dir, exist_ok=True)

# —— Precompute your secret message tensor ——
message_arr    = text_to_bits(secret_message)                # numpy array of bits
message_tensor = tf.constant(message_arr, dtype=tf.float32)  # scalar-sized TF tensor

# —— Image loader from path → normalized tensor ——
def parse_image(path):
    img = tf.io.read_file(path)
    img = tf.image.decode_image(img, channels=3)
    img = tf.image.resize(img, [128, 128])
    return img / 255.0

# —— Build a streaming Dataset that zips each image with the same message_tensor ——
def make_streaming_dataset(image_dir, batch_size, shuffle=True):
    # 1) Grab file-patterns (png/jpg/etc). This does NOT load images yet.
    files_ds = tf.data.Dataset.list_files(os.path.join(image_dir, "*.*"), shuffle=shuffle)
    # 2) Map path→(image_tensor, message_tensor)
    ds = files_ds.map(
        lambda p: (parse_image(p), message_tensor),
        num_parallel_calls=tf.data.AUTOTUNE
    )
    # 3) Batch & prefetch
    return ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)

# —— Instantiate train & val streams ——
train_ds = make_streaming_dataset(train_dir, batch_size, shuffle=True)
val_ds   = make_streaming_dataset(val_dir,   batch_size, shuffle=False)

# —— Checkpoint callback (weights only) ——
ckpt_path = os.path.join(
    checkpoint_dir,
    "stego_epoch{epoch:02d}_val{val_loss:.4f}.weights.h5"
)
checkpoint_cb = ModelCheckpoint(
    filepath=ckpt_path,
    save_weights_only=True,
    save_best_only=True,
    monitor="val_loss",
    verbose=1
)

# —— Train! (this will load only each batch into memory) ——
history = encoder.fit(
    train_ds,
    epochs=epochs,
    validation_data=val_ds,
    callbacks=[checkpoint_cb],
    verbose=2
)

# —— Save final weights too ——
encoder.save_weights(os.path.join(checkpoint_dir, "stego_final.weights.h5"))
print("✅ Done training; checkpoints in", checkpoint_dir)


In [None]:
# %% [code]
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint

# ——— Precompute your message tensor ———
message_arr    = text_to_bits(secret_message)                # numpy array shape (msg_len,)
message_tensor = tf.constant(message_arr, dtype=tf.float32)  # reuse for every example

# ——— Image parsing helper ———
def parse_image(path):
    img = tf.io.read_file(path)
    img = tf.image.decode_image(img, channels=3)
    img = tf.image.resize(img, [128, 128])
    return img / 255.0

# ——— Dataset builder ———
def make_dataset(image_dir, batch_size, shuffle=True):
    # 1) List image file‐paths
    files = [
        os.path.join(image_dir, f)
        for f in os.listdir(image_dir)
        if f.lower().endswith(('.png','.jpg','.jpeg','.bmp','.tiff'))
    ]
    # 2) Create a Dataset of paths
    ds = tf.data.Dataset.from_tensor_slices(files)
    if shuffle:
        ds = ds.shuffle(buffer_size=len(files))
    # 3) Map each path → (image, constant message)
    ds = ds.map(
        lambda p: (parse_image(p), message_tensor),
        num_parallel_calls=tf.data.AUTOTUNE
    )
    # 4) Batch & prefetch
    return ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)

# ——— Config & checkpoint callback ———
batch_size     = 16
epochs         = 50
checkpoint_dir = "checkpoints"
os.makedirs(checkpoint_dir, exist_ok=True)

ckpt_path = os.path.join(
    checkpoint_dir,
    "stego_epoch{epoch:02d}_val{val_loss:.4f}.weights.h5"
)
checkpoint_cb = ModelCheckpoint(
    filepath=ckpt_path,
    save_weights_only=True,
    save_best_only=True,
    monitor="val_loss",
    verbose=1
)

# ——— Build & train ———
train_ds = make_dataset(train_dir, batch_size, shuffle=True)
val_ds   = make_dataset(val_dir,   batch_size, shuffle=False)

history = encoder.fit(
    train_ds,
    epochs=epochs,
    validation_data=val_ds,
    callbacks=[checkpoint_cb],
    verbose=2
)

# ——— Save final weights ———
encoder.save_weights(
    os.path.join(checkpoint_dir, "stego_final.weights.h5")
)
print("✅ Training complete; checkpoints in", checkpoint_dir)


In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint

# —— Config —— 
batch_size     = 16
epochs         = 50
checkpoint_dir = "checkpoints"
os.makedirs(checkpoint_dir, exist_ok=True)

# —— Helper: parse & preprocess one image path → tensor —— 
def parse_image(path):
    img = tf.io.read_file(path)
    img = tf.image.decode_image(img, channels=3)
    img = tf.image.resize(img, [128, 128])
    return tf.cast(img, tf.float32) / 255.0

# —— Build a tf.data.Dataset from a folder + list of messages —— 
def make_dataset(image_dir, messages, batch_size, shuffle=True):
    # 1) get filepaths
    files = [os.path.join(image_dir, f) 
             for f in os.listdir(image_dir)
             if f.lower().endswith(('.png','.jpg','jpeg','.bmp','.tiff'))]
    # 2) convert messages → bit-arrays
    bits = np.stack([text_to_bits(msg) for msg in messages])
    # 3) dataset of (path, bits)
    ds = tf.data.Dataset.from_tensor_slices((files, bits))
    if shuffle:
        ds = ds.shuffle(buffer_size=len(files))
    # 4) load image, pair with bits
    ds = ds.map(lambda p, b: (parse_image(p), b),
                num_parallel_calls=tf.data.AUTOTUNE)
    # 5) batch & prefetch
    return ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)

# —— Instantiate train & val datasets —— 
train_ds = make_dataset(train_dir, train_messages, batch_size, shuffle=True)
val_ds   = make_dataset(val_dir,   val_messages,   batch_size, shuffle=False)

# —— Checkpoint callback (.weights.h5 suffix!) —— 
ckpt_path = os.path.join(checkpoint_dir, "stego_epoch{epoch:02d}_val{val_loss:.4f}.weights.h5")
checkpoint_cb = ModelCheckpoint(
    filepath=ckpt_path,
    save_weights_only=True,
    save_best_only=True,
    monitor="val_loss",
    verbose=1
)

# —— Train streaming from disk —— 
history = encoder.fit(
    train_ds,
    epochs=epochs,
    validation_data=val_ds,
    callbacks=[checkpoint_cb],
    verbose=2
)

# —— Save final weights too —— 
encoder.save_weights(os.path.join(checkpoint_dir, "stego_final.weights.h5"))
print("✅ Training complete; checkpoints in", checkpoint_dir)



In [None]:
test_image_path = r"C:/Users/User/Desktop/Watermarking_Project/stable_signature_2_0/test_images/"      # where your test images are
secret_message  = "InvisibleKey!"
message_arr     = text_to_bits(secret_message)

In [None]:
import glob
import numpy as np
import matplotlib.pyplot as plt

# 1) Pick the latest .h5 in your checkpoints folder
ckpt_files = glob.glob(os.path.join(checkpoint_dir, "*.h5"))
latest_ckpt = max(ckpt_files, key=os.path.getmtime)
print("Loading weights from:", latest_ckpt)
encoder.load_weights(latest_ckpt)

# 2) Load a test image (first of your set)
test_imgs = load_all_imgs(test_image_path)
cover = test_imgs[0]

# 3) Embed then extract
stego     = encoder.predict([np.expand_dims(cover, 0), message_arr])[0]
pred_bits = decoder.predict(np.expand_dims(stego, 0))[0]
recovered = bits_to_text(pred_bits)

# 4) Plot cover, stego, and recovered text
plt.figure(figsize=(8, 4))

plt.subplot(1, 3, 1)
plt.title("Cover")
plt.imshow(cover)
plt.axis("off")

plt.subplot(1, 3, 2)
plt.title("Stego")
plt.imshow(stego)
plt.axis("off")

plt.subplot(1, 3, 3)
plt.title(f"Recovered:\n{repr(recovered)}")
blank = np.ones_like(cover) * 0.8
plt.imshow(blank)
plt.text(0.5, 0.5, recovered, ha="center", va="center", fontsize=12)
plt.axis("off")

plt.tight_layout()
plt.show()


In [None]:
# 4. Steganography Model: Combined Encoder-Decoder

input_cover = keras.Input(shape=(128,128,3))
input_msg = keras.Input(shape=(128,128,1))
stego = encoder([input_cover, input_msg])
decoded = decoder(stego)

steganography_model = keras.Model([input_cover, input_msg], [stego, decoded])

steganography_model.compile(
    optimizer='adam',
    loss=['mse', 'binary_crossentropy'],
    loss_weights=[0.5, 0.5]
)

# 5. Prepare Training Data

def generate_batch(batch_size=8, image_files=None):
    batch_images = []
    batch_messages = []
    for i in range(batch_size):
        if image_files:
            img = load_image(np.random.choice(image_files))
        else:
            img = np.random.rand(128,128,3)
        msg = np.random.randint(0, 2, size=(128,128,1)).astype(np.float32)
        batch_images.append(img)
        batch_messages.append(msg)
    return np.array(batch_images), np.array(batch_messages)

# Update with your image paths:
# image_folder = 'images'
# os.makedirs(image_folder, exist_ok=True)
image_folder = r'C:/Users/User/Desktop/stable_signature/data/train/train'
# Place or generate some images in the folder before running!

image_files = [os.path.join(image_folder, f) for f in os.listdir(image_folder) if f.lower().endswith(('.png','.jpg','.jpeg'))]
if not image_files:
    # Generate dummy images for demonstration
    for i in range(4):
        dummy = (np.random.rand(128,128,3)*255).astype(np.uint8)
        Image.fromarray(dummy).save(f'{image_folder}/dummy_{i}.png')
    image_files = [os.path.join(image_folder, f) for f in os.listdir(image_folder)]

cover_batch, message_batch = generate_batch(8, image_files=image_files)

In [None]:
# 6. Train the Model
epochs = 100
for epoch in range(epochs):
    cover_batch, message_batch = generate_batch(8, image_files=image_files)
    loss = steganography_model.train_on_batch([cover_batch, message_batch], [cover_batch, message_batch])
    print(f"Epoch {epoch+1}/{epochs} - Loss: {loss}")


In [None]:
# 7. Demo: Embedding and Extracting a Text Key

def create_folder(folder_name, base_path=None):
    if base_path is None:
        base_path = os.getcwd()
    full_path = os.path.join(base_path, folder_name)
    os.makedirs(full_path, exist_ok=True)
    return full_path

def load_all_imgs(path, size=(128,128)):
    imgs = []
    for filename in os.listdir(path):
        if filename.lower().endswith(('.png','.jpg','.jpeg','.bmp','.tiff')):
            full_path = os.path.join(path, filename)
            img = Image.open(full_path).convert('RGB').resize(size)
            imgs.append(np.asarray(img)/255.0)
    return np.stack(imgs)

# 3) Demo
test_image_path = r"C:/Users/User/Desktop/Watermarking_Project/stable_signature_2_0/test_images/"
all_imgs        = load_all_imgs(test_image_path)
secret_message  = "InvisibleKey!"
message_arr     = text_to_bits(secret_message)

# create once, outside the loop
results_dir = create_folder("stegnography_test_results")

for idx, cover_img in enumerate(all_imgs[:4]):
    stego_img = encoder.predict([np.expand_dims(cover_img, 0), message_arr])[0]

    # now dirs is a real string
    out_path = os.path.join(results_dir, f'stego_image_{idx}.png')
    save_image(stego_img, out_path)

    # display
    print(f"Image #{idx} — Original vs Stego:")
    plt.figure(figsize=(8,4))
    for i, im in enumerate((cover_img, stego_img), 1):
        plt.subplot(1,2,i)
        plt.title('Original' if i==1 else 'Stego')
        plt.imshow(im)
        plt.axis('off')
    plt.show()

print("PSNR:", psnr(cover_img, stego_img))


In [None]:
# 8. Extract the Message
# stego_img_loaded = load_image('/stegnography_test_results/stego_image.png')
all_test_imgs = load_all_imgs(results_dir)
for idx, cover_img in enumerate(all_test_imgs[:4]):
    pred_msg = decoder.predict(np.expand_dims(cover_img, 0))[0]
    recovered_text = bits_to_text(pred_msg)
    print("Recovered message:", repr(recovered_text))



In [None]:
# 9. Batch Processing Example
batch_secret = "BatchKey"
for fname in os.listdir(test_image_path):
    img_path = os.path.join(test_image_path, fname)
    cover_img = load_image(img_path)
    secret_msg = text_to_bits(batch_secret)
    stego_img = encoder.predict([np.expand_dims(cover_img,0), secret_msg])[0]
    save_image(stego_img, f'stego_{fname}')
    # Extraction example
    pred_msg = decoder.predict(np.expand_dims(stego_img,0))[0]
    recovered = bits_to_text(pred_msg)
    print(f"{fname} → Extracted: {repr(recovered[:len(batch_secret)])}")



In [None]:
# 10. Save Models

encoder.save('encoder_model.h5')
decoder.save('decoder_model.h5')

In [None]:
# 7. Demo: Embedding and Extracting a Text Key (with binary display)

test_image_path = r"C:/Users/User/Desktop/Watermarking_Project/stable_signature_2_0/test_images/"
secret_message = "InvisibleKey!"

new_all_img = load_all_imgs(test_image_path)
message_arr = text_to_bits(secret_message)
stegno_dir = create_folder("stegnography_images")
for idx, cover_img in enumerate(new_all_img[:4]):
    stego_img = encoder.predict([np.expand_dims(cover_img, 0), message_arr])[0]
        # now dirs is a real string
    out_path = os.path.join(stegno_dir, f'stego_{idx}.png')
    save_image(stego_img, out_path)

    # Display images
    print("Original vs Stego Image:")
    plt.figure(figsize=(8,4))
    plt.subplot(1,2,1)
    plt.title('Original')
    plt.imshow(cover_img)
    plt.axis('off')
    plt.subplot(1,2,2)
    plt.title('Stego')
    plt.imshow(stego_img)
    plt.axis('off')
    plt.show()

    print("PSNR:", psnr(cover_img, stego_img))



In [None]:
# Extract the message as bits
# Load your stego images
stego_all_img = load_all_imgs(stegno_dir)

# Prepare the input key in binary once
input_bin = ''.join(format(ord(c), '08b') for c in secret_message)

# Function to turn a decoded bit-array into an 0/1 string
def bits_to_binstr(bit_array, bit_len):
    flat = bit_array.flatten()
    # threshold at 0.5
    return ''.join(str(int(b > 0.5)) for b in flat[:bit_len])

# Process exactly 4 images (or fewer if you have <4)
for idx, image in enumerate(stego_all_img[:4]):
    # 1) Decode
    pred_msg = decoder.predict(np.expand_dims(image, 0))[0]
    
    # 2) Convert to binary string
    extracted_bin = bits_to_binstr(pred_msg, len(secret_message) * 8)
    
    # 3) Recover text
    recovered_text = bits_to_text(pred_msg)
    if recovered_text == secret_message:
        print(f"Image #{idx}: Perfect recovery ✅")
    else:
        print(f"Image #{idx}: MISMATCH 🚨")
        print("  Expected:", repr(secret_message))
        print("  Got:     ", repr(recovered_text))
# 4) Print results
print(f"Image #{idx}")
print(f"  Input (bin):     {input_bin}")
print(f"  Extracted (bin): {extracted_bin}")
print(f"  Recovered text:  {repr(recovered_text)}\n")


In [None]:
input_bits     = [int(b) for b in input_bin]
extracted_bits = [int(b) for b in extracted_bin]
errors = sum(i!=j for i,j in zip(input_bits, extracted_bits))
ber = errors / len(input_bits)
print(f"  Bit errors: {errors}/{len(input_bits)} ({ber:.2%})\n")

In [None]:
import time
import numpy as np

# Prepare test image and secret
cover_img = load_image(test_image_path)
secret_message = "InvisibleKey!"
message_arr = text_to_bits(secret_message)

# Test the model: Encrypt (Alice)
start_time = time.time()
stego_img = encoder.predict([np.expand_dims(cover_img, 0), message_arr])[0]
alice_encrypt_time = time.time() - start_time

# Test the model: Decrypt (Bob)
start_time = time.time()
bob_output = decoder.predict(np.expand_dims(stego_img, 0))[0]
bob_decrypt_time = time.time() - start_time

# Loss/accuracy computation (optional)
bob_loss_test = np.mean(np.abs(message_arr[0] - bob_output))

print('Alice encrypt time:', alice_encrypt_time)
print('Bob decrypt time:', bob_decrypt_time)
print('Bob mean absolute error (loss):', bob_loss_test)

In [None]:
# Bob's predicted output (binary array)
b1 = np.round(bob_output, 1)
b1 = b1.ravel()
b1 = np.abs(b1)
print('bob_output == ', b1)

# Convert to binary string
b2 = ''.join(str(int(bit)) for bit in b1[:len(secret_message)*8])
print('b2 (binary) =', b2)

# Convert binary string to ASCII
n = int(b2, 2)
try:
    str2 = n.to_bytes((n.bit_length() + 7) // 8, 'big').decode(errors='ignore')
except Exception as e:
    str2 = "Decode error: " + str(e)
print("Bob recovered plain text:", repr(str2))

In [None]:
# If you have an 'eve' model:
# eve_output = eve.predict(np.expand_dims(stego_img, 0))[0]
# eve_loss_test = np.mean(np.abs(message_arr[0] - eve_output))
# print('Eve mean absolute error (loss):', eve_loss_test)