# Comprehensive Technical Methodology for Malicious Payload Detection


### 1. Imports and Setup

In [None]:
import os
import shutil
import warnings
import zipfile
import kagglehub
import cv2

import albumentations as A
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.layers import (
    Input, Dense, GlobalAveragePooling1D, GlobalAveragePooling2D, Conv2D,
    MaxPooling2D, Flatten, Reshape, GRU, LSTM, Dropout
)
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.utils import to_categorical, Sequence
from transformers import TFAutoModel
from PIL import Image
import matplotlib.pyplot as plt
import seaborn as sns

from transformers import ViTImageProcessor, TFViTForImageClassification

# Suppress warnings
warnings.filterwarnings('ignore')

### 2. Configuration

In [None]:
class Config:
    # Directory configurations
    BASE_DIR = 'payload_detection_project'
    DATA_DIR = os.path.join(BASE_DIR, 'data')
    TRAIN_DATA_DIR = os.path.join(DATA_DIR, 'train')
    TEST_DATA_DIR = os.path.join(DATA_DIR, 'test')
    VAL_DATA_DIR = os.path.join(DATA_DIR, 'val') # Though not used for final model training
    MODEL_DIR = os.path.join(BASE_DIR, 'models')
    RESULTS_DIR = os.path.join(BASE_DIR, 'results')

    # Training configurations
    BATCH_SIZE = 32
    EPOCHS = 10
    N_FOLDS = 5
    RANDOM_STATE = 42

    # Transformer model names
    DEIT_MODEL_NAME = 'facebook/deit-base-distilled-patch16-224'
    VIT_MODEL_NAME = 'google/vit-base-patch16-224-in21k'

### 3. Data Preparation: Segregation and Loading

In [None]:
def segregate_data_by_payload(original_data_path, new_base_path):
    """
    Segregates the dataset into directories based on payload type (e.g., 'Benign', 'HiddenPayloadA').
    This is specifically for the structure of the 'stegoimagesdataset'.
    """
    print("\nSegregating data by payload type...")
    for split in ['train', 'test', 'val']: # The dataset structure has train, test, val subdirectories
        stego_path = os.path.join(original_data_path, split, split, 'stego') # Adjust path based on dataset structure
        clean_path = os.path.join(original_data_path, split, split, 'clean') # Adjust path based on dataset structure
        new_split_path = os.path.join(new_base_path, split)

        if os.path.exists(stego_path):
            print(f"Processing stego images in {split} split...")
            for img_name in os.listdir(stego_path):
                try:
                    # Expected format: carrier_method_payload_index.png
                    parts = img_name.split("_")
                    if len(parts) > 2:
                        payload_class = parts[2] # Extract payload type as class
                        class_dir = os.path.join(new_split_path, payload_class)
                        os.makedirs(class_dir, exist_ok=True)
                        shutil.copy(os.path.join(stego_path, img_name), class_dir)
                    else:
                        print(f"Skipping file with unexpected name format in stego: {img_name}")
                except Exception as e:
                    print(f"Error processing file {img_name}: {e}")

        if os.path.exists(clean_path):
            print(f"Processing clean images in {split} split...")
            new_clean_path = os.path.join(new_split_path, 'Benign') # Name 'clean' as 'Benign' class
            os.makedirs(new_clean_path, exist_ok=True) # Ensure directory exists
            # Copy files individually instead of using copytree to avoid issues with existing dirs
            for img_name in os.listdir(clean_path):
                if os.path.isfile(os.path.join(clean_path, img_name)):
                    shutil.copy(os.path.join(clean_path, img_name), new_clean_path)

    print("Data segregation complete.")

