<a href="https://colab.research.google.com/github/Anjasfedo/eceg-lsb-lzw-huffman/blob/main/scenarios_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Prepare Data

In [63]:
!pip install faker -q

In [64]:
from faker import Faker
import random

class DummyKTPGenerator:
    def __init__(self):
        self.faker = Faker('id_ID')  # Use Indonesian locale
        self.indonesian_jobs = [
            "Guru", "Dokter", "Petani", "Nelayan", "Pegawai Negeri", "Karyawan Swasta",
            "Wiraswasta", "Mahasiswa", "Pelajar", "Pengacara", "Arsitek", "Insinyur",
            "Pedagang", "Polisi", "Tentara", "Seniman", "Penulis", "Pilot", "Supir",
            "Teknisi", "Pemadam Kebakaran", "Apoteker"
        ]

    def generate_ktp(self):
        """Generate a single dummy KTP record."""
        nik = self.generate_nik()
        name = self.faker.name()
        birth_place = self.faker.city()
        birth_date = self.faker.date_of_birth().strftime('%d-%m-%Y')
        gender = random.choice(['Laki-Laki', 'Perempuan'])
        blood_type = random.choice(['A', 'B', 'AB', 'O'])
        address = self.faker.address().replace('\n', ', ')
        rt_rw = f"{random.randint(1, 20)}/{random.randint(1, 20)}"
        kelurahan = self.faker.city_suffix()
        religion = random.choice(['Islam', 'Kristen', 'Katolik', 'Hindu', 'Buddha', 'Konghucu'])
        marital_status = random.choice(['Belum Kawin', 'Kawin', 'Cerai Hidup', 'Cerai Mati'])
        occupation = random.choice(self.indonesian_jobs)  # Select random Indonesian job
        nationality = 'WNI'  # Assuming all generated data is Indonesian
        valid_until = 'SEUMUR HIDUP'

        return {
            'NIK': nik,
            'Nama': name,
            'Tempat/Tgl Lahir': f"{birth_place}, {birth_date}",
            'Jenis Kelamin': gender,
            'Gol Darah': blood_type,
            'Alamat': address,
            'RT/RW': rt_rw,
            'Kel/Desa': kelurahan,
            'Agama': religion,
            'Status Perkawinan': marital_status,
            'Pekerjaan': occupation,
            'Kewarganegaraan': nationality,
            'Berlaku Hingga': valid_until,
        }

    def generate_nik(self):
        """Generate a dummy NIK (Indonesian identity number)."""
        province_code = random.randint(10, 34)  # Random province code
        regency_code = random.randint(1, 99)   # Random regency code
        district_code = random.randint(1, 99) # Random district code
        date_of_birth = self.faker.date_of_birth()
        birth_date_part = date_of_birth.strftime('%d%m%y')  # Format DDMMYY
        random_sequence = random.randint(1000, 9999)       # Random sequence number
        return f"{province_code:02}{regency_code:02}{district_code:02}{birth_date_part}{random_sequence:04}"

    def generate_multiple_ktps(self, count=1):
        """Generate multiple dummy KTP records."""
        return [self.generate_ktp() for _ in range(count)]

    @staticmethod
    def merge_ktp_data(ktp):
        """
        Merge a single KTP dictionary into a formatted string with '#' as a separator.
        Replace spaces with '%'.
        """
        fields = [
            ktp.get('NIK', ''),
            ktp.get('Nama', ''),
            ktp.get('Tempat/Tgl Lahir', ''),
            ktp.get('Jenis Kelamin', ''),
            ktp.get('Gol Darah', ''),
            ktp.get('Alamat', ''),
            ktp.get('RT/RW', ''),
            ktp.get('Kel/Desa', ''),
            ktp.get('Agama', ''),
            ktp.get('Status Perkawinan', ''),
            ktp.get('Pekerjaan', ''),
            ktp.get('Kewarganegaraan', ''),
            ktp.get('Berlaku Hingga', '')
        ]
        merged = '#'.join(fields)
        return merged.replace(' ', '%')

    @staticmethod
    def merge_multiple_ktps(ktps):
        """
        Merge multiple KTP dictionaries into formatted strings with '#' as a separator.
        Replace spaces with '%'.
        """
        return [DummyKTPGenerator.merge_ktp_data(ktp) for ktp in ktps]


In [65]:
generator = DummyKTPGenerator()

# Generate multiple dummy KTPs
dummy_ktps = generator.generate_multiple_ktps(count=5)

# Merge single KTP
merged_ktp = generator.merge_ktp_data(dummy_ktps[0])
print("Merged Single KTP:", merged_ktp)

# Merge multiple KTPs
merged_ktps = generator.merge_multiple_ktps(dummy_ktps)
print("Merged Multiple KTPs:")
for m_ktp in merged_ktps:
    print(m_ktp)

Merged Single KTP: 1075571201538811#R.A.%Victoria%Saptono#Metro,%31-10-1941#Laki-Laki#B#Jalan%Suniaraja%No.%2,%Madiun,%ST%59656#4/3#Ville#Hindu#Cerai%Mati#Karyawan%Swasta#WNI#SEUMUR%HIDUP
Merged Multiple KTPs:
1075571201538811#R.A.%Victoria%Saptono#Metro,%31-10-1941#Laki-Laki#B#Jalan%Suniaraja%No.%2,%Madiun,%ST%59656#4/3#Ville#Hindu#Cerai%Mati#Karyawan%Swasta#WNI#SEUMUR%HIDUP
1925620510365098#Puti%Padma%Yuliarti,%M.Ak#Sungai%Penuh,%30-10-1914#Perempuan#AB#Gg.%Ahmad%Dahlan%No.%60,%Bekasi,%KI%97295#6/14#Ville#Hindu#Cerai%Mati#Karyawan%Swasta#WNI#SEUMUR%HIDUP
2264171801661721#Anita%Haryanto#Cirebon,%14-09-1960#Perempuan#A#Gg.%Setiabudhi%No.%3,%Binjai,%Kalimantan%Timur%75239#18/3#Ville#Buddha#Cerai%Hidup#Seniman#WNI#SEUMUR%HIDUP
1862522306694339#drg.%Gambira%Winarno,%S.IP#Madiun,%09-05-1984#Perempuan#AB#Gang%Suryakencana%No.%16,%Lubuklinggau,%NB%49183#3/17#Ville#Islam#Cerai%Hidup#Teknisi#WNI#SEUMUR%HIDUP
2218933006122533#Ghaliyati%Simanjuntak#Subulussalam,%10-12-1936#Perempuan#A#Jl.%Cikutr

