---

# EchoKey: Quantum-Inspired Adaptive Encryption System

**EchoKey** is a cutting-edge cryptographic framework designed to integrate quantum-inspired randomness, recursive fractal regression, and acoustic modulation for secure data encryption and decryption. By leveraging advanced mathematical principles and high-efficiency computing techniques, EchoKey provides adaptive, resilient, and high-performance cryptographic solutions.

---

## 🔑 **Key Features**

- **Dynamic Key Evolution**: Ensures continuous and adaptive encryption keys, resistant to both classical and quantum attacks.
- **Fractal Regression Algorithms**: Embeds complex recursive patterns for computationally infeasible key reversal.
- **Acoustic Modulation**: Simulates environmental quantum influences for stability and unpredictability.
- **Optimized Performance**: High-speed encryption and decryption using Numba-accelerated processes.
- **Robust Security**: Designed to handle large data sets with rolling synergy windows for enhanced randomness.

---

## ⚙️ **Installation**

1. Clone the repository:
   ```bash
   git clone https://github.com/your-username/EchoKey.git
   cd EchoKey
   ```
2. Install the required dependencies:
   ```bash
   pip install -r requirements.txt
   ```

---

## 🚀 **Usage**

### Encrypt Data
```python
from echokey import OptimizedEncryption

encryptor = OptimizedEncryption(seed=123456789)
data = b"Your message here"
encrypted_data = encryptor.encrypt(data)
```

### Decrypt Data
```python
decryptor = OptimizedEncryption(seed=123456789)
decrypted_data = decryptor.decrypt(encrypted_data)
assert decrypted_data == data
```

---

## 📜 **How It Works**

1. **Fractal Regression**:
   - Keys evolve through recursive, self-similar patterns for high entropy.
2. **Synergy Metrics**:
   - Balance randomness and order for dynamic, environment-sensitive encryption.
3. **Acoustic Modulation**:
   - Simulates environmental interactions for secure, real-time cryptographic processes.

### Code Highlights

- **`RollingWindow`**: Efficiently manages dynamic data buffers for synergy calculations.
- **`process_batch_numba`**: Accelerates processing with Numba's just-in-time compilation.
- **`OptimizedEncryption`**: Manages encryption/decryption workflows with dynamic parameter updates.

---

## 🔍 **Examples**

Run the included `main()` to test encryption and decryption:

- Single-byte encryption/decryption test ensures correctness.
- Generates test data files for performance analysis.

---

## 🛠️ **Development**

Contributions are welcome! Please fork the repository and submit a pull request.

---

## ⚠️ **Disclaimer**

This project is for **educational and testing purposes only**. It may contain flaws and is not recommended for sensitive or production environments without rigorous testing and review.

---

## 📞 **Contact**

For questions or feedback, reach out at [Jonpoplett@JGPTech.net).

---

In [7]:
import numpy as np
from dataclasses import dataclass
from typing import Tuple
import secrets
import sys
from tqdm import tqdm  # For progress bar
import logging
from numba import njit
import math

#=============================================================================#
#                            Configurable Variables                           #
#=============================================================================#

# General Settings
WINDOW_SIZE = 20          # Size of the rolling windows
BATCH_SIZE = 100        # Size of each processing batch
DEBUG_MODE = False          # Enable debug mode

# Encryption Parameters
PARAMS_ALPHA_INITIAL = 0.4   # Initial alpha parameter
PARAMS_BETA_INITIAL = 0.4   # Initial beta parameter
PARAMS_OMEGA_INITIAL = 1.0   # Initial omega parameter

# Numba Processing Parameters
CHUNK_SIZE = 256            # Chunk size for processing in numba

# Seed Settings
SEED = 985419863519517897816516351321981789             # Default seed value

# File Settings
ZERO_DATA_FILE = 'zero_data.bin'         # Filename for zero data
TEST_DATA_FILE = 'test_data.bin'  # Filename for test data

#=============================================================================#
#                                End of Config                                #
#=============================================================================#

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='[%(asctime)s] [%(levelname)s] %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout)
    ]
)