import shutil
def setup_environment():
    """
    Sets up the necessary directory structure and downloads/organizes data using payload-based segregation.
    """
    print("--- Setting up project environment ---")

    # Create base project directories
    for dir_path in [Config.BASE_DIR, Config.DATA_DIR, Config.TRAIN_DATA_DIR, Config.TEST_DATA_DIR, Config.VAL_DATA_DIR, Config.MODEL_DIR, Config.RESULTS_DIR]:
        if not os.path.exists(dir_path):
            os.makedirs(dir_path)
            print(f"Created directory: {dir_path}")

    # Check if data is already segregated
    expected_train_classes = ['Benign', 'HiddenPayloadA']
    data_already_segregated = all(os.path.exists(os.path.join(Config.TRAIN_DATA_DIR, cls)) for cls in expected_train_classes)

    if data_already_segregated:
        print("Dataset already downloaded and segregated.")
        return

    print("Dataset not found or not organized. Starting download and segregation...")

    # Download data from Kaggle Hub
    download_path = None
    try:
        zip_path = kagglehub.dataset_download('marcozuppelli/stegoimagesdataset')
        download_path = 'stegoimagesdataset_unzipped'
        shutil.copytree(zip_path, "./stegoimagesdataset_unzipped")
        print("Path to copyied dataset files:", download_path)
    except Exception as e:
        print(f"Could not download or extract from Kaggle Hub. Error: {e}")
        raise

    segregate_data_by_payload(download_path, Config.DATA_DIR)

    print("\nEnvironment setup complete.")

def load_and_combine_data():
    """
    Loads the segregated training and testing data and combines them for cross-validation.
    """
    print("\n--- Loading segregated data into DataFrames ---")

    def build_df_from_directory(directory):
        image_files = []
        labels = []
        if not os.path.exists(directory):
            print(f"Warning: Directory not found: {directory}. Skipping.")
            return pd.DataFrame({'image_path': [], 'label': []})

        for class_folder in os.listdir(directory):
            class_path = os.path.join(directory, class_folder)
            if os.path.isdir(class_path):
                for img_file in os.listdir(class_path):
                    image_files.append(os.path.join(class_path, img_file))
                    labels.append(class_folder)
        return pd.DataFrame({'image_path': image_files, 'label': labels})

    train_df = build_df_from_directory(Config.TRAIN_DATA_DIR)
    test_df = build_df_from_directory(Config.TEST_DATA_DIR)

    if train_df.empty or test_df.empty:
        raise ValueError("Training or testing DataFrame is empty. Data segregation may have failed.")

    # Combine for cross-validation
    combined_df = pd.concat([train_df, test_df], ignore_index=True)

    label_encoder = LabelEncoder()
    combined_df['encoded_label'] = label_encoder.fit_transform(combined_df['label'])

    print(f"Total combined data shape: {combined_df.shape}")
    print("Class distribution in combined set:\n", combined_df['label'].value_counts())

    return combined_df, label_encoder

# Execute the data preparation pipeline
setup_environment()
df, label_encoder = load_and_combine_data()
df.head()

### 4. Data Generator and Augmentation