In [66]:
message_ktp = merged_ktps[0]
message_ktp

'1075571201538811#R.A.%Victoria%Saptono#Metro,%31-10-1941#Laki-Laki#B#Jalan%Suniaraja%No.%2,%Madiun,%ST%59656#4/3#Ville#Hindu#Cerai%Mati#Karyawan%Swasta#WNI#SEUMUR%HIDUP'

# Prepare Image

In [67]:
import requests
from PIL import Image
import io
import numpy as np

# URL to the raw image file
url = "https://raw.githubusercontent.com/mikolalysenko/lena/master/lena.png"

# Download the image
response = requests.get(url)
if response.status_code == 200:
    # Load the image using PIL
    lena_image = Image.open(io.BytesIO(response.content))
    lena_image.show()  # Display the image (optional)
    lena_image.save("lena.png")  # Save the image locally
else:
    print("Failed to download the image.")

In [68]:
import os
LENA_IMG = 'lena.png'

if not os.path.exists(LENA_IMG):
    raise FileNotFoundError(f"Image not found at {LENA_IMG}")

# Stego Metrics

In [69]:
import cv2
import numpy as np
from skimage.metrics import structural_similarity as skimage_ssim


class StegoMetrics:
    def __init__(self, ori_image):
        """
        Initialize the StegoMetrics class with the original image path.

        Args:
            ori_image (str): Path to the original image.
        """
        self.ori_image = ori_image

    @staticmethod
    def calculate_mse(original, stego):
        """
        Calculate the Mean Squared Error (MSE) between two images.

        Args:
            original (numpy.ndarray): Original image array.
            stego (numpy.ndarray): Stego image array.

        Returns:
            float: The MSE value.
        """
        return np.mean((original - stego) ** 2)

    def calculate_psnr(self, stego_image_path):
        """
        Calculate the Peak Signal-to-Noise Ratio (PSNR) between the original and stego image.

        Args:
            stego_image_path (str): Path to the stego image.

        Returns:
            float: The PSNR value.
        """
        # Read images
        original = cv2.imread(self.ori_image)
        stego = cv2.imread(stego_image_path)

        if original is None:
            raise ValueError(f"Failed to load original image from {self.ori_image}. Ensure the file exists and is a valid image format.")
        if stego is None:
            raise ValueError(f"Failed to load stego image from {stego_image_path}. Ensure the file exists and is a valid image format.")

        # Calculate MSE
        mse = self.calculate_mse(original, stego)
        if mse == 0:  # If images are identical
            return float('inf')

        # Calculate PSNR
        max_pixel_value = 255.0
        psnr = 10 * np.log10((max_pixel_value ** 2) / mse)
        return psnr

    @staticmethod
    def calculate_ssim(original, stego):
        """
        Calculate the Structural Similarity Index (SSIM) between two images.

        Args:
            original (numpy.ndarray): Original image array.
            stego (numpy.ndarray): Stego image array.

        Returns:
            float: The SSIM value.
        """
        original_gray = cv2.cvtColor(original, cv2.COLOR_BGR2GRAY)
        stego_gray = cv2.cvtColor(stego, cv2.COLOR_BGR2GRAY)

        ssim_value, _ = skimage_ssim(original_gray, stego_gray, full=True)
        return ssim_value


    def calculate_metrics(self, stego_image_path):
        """
        Calculate MSE, PSNR, and SSIM between the original and stego image.

        Args:
            stego_image_path (str): Path to the stego image.

        Returns:
            tuple: MSE, PSNR, and SSIM values.
        """
        # Read images
        original = cv2.imread(self.ori_image)
        stego = cv2.imread(stego_image_path)

        if original is None:
            raise ValueError(f"Failed to load original image from {self.ori_image}. Ensure the file exists and is a valid image format.")
        if stego is None:
            raise ValueError(f"Failed to load stego image from {stego_image_path}. Ensure the file exists and is a valid image format.")

        # Calculate metrics
        mse_value = self.calculate_mse(original, stego)
        psnr_value = self.calculate_psnr(stego_image_path)
        ssim_value = self.calculate_ssim(original, stego)

        print(f'Metrics between original ({self.ori_image}) and stego image ({stego_image_path}):')
        print(f'MSE: {mse_value}')
        print(f'PSNR: {psnr_value}')
        print(f'SSIM: {ssim_value}')

        return mse_value, psnr_value, ssim_value

In [70]:
original_image_path = 'lena.png'
stego_metrics = StegoMetrics(ori_image=original_image_path)

# Bits Information

In [71]:
def bits_information(bits):
    """
    Convert bits to kilobytes (KB), megabytes (MB), gigabytes (GB), and terabytes (TB).
    """
    bytes_value = bits / 8
    kb_value = bytes_value / 1024
    mb_value = kb_value / 1024
    gb_value = mb_value / 1024
    tb_value = gb_value / 1024

    print(f"{bits} bits is:")
    print(f"{kb_value:.2f} KB")
    print(f"{mb_value:.4f} MB")
    print(f"{gb_value:.6f} GB")
    print(f"{tb_value:.9f} TB")  # More precision for TB
    print()

