# Baseline LSB

### imports

In [1]:
from PIL import Image
import numpy as np
from collections import Counter
import pandas as pd

In [2]:
def calculate_prr(original_payload: str, extracted_payload: str) -> float:
    """
    Calculate Payload Recovery Rate (PRR) for string inputs.

    :param original_payload: The original payload as a string.
    :param extracted_payload: The extracted payload as a string.
    :return: PRR as a percentage.
    """
    # Convert strings to bytes
    original_payload_bytes = original_payload.encode('utf-8')
    extracted_payload_bytes = extracted_payload.encode('utf-8')

    # Total bits in the original payload
    total_bits = len(original_payload_bytes) * 8

    # Edge case: If original payload is empty
    if total_bits == 0:
        return 100.0 if len(extracted_payload_bytes) == 0 else 0.0

    # Compare overlapping bytes bit by bit
    matching_bits = 0
    for orig_byte, ext_byte in zip(original_payload_bytes, extracted_payload_bytes):
        # Count matching bits in each byte
        matching_bits += 8 - bin(orig_byte ^ ext_byte).count('1')

    # Add penalty for bits in missing bytes
    unmatched_bits = (len(original_payload_bytes) - len(extracted_payload_bytes)) * 8

    # Total matching bits
    total_matching_bits = matching_bits

    # Calculate PRR
    prr = (total_matching_bits / total_bits) * 100
    return prr

def calculate_bit_error_rate(original: str, recovered: str) -> float:
    """
    Calculate the Bit Error Rate (BER) between the original and recovered strings.
    
    Parameters:
        original (str): The original string.
        recovered (str): The recovered string.
    
    Returns:
        float: The bit error rate as a percentage.
    """
    # Convert strings to binary representation
    original_bits = ''.join(format(ord(c), '08b') for c in original)
    recovered_bits = ''.join(format(ord(c), '08b') for c in recovered)

    # Make both binary strings the same length by trimming or padding
    max_length = max(len(original_bits), len(recovered_bits))
 
    original_bits = original_bits.ljust(max_length, '0')  # Pad original with '0' if shorter
    recovered_bits = recovered_bits[:max_length]  # Trim recovered if longer

    # Count the number of bit errors
    errors = sum(1 for o, r in zip(original_bits, recovered_bits) if o != r)

    # Compute BER as a percentage
    ber = (errors / max_length) * 100  
    
    return ber

In [8]:
import png 

def baseline_gery_encode_message(input_img_path, message, output, img_format="jpg"):
    """
    Encodes a message into an image using LSB steganography. Supports grayscale and color images.

    Parameters:
        input_img_path (str): Path to the input image.
        message (str): The message to encode. 
        output (str): Path to save the encoded image.
        img_format (str): Format to save the image (jpg or other).
    """
    message_binary = ''.join(format(ord(char), '08b') for char in message) + '00000000'

    image = Image.open(input_img_path)
    image_array = np.array(image, dtype=np.uint8)

    # Handle grayscale images:
    if image.mode == 'L':  # 'L' mode indicates grayscale
        flat_pixels = image_array.flatten()
    else:  # Color image (RGB or RGBA)
        flat_pixels = image_array.flatten()  # Flatten all color channels
    
    if len(message_binary) > len(flat_pixels):
        raise ValueError("Message is too large to encode in the given image.")

    for i, bit in enumerate(message_binary):
        flat_pixels[i] = (flat_pixels[i] & 0xFE) | int(bit)

    if image.mode == 'L':
        encoded_array = flat_pixels.reshape(image_array.shape)
    else:
        encoded_array = flat_pixels.reshape(image_array.shape)


    encoded_image = Image.fromarray(encoded_array.astype(np.uint8))
    if img_format == 'png':
        w = png.Writer(width=encoded_image.width, height=encoded_image.height, bitdepth=8, greyscale=True if image.mode == 'L' else False) # Set greyscale parameter
        with open(output, 'wb') as f:
            w.write(f, encoded_array)
    elif img_format == 'jpg':
        encoded_image.save(output, "JPEG", quality=95)
    else:
        encoded_image.save(output)

    print(f"Message encoded and saved to {output}")

ModuleNotFoundError: No module named 'png'