In [None]:
class PayloadDataGenerator(Sequence):
    """Custom data generator for payload images, using Albumentations."""
    def __init__(self, df, batch_size, image_size, num_classes, augment=False, shuffle=True):
        self.df = df
        self.batch_size = batch_size
        self.image_size = image_size
        self.num_classes = num_classes
        self.shuffle = shuffle
        self.augment = augment
        # 🚀 CRITICAL FIX: Use the actual indices of the input DataFrame slice
        self.indices = self.df.index.tolist()
        self.on_epoch_end()

        if self.augment:
            # Using the more advanced augmentation pipeline from new_payload_code.ipynb
            self.augmentation = A.Compose([
                A.RandomCrop(width=int(image_size[0]*0.8), height=int(image_size[1]*0.8), p=0.5),
                A.HorizontalFlip(p=0.5),
                A.RandomBrightnessContrast(p=0.3),
                A.RandomGamma(p=0.3),
                A.ElasticTransform(p=0.3, alpha=120, sigma=120 * 0.05, alpha_affine=120 * 0.03),
                A.GridDistortion(p=0.3),
                A.OpticalDistortion(p=0.3, distort_limit=2, shift_limit=0.5),
                A.Resize(image_size[0], image_size[1]) # Resize back to target size
            ])
        else:
            self.augmentation = None

    def __len__(self):
        return int(np.floor(len(self.indices) / self.batch_size))

    def __getitem__(self, index):
        # 🚀 CRITICAL FIX: Select indices from the generator's indices list
        batch_indices = self.indices[index * self.batch_size:(index + 1) * self.batch_size]
        X, y = self.__data_generation(batch_indices)
        return X, y

    def on_epoche_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

    def __data_generation(self, batch_indices):
        # 🚀 CRITICAL FIX: Use .loc[] to access rows by index label
        batch_df = self.df.loc[batch_indices]
        X = np.empty((self.batch_size, *self.image_size, 3), dtype=np.float32) # Use float32 for normalization
        y = np.empty((self.batch_size), dtype=int)

        for i, (idx, row) in enumerate(batch_df.iterrows()):
            img_path = row['image_path']
            image = cv2.imread(img_path)
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

            if self.augmentation:
                augmented = self.augmentation(image=image)
                image = augmented['image']

            image = cv2.resize(image, self.image_size)

            # 🚀 CRITICAL FIX: Add pixel normalization for the DeiT model
            X[i,] = image
            y[i] = row['encoded_label']

        return X, to_categorical(y, num_classes=self.num_classes)

### 5. Training and Evaluation Utilities

In [None]:
!nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits

In [None]:
def train_and_evaluate_model(model_builder, experiment_name, image_size, df, num_classes):
    """Trains and evaluates a given model using 5-fold cross-validation."""
    print(f"\n--- Starting Experiment: {experiment_name} ---")

    skf = StratifiedKFold(n_splits=Config.N_FOLDS, shuffle=True, random_state=Config.RANDOM_STATE)
    cv_scores = []
    histories = []

    X = df['image_path'].values
    y = df['encoded_label'].values


    callbacks = [
        EarlyStopping(monitor='val_accuracy', patience=5, restore_best_weights=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=1e-6)
    ]

    for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
        print(f'\n----- Fold {fold+1}/{Config.N_FOLDS} -----')

        train_df = df.iloc[train_idx]
        val_df = df.iloc[val_idx]

        train_generator = PayloadDataGenerator(
            df=train_df,
            batch_size=Config.BATCH_SIZE,
            image_size=image_size,
            num_classes=num_classes,
            augment=False
        )

        val_generator = PayloadDataGenerator(
            df=val_df,
            batch_size=Config.BATCH_SIZE,
            image_size=image_size,
            num_classes=num_classes,
            augment=False,
            shuffle=False
        )

            model = model_builder(image_size, num_classes)

            optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
        
            model.compile(
            optimizer=optimizer,
            loss='categorical_crossentropy',
            metrics=['accuracy']
            )

        history = model.fit(
            train_generator,
            validation_data=val_generator,
            epochs=Config.EPOCHS,
            callbacks=callbacks,
            verbose=1
        )
        histories.append(history)

        loss, accuracy = model.evaluate(val_generator, verbose=0)
        print(f'Fold {fold+1} Validation Accuracy: {accuracy:.4f}')
        cv_scores.append(accuracy)

        # Save the model for this fold
        model_save_path = os.path.join(Config.MODEL_DIR, f"/content/drive/MyDrive/payload_new_models/{experiment_name}_fold_{fold+1}.keras")
        model.save(model_save_path)
        print(f"Model for fold {fold+1} saved to {model_save_path}")

    print(f"\n--- Results for {experiment_name} ---")
    print(f"Scores for each fold: {[round(s, 4) for s in cv_scores]}")
    print(f"Mean Accuracy: {np.mean(cv_scores):.4f}")
    print(f"Standard Deviation: {np.std(cv_scores):.4f}\n")
    return histories, cv_scores

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
os.makedirs("/content/drive/MyDrive/payload_new_models", exist_ok=True)