# File Size Information

In [72]:
import os

def format_file_size(size_in_bytes):
    """
    Convert a file size in bytes to a human-readable format (KB, MB, GB, etc.).

    Args:
        size_in_bytes (int): The size of the file in bytes.

    Returns:
        str: The size in a human-readable format.
    """
    if size_in_bytes < 1024:
        return f"{size_in_bytes} Bytes"
    elif size_in_bytes < 1024 ** 2:
        return f"{size_in_bytes / 1024:.2f} KB"
    elif size_in_bytes < 1024 ** 3:
        return f"{size_in_bytes / (1024 ** 2):.2f} MB"
    elif size_in_bytes < 1024 ** 4:
        return f"{size_in_bytes / (1024 ** 3):.2f} GB"
    else:
        return f"{size_in_bytes / (1024 ** 4):.2f} TB"

def print_png_file_sizes(output_image_path, input_image_path='lena.png'):
    """
    Function to read and print the sizes of two PNG files in a human-readable format.

    Args:
        input_image_path (str): Full path to the input PNG file.
        output_image_path (str): Full path to the output PNG file.
    """
    for file_path in [input_image_path, output_image_path]:
        if os.path.isfile(file_path) and file_path.lower().endswith('.png'):
            file_size = os.path.getsize(file_path)
            human_readable_size = format_file_size(file_size)
            print(f"The size of the file '{file_path}' is: {human_readable_size}")
        else:
            print(f"File '{file_path}' not found or it is not a PNG file.")

# Bit & Message

In [73]:
def message_to_bit(message, bit_length=8):
    """
    Convert a text message into a binary string with a specified bit length per character.
    """
    return ''.join(format(ord(char), f'0{bit_length}b') for char in message)

def bit_to_message(bit_string, bit_length=8):
    """
    Convert a binary string back into a text message using the specified bit length per character.
    """
    if len(bit_string) % bit_length != 0:
        raise ValueError("Invalid bit string length for the specified bit length.")

    chars = [bit_string[i:i+bit_length] for i in range(0, len(bit_string), bit_length)]
    return ''.join(chr(int(char, 2)) for char in chars)

# Steganography

## Least Significant Bit

In [74]:
from PIL import Image
import numpy as np
import math


class LeastSignificantBit:
    def __init__(self, k_val=1):
        """
        Initialize the LeastSignificantBit class.

        Args:
            k_val (int): The number of least significant bits to use for embedding.
        """
        if not (1 <= k_val <= 8):
            raise ValueError("k_val must be between 1 and 8.")
        self.k_val = k_val

    def calculate_max_message_size(self, image_path):
        """
        Calculate the maximum number of characters that can be stored in an image
        using the dynamic encoding method.

        Returns:
            max_chars (int): The maximum number of characters that can be stored.
            max_bits (int): The total number of usable bits for message storage.
        """
        image = Image.open(image_path)
        img_data = np.array(image)

        # Calculate total bit capacity
        height, width, channels = img_data.shape
        total_capacity_bits = height * width * channels * self.k_val

        total_capacity_bits = total_capacity_bits - 32

        max_chars = total_capacity_bits // 8

        max_bits = total_capacity_bits

        # print(max_chars, max_bits)

        return max_chars, max_bits

    def message_to_bits(self, message_bits):
        """
        Convert a string message to a bit string, using a dynamically determined bit length for the message size.
        """
        message_length = len(message_bits)
        print(f"embbed message {message_length}")

        message_length_bits = format(message_length, f'032b')

        return message_length_bits + message_bits

    def bits_to_message(self, bits):
        """
        Convert a bit string back to a human-readable message, using dynamically encoded message length.
        """
        message_length_bit = bits[:32]

        message_length = int(message_length_bit, 2)
        print(f"extract message {message_length}")

        message_bits = bits[32:32 + message_length]

        print(len(bits))

        return message_bits

    @staticmethod
    def change_n_lsb(binary_number, new_lsbs, k_val):
        """
        Modify the n least significant bits (LSBs) of a binary number.
        """
        binary_literal = int(new_lsbs, 2)
        mask = ~((1 << k_val) - 1) & 0xFF
        return (binary_number & mask) | binary_literal

    def embed_message(self, input_image_path, output_image_path, message):
        """
        Modify pixel values of an image to embed a message.
        """
        message_bits = self.message_to_bits(message)

        image = Image.open(input_image_path)
        img_data = np.array(image)

        height, width, channels = img_data.shape
        max_chars, max_bits = self.calculate_max_message_size(input_image_path)
        if len(message_bits) > max_bits:
            raise ValueError(f"Message too long! max_bits: {max_bits} bits, Message: {len(message_bits)} bits.")

            # Calculate maximum capacity

        bit_idx = 0
        for h in range(height):
            for w in range(width):
                for c in range(channels):
                    if bit_idx < len(message_bits):
                        original_value = img_data[h, w, c]
                        bits_to_embed = message_bits[bit_idx:bit_idx + self.k_val]
                        bits_to_embed = bits_to_embed.ljust(self.k_val, '0')

                        img_data[h, w, c] = self.change_n_lsb(original_value, bits_to_embed, self.k_val)
                        bit_idx += self.k_val
                if bit_idx >= len(message_bits):
                    break
            if bit_idx >= len(message_bits):
                break

        stego_image = Image.fromarray(img_data)
        stego_image.save(output_image_path, format="PNG")
        return stego_image

    def extract_message(self, stego_image_path):
        """
        Extract a hidden message from an image that uses LSB encoding.
        """
        image = Image.open(stego_image_path)
        img_data = np.array(image)

        height, width, channels = img_data.shape
        extracted_bits = ""

        for h in range(height):
            for w in range(width):
                for c in range(channels):
                    pixel_value = img_data[h, w, c]
                    lsb_bits = format(pixel_value, '08b')[-self.k_val:]
                    extracted_bits += lsb_bits

        # Ensure that we are extracting the correct number of bits
        return self.bits_to_message(extracted_bits)