In [3]:
def baseline_encode_message(input_img_path, message, output , img_format = "jpg"):
    """
    Encodes a message into an image using LSB steganography.

    Parameters:
        input_img_path (str): Path to the input image.
        message (str): The message to encode. 
        output_jpeg (str): Path to save the JPEG-encoded image.
        jpeg_quality (int): JPEG quality for saving the image.
    """
    # Binary conversion of the message
    message_binary = ''.join(format(ord(char), '08b') for char in message) + '00000000' 

    # Read the image from location
    image = Image.open(input_img_path)
    
    # Array representation of the image
    image_array = np.array(image, dtype=np.uint8)   

    # Converting array into one dimension for processing
    flat_pixels = image_array.flatten()

    # Check if the message fits into the image
    if len(message_binary) > len(flat_pixels):
        raise ValueError("Message is too large to encode in the given image.")

    # Encode the message into the least significant bits of the image
    for i, bit in enumerate(message_binary):
        flat_pixels[i] = (flat_pixels[i] & 0xFE) | int(bit)  # Ensure LSB is set to the message bit

    # Reshape the array back to the original image shape
    encoded_array = flat_pixels.reshape(image_array.shape)

    # Save as PNG
    encoded_image = Image.fromarray(encoded_array)
    # encoded_image.save(output_png)

    # Convert PNG to given format
    if(img_format == 'jpg'):
        encoded_image.save(output, "JPEG", quality=95)
    else:
        encoded_image.save(output)

    print(f"Message encoded and saved to {output}")

def baseline_decode_message(image_path):
    """
    Decodes a message from an image using LSB steganography.

    Parameters:
        image_path (str): Path to the image containing the hidden message.

    Returns:
        str: The decoded message.
    """
    # Read the image from location
    image = Image.open(image_path)

    # Array representation of the image
    image_array = np.array(image, dtype=np.uint8)  # Ensure the array is uint8

    # Converting array into one dimension for processing
    flat_pixels = image_array.flatten()

    # Extract the least significant bits
    binary_message = ''.join(str(pixel & 1) for pixel in flat_pixels)

    # Convert binary to string, stopping at the null terminator
    chars = [chr(int(binary_message[i:i+8], 2)) for i in range(0, len(binary_message), 8)]
    message = ''.join(chars).split('\x00', 1)[0]  # Stop at the first null character

    return message


def hamming_encode(data, redundancy_bits=3):
    """
    Encodes a binary string using Hamming code with parameterized redundancy bits.

    Parameters:
        data (str): Binary string to encode (length must be a multiple of 2^redundancy_bits - redundancy_bits - 1).
        redundancy_bits (int): Number of redundancy bits (default is 3 for Hamming (7,4)).

    Returns:
        str: Hamming-encoded binary string.
    """
    # Determine the number of data bits per block
    data_bits = (2 ** redundancy_bits) - redundancy_bits - 1

    if len(data) % data_bits != 0:
        # Pad data to make it a multiple of data_bits
        padding_length = data_bits - (len(data) % data_bits)
        data += '0' * padding_length

    encoded = ""
    for i in range(0, len(data), data_bits):
        # Extract a block of data bits
        data_block = data[i:i+data_bits]
        data_block = list(map(int, data_block))  # Convert to a list of integers

        # Determine total bits (data + redundancy)
        total_bits = data_bits + redundancy_bits
        hamming_block = [0] * total_bits

        # Fill in the data bits (skipping parity bit positions)
        j = 0
        for bit_index in range(1, total_bits + 1):
            if not (bit_index & (bit_index - 1)) == 0:  # Check if not a power of 2
                hamming_block[bit_index - 1] = data_block[j]
                j += 1

        # Calculate the parity bits
        for parity_index in range(redundancy_bits):
            parity_position = 2 ** parity_index
            parity_value = 0
            for bit_index in range(1, total_bits + 1):
                if bit_index & parity_position and bit_index != parity_position:
                    parity_value ^= hamming_block[bit_index - 1]
            hamming_block[parity_position - 1] = parity_value

        # Append encoded block to the result
        encoded += ''.join(map(str, hamming_block))

    return encoded


def hamming_decode(encoded, redundancy_bits=3):
    """
    Decodes a Hamming encoded binary string with parameterized redundancy bits.

    Parameters:
        encoded (str): Hamming-encoded binary string.
        redundancy_bits (int): Number of redundancy bits (default is 3 for Hamming (7,4)).

    Returns:
        str: Decoded binary string.
    """
    # Determine the number of data bits per block
    data_bits = (2 ** redundancy_bits) - redundancy_bits - 1
    total_bits = data_bits + redundancy_bits

    if len(encoded) % total_bits != 0:
        # Pad the encoded message to make it a multiple of total_bits
        padding_length = total_bits - (len(encoded) % total_bits)
        encoded += '0' * padding_length

    decoded = ""
    for i in range(0, len(encoded), total_bits):
        # Extract a block of encoded bits
        encoded_block = list(map(int, encoded[i:i + total_bits]))

        # Check for errors using parity bits
        error_pos = 0
        for parity_index in range(redundancy_bits):
            parity_position = 2 ** parity_index
            parity_value = 0
            for bit_index in range(1, total_bits + 1):
                if bit_index & parity_position:
                    parity_value ^= encoded_block[bit_index - 1]
            # Update error position
            if parity_value != 0:
                error_pos += parity_position

        # Correct the error if any
        if error_pos != 0:
            encoded_block[error_pos - 1] ^= 1

        # Extract data bits (skip parity bit positions)
        decoded_block = []
        for bit_index in range(1, total_bits + 1):
            if not (bit_index & (bit_index - 1)) == 0:  # Not a power of 2
                decoded_block.append(encoded_block[bit_index - 1])

        # Convert data bits to string and append to the result
        decoded += ''.join(map(str, decoded_block))

    return decoded

