Final Security Project by Elsana Kanybek and Joseph Lehman

In [None]:
!pip install gradio
!pip install pycryptodome
!pip install cryptography
!sudo apt-get install steghide
!pip install ipdb
%pdb on


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
steghide is already the newest version (0.5.1-15).
0 upgraded, 0 newly installed, 0 to remove and 49 not upgraded.
Automatic pdb calling has been turned ON


In [None]:
# Encryption and decryption logic for RSA

from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import serialization, hashes

# Generate a private key with 2048 bits
def generate_rsa_keys():
    private_key = rsa.generate_private_key(
        public_exponent=65537,  # Standard value for RSA
        key_size=2048
    )
    public_key = private_key.public_key()
    return private_key, public_key

# Encrypting the message using RSA
def encrypt_rsa_message(message, public_key):
    encrypted_message = public_key.encrypt(
        message,
        padding.OAEP(
            mgf=padding.MGF1(algorithm=hashes.SHA256()),  # Mask Generation Function
            algorithm=hashes.SHA256(),
            label=None
        )
    )
    return encrypted_message

# Decrypting the message using RSA
def decrypt_rsa_message(encrypted_message, private_key):
    decrypted_message = private_key.decrypt(
        encrypted_message,
        padding.OAEP(
            mgf=padding.MGF1(algorithm=hashes.SHA256()),
            algorithm=hashes.SHA256(),
            label=None
        )
    )
    return decrypted_message

In [None]:
# Encryption and decryption for ECC

from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicNumbers
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import os

iv = b"123456789012"

# Generate a private key using the secp384r1 curve
def generate_ecc_keys():
    private_key = ec.generate_private_key(ec.SECP384R1())
    public_key = private_key.public_key()
    return private_key, public_key

# Encrypring the message using the ECC's derived symmetric key(AES-GCM)
def encrypt_ecc_message(message, ephemeral_private_key, recipient_public_key):
    # Generating a shared key
    shared_key = ephemeral_private_key.exchange(ec.ECDH(), recipient_public_key)

    derived_key = HKDF(
        algorithm=hashes.SHA256(),
        length=32,
        salt=None,
        info=b"encryption key"
    ).derive(shared_key) # symmetric encryption key from the shared key

    cipher = Cipher(algorithms.AES(derived_key), modes.GCM(iv))
    encryptor = cipher.encryptor()
    ciphertext = encryptor.update(message) + encryptor.finalize()

    auth_tag = encryptor.tag
    return ciphertext, auth_tag

# Decrypting the message using ECC
def decrypt_ecc_message(ciphertext, auth_tag, ephemeral_public_key, recipient_private_key):
    # Get the shared key using the recipient's private key and the sender's ephemeral public key
    shared_key = recipient_private_key.exchange(ec.ECDH(), ephemeral_public_key)

    derived_key = HKDF(
        algorithm=hashes.SHA256(),
        length=32,
        salt=None,
        info=b"encryption key"
    ).derive(shared_key) # symmetric encryption key

    # Decrypt the message
    cipher = Cipher(algorithms.AES(derived_key), modes.GCM(iv, auth_tag))
    decryptor = cipher.decryptor()
    plaintext = decryptor.update(ciphertext) + decryptor.finalize()
    return plaintext


In [None]:
# Encryption and decryption for X25519

from cryptography.hazmat.primitives.asymmetric import x25519
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import os

iv = b"123456789012"


# Generating X25519 Keys
def generate_x25519_keys():
    private_key = x25519.X25519PrivateKey.generate()
    public_key = private_key.public_key()
    return private_key, public_key


# Encrypting a Message with shared key using X25519 key exchange
def encrypt_x25519_message(message, sender_private_key, recipient_public_key):
    shared_key = sender_private_key.exchange(recipient_public_key)

    derived_key = HKDF(
        algorithm=hashes.SHA256(),
        length=32,
        salt=None,
        info=b"encryption",
    ).derive(shared_key)

    # Encrypt the message
    cipher = Cipher(algorithms.AES(derived_key), modes.GCM(iv))
    encryptor = cipher.encryptor()
    ciphertext = encryptor.update(message) + encryptor.finalize()
    return ciphertext, encryptor.tag


