<a href="https://colab.research.google.com/github/gracemaria321/AI-Integrated-Steganalysis/blob/trials/DCT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [8]:
# -*- coding: utf-8 -*-
"""
Created on Wed Oct 22 13:10:19 2025

@author: Grace Maria IIT
"""

import os
import numpy as np
from PIL import Image
from skimage.metrics import structural_similarity as ssim
from skimage.metrics import peak_signal_noise_ratio as psnr
from scipy.fftpack import dct, idct
import kagglehub
import csv
import warnings
warnings.filterwarnings("ignore")


# Helper functions for DCT
def blockwise_dct(img_array, block_size=8):
    h, w = img_array.shape
    dct_blocks = np.zeros_like(img_array, dtype=np.float32)
    for i in range(0, h, block_size):
        for j in range(0, w, block_size):
            block = img_array[i:i+block_size, j:j+block_size]
            if block.shape == (block_size, block_size):
                dct_block = dct(dct(block.T, norm='ortho').T, norm='ortho')
                dct_blocks[i:i+block_size, j:j+block_size] = dct_block
    return dct_blocks

def blockwise_idct(dct_array, block_size=8):
    h, w = dct_array.shape
    img_array = np.zeros_like(dct_array, dtype=np.float32)
    for i in range(0, h, block_size):
        for j in range(0, w, block_size):
            block = dct_array[i:i+block_size, j:j+block_size]
            if block.shape == (block_size, block_size):
                idct_block = idct(idct(block.T, norm='ortho').T, norm='ortho')
                img_array[i:i+block_size, j:j+block_size] = idct_block
    return np.clip(img_array, 0, 255).astype(np.uint8)

# DCT-based encoding
def encode_dct(image_path, message, output_path=None):
    img = Image.open(image_path).convert("L")
    img_array = np.array(img, dtype=np.float32)
    h, w = img_array.shape

    message += chr(0)  # Delimiter
    msg_binary = ''.join([format(ord(char), '08b') for char in message])
    msg_index = 0

    dct_array = blockwise_dct(img_array)
    for i in range(0, h, 8):
        for j in range(0, w, 8):
            if msg_index >= len(msg_binary):
                break
            block = dct_array[i:i+8, j:j+8]
            if block.shape == (8, 8):
                coeff = int(block[4, 3])
                coeff = (coeff & ~1) | int(msg_binary[msg_index])
                block[4, 3] = coeff
                dct_array[i:i+8, j:j+8] = block
                msg_index += 1
        if msg_index >= len(msg_binary):
            break

    encoded_array = blockwise_idct(dct_array)
    encoded_img = Image.fromarray(encoded_array)
    if output_path is None:
        base, ext = os.path.splitext(image_path)
        output_path = f"{base}_dct_encoded.png"
    encoded_img.save(output_path)
    return output_path

# DCT-based decoding
def decode_dct(image_path):
    img = Image.open(image_path).convert("L")
    img_array = np.array(img, dtype=np.float32)
    h, w = img_array.shape

    dct_array = blockwise_dct(img_array)
    msg_binary = ''
    for i in range(0, h, 8):
        for j in range(0, w, 8):
            block = dct_array[i:i+8, j:j+8]
            if block.shape == (8, 8):
                coeff = int(block[4, 3])
                msg_binary += str(coeff & 1)

    chars = [chr(int(msg_binary[i:i+8], 2)) for i in range(0, len(msg_binary), 8)]
    message = ''.join(chars)
    return message.split(chr(0))[0]

# Evaluation metrics
def compute_ssim(original_path, stego_path):
    original = np.array(Image.open(original_path).convert("L"))
    stego = np.array(Image.open(stego_path).convert("L"))
    return ssim(original, stego)

def compute_psnr(original_path, stego_path):
    original = np.array(Image.open(original_path).convert("L"))
    stego = np.array(Image.open(stego_path).convert("L"))
    return psnr(original, stego)

def detection_accuracy(original_message, decoded_message):
    correct = sum(o == d for o, d in zip(original_message, decoded_message))
    return (correct / len(original_message)) * 100 if original_message else 0


# Dataset processing
def process_dct_dataset(dataset_path, message, output_base_folder="/content/"):
    for root, dirs, files in os.walk(dataset_path):
        # Process only immediate subdirectories of the dataset path
        if root == dataset_path:
            for sub_dir in dirs:
                image_folder = os.path.join(root, sub_dir)
                output_folder = os.path.join(output_base_folder, sub_dir)
                print(f"Processing folder: {image_folder}")
                process_images_in_folder(image_folder, message, output_folder)