# Criptography

## Elliptic Curve El Gamal

In [75]:
import random

class Point:
    def __init__(self, x=None, y=None):
        self.x = x
        self.y = y

    def is_infinity(self):
        """Check if the point is the point at infinity."""
        return self.x is None and self.y is None

    def __eq__(self, other):
        """Custom equality check for Point objects."""
        if isinstance(other, Point):
            return self.x == other.x and self.y == other.y
        return False

    def __hash__(self):
        """Make Point hashable by defining a unique hash."""
        return hash((self.x, self.y))

    def __repr__(self):
        if self.is_infinity():
            return "Point at Infinity"
        return f"Point({self.x}, {self.y})"

class EllipticCurveElGamal:
  def __init__(self):
    self.a = 214
    self.b = 110
    self.p = 251
    # self.p = 233
    self.base_point = self.generate_random_valid_point()

    self.characters = [chr(i) for i in range(256)]
    # self.characters = [chr(i) for i in range(1, 256)]

    self.valid_points = self.get_all_points()
    self.point_to_char, self.char_to_point = self.create_mappings()

  def elliptic_curve_equation(self, x):
    return (x**3 + self.a*x + self.b) % self.p

  def is_on_curve(self, x, y):
        """Check if a point (x, y) lies on the curve."""
        if x is None or y is None:
            return True
        return (y**2 - (x**3 + self.a * x + self.b)) % self.p == 0

  def generate_random_valid_point(self):
        """Generate a random point that lies on the elliptic curve."""
        while True:
            x = random.randint(0, self.p - 1)
            y_squared = (x**3 + self.a * x + self.b) % self.p

            if pow(y_squared, (self.p - 1) // 2, self.p) == 1:
                for y in range(self.p):
                    if (y**2) % self.p == y_squared:
                        return Point(x, y)

  def calc_point_add(self, P, Q):
    """Calculate the addition of two points P and Q on the elliptic curve."""
    R = Point()

    if P.is_infinity():
        return Q
    if Q.is_infinity():
        return P

    if P.x == Q.x and (P.y != Q.y or P.y == 0):
        return Point()

    # Calculate slope
    if P.x == Q.x and P.y == Q.y:
        slope = (3 * P.x**2 + self.a) * pow(2 * P.y, -1, self.p) % self.p
    else:
        slope = (Q.y - P.y) * pow(Q.x - P.x, -1, self.p) % self.p

    R.x = (slope**2 - P.x - Q.x) % self.p

    R.y = (slope * (P.x - R.x) - P.y) % self.p

    return R

  def calc_point_doubling(self, P):
      """Calculate the point doubling 2P = P + P on the elliptic curve."""
      R = Point()

      if P.is_infinity() or P.y == 0:
          return Point()

      slope = (3 * P.x**2 + self.a) * pow(2 * P.y, -1, self.p) % self.p

      R.x = (slope**2 - 2 * P.x) % self.p

      R.y = (slope * (P.x - R.x) - P.y) % self.p

      return R

  def calc_point_subtraction(self, P, Q):
    """Calculate the subtraction of two points P - Q on the elliptic curve."""
    if Q.is_infinity():
        return P

    if P.is_infinity():
        Q_neg = Point(Q.x, (-Q.y) % self.p)
        return Q_neg

    Q_neg = Point(Q.x, (-Q.y) % self.p)

    return self.calc_point_add(P, Q_neg)


  def calc_point_multiplication(self, P, k):
    """Calculate kP using the double-and-add method."""
    R = Point()
    current_point = P

    while k > 0:
        if k % 2 == 1:
            R = self.calc_point_add(R, current_point)
        current_point = self.calc_point_add(current_point, current_point)
        k //= 2

    return R

  def generate_keys(self):
        """Generate a private and public key pair."""
        private_key = random.randint(1, self.p - 1)

        public_key = self.calc_point_multiplication(self.base_point, private_key)

        return private_key, public_key

  def encrypt(self, plaintext_point, public_key, k=None):
      """
      Encrypt a point on the elliptic curve using the public key.
      """
      if k is None:
          k = random.randint(1, self.p - 1)

      C1 = self.calc_point_multiplication(self.base_point, k)

      k_e2 = self.calc_point_multiplication(public_key, k)

      C2 = self.calc_point_add(plaintext_point, k_e2)

      return C1, C2


  def decrypt(self, C1, C2, private_key):
        """
        Decrypt a ciphertext pair (C1, C2) using the private key.
        """
        d_C1 = self.calc_point_multiplication(C1, private_key)

        plaintext_point = self.calc_point_subtraction(C2, d_C1)

        return plaintext_point

  def get_all_points(self):
      """
      Generate all valid points on the elliptic curve.
      """
      points = [Point()]
      for x in range(self.p):
          y_squared = self.elliptic_curve_equation(x)
          for y in range(self.p):
              if (y**2) % self.p == y_squared:
                  points.append(Point(x, y))

      return points

  def create_mappings(self):
    valid_points = [point for point in self.valid_points]

    if len(valid_points) != len(self.characters):
        raise ValueError("Mismatch between the number of valid points and characters.")

    point_to_char = {point: char for point, char in zip(valid_points, self.characters)}

    char_to_point = {char: point for point, char in point_to_char.items()}

    return point_to_char, char_to_point


  def encode_character(self, char):
        """Encode a character to a point on the elliptic curve."""
        if char not in self.char_to_point:
            raise ValueError(f"Character '{char}' not in mapping.")

        return self.char_to_point[char]

  def decode_point(self, point):
        """Decode a point on the elliptic curve to a character."""
        if point not in self.point_to_char:
            raise ValueError(f"Point '{point}' not in mapping.")

        return self.point_to_char[point]

  def encrypt_message(self, message, public_key):
      """
      Encrypt a message using the elliptic curve encryption scheme and return a character-based ciphertext.
      """
      ciphertext = ""

      for char in message:
          plaintext_point = self.encode_character(char)

          C1, C2 = self.encrypt(plaintext_point, public_key)

          encrypted_char_C1 = self.decode_point(C1)
          encrypted_char_C2 = self.decode_point(C2)

          ciphertext += encrypted_char_C1 + encrypted_char_C2

      return ciphertext

  # def decrypt_message(self, ciphertext, private_key):
  #     """
  #     Decrypt a ciphertext into its plaintext message using the private key.
  #     """
  #     plaintext = ""
  #     for i in range(0, len(ciphertext), 2):
  #         C1 = self.encode_character(ciphertext[i])
  #         C2 = self.encode_character(ciphertext[i + 1])

  #         decrypted_point = self.decrypt(C1, C2, private_key)
  #         char = self.decode_point(decrypted_point)
  #         plaintext += char

  #     return plaintext

  def decrypt_message(self, ciphertext, private_key):
    """
    Decrypt a ciphertext into its plaintext message using the private key.
    """
    if len(ciphertext) % 2 != 0:
        raise ValueError("Ciphertext length must be even to form valid (C1, C2) pairs.")

    plaintext = ""

    for i in range(0, len(ciphertext), 2):
        C1 = self.encode_character(ciphertext[i])
        C2 = self.encode_character(ciphertext[i + 1])

        decrypted_point = self.decrypt(C1, C2, private_key)
        char = self.decode_point(decrypted_point)
        plaintext += char

    return plaintext


# Compression

## Huffman Coding

In [76]:
import heapq
from collections import Counter


class HuffmanNode:
    def __init__(self, char, freq):
        self.char = char
        self.freq = freq
        self.left = None
        self.right = None

    def __lt__(self, other):
        return self.freq < other.freq


class HuffmanCoding:
    @staticmethod
    def build_frequency_table(text):
        """Build a frequency table for the given text."""
        return Counter(text)

    @staticmethod
    def build_huffman_tree(freq_table):
        """Build the Huffman Tree based on the frequency table."""
        heap = [HuffmanNode(char, freq) for char, freq in freq_table.items()]
        heapq.heapify(heap)

        while len(heap) > 1:
            left = heapq.heappop(heap)
            right = heapq.heappop(heap)
            merged = HuffmanNode(None, left.freq + right.freq)
            merged.left = left
            merged.right = right
            heapq.heappush(heap, merged)

        return heap[0] if heap else None

    @staticmethod
    def generate_huffman_codes(node, prefix='', codebook=None):
        """Recursively generate Huffman codes for each character."""
        if codebook is None:
            codebook = {}
        if node is None:
            return codebook

        if node.char is not None:
            codebook[node.char] = prefix
        else:
            HuffmanCoding.generate_huffman_codes(
                node.left, prefix + '0', codebook)
            HuffmanCoding.generate_huffman_codes(
                node.right, prefix + '1', codebook)

        return codebook

    @staticmethod
    def encode(text, codebook):
        """Encode the input text using the Huffman codebook."""
        bitstring = ''.join(codebook[char] for char in text)
        return bitstring

    @staticmethod
    def decode(bitstring, huffman_tree):
        """Decode the encoded bitstring back to the original text."""
        decoded_text = []
        node = huffman_tree

        for bit in bitstring:
            node = node.left if bit == '0' else node.right
            if node.char is not None:
                decoded_text.append(node.char)
                node = huffman_tree

        return ''.join(decoded_text)

    @staticmethod
    def build_huffman(text):
        """Build the Huffman tree, generate codes, and encode the text."""
        freq_table = HuffmanCoding.build_frequency_table(text)

        huffman_tree = HuffmanCoding.build_huffman_tree(freq_table)

        codebook = HuffmanCoding.generate_huffman_codes(huffman_tree)

        encoded_text = HuffmanCoding.encode(text, codebook)

        return encoded_text, huffman_tree

In [77]:
input_text = 'hello world!'

huffman = HuffmanCoding()

compressed_message, huffman_tree = huffman.build_huffman(input_text) # it take message and return bits

decoded_text = huffman.decode(compressed_message, huffman_tree) # it take bits and return message

assert input_text == decoded_text, "Decoded text does not match the original!"

input_text, decoded_text

('hello world!', 'hello world!')

In [78]:
compressed_message

'1110110101011011100001101000011111100'

## Lampel Ziv Welch

In [79]:
class LZW:
    def __init__(self):
        self.dictionary_size = 256

    def compress(self, input_string):
        """
        Compress a string using LZW algorithm and return a 32-bit encoded bit string.
        """
        if not input_string:
            return ""

        # Initialize dictionary
        dictionary = {chr(i): i for i in range(self.dictionary_size)}
        next_code = self.dictionary_size

        current_string = ""
        compressed_data = []

        for char in input_string:
            current_string_plus_char = current_string + char
            if current_string_plus_char in dictionary:
                current_string = current_string_plus_char
            else:
                compressed_data.append(dictionary[current_string])
                dictionary[current_string_plus_char] = next_code
                next_code += 1
                current_string = char

        print(f'code {next_code}')

        if current_string:
            compressed_data.append(dictionary[current_string])

        # Convert compressed codes to 32-bit binary strings
        bit_output = ''.join(format(code, '032b') for code in compressed_data)
        return bit_output

    def decompress(self, compressed_bits):
        """
        Decompress a 32-bit encoded bit string back into the original message.
        """
        if not compressed_bits:
            return ""

        # Convert 32-bit binary chunks back to integer codes
        compressed_data = [int(compressed_bits[i:i+32], 2) for i in range(0, len(compressed_bits), 32)]

        # Initialize dictionary
        dictionary = {i: chr(i) for i in range(self.dictionary_size)}
        next_code = self.dictionary_size

        current_code = compressed_data[0]
        decompressed_string = dictionary[current_code]
        current_string = decompressed_string

        for code in compressed_data[1:]:
            if code in dictionary:
                entry = dictionary[code]
            elif code == next_code:
                entry = current_string + current_string[0]
            decompressed_string += entry

            dictionary[next_code] = current_string + entry[0]
            next_code += 1
            current_string = entry

        return decompressed_string


# ASCII Bit Encoder

In [85]:
def encode_with_padding_info(bits):
    # Step 1: Calculate the padding needed to make the bit length a multiple of 8
    padding_needed = (8 - len(bits) % 8) % 8  # Number of bits to pad

    # Step 2: Embed the padding length in the first 8 bits
    padding_length_bits = format(padding_needed, '08b')  # 8-bit binary representation of padding length

    # Step 3: Pad the message with zeroes (if any padding is needed)
    padded_bits = bits + '0' * padding_needed

    # Step 4: Combine the padding information and the padded bits
    full_message = padding_length_bits + padded_bits

    # Step 5: Convert the full message into 8-bit chunks and encode as ASCII
    encoded_chars = [chr(int(full_message[i:i+8], 2)) for i in range(0, len(full_message), 8)]

    return ''.join(encoded_chars)

def decode_with_padding_info(encoded):
    # Step 1: Convert the encoded ASCII string back to a bit string
    bits = ''.join(format(ord(c), '08b') for c in encoded)

    # Step 2: Extract the first 8 bits as padding length
    padding_length_bits = bits[:8]
    padding_length = int(padding_length_bits, 2)

    # Step 3: Extract the actual message bits (without the padding)
    message_bits = bits[8:]  # Exclude the first 8 bits (padding length)

    # Step 4: Remove the padding bits
    message_bits = message_bits[:-padding_length] if padding_length > 0 else message_bits

    return message_bits

# Example usage:
# Input: a string of bits (no conversion from text)
bits = message_to_bit(repeated_data)
# bits = "0100010101010101010101000100101010101010100010111110100101"
# print(f"Original bits: {bits}")

# Encode the bit string with the embedded padding information
encoded_bits = encode_with_padding_info(bits)
# print(f"Encoded bits as ASCII: {encoded_bits}")

# Decode the encoded string to get the original bit string back (without padding)
decoded_bits = decode_with_padding_info(encoded_bits)
# print(f"Decoded bits: {decoded_bits}")

assert all(1 <= ord(c) <= 255 for c in encoded_bits), "Error: Non-ASCII-safe characters found!"
assert bits == decoded_bits

AssertionError: Error: Non-ASCII-safe characters found!

In [None]:
# def encode_with_padding_info(bits):
#     # Step 1: Calculate the padding needed to make the bit length a multiple of 8
#     padding_needed = (8 - len(bits) % 8) % 8  # Number of bits to pad

#     # Step 2: Embed the padding length in the first 8 bits
#     padding_length_bits = format(padding_needed, '08b')  # 8-bit binary representation of padding length

#     # Step 3: Pad the message with zeroes (if any padding is needed)
#     padded_bits = bits + '0' * padding_needed

#     # Step 4: Combine the padding information and the padded bits
#     full_message = padding_length_bits + padded_bits

#     # Step 5: Convert the full message into 8-bit chunks and encode as ASCII
#     encoded_chars = []
#     for i in range(0, len(full_message), 8):
#         byte = full_message[i:i+8]
#         if len(byte) == 8:  # Only create characters from full 8-bit chunks
#             encoded_chars.append(chr(int(byte, 2)))

#     # Join all encoded ASCII characters into a final string
#     return ''.join(encoded_chars)

# def decode_with_padding_info(encoded):
#     # Step 1: Convert the encoded ASCII string back to a bit string
#     bits = ''.join(format(ord(c), '08b') for c in encoded)

#     # Step 2: Extract the first 8 bits as padding length
#     padding_length_bits = bits[:8]
#     padding_length = int(padding_length_bits, 2)

#     # Step 3: Extract the actual message bits (without the padding)
#     message_bits = bits[8:]  # Exclude the first 8 bits (padding length)

#     # Step 4: Remove the padding bits
#     message_bits = message_bits[:-padding_length] if padding_length > 0 else message_bits

#     return message_bits

# def validate_ascii(encoded_bits):
#     invalid_chars = []
#     for char in encoded_bits:
#         if not (1 <= ord(char) <= 255):  # Check if the char is within valid ASCII range
#             invalid_chars.append(char)

#     if invalid_chars:
#         print(f"Invalid ASCII characters found: {invalid_chars}")
#         return False
#     return True

# # Example usage:
# # Define a bit string as an example (make sure it's a valid input)
# bits = message_to_bit(repeated_data)

# # Encode the bit string with the embedded padding information
# encoded_bits = encode_with_padding_info(bits)

# # Ensure that the encoded bits consist of valid ASCII characters
# assert validate_ascii(encoded_bits), "Error: Non-ASCII-safe characters found!"

# # Decode the encoded string to get the original bit string back (without padding)
# decoded_bits = decode_with_padding_info(encoded_bits)

# # Ensure that the original bit string is the same as the decoded bits
# assert bits == decoded_bits, "Error: Decoded bits do not match the original bits!"

In [None]:
# def validate_ascii(encoded_bits):
#     invalid_chars = []
#     for char in encoded_bits:
#         if not (1 <= ord(char) <= 255):  # Check if the char is within valid ASCII range
#             invalid_chars.append(char)

#     if invalid_chars:
#         print(f"Invalid ASCII characters found: {invalid_chars}")
#         return invalid_chars
#     return True

# lorem = validate_ascii(encoded_bits)[0]
# lorem

In [None]:
# def char_to_bits(char):
#     # Step 1: Get the ASCII value of the character using ord()
#     ascii_value = ord(char)

#     # Step 2: Convert the ASCII value to binary and pad it to 8 bits
#     bits = format(ascii_value, '08b')  # '08b' ensures it's padded to 8 bits

#     return bits

# # Example usage:
# char = lorem
# bit_representation = char_to_bits(char)
# print(f"Character: {char}, Bits: {bit_representation}")


In [81]:
# def encode_with_padding_info(bits):
#     # Step 1: Calculate the padding needed to make the bit length a multiple of 8
#     padding_needed = (8 - len(bits) % 8) % 8  # Number of bits to pad

#     # Step 2: Embed the padding length in the first 8 bits
#     padding_length_bits = format(padding_needed, '08b')  # 8-bit binary representation of padding length

#     # Step 3: Pad the message with zeroes (if any padding is needed)
#     padded_bits = bits + '0' * padding_needed

#     # Step 4: Combine the padding information and the padded bits
#     full_message = padding_length_bits + padded_bits

#     # Step 5: Convert the full message into 8-bit chunks and encode as ASCII
#     encoded_chars = []
#     for i in range(0, len(full_message), 8):
#         byte = full_message[i:i+8]
#         if len(byte) == 8:  # Only create characters from full 8-bit chunks
#             byte_value = int(byte, 2)
#             # Ensure the byte value is within the valid ASCII range (1-255)
#             if 1 <= byte_value <= 255:
#                 encoded_chars.append(chr(byte_value))
#             else:
#                 raise ValueError(f"Invalid ASCII value: {byte_value}")

#     # Join all encoded ASCII characters into a final string
#     return ''.join(encoded_chars)

# def decode_with_padding_info(encoded):
#     # Step 1: Convert the encoded ASCII string back to a bit string
#     bits = ''.join(format(ord(c), '08b') for c in encoded)

#     # Step 2: Extract the first 8 bits as padding length
#     padding_length_bits = bits[:8]
#     padding_length = int(padding_length_bits, 2)

#     # Step 3: Extract the actual message bits (without the padding)
#     message_bits = bits[8:]  # Exclude the first 8 bits (padding length)

#     # Step 4: Remove the padding bits
#     message_bits = message_bits[:-padding_length] if padding_length > 0 else message_bits

#     return message_bits

# # Helper function to ensure valid ASCII
# def validate_ascii(encoded_bits):
#     # Check if all characters in the encoded string are within the ASCII range (1-255)
#     return all(1 <= ord(c) <= 255 for c in encoded_bits)

# # Example usage:
# # Define a bit string as an example (make sure it's a valid input)
# bits = "0100010101010101010101000100101010101010100010111110100101"  # Example input bit string

# # Encode the bit string with the embedded padding information
# encoded_bits = encode_with_padding_info(bits)

# # Ensure that the encoded bits consist of valid ASCII characters
# if validate_ascii(encoded_bits):
#     print(f"Encoded bits as ASCII: {encoded_bits}")
# else:
#     print("Error: Non-ASCII-safe characters found!")

# # Decode the encoded string to get the original bit string back (without padding)
# decoded_bits = decode_with_padding_info(encoded_bits)

# # Ensure that the original bit string is the same as the decoded bits
# assert bits == decoded_bits, "Error: Decoded bits do not match the original bits!"

# # Output results for verification
# print(f"Original bits: {bits}")
# print(f"Decoded bits: {decoded_bits}")


Encoded bits as ASCII: EUTJªé@
Original bits: 0100010101010101010101000100101010101010100010111110100101
Decoded bits: 0100010101010101010101000100101010101010100010111110100101


In [None]:
# def encode_with_padding_info(bits):
#     # Step 1: Calculate the padding needed to make the bit length a multiple of 8
#     padding_needed = (8 - len(bits) % 8) % 8  # Number of bits to pad

#     # Step 2: Embed the padding length in the first 8 bits
#     padding_length_bits = format(padding_needed, '08b')  # 8-bit binary representation of padding length

#     # Step 3: Pad the message with zeroes (if any padding is needed)
#     padded_bits = bits + '0' * padding_needed

#     # Step 4: Combine the padding information and the padded bits
#     full_message = padding_length_bits + padded_bits

#     # Step 5: Convert the full message into 8-bit chunks and encode as ASCII
#     # encoded_chars = [chr(int(full_message[i:i+8], 2)) for i in range(0, len(full_message), 8)]

#     # return ''.join(encoded_chars)

#     return full_message

# def decode_with_padding_info(bits):
#     # Step 1: Convert the encoded ASCII string back to a bit string
#     # bits = ''.join(format(ord(c), '08b') for c in encoded)

#     # Step 2: Extract the first 8 bits as padding length
#     padding_length_bits = bits[:8]
#     padding_length = int(padding_length_bits, 2)

#     # Step 3: Extract the actual message bits (without the padding)
#     message_bits = bits[8:]  # Exclude the first 8 bits (padding length)

#     # Step 4: Remove the padding bits
#     message_bits = message_bits[:-padding_length] if padding_length > 0 else message_bits

#     return message_bits

# # Example usage:
# # Input: a string of bits (no conversion from text)
# bits = "0100010101010101010101000100101010101010100010111110100101"
# print(f"Original bits: {bits}")

# # Encode the bit string with the embedded padding information
# encoded_bits = encode_with_padding_info(bits)
# print(f"Encoded bits as ASCII: {encoded_bits}")

# # Decode the encoded string to get the original bit string back (without padding)
# decoded_bits = decode_with_padding_info(encoded_bits)
# print(f"Decoded bits: {decoded_bits}")

# assert all(1 <= ord(c) <= 255 for c in encoded_bits), "Error: Non-ASCII-safe characters found!"
# assert bits == decoded_bits
# assert len(encoded_bits) % 8 == 0

# Scenarios

## ECEGLSB

In [86]:
k = 4
lsb = LeastSignificantBit(k_val=k)

input_image_path = 'lena.png'
output_image_path = 'lsbeceg.png'

max_chars, max_bits = lsb.calculate_max_message_size(input_image_path)
repeated_data = (merged_ktps[0] * (((max_chars // 2) - 9000) // len(merged_ktps[0]) + 1))[:((max_chars // 2) - 9000)]

eceg = EllipticCurveElGamal()

private_key, public_key = eceg.generate_keys()

ciphertext = eceg.encrypt_message(repeated_data, public_key)

message = message_to_bit(ciphertext)

_ = lsb.embed_message(input_image_path, output_image_path, message)

extracted_message = lsb.extract_message(output_image_path)

return_message = bit_to_message(extracted_message)

decrypted_message = eceg.decrypt_message(return_message, private_key)

assert repeated_data == decrypted_message

bits_information(len(message_to_bit(repeated_data)))

stego_metrics.calculate_metrics(output_image_path)

print_png_file_sizes(output_image_path)

embbed message 3001696
extract message 3001696
3145728
1500848 bits is:
183.21 KB
0.1789 MB
0.000175 GB
0.000000171 TB

Metrics between original (lena.png) and stego image (lsbeceg.png):
MSE: 41.67235565185547
PSNR: 31.932323099502
SSIM: 0.8873803520231563
The size of the file 'lena.png' is: 468.53 KB
The size of the file 'lsbeceg.png' is: 542.42 KB


## HuffmanECEGLSB

In [87]:
k = 4
lsb = LeastSignificantBit(k_val=k)

input_image_path = 'lena.png'
output_image_path = 'lsbeceg.png'

max_chars, max_bits = lsb.calculate_max_message_size(input_image_path)
repeated_data = (merged_ktps[0] * (((max_chars // 2) + 80000) // len(merged_ktps[0]) + 1))[:((max_chars // 2) + 80000)]

huffman = HuffmanCoding()

compressed_message, huffman_tree = huffman.build_huffman(repeated_data)

valid_bit = encode_with_padding_info(compressed_message)

eceg = EllipticCurveElGamal()

private_key, public_key = eceg.generate_keys()

ciphertext = eceg.encrypt_message(valid_bit, public_key)

message = message_to_bit(ciphertext)

_ = lsb.embed_message(input_image_path, output_image_path, message)

extracted_message = lsb.extract_message(output_image_path)

assert message == extracted_message

return_message = bit_to_message(extracted_message)

decrypted_message = eceg.decrypt_message(return_message, private_key)

assert valid_bit == decrypted_message

decoded_bits = decode_with_padding_info(decrypted_message)

assert compressed_message == decoded_bits

decoded_text = huffman.decode(compressed_message, huffman_tree)

assert repeated_data == decoded_text

bits_information(len(message_to_bit(repeated_data)))

stego_metrics.calculate_metrics(output_image_path)

print_png_file_sizes(output_image_path)

embbed message 2884608
extract message 2884608
3145728
2212848 bits is:
270.12 KB
0.2638 MB
0.000258 GB
0.000000252 TB

Metrics between original (lena.png) and stego image (lsbeceg.png):
MSE: 40.09645462036133
PSNR: 32.099743874218966
SSIM: 0.8916505608695324
The size of the file 'lena.png' is: 468.53 KB
The size of the file 'lsbeceg.png' is: 539.60 KB


In [88]:
(2212848 - 1500848 ) / 2212848

0.3217573009985322

## LampelZivWelchECEGLSB

In [92]:
k = 4
lsb = LeastSignificantBit(k_val=k)

input_image_path = 'lena.png'
output_image_path = 'lsbeceg.png'

max_chars, max_bits = lsb.calculate_max_message_size(input_image_path)
repeated_data = (merged_ktps[0] * (((max_chars * 14) - 100) // len(merged_ktps[0]) + 1))[:((max_chars * 14) - 100)]
print(len(repeated_data))

lzw = LZW()

compressed_message = lzw.compress(repeated_data)

valid_bit = encode_with_padding_info(compressed_message)

eceg = EllipticCurveElGamal()

private_key, public_key = eceg.generate_keys()

ciphertext = eceg.encrypt_message(valid_bit, public_key)

message = message_to_bit(ciphertext)

_ = lsb.embed_message(input_image_path, output_image_path, message)

extracted_message = lsb.extract_message(output_image_path)

assert message == extracted_message

return_message = bit_to_message(message)

decrypted_message = eceg.decrypt_message(return_message, private_key)

assert valid_bit == decrypted_message

decoded_bits = decode_with_padding_info(decrypted_message)

assert compressed_message == decoded_bits

decoded_text = lzw.decompress(compressed_message)

assert repeated_data == decoded_text

bits_information(len(message_to_bit(repeated_data)))

stego_metrics.calculate_metrics(output_image_path)

print_png_file_sizes(output_image_path)

5504868
code 43147
embbed message 2745104
extract message 2745104
3145728
44038944 bits is:
5375.85 KB
5.2499 MB
0.005127 GB
0.000005007 TB

Metrics between original (lena.png) and stego image (lsbeceg.png):
MSE: 38.70089467366537
PSNR: 32.253593558659304
SSIM: 0.8986270877900612
The size of the file 'lena.png' is: 468.53 KB
The size of the file 'lsbeceg.png' is: 536.12 KB


In [93]:
(44038944 - 1500848 ) / 44038944

0.9659199820958468