# Decrypting a Message
def decrypt_x25519_message(ciphertext, tag, recipient_private_key, sender_public_key):
    shared_key = recipient_private_key.exchange(sender_public_key)

    derived_key = HKDF(
        algorithm=hashes.SHA256(),
        length=32,
        salt=None,
        info=b"encryption",
    ).derive(shared_key)

    # Decrypt the message
    cipher = Cipher(algorithms.AES(derived_key), modes.GCM(iv, tag))
    decryptor = cipher.decryptor()
    plaintext = decryptor.update(ciphertext) + decryptor.finalize()
    return plaintext



In [None]:
# Encryption and decryption general methods

from cryptography.hazmat.primitives import serialization
import base64

def private_key_to_string(key):
    private_key_der = key.private_bytes(
        encoding=serialization.Encoding.DER,
        format=serialization.PrivateFormat.PKCS8,
        encryption_algorithm=serialization.NoEncryption()
    )
    return base64.b64encode(private_key_der).decode('utf-8') # binary data to Base64

def public_key_to_string(key):
    public_key_der = key.public_bytes(
        encoding=serialization.Encoding.DER,
        format=serialization.PublicFormat.SubjectPublicKeyInfo
    )
    return base64.b64encode(public_key_der).decode('utf-8') # binary data to Base64


def string_to_private_key(key_string):
    key_bytes = base64.b64decode(key_string)
    return serialization.load_der_private_key(key_bytes, password=None) # Base64 to raw bytes


def string_to_public_key(key_string):
    key_bytes = base64.b64decode(key_string)
    return serialization.load_der_public_key(key_bytes) # Base64 to raw bytes

In [None]:
# Stenagraphy
import subprocess

steganography_password = "password"

# Embedding the encrypted message into an image
def embed_data(cover_file, data_file, output_file):
    command = [
        "steghide", "embed",
        "-cf", cover_file,
        "-ef", data_file,
        "-sf", output_file,
        "-p", steganography_password
    ]
    try:
        subprocess.run(command, check=True)
        print(f"Data embedded successfully into {output_file}")
    except subprocess.CalledProcessError as e:
        print(f"Error during embedding: {e}")


def extract_data(stego_file, output_file):
    command = [
        "steghide", "extract",
        "-sf", stego_file,
        "-xf", output_file,
        "-p", steganography_password
    ]
    try:
        subprocess.run(command, check=True)
        print(f"Data extracted successfully to {output_file}")
    except subprocess.CalledProcessError as e:
        print(f"Error during extraction: {e}")

In [None]:
# Enums for encryption types
from enum import Enum

class EncryptionType(Enum):
    RSA = 1
    ECC = 2
    X25519 = 3

In [None]:
import gradio as gr
import ipdb
import os

# Depends on the user's choice on the encryption logic
def update_encryption_logic(selected_type):
    if selected_type == "RSA":
        return EncryptionType.RSA
    elif selected_type == "ECC":
        return EncryptionType.ECC
    elif selected_type == "X25519":
        return EncryptionType.X25519

# Clears all the fields passed
def clear_fields():
    return (
        gr.update(value=None),  # Example: encrypt_image_input
        gr.update(value=""),
        gr.update(value=""),
        gr.update(value=""),
        gr.update(value=None),
        gr.update(value=""),
        gr.update(value=""),
        gr.update(value=""),
        gr.update(value=None),
        gr.update(value=""),
        gr.update(value=""),
        gr.update(value=""),
        gr.update(value=""),
        gr.update(value=""),
        gr.update(value=""),
        gr.update(value="")
    )

# Encrypting using RSA
def encrypt_image(image, message, recipient_public_key):
    if not image:
        return None, "Please upload an image."
    if not message.strip():
        return None, "Message cannot be empty."
    if not recipient_public_key.strip():
        return None, "Recipient's public key cannot be empty."

    encrypted_message = encrypt_rsa_message(message.encode(), string_to_public_key(recipient_public_key))

    with open("message.txt", "w") as file:
        file.write(base64.b64encode(encrypted_message).decode('utf-8'))

    if os.path.exists("encrypted-image.jpg"):
        os.remove("encrypted-image.jpg")

    embed_data(image, "message.txt", "encrypted-image.jpg")
    secret_image = gr.Image(value="encrypted-image.jpg", label="Image with message")
    encrypted_message_b64 = base64.b64encode(encrypted_message).decode('utf-8')
    return secret_image, encrypted_message_b64, ""  # Return an empty string as the second output for RSA

