In [None]:
#IBlink old algo

import os
import time
import hashlib
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, x448
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazma ddt.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.backends import default_backend

class QuantumEclipse:
    """Base class for quantum-resistant encryption system."""
    def __init__(self, security_level=1):
        self.security_level = security_level
        self.keys = self._generate_keys()
        self.public_keys = [info["public_key"] for info in self.keys]

    def _generate_keys(self):
        """Generate key pairs based on security level."""
        keys = []
        if self.security_level >= 1:
            keys.append({
                "type": "EC",
                "curve": ec.SECP384R1(),
                "private_key": ec.generate_private_key(ec.SECP384R1(), default_backend()),
            })
        if self.security_level >= 2:
            keys.append({
                "type": "EC",
                "curve": ec.SECP521R1(),
                "private_key": ec.generate_private_key(ec.SECP521R1(), default_backend()),
            })
        if self.security_level >= 3:
            keys.append({
                "type": "X448",
                "curve": None,
                "private_key": x448.X448PrivateKey.generate(),
            })
        for key_info in keys:
            private_key = key_info["private_key"]
            key_info["public_key"] = private_key.public_key()
        return keys

class QuantumEclipseV2(QuantumEclipse):
    """Enhanced version with hybrid key exchange and AES-GCM."""
    def _hybrid_key_exchange(self, peer_public_keys):
        """Enhanced hybrid key exchange with type checking.

        Args:
            peer_public_keys (list): Peer public keys.

        Returns:
            bytes: Mixed shared secret.
        """
        shared_secrets = []
        for key_info in self.keys:
            local_key = key_info["private_key"]
            key_type = key_info["type"]
            for peer_key in peer_public_keys:
                try:
                    if key_type == "EC" and isinstance(peer_key, ec.EllipticCurvePublicKey):
                        if local_key.curve.name == peer_key.curve.name:
                            shared_secret = local_key.exchange(ec.ECDH(), peer_key)
                            shared_secrets.append(shared_secret)
                    elif key_type == "X448" and isinstance(peer_key, x448.X448PublicKey):
                        shared_secret = local_key.exchange(peer_key)
                        shared_secrets.append(shared_secret)
                except Exception as e:
                    print(f"Key exchange error: {e}")
                    continue
        if not shared_secrets:
            raise ValueError("No compatible key pairs found")
        print(f"Number of shared secrets generated: {len(shared_secrets)}")
        return self._secret_mixer(shared_secrets)

    def _secret_mixer(self, secrets):
        """Mix shared secrets deterministically.

        Args:
            secrets (list): List of shared secrets.

        Returns:
            bytes: 32-byte mixed secret for AES-256.
        """
        mixed_secret = hashlib.shake_256()
        for secret in secrets:
            mixed_secret.update(secret)
        return mixed_secret.digest(32)

    def encrypt(self, peer_public_keys, plaintext, associated_data=None):
        """Encrypt data using hybrid key exchange and AES-GCM.

        Args:
            peer_public_keys (list): Peer public keys.
            plaintext (bytes): Data to encrypt.
            associated_data (bytes, optional): Data for authentication.

        Returns:
            tuple: (ciphertext, tag, metadata)
        """
        nonce = os.urandom(12)
        encryption_key = self._hybrid_key_exchange(peer_public_keys)
        print(f"Encryption key hash: {hashlib.sha256(encryption_key).hexdigest()}")
        cipher = Cipher(
            algorithms.AES(encryption_key),
            modes.GCM(nonce),
            backend=default_backend()
        ).encryptor()
        if associated_data:
            cipher.authenticate_additional_data(associated_data)
        ciphertext = cipher.update(plaintext) + cipher.finalize()
        metadata = {
            'keys_used': [info["curve"].name if info["type"] == "EC" else "X448" for info in self.keys],
            'security_level': self.security_level,
            'nonce': nonce,
            'timestamp': time.time()
        }
        return ciphertext, cipher.tag, metadata

    def decrypt(self, peer_public_keys, ciphertext, tag, metadata=None):
        """Decrypt data using hybrid key exchange and AES-GCM.

        Args:
            peer_public_keys (list): Peer public keys.
            ciphertext (bytes): Encrypted data.
            tag (bytes): Authentication tag.
            metadata (dict, optional): Encryption metadata.

        Returns:
            bytes: Decrypted plaintext.
        """
        if not metadata or 'nonce' not in metadata:
            raise ValueError("Missing nonce in metadata")
        encryption_key = self._hybrid_key_exchange(peer_public_keys)
        print(f"Decryption key hash: {hashlib.sha256(encryption_key).hexdigest()}")
        cipher = Cipher(
            algorithms.AES(encryption_key),
            modes.GCM(metadata['nonce'], tag),
            backend=default_backend()
        ).decryptor()
        try:
            plaintext = cipher.update(ciphertext) + cipher.finalize()
            return plaintext
        except Exception as e:
            print(f"Decryption failed with error: {e}")
            raise