In [None]:
num_classes = len(label_encoder.classes_)

### 7. [NEW] Experiment 2: CNN (512x512)

In [None]:
def create_cnn_model(image_size, num_classes):
    """Builds a standard CNN model."""
    input_shape = (*image_size, 3)
    model = Sequential([
        Input(shape=input_shape),
        Conv2D(32, (3, 3), activation='relu', padding='same'),
        MaxPooling2D((2, 2)),
        Conv2D(64, (3, 3), activation='relu', padding='same'),
        MaxPooling2D((2, 2)),
        Conv2D(128, (3, 3), activation='relu', padding='same'),
        MaxPooling2D((2, 2)),
        Conv2D(256, (3, 3), activation='relu', padding='same'),
        MaxPooling2D((2, 2)),
        Flatten(),
        Dense(512, activation='relu'),
        Dropout(0.5),
        Dense(num_classes, activation='softmax')
    ])
    model.summary()
    return model

cnn_512_histories, cnn_512_scores = train_and_evaluate_model(
    model_builder=create_cnn_model,
    experiment_name='CNN_512x512',
    image_size=(512, 512),
    df=df,
    num_classes=num_classes
)

In [None]:
cnn_224_histories, cnn_224_scores = train_and_evaluate_model(
    model_builder=create_cnn_model,
    experiment_name='CNN_224x224',
    image_size=(224, 224),
    df=df,
    num_classes=num_classes
)

### 8. [NEW] Experiment 3 & 4: CNN-GRU (512x512 and 224x224)

In [None]:
def create_cnn_gru_model(image_size, num_classes):
    """Builds a CNN-GRU model."""
    input_shape = (*image_size, 3)
    cnn_base = Sequential([
        Input(shape=input_shape),
        Conv2D(32, (3, 3), activation='relu', padding='same'),
        MaxPooling2D((2, 2)),
        Conv2D(64, (3, 3), activation='relu', padding='same'),
        MaxPooling2D((2, 2)),
        Conv2D(128, (3, 3), activation='relu', padding='same'),
        MaxPooling2D((2, 2))
    ], name='cnn_base')

    cnn_output_shape = cnn_base.output_shape
    timesteps = cnn_output_shape[1]
    features = cnn_output_shape[2] * cnn_output_shape[3]

    model = Sequential([
        cnn_base,
        Reshape((timesteps, features)),
        GRU(128, return_sequences=False),
        Dropout(0.5),
        Dense(128, activation='relu'),
        Dropout(0.5),
        Dense(num_classes, activation='softmax')
    ])
    model.summary()
    return model

cnn_gru_512_histories, cnn_gru_512_scores = train_and_evaluate_model(
    model_builder=create_cnn_gru_model,
    experiment_name='CNN-GRU_512x512',
    image_size=(512, 512),
    df=df,
    num_classes=num_classes
)



### 9. [NEW] Experiment 5 & 6: CNN-LSTM (512x512 and 224x224)

In [None]:
def create_cnn_lstm_model(image_size, num_classes):
    """Builds a CNN-LSTM model."""
    input_shape = (*image_size, 3)
    cnn_base = Sequential([
        Input(shape=input_shape),
        Conv2D(32, (3, 3), activation='relu', padding='same'),
        MaxPooling2D((2, 2)),
        Conv2D(64, (3, 3), activation='relu', padding='same'),
        MaxPooling2D((2, 2)),
        Conv2D(128, (3, 3), activation='relu', padding='same'),
        MaxPooling2D((2, 2))
    ], name='cnn_base')

    cnn_output_shape = cnn_base.output_shape
    timesteps = cnn_output_shape[1]
    features = cnn_output_shape[2] * cnn_output_shape[3]

    model = Sequential([
        cnn_base,
        Reshape((timesteps, features)),
        LSTM(128, return_sequences=False),
        Dropout(0.5),
        Dense(128, activation='relu'),
        Dropout(0.5),
        Dense(num_classes, activation='softmax')
    ])
    model.summary()
    return model