def encode_message_with_hamming(image_path, message, output_path, img_format="jpg", redundancy_bits=3):
    """
    Encodes a message with Hamming code into an image.

    Parameters:
        image_path (str): Path to the input image.
        message (str): Message to encode.
        output_path (str): Path to save the encoded image.
    """
    # Convert the message to binary and Hamming-encode it
    message_binary = ''.join(format(ord(char), '08b') for char in message)
    message_hamming = hamming_encode(message_binary , redundancy_bits) + '0000000'  # Null terminator

    # Load the image
    image = Image.open(image_path)
    image_array = np.array(image, dtype=np.uint8)
    flat_pixels = image_array.flatten()

    if len(message_hamming) > len(flat_pixels):
        raise ValueError("Message is too large to encode in the given image.")

    # Encode the message
    for i, bit in enumerate(message_hamming):
        flat_pixels[i] = (flat_pixels[i] & 0xFE) | int(bit)

    encoded_array = flat_pixels.reshape(image_array.shape)
    encoded_image = Image.fromarray(encoded_array)
    

    if(img_format == 'jpg'):
        encoded_image.save(output_path, "JPEG", quality=95)
    else:
        encoded_image.save(output_path)
    

    print(f"Message encoded and saved to {output_path}")


def decode_message_with_hamming(image_path,redundancy_bits=3):
    """
    Decodes a Hamming-encoded message from an image.

    Parameters:
        image_path (str): Path to the encoded image.

    Returns:
        str: The decoded message.
    """
    # Load the image
    image = Image.open(image_path)
    image_array = np.array(image, dtype=np.uint8)
    flat_pixels = image_array.flatten()

    # Extract bits from the least significant bits
    binary_data = ''.join(str(pixel & 1) for pixel in flat_pixels)

    # Decode the Hamming-encoded binary data
    try:
        binary_message = hamming_decode(binary_data, redundancy_bits)
        chars = [chr(int(binary_message[i:i+8], 2)) for i in range(0, len(binary_message), 8)]
        message = ''.join(chars).split('\x00', 1)[0]  # Stop at the first null character
        return message
    except Exception as e:
        print(f"Decoding error: {e}")
        return None


def encode_message_with_redundancy(image_path, message, output_path, bit_position=1, redundancy=8, img_format="jpg"):
    """
    Encodes a message into an image with robustness against JPEG compression.

    Parameters:
        image_path (str): Path to the input image.
        message (str): Message to encode.
        output_path (str): Path to save the encoded image.
        bit_position (int): Bit position to embed the data (0 for LSB, 1 for 2nd LSB, etc.).
        redundancy (int): Number of times to repeat each bit for robustness.
    """
    # Validate bit position
    if not (0 <= bit_position < 8):
        raise ValueError("bit_position must be between 0 and 7.")

    # Convert the message to binary and add a null terminator
    message_binary = ''.join(format(ord(char), '08b') for char in message) + '00000000'

    # Repeat each bit for redundancy
    redundant_message = ''.join(bit * redundancy for bit in message_binary)

    # Load the image
    image = Image.open(image_path)
    image_array = np.array(image, dtype=np.uint8)
    flat_pixels = image_array.flatten()

    if len(redundant_message) > len(flat_pixels):
        raise ValueError("Message is too large to encode in the given image.")

    # Create a mask to clear the target bit
    mask = 255 - (1 << bit_position)

    # Encode the message
    for i, bit in enumerate(redundant_message):
        flat_pixels[i] = np.uint8((flat_pixels[i] & mask) | (int(bit) << bit_position))

    # Reshape and save the encoded image
    encoded_array = flat_pixels.reshape(image_array.shape)
    encoded_image = Image.fromarray(encoded_array)
    if img_format == 'jpg':
        encoded_image.save(output_path, format="JPEG", quality=90)  # Save as JPEG to test robustness
    else:
        encoded_image.save(output_path)  # Save as JPEG to test robustness
    print(f"Message encoded and saved to {output_path}")