def fog_computing_demo():
    """Demonstrate QuantumEclipseV2 in a fog computing scenario."""
    print("\nInitializing QuantumEclipseV2 Encryption System...")
    edge_node = QuantumEclipseV2(security_level=3)
    fog_node1 = QuantumEclipseV2(security_level=3)
    fog_node2 = QuantumEclipseV2(security_level=3)  # Unused
    sensor_data = b"Critical sensor readings: Temperature=25.6C, Pressure=1013hPa, Humidity=65%"
    print(f"\nOriginal Data: {sensor_data.decode()}")

    print("\nPerforming encryption with hybrid scheme...")
    start_time = time.time()
    try:
        print("Starting encryption...")
        encrypted_data, tag, metadata = edge_node.encrypt(fog_node1.public_keys, sensor_data)
        encryption_time = time.time() - start_time
        print(f"Encryption completed in {encryption_time*1000:.2f}ms")
        print(f"Encrypted Size: {len(encrypted_data)} bytes")
        print(f"Compression Ratio: {len(sensor_data)/len(encrypted_data):.2f}x")
        print(f"Keys used: {metadata['keys_used']}")

        print("\nPerforming decryption at fog node...")
        start_time = time.time()
        print("Starting decryption...")
        decrypted_data = fog_node1.decrypt(edge_node.public_keys, encrypted_data, tag, metadata)
        decryption_time = time.time() - start_time
        print(f"Decryption completed in {decryption_time*1000:.2f}ms")
        print(f"\nDecrypted Data at Fog Node 1: {decrypted_data.decode()}")

        print(f"\nPerformance Metrics:")
        print(f"Average Round-Trip Time: {(encryption_time + decryption_time)*1000:.2f}ms")
        print(f"Throughput: {len(sensor_data)/(encryption_time + decryption_time)/1024:.2f} KB/s")
    except Exception as e:
        print(f"Error during encryption/decryption: {e}")

if __name__ == "__main__":
    fog_computing_demo()


Initializing QuantumEclipseV2 Encryption System...

Original Data: Critical sensor readings: Temperature=25.6C, Pressure=1013hPa, Humidity=65%

Performing encryption with hybrid scheme...
Starting encryption...
Number of shared secrets generated: 3
Encryption key hash: 60e9f65a55c2b4f3e1cd508d3c647e978571d21c5951c21632eb55f8ec469992
Encryption completed in 2.23ms
Encrypted Size: 75 bytes
Compression Ratio: 1.00x
Keys used: ['secp384r1', 'secp521r1', 'X448']

Performing decryption at fog node...
Starting decryption...
Number of shared secrets generated: 3
Decryption key hash: 60e9f65a55c2b4f3e1cd508d3c647e978571d21c5951c21632eb55f8ec469992
Decryption completed in 1.57ms

Decrypted Data at Fog Node 1: Critical sensor readings: Temperature=25.6C, Pressure=1013hPa, Humidity=65%

Performance Metrics:
Average Round-Trip Time: 3.80ms
Throughput: 19.27 KB/s


In [3]:
#Iblink Main

import os
import time
import hashlib
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec, x448
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend

class IBlinkBase:
    """Base class for quantum-resistant encryption system."""
    def __init__(self, security_level=1):
        self.security_level = security_level
        self.keys = self._generate_keys()
        self.public_keys = [info["public_key"] for info in self.keys]

    def _generate_keys(self):
        """Generate key pairs based on security level."""
        keys = []
        if self.security_level >= 1:
            keys.append({
                "type": "EC",
                "curve": ec.SECP384R1(),
                "private_key": ec.generate_private_key(ec.SECP384R1(), default_backend()),
            })
        if self.security_level >= 2:
            keys.append({
                "type": "EC",
                "curve": ec.SECP521R1(),
                "private_key": ec.generate_private_key(ec.SECP521R1(), default_backend()),
            })
        if self.security_level >= 3:
            keys.append({
                "type": "X448",
                "curve": None,
                "private_key": x448.X448PrivateKey.generate(),
            })
        for key_info in keys:
            private_key = key_info["private_key"]
            key_info["public_key"] = private_key.public_key()
        return keys