In [None]:
cnn_lstm_512_histories, cnn_lstm_512_scores = train_and_evaluate_model(
    model_builder=create_cnn_lstm_model,
    experiment_name='CNN-LSTM_512x512',
    image_size=(512, 512),
    df=df,
    num_classes=num_classes
)

In [None]:
cnn_lstm_224_histories, cnn_lstm_224_scores = train_and_evaluate_model(
    model_builder=create_cnn_lstm_model,
    experiment_name='CNN-LSTM_224x224',
    image_size=(224, 224),
    df=df,
    num_classes=num_classes
)

### 11. Universal Prediction Function

#### Demonstrate Prediction Function

Here, we'll pick a few random images and test the prediction function.

In [None]:
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve
import matplotlib.pyplot as plt
import seaborn as sns
from tensorflow.keras.models import load_model
from tensorflow.keras.utils import to_categorical

def evaluate_model(model_path, experiment_name, df, label_encoder_classes, is_binary=False):
    """Evaluates a single model."""
    print(f"\n--- Evaluating Model: {experiment_name} ({'Binary' if is_binary else 'Multi-class'}) ---")

    try:
        model = load_model(model_path, compile=False)
    except Exception as e:
        print(f"Error loading model {model_path}: {e}")
        return None, None, None, None

    input_shape = model.input_shape[1:3]

    # Prepare data for evaluation
    # Use the entire combined dataframe for evaluation as models were trained on folds
    # This gives a comprehensive view across the dataset
    eval_df = df.copy()

    if is_binary:
        # Create binary labels: 0 for Benign, 1 for Not Benign
        eval_df['binary_label'] = eval_df['label'].apply(lambda x: 0 if x == 'Benign' else 1)
        true_labels = eval_df['binary_label'].values
        num_eval_classes = 2
        target_names = ['Benign', 'Not Benign']
    else:
        true_labels = eval_df['encoded_label'].values
        num_eval_classes = len(label_encoder_classes)
        target_names = label_encoder_classes

    # Create a generator for evaluation data
    eval_generator = PayloadDataGenerator(
        df=eval_df,
        batch_size=Config.BATCH_SIZE,
        image_size=input_shape,
        num_classes=num_classes, # Need original num_classes for generator even if doing binary eval
        augment=False,
        shuffle=False
    )

    # Get predictions
    # Predict returns probabilities for each class
    predictions_probs = model.predict(eval_generator)
    predicted_labels_encoded = np.argmax(predictions_probs, axis=1)

    if is_binary:
        # For binary task, map the multi-class predictions to binary predictions
        # If the multi-class prediction is 'Benign', the binary prediction is 0.
        # Otherwise, the binary prediction is 1.
        predicted_labels_binary = [0 if label_encoder_classes[pred] == 'Benign' else 1 for pred in predicted_labels_encoded]
        predicted_labels = predicted_labels_binary
        # For binary ROC-AUC, we need the probability of the positive class (Not Benign)
        # We need to sum probabilities of all non-Benign classes
        benign_index = list(label_encoder_classes).index('Benign')
        not_benign_probs = np.sum(np.delete(predictions_probs, benign_index, axis=1), axis=1)
        roc_auc_score_val = roc_auc_score(true_labels, not_benign_probs)
    else:
        predicted_labels = predicted_labels_encoded
        # For multi-class ROC-AUC, we use the one-vs-rest approach
        true_labels_one_hot = to_categorical(true_labels, num_classes=num_eval_classes)
        roc_auc_score_val = roc_auc_score(true_labels_one_hot, predictions_probs, average='macro')


    # Classification Report
    report = classification_report(true_labels, predicted_labels, target_names=target_names)
    print("\nClassification Report:")
    print(report)

    # Confusion Matrix
    cm = confusion_matrix(true_labels, predicted_labels)
    print("\nConfusion Matrix:")
    print(cm)

    # Plot Confusion Matrix
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=target_names, yticklabels=target_names)
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title(f'Confusion Matrix - {experiment_name} ({'Binary' if is_binary else 'Multi-class'})')
    plt.show()

    # ROC-AUC Curve (Multi-class using One-vs-Rest, Binary for positive class)
    if is_binary:
        fpr, tpr, _ = roc_curve(true_labels, not_benign_probs)
        plt.figure(figsize=(8, 6))
        plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc_score_val:.2f})')
        plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
        plt.xlim([0.0, 1.0])
        plt.ylim([0.0, 1.05])
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title(f'Receiver Operating Characteristic - {experiment_name} (Binary)')
        plt.legend(loc="lower right")
        plt.show()
    else:
        # Plot ROC-AUC for each class (One-vs-Rest)
        plt.figure(figsize=(10, 8))
        for i, target_name in enumerate(target_names):
            fpr, tpr, _ = roc_curve(true_labels == i, predictions_probs[:, i])
            plt.plot(fpr, tpr, lw=2, label=f'ROC curve of class {target_name} (area = {roc_auc_score(true_labels == i, predictions_probs[:, i]):.2f})')

        plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
        plt.xlim([0.0, 1.0])
        plt.ylim([0.0, 1.05])
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title(f'Receiver Operating Characteristic - {experiment_name} (Multi-class)')
        plt.legend(loc="lower right")
        plt.show()


    return report, cm, roc_auc_score_val, predicted_labels