def decode_message_with_redundancy(image_path, bit_position=1, redundancy=8):
    """
    Decodes a message from an image with robustness against JPEG compression.

    Parameters:
        image_path (str): Path to the encoded image.
        bit_position (int): Bit position to read the data from (0 for LSB, 1 for 2nd LSB, etc.).
        redundancy (int): Number of times each bit was repeated.
    """
    from collections import Counter

    # Load the image
    image = Image.open(image_path)
    image_array = np.array(image, dtype=np.uint8)
    flat_pixels = image_array.flatten()

    # Extract the embedded bits
    extracted_bits = [(flat_pixels[i] >> bit_position) & 1 for i in range(len(flat_pixels))]

    # Group bits by redundancy
    grouped_bits = [extracted_bits[i:i + redundancy] for i in range(0, len(extracted_bits), redundancy)]

    # Decode each group using majority voting and validate consistency
    decoded_bits = []
    for group in grouped_bits:
        if len(group) == redundancy:
            most_common_bit, count = Counter(group).most_common(1)[0]
            # Include the bit only if it appears in the majority
            if count > redundancy // 2:
                decoded_bits.append(most_common_bit)
            else:
                # Inconsistent group - discard
                break
        else:
            # Incomplete group - stop decoding
            break

    # Convert bits to characters
    decoded_message = ''.join(chr(int(''.join(map(str, decoded_bits[i:i + 8])), 2)) for i in range(0, len(decoded_bits), 8))

    # Stop at the null terminator
    decoded_message = decoded_message.split('\x00', 1)[0]
    #print(f"Decoded Message: {decoded_message}")
    return decoded_message

In [7]:
secret_message = "JPEG robustness! daraz"

color_space="RGB"
resolution = "high" #high , mid , low
algo="resized" # ground , hamming , redundancy  , resized
app="whatsapp" # fb , whatsapp
redundancy_bits = 3
img_format = "png"
img_format_list = ["jpg","png"]

#ENCODING
# for current_format in img_format_list:
#     for file in range (10): 
        
#         #input_image = "LSB\\" + color_space + "\\" + str(resolution) +"\\original\\" + str(file+1)  + "." + str(current_format)
#         #resize
#         input_image = "LSB\\" + color_space + "\\" + str(resolution) +"\\original\\resized\\" + str(file+1)  + "." + str(current_format)
        
#         if algo == 'ground':
#             output_image = "LSB\\" + color_space + "\\" + str(resolution) +"\\encoded\\" + str(algo) + "\\" + str(file+1) + "_encoded." + str(current_format)
#         elif algo == 'resized':
#             output_image = "LSB\\" + color_space + "\\" + str(resolution) +"\\encoded\\" + str(algo) + "\\" + str(redundancy_bits) + "\\" + str(file+1) + "_encoded."  + str(current_format) 
#         else:
#             output_image = "LSB\\" + color_space + "\\" + str(resolution) +"\\encoded\\" + str(algo) + "\\" + str(redundancy_bits) + "\\" + str(file+1) + "_encoded."  + str(current_format) 
        
#         # Embed message
#         if algo == 'ground':
#             if color_space == 'RGB':
#                 baseline_encode_message(input_image, secret_message , output_image , current_format)
#             else:
#                 baseline_gery_encode_message(input_image, secret_message , output_image , current_format)
#         elif algo == 'hamming':
#             encode_message_with_hamming(input_image, secret_message, output_image, current_format ,redundancy_bits) 
#         elif algo == 'redundancy':
#             encode_message_with_redundancy(input_image, secret_message, output_image, 7, redundancy_bits, current_format) 
#         elif algo == 'resized':
#             encode_message_with_redundancy(input_image, secret_message, output_image, 7, redundancy_bits, current_format) 
         





In [8]:
#DECODING
stats = [];

path = "LSB\\" + color_space + "\\" + str(resolution) +"\\" + str(app) + "\\" + str(img_format) + "\\" + str(algo) + "\\" + str(redundancy_bits) + "\\"
print(path)
for file in range (10):    
    
    #downloaded = "LSB\\" + color_space + "\\" + str(resolution) +"\\encoded\\" + str(algo) + "\\" + str(redundancy_bits) + "\\" + str(file+1)  + "_encoded" + "." + str(img_format)    
    downloaded = "LSB\\" + color_space + "\\" + str(resolution) +"\\" + str(app) + "\\" + str(img_format) + "\\" + str(algo) + "\\" + str(redundancy_bits) + "\\" + str(file+1) + ".jpg"    
    #print(downloaded)
    decoded_message = decode_message_with_redundancy(downloaded, 7, redundancy_bits)
    #print(downloaded)
    prr = calculate_prr(secret_message, decoded_message)    
    # ber = calculate_bit_error_rate(secret_message, decoded_message)   
    print(str(prr))
    # obj = {}
    # obj['path'] = path
    # obj['PRR'] = '{0:.2f}'.format(prr)
    # # obj['BER'] = '{0:.2f}'.format(ber)
    # stats.append(obj)  

#print(stats)

# df=pd.DataFrame(stats)
# df
 

LSB\RGB\high\whatsapp\png\resized\3\
100.0
100.0
100.0
100.0
100.0
100.0
100.0
100.0
100.0
100.0