@dataclass
class RollingWindow:
    """Efficient rolling window using NumPy arrays."""
    data: np.ndarray
    size: int
    index: int = 0

    def update(self, value: float):
        self.data[self.index % self.size] = value
        self.index += 1

    def get_recent(self, n: int = None) -> np.ndarray:
        """
        Retrieve the most recent 'n' elements from the rolling window.
        Pads with zeros if fewer than 'n' elements.
        """
        if n is None or n > self.size:
            n = self.size
        if self.index >= n:
            return self.data[self.index - n:self.index]
        elif self.index > 0:
            return np.concatenate((self.data[-(n - self.index):], self.data[:self.index]))
        else:
            return self.data[-n:]

@njit
def calculate_synergy_numba(data_block: np.ndarray) -> float:
    """Calculate synergy using Numba for speed."""
    if data_block.size == 0:
        return 1.0
    alpha = data_block.mean()
    beta = data_block.std() + 1e-10  # Avoid division by zero
    return alpha / beta

@njit
def process_batch_numba(
    data_block: np.ndarray,
    synergy_window: np.ndarray,
    osc_x_window: np.ndarray,
    osc_y_window: np.ndarray,
    acoustic_window: np.ndarray,
    params_alpha: float,
    params_beta: float,
    params_omega: float,
    position_start: int,
    is_encrypt: bool,
    chunk_size: int  # Added chunk_size as a parameter
) -> Tuple[np.ndarray, float, float, float, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    n = len(data_block)
    processed = np.empty(n, dtype=np.uint8)
    
    # Pre-allocate arrays for better performance
    new_osc_x = np.empty(n, dtype=np.float64)
    new_osc_y = np.empty(n, dtype=np.float64)
    new_acoustic = np.empty(n, dtype=np.float64)
    
    # Process in smaller chunks for better cache utilization
    # chunk_size is now a parameter
    
    for chunk_start in range(0, n, chunk_size):
        chunk_end = min(chunk_start + chunk_size, n)
        chunk_size_actual = chunk_end - chunk_start
        # Currently, the loop does nothing with chunk_size_actual
        # This can be used for more complex processing if needed

    for i in range(n):
        position = position_start + i
        synergy = calculate_synergy_numba(synergy_window)

        prev_osc_x = osc_x_window[-1]
        prev_osc_y = osc_y_window[-1]

        new_osc_x_val = prev_osc_y
        new_osc_y_val = -params_alpha * prev_osc_y - (prev_osc_x ** 3) + params_beta * math.cos(params_omega * position)

        new_acoustic_val = (acoustic_window[-1] * math.pi + position * math.e) % 1.0

        new_alpha = 0.2 + 0.1 * math.sin(new_acoustic_val * math.pi)
        new_beta = 0.2 + 0.1 * math.cos(new_acoustic_val * math.e)
        new_omega = 1.0 + 0.1 * math.sin(new_acoustic_val * math.sqrt(2))

        rotate = max(1, min(7, int(synergy * 7)))

        osc_val = int((new_osc_x_val + 1) * 128) & 0xFF
        ac_val = int(new_acoustic_val * 256) & 0xFF
        byte = data_block[i]

        if is_encrypt:
            transformed_byte = ((byte ^ osc_val) + ac_val) & 0xFF
            transformed_byte = ((transformed_byte << rotate) | (transformed_byte >> (8 - rotate))) & 0xFF
        else:
            transformed_byte = ((byte >> rotate) | (byte << (8 - rotate))) & 0xFF
            transformed_byte = (transformed_byte - ac_val) & 0xFF
            transformed_byte = (transformed_byte ^ osc_val) & 0xFF

        processed[i] = transformed_byte

        synergy_window = np.roll(synergy_window, -1)
        synergy_window[-1] = byte if is_encrypt else transformed_byte

        osc_x_window = np.roll(osc_x_window, -1)
        osc_x_window[-1] = new_osc_x_val

        osc_y_window = np.roll(osc_y_window, -1)
        osc_y_window[-1] = new_osc_y_val

        acoustic_window = np.roll(acoustic_window, -1)
        acoustic_window[-1] = new_acoustic_val

        params_alpha = new_alpha
        params_beta = new_beta
        params_omega = new_omega

    return processed, params_alpha, params_beta, params_omega, synergy_window, osc_x_window, osc_y_window, acoustic_window
        
class OptimizedEncryption:
    def __init__(self, seed: int = None, window_size: int = WINDOW_SIZE, batch_size: int = BATCH_SIZE, debug: bool = DEBUG_MODE):
        self.window_size = window_size
        self.batch_size = batch_size
        self.debug = debug

        self.synergy_window = RollingWindow(np.zeros(window_size, dtype=np.float64), window_size)
        self.oscillator_x_window = RollingWindow(np.zeros(window_size, dtype=np.float64), window_size)
        self.oscillator_y_window = RollingWindow(np.zeros(window_size, dtype=np.float64), window_size)
        self.acoustic_window = RollingWindow(np.zeros(window_size, dtype=np.float64), window_size)

        self.params_alpha = PARAMS_ALPHA_INITIAL
        self.params_beta = PARAMS_BETA_INITIAL
        self.params_omega = PARAMS_OMEGA_INITIAL

        if seed is None:
            seed = secrets.randbits(32)
        self.seed = seed & ((1 << 32) - 1)

        self.init_system_state()
        self.position = 0

    def init_system_state(self):
        rng = np.random.RandomState(self.seed)
        initial_x = rng.uniform(-1, 1)
        initial_y = rng.uniform(-1, 1)
        initial_acoustic = rng.uniform(0, 1)

        for _ in range(self.window_size):
            self.oscillator_x_window.update(initial_x)
            self.oscillator_y_window.update(initial_y)
            self.acoustic_window.update(initial_acoustic)
            self.synergy_window.update(1.0)

    def process(self, data: bytes, is_encrypt: bool) -> bytes:
        processed = bytearray(len(data))
        total_batches = (len(data) + self.batch_size - 1) // self.batch_size

        with tqdm(total=total_batches, desc="Encrypting" if is_encrypt else "Decrypting", unit="batch", dynamic_ncols=True) as pbar:
            for idx in range(0, len(data), self.batch_size):
                chunk = data[idx:idx + self.batch_size]
                chunk_array = np.frombuffer(chunk, dtype=np.uint8)

                processed_chunk, self.params_alpha, self.params_beta, self.params_omega, \
                synergy_window, osc_x_window, osc_y_window, acoustic_window = process_batch_numba(
                    chunk_array,
                    self.synergy_window.get_recent(self.window_size),
                    self.oscillator_x_window.get_recent(self.window_size),
                    self.oscillator_y_window.get_recent(self.window_size),
                    self.acoustic_window.get_recent(self.window_size),
                    self.params_alpha,
                    self.params_beta,
                    self.params_omega,
                    self.position,
                    is_encrypt,
                    CHUNK_SIZE  # Pass chunk_size
                )

                self.synergy_window.data = synergy_window
                self.oscillator_x_window.data = osc_x_window
                self.oscillator_y_window.data = osc_y_window
                self.acoustic_window.data = acoustic_window

                processed[idx:idx + len(processed_chunk)] = processed_chunk.tobytes()

                self.position += len(processed_chunk)
                pbar.update(1)

        return bytes(processed)

    def encrypt(self, data: bytes) -> bytes:
        return self.process(data, is_encrypt=True)

    def decrypt(self, data: bytes) -> bytes:
        return self.process(data, is_encrypt=False)

# Test the functionality
def test_single_byte():
    """Test encryption and decryption of a single byte."""
    seed = 123456789  # Fixed seed for consistency
    encryptor = OptimizedEncryption(seed=seed)
    original = bytes([42])  # Test with a single byte
    encrypted = encryptor.encrypt(original)

    decryptor = OptimizedEncryption(seed=seed)
    decrypted = decryptor.decrypt(encrypted)

    assert original == decrypted, "Decryption failed!"
    print(f"Original: {original.hex()}, Encrypted: {encrypted.hex()}, Decrypted: {decrypted.hex()}")

def generate_test_data(size: int = 100_000_000, batch_size: int = BATCH_SIZE, debug: bool = DEBUG_MODE, seed: int = SEED) -> bytes:
    """Generate test data using optimized encryption with a progress bar."""
    logging.info("Loading or generating zero data...")
    
    # Try to load existing zero data file
    try:
        with open(ZERO_DATA_FILE, 'rb') as f:
            data = f.read(size)
            if len(data) < size:
                raise ValueError("Zero data file too small")
    except (FileNotFoundError, ValueError):
        # Generate and save zero data if file doesn't exist or is too small
        logging.info("Generating new zero data file...")
        data_np = np.zeros(size, dtype=np.uint8)
        data = data_np.tobytes()
        with open(ZERO_DATA_FILE, 'wb') as f:
            f.write(data)
        logging.info(f"Zero data saved to {ZERO_DATA_FILE}")
    
    # Encrypt using optimized system
    encryptor = OptimizedEncryption(window_size=WINDOW_SIZE, batch_size=batch_size, debug=debug, seed=seed)
    logging.info("Starting encryption of test data...")
    encrypted_data = encryptor.encrypt(data)
    logging.info("Encryption of test data completed.")
    
    return encrypted_data

def main():
    import time

    # Example usage with debug mode enabled
    debug_mode = DEBUG_MODE  # Set via config
    seed = SEED  # Set via config

    try:
        message = input("Enter message to encrypt: ").encode()
    except KeyboardInterrupt:
        print("\nEncryption canceled.")
        sys.exit(0)

    # Create encryption system with optimized settings
    encryptor = OptimizedEncryption(window_size=WINDOW_SIZE, batch_size=BATCH_SIZE, debug=debug_mode, seed=seed)

    try:
        # Encrypt
        logging.info("Starting encryption...")
        start_time = time.time()
        encrypted = encryptor.encrypt(message)
        encrypt_time = time.time() - start_time
        logging.info(f"Encryption Time: {encrypt_time:.2f} seconds")
        print(f"\nEncrypted (hex): {encrypted.hex()}")

        # Decrypt using the same seed
        decryptor = OptimizedEncryption(window_size=WINDOW_SIZE, batch_size=BATCH_SIZE, debug=debug_mode, seed=seed)
        logging.info("Starting decryption...")
        start_time = time.time()
        decrypted = decryptor.decrypt(encrypted)
        decrypt_time = time.time() - start_time
        logging.info(f"Decryption Time: {decrypt_time:.2f} seconds")
        try:
            print(f"Decrypted: {decrypted.decode()}")
        except UnicodeDecodeError:
            print("Decryption resulted in invalid UTF-8 bytes.")

        # Generate test data
        print("\nGenerating test data...")
        start_time = time.time()
        test_data = generate_test_data(size=100_000_000, batch_size=BATCH_SIZE, debug=debug_mode, seed=seed)
        gen_time = time.time() - start_time
        logging.info(f"Test Data Generation Time: {gen_time:.2f} seconds")
        with open(TEST_DATA_FILE, 'wb') as f:
            f.write(test_data)
        logging.info(f"Test data generated and saved to '{TEST_DATA_FILE}'")
        
    finally:
        pass

if __name__ == "__main__":
   # Run single byte test to ensure correctness
   print("Running single byte test...")
   test_single_byte()
   print("\nSingle byte test passed.\n")

   # Proceed to main encryption and decryption
   main()


Running single byte test...


Encrypting: 100%|██████████████████████████████| 1/1 [00:00<00:00,  3.30batch/s]
Decrypting: 100%|███████████████████████████| 1/1 [00:00<00:00, 18001.30batch/s]


Original: 2a, Encrypted: 9d, Decrypted: 2a

Single byte test passed.



Enter message to encrypt:  Hello World!


[2024-11-21 04:09:04,656] [INFO] Starting encryption...


Encrypting: 100%|███████████████████████████| 1/1 [00:00<00:00, 12748.64batch/s]

[2024-11-21 04:09:04,657] [INFO] Encryption Time: 0.00 seconds

Encrypted (hex): d0784d74cb747fe876449896
[2024-11-21 04:09:04,657] [INFO] Starting decryption...



Decrypting: 100%|███████████████████████████| 1/1 [00:00<00:00, 23967.45batch/s]

[2024-11-21 04:09:04,658] [INFO] Decryption Time: 0.00 seconds
Decrypted: Hello World!

Generating test data...
[2024-11-21 04:09:04,658] [INFO] Loading or generating zero data...
[2024-11-21 04:09:04,686] [INFO] Starting encryption of test data...



Encrypting: 100%|███████████████| 1000000/1000000 [00:26<00:00, 37475.49batch/s]

[2024-11-21 04:09:31,423] [INFO] Encryption of test data completed.





[2024-11-21 04:09:31,426] [INFO] Test Data Generation Time: 26.77 seconds
[2024-11-21 04:09:31,456] [INFO] Test data generated and saved to 'test_data.bin'