# --- Evaluate all models ---
models_to_evaluate = {
    'CNN_512x512_fold_5': (512, 512),
    'CNN_224x224_fold_5': (224, 224),
    'CNN-GRU_512x512_fold_5': (512, 512),
    'CNN-GRU_224x224_fold_5': (224, 224),
    'CNN-LSTM_512x512_fold_5': (512, 512),
    'CNN-LSTM_224x224_fold_5': (224, 224),
}

evaluation_results = {}

for model_name, image_size in models_to_evaluate.items():
    model_path = os.path.join("/content/drive/MyDrive/payload_new_models", model_name + ".keras") # Assume .keras extension
    if os.path.exists(model_path):
        # Evaluate Multi-class
        report_mc, cm_mc, roc_auc_mc, preds_mc = evaluate_model(
            model_path,
            model_name,
            df,
            label_encoder.classes_,
            is_binary=False
        )

        # Evaluate Binary
        report_b, cm_b, roc_auc_b, preds_b = evaluate_model(
            model_path,
            model_name,
            df,
            label_encoder.classes_,
            is_binary=True
        )

        evaluation_results[model_name] = {
            'multi_class': {'report': report_mc, 'confusion_matrix': cm_mc, 'roc_auc': roc_auc_mc, 'predictions': preds_mc},
            'binary': {'report': report_b, 'confusion_matrix': cm_b, 'roc_auc': roc_auc_b, 'predictions': preds_b}
        }
    else:
        print(f"Model not found at {model_path}. Skipping evaluation for {model_name}.")

# Optional: You can now further process or display evaluation_results
# For example, you could create a summary table of ROC-AUC scores
print("\n--- Summary of ROC-AUC Scores ---")
for model_name, results in evaluation_results.items():
    print(f"{model_name}:")
    if results['multi_class']['roc_auc'] is not None:
        print(f"  Multi-class ROC-AUC (macro avg): {results['multi_class']['roc_auc']:.4f}")
    if results['binary']['roc_auc'] is not None:
         print(f"  Binary ROC-AUC: {results['binary']['roc_auc']:.4f}")

In [None]:
! wget https://webpages.tuni.fi/imaging/tampere17/tampere17_color.zip

In [None]:
!unzip tampere17_color.zip -d samples_to_embed/