def process_images_in_folder(image_folder, message, output_folder):
    image_files = sorted([f for f in os.listdir(image_folder) if f.lower().endswith(('.png', '.bmp', '.jpg', '.jpeg'))])
    log = []

    # Create output folder if it doesn't exist
    os.makedirs(output_folder, exist_ok=True)

    for img_file in image_files:
        img_path = os.path.join(image_folder, img_file)
        base, ext = os.path.splitext(img_file)
        stego_path = os.path.join(output_folder, f"{base}_dct_encoded.png")

        try:
            encode_dct(img_path, message, stego_path)
            ssim_val = compute_ssim(img_path, stego_path)
            psnr_val = compute_psnr(img_path, stego_path)
            decoded_msg = decode_dct(stego_path)
            accuracy = detection_accuracy(message, decoded_msg)

            log.append({
                "image": img_file,
                "stego_image": os.path.basename(stego_path),
                "message": message,
                "decoded": decoded_msg,
                "SSIM": round(ssim_val, 4),
                "PSNR": round(psnr_val, 2),
                "Accuracy": round(accuracy, 2)
            })
        except Exception as e:
            print(f"Error processing {img_file}: {e}")

    # Save log to CSV only if there are entries
    if log:
        csv_path = os.path.join(output_folder, "steganalysis_log.csv")
        with open(csv_path, mode='w', newline='', encoding='utf-8') as file:
            writer = csv.DictWriter(file, fieldnames=["image", "stego_image", "message", "decoded", "SSIM", "PSNR", "Accuracy"])
            writer.writeheader()
            for entry in log:
                writer.writerow(entry)

        # Print log
        for entry in log:
            print(f"\nImage: {entry['image']}")
            print(f"Stego Image: {entry['stego_image']}")
            print(f"Original Message: {entry['message']}")
            print(f"Decoded Message: {entry['decoded']}")
            print(f"SSIM: {entry['SSIM']}")
            print(f"PSNR: {entry['PSNR']} dB")
            print(f"Detection Accuracy: {entry['Accuracy']}%")
        print(f"Processing successful and log saved to {csv_path}")
    else:
        print(f"No images were successfully processed in {image_folder}. The log file was not created.")