class IBlink(IBlinkBase):
    """Enhanced version with hybrid key exchange, AES-GCM, and periodic key rotation."""
    def __init__(self, security_level=3, key_rotation_interval=3600):  # Default: 1 hour
        """Initialize with key rotation interval.

        Args:
            security_level (int): Security level (1-3).
            key_rotation_interval (int): Time in seconds between key rotations.
        """
        super().__init__(security_level)
        self.key_rotation_interval = key_rotation_interval
        self.last_key_rotation_time = time.time()

    def _hybrid_key_exchange(self, peer_public_keys):
        """Enhanced hybrid key exchange with periodic key rotation.

        Args:
            peer_public_keys (list): Peer public keys.

        Returns:
            bytes: Mixed shared secret.
        """
        current_time = time.time()
        # Check if it's time to rotate keys
        if current_time - self.last_key_rotation_time > self.key_rotation_interval:
            print(f"Rotating keys after {self.key_rotation_interval} seconds...")
            self.keys = self._generate_keys()
            self.public_keys = [info["public_key"] for info in self.keys]
            self.last_key_rotation_time = current_time
            print("Key rotation completed")

        shared_secrets = []
        for key_info in self.keys:
            local_key = key_info["private_key"]
            key_type = key_info["type"]
            for peer_key in peer_public_keys:
                try:
                    if key_type == "EC" and isinstance(peer_key, ec.EllipticCurvePublicKey):
                        if local_key.curve.name == peer_key.curve.name:
                            shared_secret = local_key.exchange(ec.ECDH(), peer_key)
                            shared_secrets.append(shared_secret)
                    elif key_type == "X448" and isinstance(peer_key, x448.X448PublicKey):
                        shared_secret = local_key.exchange(peer_key)
                        shared_secrets.append(shared_secret)
                except Exception as e:
                    print(f"Key exchange error: {e}")
                    continue
        if not shared_secrets:
            raise ValueError("No compatible key pairs found")
        print(f"Number of shared secrets generated: {len(shared_secrets)}")
        return self._secret_mixer(shared_secrets)

    def _secret_mixer(self, secrets):
        """Mix shared secrets deterministically.

        Args:
            secrets (list): List of shared secrets.

        Returns:
            bytes: 32-byte mixed secret for AES-256.
        """
        mixed_secret = hashlib.shake_256()
        for secret in secrets:
            mixed_secret.update(secret)
        return mixed_secret.digest(32)

    def encrypt(self, peer_public_keys, plaintext, associated_data=None):
        """Encrypt data using hybrid key exchange and AES-GCM.

        Args:
            peer_public_keys (list): Peer public keys.
            plaintext (bytes): Data to encrypt.
            associated_data (bytes, optional): Data for authentication.

        Returns:
            tuple: (ciphertext, tag, metadata)
        """
        nonce = os.urandom(12)
        encryption_key = self._hybrid_key_exchange(peer_public_keys)
        print(f"Encryption key hash: {hashlib.sha256(encryption_key).hexdigest()}")
        cipher = Cipher(
            algorithms.AES(encryption_key),
            modes.GCM(nonce),
            backend=default_backend()
        ).encryptor()
        if associated_data:
            cipher.authenticate_additional_data(associated_data)
        ciphertext = cipher.update(plaintext) + cipher.finalize()
        metadata = {
            'keys_used': [info["curve"].name if info["type"] == "EC" else "X448" for info in self.keys],
            'security_level': self.security_level,
            'nonce': nonce,
            'timestamp': time.time()
        }
        return ciphertext, cipher.tag, metadata

    def decrypt(self, peer_public_keys, ciphertext, tag, metadata=None):
        """Decrypt data using hybrid key exchange and AES-GCM.

        Args:
            peer_public_keys (list): Peer public keys.
            ciphertext (bytes): Encrypted data.
            tag (bytes): Authentication tag.
            metadata (dict, optional): Encryption metadata.

        Returns:
            bytes: Decrypted plaintext.
        """
        if not metadata or 'nonce' not in metadata:
            raise ValueError("Missing nonce in metadata")
        encryption_key = self._hybrid_key_exchange(peer_public_keys)
        print(f"Decryption key hash: {hashlib.sha256(encryption_key).hexdigest()}")
        cipher = Cipher(
            algorithms.AES(encryption_key),
            modes.GCM(metadata['nonce'], tag),
            backend=default_backend()
        ).decryptor()
        try:
            plaintext = cipher.update(ciphertext) + cipher.finalize()
            return plaintext
        except Exception as e:
            print(f"Decryption failed with error: {e}")
            raise