# Decryption using RSA
def decrypt_image(image, recipient_private_key):
    if not image:
        return "Please upload an encrypted image."

    if os.path.exists("output.txt"):
        os.remove("output.txt")

    extract_data(image, "output.txt")
    try:
        # Reading the encrypted message from the output file
        with open("output.txt", "r") as file:
            encrypted_message_b64 = file.read().strip()

        encrypted_message = base64.b64decode(encrypted_message_b64)   # Base64 to bytes

        private_key = string_to_private_key(recipient_private_key)

        decrypted_message = decrypt_rsa_message(encrypted_message, private_key)

        return decrypted_message.decode('utf-8')  # Bytes to string
    except Exception as e:
        return f"Decryption error: {str(e)}"


#Encryption using ECC
def encrypt_image_ecc(image, message, recipient_public_key, sender_private_key):
    if not image:
        return None, "Please upload an image."
    if not message.strip():
        return None, "Message cannot be empty."
    if not recipient_public_key.strip():
        return None, "Recipient's public key cannot be empty."
    if not sender_private_key.strip():
        return None, "Sender's private key is needed for ecc encryption"

    private_key = string_to_private_key(sender_private_key)
    public_key = string_to_public_key(recipient_public_key)

    encrypted_message, auth_tag = encrypt_ecc_message(message.encode(), private_key, public_key)

    with open("message.txt", "w") as file:
       file.write(base64.b64encode(encrypted_message).decode('utf-8'))

    if os.path.exists("encrypted-image.jpg"):
        os.remove("encrypted-image.jpg")

    embed_data(image, "message.txt", "encrypted-image.jpg")
    secret_image = image_component = gr.Image(value="encrypted-image.jpg", label="Image with message")
    encrypted_message_b64 = base64.b64encode(encrypted_message).decode('utf-8')
    return secret_image, encrypted_message_b64, base64.b64encode(auth_tag).decode('utf-8')

# Decryption using ECC
def decrypt_image_ecc(image, recipient_private_key, sender_public_key, auth_tag):
    if not image:
        return "Please upload an encrypted image."
    if not recipient_private_key:
        return None, "Recipient private key is required"
    if not sender_public_key:
        return None, "Sender public key is required"
    if not auth_tag:
        return None, "Auth tag is required"

    if os.path.exists("output.txt"):
        os.remove("output.txt")

    extract_data(image, "output.txt")
    try:
        # Read the encrypted message from the output file
        with open("output.txt", "r") as file:
            encrypted_message_b64 = file.read().strip()

        encrypted_message = base64.b64decode(encrypted_message_b64) # Base64 to bytes

        private_key = string_to_private_key(recipient_private_key)

        public_key = string_to_public_key(sender_public_key)

        decrypted_message = decrypt_ecc_message(encrypted_message, base64.b64decode(auth_tag), public_key, private_key)

        return decrypted_message.decode('utf-8')  # Convert bytes to string
    except Exception as e:
        return f"Decryption error: {str(e)}"


# Encryption using x25519
def encrypt_image_x25519(image, message, recipient_public_key, sender_private_key):
    if not image:
        return None, "Please upload an image."
    if not message.strip():
        return None, "Message cannot be empty."
    if not recipient_public_key.strip():
        return None, "Recipient's public key cannot be empty."
    if not sender_private_key.strip():
        return None, "Sender's private key is needed for ecc encryption"

    private_key = string_to_private_key(sender_private_key)
    public_key = string_to_public_key(recipient_public_key)

    encrypted_message, auth_tag = encrypt_x25519_message(message.encode(), private_key, public_key)

    with open("message.txt", "w") as file:
       file.write(base64.b64encode(encrypted_message).decode('utf-8'))

    if os.path.exists("encrypted-image.jpg"):
        os.remove("encrypted-image.jpg")

    embed_data(image, "message.txt", "encrypted-image.jpg")
    secret_image = image_component = gr.Image(value="encrypted-image.jpg", label="Image with message")
    encrypted_message_b64 = base64.b64encode(encrypted_message).decode('utf-8')
    return secret_image, encrypted_message_b64, base64.b64encode(auth_tag).decode('utf-8')