dataset_path = kagglehub.dataset_download("saadghojaria/sneakers-image-dataset-pinterest")
message = "Hello Grace! Welcome to IIT. Steganography is fun. AI enhances security. Python scripting rocks!"
process_dct_dataset(dataset_path, message)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Stego Image: 90db887b1ce9076a423161c60dfed400_dct_encoded.png
Original Message: Hello Grace! Welcome to IIT. Steganography is fun. AI enhances security. Python scripting rocks!
Decoded Message: ïëôÇïoÿòþôõ9¡þÿox}÷oöÿ&oØ;ç¿ôç÷.«ßþÿóïþrÿó/î|æ¹II.7ÿ~´Âceáêëú}wíöõÿ=ÏÝ½ü®|3zgütõö[ä÷oóêvYþ£
SSIM: 0.9891
PSNR: 28.31 dB
Detection Accuracy: 5.21%

Image: 91424a8567c29e3b5e3c9fa1d522b5c9.jpg
Stego Image: 91424a8567c29e3b5e3c9fa1d522b5c9_dct_encoded.png
Original Message: Hello Grace! Welcome to IIT. Steganography is fun. AI enhances security. Python scripting rocks!
Decoded Message: õ¤·ÿ¿ïÿÿßdÿï»þþz\¿êÿÿ÷ÿý{íãðÿÀ³«þ¿_g{ûO½þÏïúðãáÕ:[û¿PÞ¿ò§ÏÙznJvß:ÜV]Éyoÿ:läQð½þ÷ï>ïø¡
SSIM: 0.9993
PSNR: 50.8 dB
Detection Accuracy: 0.0%

Image: 91912fa4fa23b6ed15406dba966cda0d.png
Stego Image: 91912fa4fa23b6ed15406dba966cda0d_dct_encoded.png
Original Message: Hello Grace! Welcome to IIT. Steganography is fun. AI enhances security. 

In [9]:
# -*- coding: utf-8 -*-
"""
Created on Wed Oct 22 13:10:19 2025

@author: Grace Maria IIT
"""

import os
import numpy as np
from PIL import Image
from skimage.metrics import structural_similarity as ssim
from skimage.metrics import peak_signal_noise_ratio as psnr
from scipy.fftpack import dct, idct
import kagglehub
import csv
import warnings
warnings.filterwarnings("ignore")


# Helper functions for DCT
def blockwise_dct(img_array, block_size=8):
    h, w = img_array.shape
    dct_blocks = np.zeros_like(img_array, dtype=np.float32)
    for i in range(0, h, block_size):
        for j in range(0, w, block_size):
            block = img_array[i:i+block_size, j:j+block_size]
            if block.shape == (block_size, block_size):
                dct_block = dct(dct(block.T, norm='ortho').T, norm='ortho')
                dct_blocks[i:i+block_size, j:j+block_size] = dct_block
    return dct_blocks

def blockwise_idct(dct_array, block_size=8):
    h, w = dct_array.shape
    img_array = np.zeros_like(dct_array, dtype=np.float32)
    for i in range(0, h, block_size):
        for j in range(0, w, block_size):
            block = dct_array[i:i+block_size, j:j+block_size]
            if block.shape == (block_size, block_size):
                idct_block = idct(idct(block.T, norm='ortho').T, norm='ortho')
                img_array[i:i+block_size, j:j+block_size] = idct_block
    return np.clip(img_array, 0, 255).astype(np.uint8)

# DCT-based encoding
def encode_dct(image_path, message, output_path=None):
    img = Image.open(image_path).convert("L")
    img_array = np.array(img, dtype=np.float32)
    h, w = img_array.shape

    message += chr(0)  # Delimiter
    msg_binary = ''.join([format(ord(char), '08b') for char in message])
    msg_index = 0

    dct_array = blockwise_dct(img_array)
    for i in range(0, h, 8):
        for j in range(0, w, 8):
            if msg_index >= len(msg_binary):
                break
            block = dct_array[i:i+8, j:j+8]
            if block.shape == (8, 8):
                coeff = int(block[4, 3])
                coeff = (coeff & ~1) | int(msg_binary[msg_index])
                block[4, 3] = coeff
                dct_array[i:i+8, j:j+8] = block
                msg_index += 1
        if msg_index >= len(msg_binary):
            break

    encoded_array = blockwise_idct(dct_array)
    encoded_img = Image.fromarray(encoded_array)
    if output_path is None:
        base, ext = os.path.splitext(image_path)
        output_path = f"{base}_dct_encoded.png"
    encoded_img.save(output_path)
    return output_path

# DCT-based decoding
def decode_dct(image_path):
    img = Image.open(image_path).convert("L")
    img_array = np.array(img, dtype=np.float32)
    h, w = img_array.shape

    dct_array = blockwise_dct(img_array)
    msg_binary = ''
    for i in range(0, h, 8):
        for j in range(0, w, 8):
            block = dct_array[i:i+8, j:j+8]
            if block.shape == (8, 8):
                coeff = int(block[4, 3])
                msg_binary += str(coeff & 1)

    chars = [chr(int(msg_binary[i:i+8], 2)) for i in range(0, len(msg_binary), 8)]
    message = ''.join(chars)
    return message.split(chr(0))[0]

# Evaluation metrics
def compute_ssim(original_path, stego_path):
    original = np.array(Image.open(original_path).convert("L"))
    stego = np.array(Image.open(stego_path).convert("L"))
    return ssim(original, stego)

def compute_psnr(original_path, stego_path):
    original = np.array(Image.open(original_path).convert("L"))
    stego = np.array(Image.open(stego_path).convert("L"))
    return psnr(original, stego)

def detection_accuracy(original_message, decoded_message):
    correct = sum(o == d for o, d in zip(original_message, decoded_message))
    return (correct / len(original_message)) * 100 if original_message else 0


# Dataset processing
def process_dct_dataset(dataset_path, message, output_base_folder="/output/"):
    for root, dirs, files in os.walk(dataset_path):
        # Process only immediate subdirectories of the dataset path
        if root == dataset_path:
            for sub_dir in dirs:
                image_folder = os.path.join(root, sub_dir)
                output_folder = os.path.join(output_base_folder, sub_dir)
                print(f"Processing folder: {image_folder}")
                process_images_in_folder(image_folder, message, output_folder)

def process_images_in_folder(image_folder, message, output_folder):
    image_files = sorted([f for f in os.listdir(image_folder) if f.lower().endswith(('.png', '.bmp', '.jpg', '.jpeg'))])
    log = []

    # Create output folder if it doesn't exist
    os.makedirs(output_folder, exist_ok=True)

    for img_file in image_files:
        img_path = os.path.join(image_folder, img_file)
        base, ext = os.path.splitext(img_file)
        stego_path = os.path.join(output_folder, f"{base}_dct_encoded.png")

        try:
            encode_dct(img_path, message, stego_path)
            ssim_val = compute_ssim(img_path, stego_path)
            psnr_val = compute_psnr(img_path, stego_path)
            decoded_msg = decode_dct(stego_path)
            accuracy = detection_accuracy(message, decoded_msg)

            log.append({
                "image": img_file,
                "stego_image": os.path.basename(stego_path),
                "message": message,
                "decoded": decoded_msg,
                "SSIM": round(ssim_val, 4),
                "PSNR": round(psnr_val, 2),
                "Accuracy": round(accuracy, 2)
            })
        except Exception as e:
            print(f"Error processing {img_file}: {e}")

    # Save log to CSV only if there are entries
    if log:
        csv_path = os.path.join(output_folder, "steganalysis_log.csv")
        with open(csv_path, mode='w', newline='', encoding='utf-8') as file:
            writer = csv.DictWriter(file, fieldnames=["image", "stego_image", "message", "decoded", "SSIM", "PSNR", "Accuracy"])
            writer.writeheader()
            for entry in log:
                writer.writerow(entry)

        # Print log
        print(f"Processing successful and log saved to {csv_path}")
    else:
        print(f"No images were successfully processed in {image_folder}. The log file was not created.")


dataset_path = kagglehub.dataset_download("saadghojaria/sneakers-image-dataset-pinterest")
message = "Hello Grace! Welcome to IIT. Steganography is fun. AI enhances security. Python scripting rocks!"
process_dct_dataset(dataset_path, message)

Using Colab cache for faster access to the 'sneakers-image-dataset-pinterest' dataset.
Processing folder: /kaggle/input/sneakers-image-dataset-pinterest/sneakers
Processing successful and log saved to /output/sneakers/steganalysis_log.csv
Processing folder: /kaggle/input/sneakers-image-dataset-pinterest/boots
Processing successful and log saved to /output/boots/steganalysis_log.csv


In [7]:
import os
import kagglehub

dataset_path = kagglehub.dataset_download("saadghojaria/sneakers-image-dataset-pinterest")

print(f"Dataset downloaded to: {dataset_path}")
print("Contents of the dataset directory:")
for root, dirs, files in os.walk(dataset_path):
    level = root.replace(dataset_path, '').count(os.sep)
    indent = ' ' * 4 * (level)
    print(f'{indent}{os.path.basename(root)}/')
    subindent = ' ' * 4 * (level + 1)
    for f in files:
        print(f'{subindent}{f}')

Using Colab cache for faster access to the 'sneakers-image-dataset-pinterest' dataset.
Dataset downloaded to: /kaggle/input/sneakers-image-dataset-pinterest
Contents of the dataset directory:
sneakers-image-dataset-pinterest/
    sneakers/
        793ab48642066e158453936c0c54833f.png
        446f45174cf8a1a6b6b3626ccbb82612.jpg
        418a27f280f78a2a9f998e707c81ae9e.jpg
        ac8b3744249cf38ec767019b11be638c.jpg
        ecb6f1cc1c915c908483bef754743b51.jpg
        ac10b778c45b130500224f70851a7a57.png
        0e1a215915632f5449e0d060120ee7a8.jpg
        80e20c1cf78a14ef6ec9ce9199ae05aa.jpg
        e048bc572b410a975422c4978aa6fa9f.jpg
        65b2267f8b72ded349ffbe6b1fe0d4a5.jpg
        66e3f85790201f16a9ec99e31461b71f.jpg
        ce6882a721d58b914cafc9837ff2ac85.jpg
        9ef5b71a944cbbfcad2e1bc3384cd4c5.jpg
        ec069fa14e0b27d554c2dde9f9b85a78.jpg
        c79969ba3d64430c65957e85088ecd1a.jpg
        1df27a99c2bed99f6a35a2663255a281.jpg
        6b68b2ed74b22f0f1665cc831c009dcf