def fog_computing_demo():
    print("\nInitializing IBlink...")
    # Use a short interval (5 seconds) for demo purposes
    edge_node = IBlink(security_level=3, key_rotation_interval=5)
    fog_node1 = IBlink(security_level=3, key_rotation_interval=5)
    fog_node2 = IBlink(security_level=3, key_rotation_interval=5)  # Unused
    sensor_data = b"Critical sensor readings: Temperature=25.6C, Humidity=65%"
    print(f"\nOriginal Data: {sensor_data.decode()}")

    # Initial encryption/decryption with original keys
    print("\nPerforming initial encryption with original keys...")
    encrypted_data_initial, tag_initial, metadata_initial = edge_node.encrypt(fog_node1.public_keys, sensor_data)
    decrypted_data_initial = fog_node1.decrypt(edge_node.public_keys, encrypted_data_initial, tag_initial, metadata_initial)
    print(f"Initial Decrypted Data: {decrypted_data_initial.decode()}")

    # Wait to trigger key rotation
    print("\nWaiting 3 seconds to trigger key rotation...")
    time.sleep(3)

    # Simulate key rotation and synchronize public keys
    print("\nSynchronizing keys after rotation...")
    # Force rotation on edge_node
    edge_node._hybrid_key_exchange(fog_node1.public_keys)
    # Force rotation on fog_node1 and update peer keys
    fog_node1._hybrid_key_exchange(edge_node.public_keys)
    # Exchange new public keys (simulating a handshake)
    edge_node_public_keys = edge_node.public_keys
    fog_node1_public_keys = fog_node1.public_keys

    # Encryption and decryption with rotated, synchronized keys
    print("\nPerforming encryption with rotated keys...")
    start_time = time.time()
    try:
        print("Starting encryption...")
        encrypted_data, tag, metadata = edge_node.encrypt(fog_node1_public_keys, sensor_data)
        encryption_time = time.time() - start_time
        print(f"Encryption completed in {encryption_time*1000:.2f}ms")
        print(f"Encrypted Size: {len(encrypted_data)} bytes")
        print(f"Compression Ratio: {len(sensor_data)/len(encrypted_data):.2f}x")
        print(f"Keys used: {metadata['keys_used']}")

        print("\nPerforming decryption with rotated keys...")
        start_time = time.time()
        print("Starting decryption...")
        decrypted_data = fog_node1.decrypt(edge_node_public_keys, encrypted_data, tag, metadata)
        decryption_time = time.time() - start_time
        print(f"Decryption completed in {decryption_time*1000:.2f}ms")
        print(f"\nDecrypted Data at Fog Node 1: {decrypted_data.decode()}")

        print(f"\nPerformance Metrics:")
        print(f"Average Round-Trip Time: {(encryption_time + decryption_time)*1000:.2f}ms")
        print(f"Throughput: {len(sensor_data)/(encryption_time + decryption_time)/1024:.2f} KB/s")
    except Exception as e:
        print(f"Error during encryption/decryption: {e}")

if __name__ == "__main__":
    fog_computing_demo()


Initializing IBlink...

Original Data: Critical sensor readings: Temperature=25.6C, Humidity=65%

Performing initial encryption with original keys...
Number of shared secrets generated: 3
Encryption key hash: cd7ff3b18869ca8f4f57436a8b67c729c8aff5e34288666af1c7cfa03f7b1e1e
Number of shared secrets generated: 3
Decryption key hash: cd7ff3b18869ca8f4f57436a8b67c729c8aff5e34288666af1c7cfa03f7b1e1e
Initial Decrypted Data: Critical sensor readings: Temperature=25.6C, Humidity=65%

Waiting 3 seconds to trigger key rotation...

Synchronizing keys after rotation...
Number of shared secrets generated: 3
Number of shared secrets generated: 3

Performing encryption with rotated keys...
Starting encryption...
Number of shared secrets generated: 3
Encryption key hash: cd7ff3b18869ca8f4f57436a8b67c729c8aff5e34288666af1c7cfa03f7b1e1e
Encryption completed in 2.18ms
Encrypted Size: 57 bytes
Compression Ratio: 1.00x
Keys used: ['secp384r1', 'secp521r1', 'X448']

Performing decryption with rotated keys.