# Decryption using x25519
def decrypt_image_x25519(image, recipient_private_key, sender_public_key, auth_tag):
    if not image:
        return "Please upload an encrypted image."
    if not recipient_private_key:
        return None, "Recipient private key is required"
    if not sender_public_key:
        return None, "Sender public key is required"
    if not auth_tag:
        return None, "Auth tag is required"

    if os.path.exists("output.txt"):
        os.remove("output.txt")

    extract_data(image, "output.txt")
    try:
        # Read the encrypted message from the output file
        with open("output.txt", "r") as file:
            encrypted_message_b64 = file.read().strip()

        encrypted_message = base64.b64decode(encrypted_message_b64) #Base64 to bytes

        private_key = string_to_private_key(recipient_private_key)

        public_key = string_to_public_key(sender_public_key)

        decrypted_message = decrypt_x25519_message(encrypted_message, base64.b64decode(auth_tag), private_key, public_key)

        return decrypted_message.decode('utf-8')  # Convert bytes to string
    except Exception as e:
        return f"Decryption error: {str(e)}"


def generate_keys(private_key_textbox, public_key_textbox):
    private_key, public_key = generate_rsa_keys()
    private_key_string = private_key_to_string(private_key)
    public_key_string = public_key_to_string(public_key)
    return private_key_string, public_key_string


def generate_ecc_keys_for_user(private_key_textbox, public_key_textbox):
    private_key, public_key = generate_ecc_keys()
    private_key_string = private_key_to_string(private_key)
    public_key_string = public_key_to_string(public_key)
    return private_key_string, public_key_string


def generate_x25519_keys_for_user(private_key_textbox, public_key_textbox):
    private_key, public_key = generate_x25519_keys()
    private_key_string = private_key_to_string(private_key)
    public_key_string = public_key_to_string(public_key)
    return private_key_string, public_key_string


# Test RSA encryption
def test_rsa_encryption(message, public_key_string):
    try:
        public_key = string_to_public_key(public_key_string)
        encrypted_message = encrypt_rsa_message(message.encode(), public_key)
        return base64.b64encode(encrypted_message).decode('utf-8')
    except Exception as e:
        return f"Encryption error: {e}"

# Test ECC encryption
def test_ecc_encryption(message, recipient_public_key, sender_private_key):
    try:
        private_key = string_to_private_key(sender_private_key)
        public_key = string_to_public_key(recipient_public_key)
        encrypted_message, auth_tag = encrypt_ecc_message(message.encode(), private_key, public_key)
        return base64.b64encode(encrypted_message).decode('utf-8'), base64.b64encode(auth_tag).decode('utf-8')
    except Exception as e:
        return f"Encryption error: {e}"

# Test x25519 encryption
def test_x25519_encryption(message, recipient_public_key, sender_private_key):
    try:
        private_key = string_to_private_key(sender_private_key)
        public_key = string_to_public_key(recipient_public_key)
        encrypted_message, auth_tag = encrypt_x25519_message(message.encode(), private_key, public_key)
        return base64.b64encode(encrypted_message).decode('utf-8'), base64.b64encode(auth_tag).decode('utf-8')
    except Exception as e:
        return f"Encryption error: {e}"


# Generate encryption keys
def generate_keys_logic(choice, private_key_textbox=None, public_key_textbox=None):
    if choice == "RSA":
        private_key, public_key = generate_keys(private_key_textbox, public_key_textbox)
    elif choice == "ECC":
        private_key, public_key = generate_ecc_keys_for_user(private_key_textbox, public_key_textbox)
    elif choice == "X25519":
        private_key, public_key = generate_x25519_keys_for_user(private_key_textbox, public_key_textbox)
    else:
        private_key, public_key = "", ""  # Fallback in case of an unknown option
    return private_key, public_key