In [None]:
import matplotlib.pyplot as plt
import os
from PIL import Image

sample_dir = "/content/samples_to_embed/color"
image_files = [os.path.join(sample_dir, f) for f in os.listdir(sample_dir) if f.endswith('.png')]

# Display a few sample images
num_samples_to_show = 5
selected_samples = image_files[:num_samples_to_show]

plt.figure(figsize=(15, 5))
for i, img_path in enumerate(selected_samples):
    try:
        img = Image.open(img_path)
        plt.subplot(1, num_samples_to_show, i + 1)
        plt.imshow(img)
        plt.title(os.path.basename(img_path))
        plt.axis('off')
    except Exception as e:
        print(f"Could not load image {img_path}: {e}")

plt.tight_layout()
plt.show()

In [None]:
from PIL import Image
import numpy as np
import struct

def _text_to_bits(data: bytes) -> np.ndarray:
    """Convert bytes to a 1-D numpy array of bits (0/1)."""
    bits = np.unpackbits(np.frombuffer(data, dtype=np.uint8))
    return bits.astype(np.uint8)

def _bits_to_bytes(bits: np.ndarray) -> bytes:
    """Convert 1-D numpy array of bits to bytes."""
    # pad to a multiple of 8
    pad = (-len(bits)) % 8
    if pad:
        bits = np.concatenate([bits, np.zeros(pad, dtype=np.uint8)])
    arr = np.packbits(bits)
    return arr.tobytes()

def embed_text_lsb(image_path: str, text: str, out_path: str) -> None:
    """
    Embed text into image LSBs and save to out_path.
    - image_path: path to input image (PNG recommended).
    - text: string to embed (e.g., Ethereum address).
    - out_path: path to save stego image.
    """
    # Load image and convert to RGB
    img = Image.open(image_path).convert("RGB")
    arr = np.array(img)
    h, w, c = arr.shape
    if c != 3:
        raise ValueError("Expected RGB image with 3 channels")

    # Prepare payload: 32-bit length header (number of bytes) + UTF-8 bytes
    payload_bytes = text.encode("utf-8")
    length = len(payload_bytes)
    if length >= 2**32:
        raise ValueError("Text too long to encode")
    header = struct.pack(">I", length)  # big-endian 4 bytes
    full_bytes = header + payload_bytes
    bits = _text_to_bits(full_bytes)  # 8 * (4 + length) bits

    # Capacity: one bit per color channel per pixel if using all channels
    capacity = h * w * 3
    if bits.size > capacity:
        raise ValueError(f"Image too small; need {bits.size} bits but have {capacity}")

    # Flatten pixel array and replace LSBs
    flat = arr.flatten()
    # Replace LSB of first bits.size entries
    flat[:bits.size] = (flat[:bits.size] & 0xFE) | bits
    stego_arr = flat.reshape(arr.shape)

    stego_img = Image.fromarray(stego_arr.astype(np.uint8), "RGB")
    # Save as PNG to avoid compression losses
    stego_img.save(out_path, format="PNG")

def extract_text_lsb(stego_path: str) -> str:
    """
    Extract embedded text from LSBs of an image saved by embed_text_lsb.
    - stego_path: path to stego image (PNG recommended).
    Returns the extracted UTF-8 string.
    """
    img = Image.open(stego_path).convert("RGB")
    arr = np.array(img)
    flat = arr.flatten()
    # First read header: 32 bits = 4 bytes * 8
    header_bits = flat[:32] & 1
    header_bytes = _bits_to_bytes(header_bits)
    length = struct.unpack(">I", header_bytes[:4])[0]
    total_bits = (4 + length) * 8
    if total_bits > flat.size:
        raise ValueError("Declared payload length exceeds image capacity or image corrupted")
    payload_bits = flat[:total_bits] & 1
    payload_bytes = _bits_to_bytes(payload_bits)[4:4+length]
    return payload_bytes.decode("utf-8")