# Gradio Interface
with gr.Blocks() as demo:
    gr.Markdown("## Image Steganography: Encrypt and Decrypt Images")

    # Dropdown for selecting encryption logic
    dropdown = gr.Dropdown(
        label="Choose an encryption logic",
        choices=["RSA", "ECC", "X25519"],
        value="RSA",  # Default value
    )

    with gr.Row():
        with gr.Column():
            gr.Markdown("### Encrypt Image")
            encrypt_image_input = gr.Image(type="filepath", label="Upload Image")
            encrypt_message_input = gr.Textbox(label="Message to Encrypt")
            sender_private_key_input = gr.Textbox(label="Your Private Key", visible=False)  # Initially hidden
            recipient_public_key_input = gr.Textbox(label="Recipient's Public Key")
            encrypt_button = gr.Button("Encrypt")
            encrypted_image_output = gr.File(label="Download Encrypted Image")
            auth_key_output = gr.Textbox(label="Auth Key:", visible=False)  # Initially hidden
            encrypted_message_display = gr.Textbox(label="Encrypted Message (for testing)", interactive=False)

        with gr.Column():
            gr.Markdown("### Decrypt Image")
            decrypt_image_input = gr.Image(type="filepath", label="Upload Encrypted Image")
            recipient_private_key_input = gr.Textbox(label="Recipient's Private Key")
            sender_public_key_input = gr.Textbox(label="Sender's Public Key", visible=False)  # Initially hidden
            auth_tag_input = gr.Textbox(label="Auth Tag", visible=False)  # Initially hidden
            decrypt_button = gr.Button("Decrypt")
            decrypted_message_output = gr.Textbox(label="Decrypted Message")

            gr.Markdown("### Encryption Keys")
            private_key_textbox = gr.Textbox(label="Private Key")
            public_key_textbox = gr.Textbox(label="Public Key")
            generate_key_button = gr.Button("Generate")

    # Logic to handle dropdown selection
    def update_ui(choice):
        if choice == "RSA":
            return gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)
        elif choice in ["ECC", "X25519"]:
            return gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True)

    # Dropdown change event: update UI and clear fields
    dropdown.change(
        lambda choice: (
            update_ui(choice) + clear_fields()
        ),
        inputs=[dropdown],
        outputs=[
            sender_private_key_input, auth_key_output, sender_public_key_input, auth_tag_input,  # For `update_ui`
            encrypt_image_input, encrypt_message_input, sender_private_key_input, recipient_public_key_input,
            decrypt_image_input, recipient_private_key_input, sender_public_key_input, auth_tag_input,
            encrypted_image_output, auth_key_output,  # For `clear_fields`
            decrypted_message_output, private_key_textbox, public_key_textbox, encrypted_message_display
        ]
    )

    # Button actions
    encrypt_button.click(
        lambda choice, image, message, recipient_key, sender_key=None: (
            encrypt_image(image, message, recipient_key) if choice == "RSA" else
            encrypt_image_ecc(image, message, recipient_key, sender_key) if choice == "ECC" else
            encrypt_image_x25519(image, message, recipient_key, sender_key)
        ),
        inputs=[dropdown, encrypt_image_input, encrypt_message_input, recipient_public_key_input, sender_private_key_input],
        outputs=[encrypted_image_output, encrypted_message_display] if dropdown == "RSA" else [encrypted_image_output, encrypted_message_display, auth_key_output],  # Handle RSA separately
    )

    decrypt_button.click(
        lambda choice, image, recipient_key, sender_key=None, auth_tag=None:
        (decrypt_image(image, recipient_key) if choice == "RSA" else
        decrypt_image_ecc(image, recipient_key, sender_key, auth_tag) if choice == "ECC" else
        decrypt_image_x25519(image, recipient_key, sender_key, auth_tag)),
        inputs=[dropdown, decrypt_image_input, recipient_private_key_input, sender_public_key_input, auth_tag_input],
        outputs=[decrypted_message_output],
    )

    generate_key_button.click(
        generate_keys_logic,
        inputs=[dropdown],
        outputs=[private_key_textbox, public_key_textbox],
    )

demo.launch(debug=True)


Running Gradio in a Colab notebook requires sharing enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://7cd00faf3679df90b1.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


Data embedded successfully into encrypted-image.jpg
Data extracted successfully to output.txt


    Output components:
        [textbox, textbox, textbox, textbox, image, textbox, textbox, textbox, image, textbox, textbox, textbox, file, textbox, textbox, textbox, textbox, textbox]
    Output values returned:
        [{'__type__': 'update', 'visible': True}, {'__type__': 'update', 'visible': True}, {'__type__': 'update', 'visible': True}, {'__type__': 'update', 'visible': True}, {'value': None, '__type__': 'update'}, {'value': '', '__type__': 'update'}, {'value': '', '__type__': 'update'}, {'value': '', '__type__': 'update'}, {'value': None, '__type__': 'update'}, {'value': '', '__type__': 'update'}, {'value': '', '__type__': 'update'}, {'value': '', '__type__': 'update'}, {'value': None, '__type__': 'update'}, {'value': '', '__type__': 'update'}, {'value': '', '__type__': 'update'}, {'value': '', '__type__': 'update'}, {'value': '', '__type__': 'update'}, {'value': '', '__type__': 'update'}, {'value': '', '__type__': 'update'}, {'value': '', '__type__': 'update'}]
    Output com

Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://7cd00faf3679df90b1.gradio.live


