In [None]:
# ================= INSTALL PYTHON PACKAGES =================
!pip install -q \
    opencv-python-headless \
    opencv-python \
    pydicom \
    scikit-image \
    pillow \
    matplotlib \
    kagglehub[pandas-datasets]

# ================= INSTALL SYSTEM DEPENDENCIES =================
!apt-get -qq update
!apt-get -qq install -y cmake ninja-build gcc g++ libssl-dev

# ================= INSTALL liboqs-python =================
%cd /content
!rm -rf liboqs-python
!git clone --depth 1 https://github.com/open-quantum-safe/liboqs-python.git
%cd liboqs-python
!pip install -q .

In [None]:


import oqs


with oqs.KeyEncapsulation("ML-KEM-768") as kem_receiver:
    public_key = kem_receiver.generate_keypair()


    with oqs.KeyEncapsulation("ML-KEM-768") as kem_sender:
        ciphertext, shared_secret_sender = kem_sender.encap_secret(public_key)


    shared_secret_receiver = kem_receiver.decap_secret(ciphertext)

    print("ML-KEM-768 works")
    print("Shared secret match:", shared_secret_sender == shared_secret_receiver)
    print("Shared secret length:", len(shared_secret_sender))


In [None]:
#download dataset from kaggle
import os
import kagglehub


dataset_path = kagglehub.dataset_download("unidpro/brain-cancer-dataset")
print("Dataset path:", dataset_path)


dicom_files = []

for root, _, files in os.walk(dataset_path):
    for f in files:
        if f.lower().endswith(".dcm"):
            dicom_files.append(os.path.join(root, f))

print(f"Total DICOM images found: {len(dicom_files)}")


dicom_files[:5]


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import cv2
import pydicom
import os
from skimage.metrics import peak_signal_noise_ratio as psnr, structural_similarity as ssim
from typing import Dict, Tuple
from PIL import Image
import io
import hashlib
import json
from datetime import datetime
import sys




class Block:
    def __init__(self, timestamp, data, previous_hash):
        self.timestamp = timestamp
        self.data = data
        self.previous_hash = previous_hash
        self.hash = self.calculate_hash()

    def calculate_hash(self):
        """Calculates the hash of the block."""
        block_string = str(self.timestamp) + str(self.data) + str(self.previous_hash)
        return hashlib.sha256(block_string.encode()).hexdigest()

class Blockchain:
    def __init__(self):

        self.chain = [self.create_genesis_block()]

    def create_genesis_block(self):
        """Creates the very first block in the chain."""
        return Block(datetime.now(), "Genesis Block", "0")

    def get_latest_block(self):
        """Returns the most recent block in the chain."""
        return self.chain[-1]

    def add_block(self, new_data):
        """Adds a new block to the chain."""
        latest_block = self.get_latest_block()
        new_block = Block(
            timestamp=datetime.now(),
            data=new_data,
            previous_hash=latest_block.hash
        )
        self.chain.append(new_block)
        print(" New block added to the chain with key hash:", new_data)

    def is_chain_valid(self):
        """Validates the integrity of the entire blockchain."""
        for i in range(1, len(self.chain)):
            current_block = self.chain[i]
            previous_block = self.chain[i-1]

            if current_block.hash != current_block.calculate_hash():
                return False
            if current_block.previous_hash != previous_block.hash:
                return False
        return True

class dcmCompressor:
    def __init__(self, j2k_quality=90, epsilon=1/512):
        self.j2k_quality = j2k_quality
        self.epsilon = epsilon
        self.original_pixel_data = None
        self.metadata = {}


    def _extract_metadata(self, dcm):
        self.metadata = {
            'Modality': dcm.get('Modality', 'UN'),
            'BitsStored': dcm.get('BitsStored', 12),
            'TransferSyntaxUID': dcm.file_meta.TransferSyntaxUID if hasattr(dcm, 'file_meta') else '',
            'PhotometricInterpretation': dcm.get('PhotometricInterpretation', 'MONOCHROME2'),
            'Rows': dcm.Rows,
            'Columns': dcm.Columns
        }

    def _convert_complex_to_magnitude(self, pixel_array):
        if np.iscomplexobj(pixel_array):
            print("Converting complex SWI data to magnitude")
            return np.abs(pixel_array)
        return pixel_array

    def _normalize_pixel_data(self, pixel_array, bits_stored):
        max_val = 2**bits_stored - 1
        return pixel_array.astype(np.float32) / max_val

    def _denormalize_pixel_data(self, normalized_array, bits_stored):
        max_val = 2**bits_stored - 1
        return (normalized_array * max_val).astype(np.uint16)

    def _jpeg2000_compress(self, image, quality_mode='dB'):
        img_uint16 = self._denormalize_pixel_data(image, self.metadata['BitsStored'])
        img_pil = Image.fromarray(img_uint16)
        buf = io.BytesIO()
        img_pil.save(buf, format='JPEG2000',
            quality_mode=quality_mode,
            quality_layers=[self.j2k_quality])
        buf.seek(0)
        decompressed = np.array(Image.open(buf)).astype(np.float32)
        decompressed = self._normalize_pixel_data(decompressed, self.metadata['BitsStored'])
        return buf.getvalue(), decompressed

    def _calculate_residual(self, original, compressed):
        residual = original - compressed


        residual = np.clip(residual, -self.epsilon, +self.epsilon)


        max_abs = np.max(np.abs(residual))
        if max_abs > 0:
            residual_scaled = residual / max_abs
        else:
            residual_scaled = residual

        return residual_scaled


    def _reconstruct_from_residual(self, compressed, residual):
        residual_restored = residual * np.max(np.abs(self.original_pixel_data - compressed))
        return np.clip(compressed + residual_restored, 0, 1)

    def compress(self, filepath):
        try:
            dcm = pydicom.dcmread(filepath, force=True)
            self._extract_metadata(dcm)
            pixel_array = dcm.pixel_array
            pixel_array = self._convert_complex_to_magnitude(pixel_array)
            self.original_pixel_data = self._normalize_pixel_data(pixel_array, self.metadata['BitsStored'])

            j2k_data, j2k_recon = self._jpeg2000_compress(self.original_pixel_data)
            residual = self._calculate_residual(self.original_pixel_data, j2k_recon)
            residual_data, _ = self._jpeg2000_compress(residual, quality_mode='rates')

            return {
                'metadata': self.metadata,
                'j2k_primary': j2k_data,
                'j2k_residual': residual_data,
                'compression_ratio': len(dcm.PixelData) / (len(j2k_data) + len(residual_data))
            }
        except Exception as e:
            print(f"Compression failed for {filepath}: {str(e)}")
            return None

    def decompress(self, compressed_data):
        try:
            buf = io.BytesIO(compressed_data['j2k_primary'])
            primary_img = np.array(Image.open(buf)).astype(np.float32)
            primary_img = self._normalize_pixel_data(primary_img, compressed_data['metadata']['BitsStored'])

            buf = io.BytesIO(compressed_data['j2k_residual'])
            residual_img = np.array(Image.open(buf)).astype(np.float32)
            residual_img = residual_img / 32767.0

            reconstructed = self._reconstruct_from_residual(primary_img, residual_img)
            return reconstructed
        except Exception as e:
            print(f"Decompression failed: {str(e)}")
            return None

    def evaluate(self, filepath):
        compressed = self.compress(filepath)
        if not compressed:
            return None

        reconstructed = self.decompress(compressed)
        if reconstructed is None:
            return None

        metrics = {
            'filename': os.path.basename(filepath),
            'psnr': psnr(self.original_pixel_data, reconstructed, data_range=1.0),
            'ssim': ssim(self.original_pixel_data, reconstructed,
                data_range=1.0, win_size=3, channel_axis=None),
            'compression_ratio': compressed['compression_ratio'],
            'original_size': len(pydicom.dcmread(filepath).PixelData),
            'compressed_size': len(compressed['j2k_primary']) + len(compressed['j2k_residual'])
    }


        return metrics

class MedicalImageEncryptor:
    """
    Medical Image Encryption Framework using:
    - Enhanced Memristive Hyperchaotic Key Generator
    - Hybrid Permutation
    - Bidirectional Diffusion
    """

    def __init__(self):
        self.encryption_key = None

    def enhanced_memristive_chaotic_generator(self, x0, y0, z0, w0,a, b, c, d, e, size) -> np.ndarray:
        """
         Generate chaotic key stream using a 4D memristive hyperchaotic system.
        """
        x, y, z, w = x0, y0, z0, w0
        dt = 0.005


        for _ in range(2000):
            dx = a*(y - x) + w + e*x*z
            dy = b*x - x*z + c*y
            dz = x*y - d*z
            dw = -x - e*w
            x, y, z, w = x + dt*dx, y + dt*dy, z + dt*dz, w + dt*dw


        key_stream = np.zeros(size, dtype=np.uint8)
        for i in range(size):
            dx = a*(y - x) + w + e*x*z
            dy = b*x - x*z + c*y
            dz = x*y - d*z
            dw = -x - e*w
            x, y, z, w = x + dt*dx, y + dt*dy, z + dt*dz, w + dt*dw
            val = int(abs(x*1e5 + y*1e4 + z*1e3 + w*1e2)) % 256
            key_stream[i] = val
        return key_stream


    def hybrid_permutation(self, image_array: np.ndarray, key_stream: np.ndarray) -> np.ndarray:
        rows, cols = image_array.shape
        total = rows * cols
        flat = image_array.flatten()

        random_order = np.argsort(key_stream[:total])
        permuted = flat[random_order]
        return permuted.reshape(rows, cols)

    def reverse_hybrid_permutation(self, image_array: np.ndarray, key_stream: np.ndarray) -> np.ndarray:
        rows, cols = image_array.shape
        total = rows * cols
        flat = image_array.flatten()

        random_order = np.argsort(key_stream[:total])
        original = np.zeros_like(flat)
        original[random_order] = flat
        return original.reshape(rows, cols)


    def bidirectional_diffusion(self, image_array: np.ndarray, key_stream: np.ndarray, rounds=2) -> np.ndarray:
        diffused = image_array.copy().astype(np.int32)
        rows, cols = diffused.shape
        flat_size = rows * cols

        for _ in range(rounds):

            for i in range(flat_size):
              r, c = i // cols, i % cols
              key_val = int(key_stream[i % len(key_stream)])
              left = diffused[r, (c-1) % cols]
              up = diffused[(r-1) % rows, c]
              prev = diffused[(i-1) // cols, (i-1) % cols] if i > 0 else 0
              mix = (key_val ^ r ^ c ^ left ^ up ^ prev) & 0xFF
              diffused[r, c] = (diffused[r, c] ^ mix) & 0xFF


            for i in range(flat_size-1, -1, -1):
              r, c = i // cols, i % cols
              key_val = int(key_stream[i % len(key_stream)])
              right = diffused[r, (c+1) % cols]
              down = diffused[(r+1) % rows, c]
              nxt = diffused[(i+1) // cols, (i+1) % cols] if i < flat_size-1 else 0
              mix = (key_val ^ r ^ c ^ right ^ down ^ nxt) & 0xFF
              diffused[r, c] = (diffused[r, c] ^ mix) & 0xFF

        return diffused.astype(np.uint8)

    def reverse_bidirectional_diffusion(self, image_array: np.ndarray, key_stream: np.ndarray, rounds=2) -> np.ndarray:
        undiffused = image_array.copy().astype(np.int32)
        rows, cols = undiffused.shape
        flat_size = rows * cols

        for _ in range(rounds):

            for i in range(flat_size):
                r, c = i // cols, i % cols
                key_val = int(key_stream[i % len(key_stream)])
                right = undiffused[r, (c+1) % cols]
                down = undiffused[(r+1) % rows, c]
                nxt = undiffused[(i+1) // cols, (i+1) % cols] if i < flat_size-1 else 0
                mix = (key_val ^ r ^ c ^ right ^ down ^ nxt) & 0xFF
                undiffused[r, c] = (undiffused[r, c] ^ mix) & 0xFF

            for i in range(flat_size-1, -1, -1):
                r, c = i // cols, i % cols
                key_val = int(key_stream[i % len(key_stream)])
                left = undiffused[r, (c-1) % cols]
                up = undiffused[(r-1) % rows, c]
                prev = undiffused[(i-1) // cols, (i-1) % cols] if i > 0 else 0
                mix = (key_val ^ r ^ c ^ left ^ up ^ prev) & 0xFF
                undiffused[r, c] = (undiffused[r, c] ^ mix) & 0xFF

        return undiffused.astype(np.uint8)


    def encrypt_image(self, image_array: np.ndarray, key_params: Dict=None) -> Tuple[np.ndarray, Dict]:
        if len(image_array.shape) == 3:
            encrypted_channels, params = [], None
            for i in range(3):
                encrypted, params = self.encrypt_image(image_array[:, :, i], key_params)
                encrypted_channels.append(encrypted)
            return np.stack(encrypted_channels, axis=2), params

        if key_params is None:
            key_params = {
                'x0': random.random(), 'y0': random.random(),
                'z0': random.random(), 'w0': random.random(),
                'a': 35.0 + random.random()*5,
                'b': 3.0 + random.random(),
                'c': 12.0 + random.random()*2,
                'd': 7.0 + random.random(),
                'e': 0.1 + random.random()*0.1
        }


        seed = int(np.sum(image_array)) % 1000
        key_params = key_params.copy()
        key_params['seed'] = seed
        key_params['x0'] = (key_params['x0'] + seed) % 1
        key_params['y0'] = (key_params['y0'] + seed/2) % 1
        key_params['z0'] = (key_params['z0'] + seed/3) % 1
        key_params['w0'] = (key_params['w0'] + seed/4) % 1

        rows, cols = image_array.shape
        total_pixels = rows * cols
        key_stream = self.enhanced_memristive_chaotic_generator(
            key_params['x0'], key_params['y0'], key_params['z0'], key_params['w0'],
            key_params['a'], key_params['b'], key_params['c'], key_params['d'], key_params['e'],
            total_pixels*2
        )

        permuted = self.hybrid_permutation(image_array, key_stream[:total_pixels])
        encrypted = self.bidirectional_diffusion(permuted, key_stream[total_pixels:], rounds=2)
        self.encryption_key = key_params
        return encrypted, key_params

    def decrypt_image(self, encrypted_array: np.ndarray, key_params: Dict) -> np.ndarray:
        if len(encrypted_array.shape) == 3:
            decrypted_channels = []
            for i in range(3):
                decrypted = self.decrypt_image(encrypted_array[:, :, i], key_params)
                decrypted_channels.append(decrypted)
            return np.stack(decrypted_channels, axis=2)

        rows, cols = encrypted_array.shape
        total_pixels = rows * cols


        key_stream = self.enhanced_memristive_chaotic_generator(
            key_params['x0'], key_params['y0'], key_params['z0'], key_params['w0'],
            key_params['a'], key_params['b'], key_params['c'], key_params['d'], key_params['e'],
            total_pixels*2
        )

        undiffused = self.reverse_bidirectional_diffusion(encrypted_array, key_stream[total_pixels:], rounds=2)
        decrypted = self.reverse_hybrid_permutation(undiffused, key_stream[:total_pixels])
        return decrypted


    def calculate_metrics(self, original: np.ndarray, encrypted: np.ndarray, decrypted: np.ndarray) -> Dict:
        modified_original = original.copy()
        modified_original[0, 0] = (int(modified_original[0, 0]) + 1) % 256
        modified_encrypted, _ = self.encrypt_image(modified_original, key_params=self.encryption_key)

        diff_matrix = (encrypted != modified_encrypted).astype(np.int32)
        npcr = (np.sum(diff_matrix) / original.size) * 100
        uaci = (np.sum(np.abs(encrypted.astype(np.int32) - modified_encrypted.astype(np.int32)))
                    / (255 * original.size)) * 100

        def entropy(img):
            hist = np.histogram(img.flatten(), bins=256, range=[0, 256])[0]
            prob = hist / np.sum(hist)
            prob = prob[prob > 0]
            return -np.sum(prob * np.log2(prob))

        return {
            'NPCR': npcr,
            'UACI': uaci,
            'Original Entropy': entropy(original),
            'Encrypted Entropy': entropy(encrypted)
        }





In [None]:
#experiment for a single dicom image
import hmac
import hashlib
import secrets
import sys
import oqs
import cv2
import numpy as np
import matplotlib.pyplot as plt
import pydicom
from skimage.metrics import peak_signal_noise_ratio as psnr, structural_similarity as ssim
import time
import statistics


def hkdf(ikm: bytes, salt: bytes, info: bytes, length=64):
    prk = hmac.new(salt, ikm, hashlib.sha256).digest()
    t = b""
    okm = b""
    c = 1
    while len(okm) < length:
        t = hmac.new(prk, t + info + bytes([c]), hashlib.sha256).digest()
        okm += t
        c += 1
    return okm[:length]

def derive_session_seed(shared_secret, nonce, receiver_id):
    return hkdf(shared_secret, nonce, receiver_id, 32)

def derive_image_key(session_seed, dcm):
    uid_blob = (
        dcm.StudyInstanceUID +
        dcm.SeriesInstanceUID +
        dcm.SOPInstanceUID +
        dcm.SOPClassUID
    ).encode()
    return hkdf(session_seed, b"DICOM", uid_blob, 64)

def derive_domain_keys(K_img):
    return {
        "K_perm_p": hkdf(K_img, b"", b"perm:primary", 32),
        "K_diff_p": hkdf(K_img, b"", b"diff:primary", 32),
        "K_perm_r": hkdf(K_img, b"", b"perm:residual", 32),
        "K_diff_r": hkdf(K_img, b"", b"diff:residual", 32),
        "K_mac":    hkdf(K_img, b"", b"mac", 32)
    }

def map_key_to_chaos_params(K_img):
    v = [int.from_bytes(K_img[i:i+4], 'big') for i in range(0, 36, 4)]
    return {
        'x0': 0.1 + (v[0] % 1000) / 10000,
        'y0': 0.1 + (v[1] % 1000) / 10000,
        'z0': 0.1 + (v[2] % 1000) / 10000,
        'w0': 0.1 + (v[3] % 1000) / 10000,
        'a': 35.0 + (v[4] % 50) / 10,
        'b': 3.0 + (v[5] % 10) / 10,
        'c': 12.0 + (v[6] % 20) / 10,
        'd': 7.0 + (v[7] % 10) / 10,
        'e': 0.1 + (v[8] % 10) / 100
    }

if __name__ == "__main__":

    print("Blockchain for Key Validation Initialized.")

    input_filepath = "image path "
    dcm = pydicom.dcmread(input_filepath, force=True)


    compressor = dcmCompressor(j2k_quality=90)
    encryptor = MedicalImageEncryptor()

    print("\n==================== COMPRESSION ====================")
    compression_results = compressor.compress(input_filepath)
    reconstructed = compressor.decompress(compression_results)

    print(f"PSNR : {psnr(compressor.original_pixel_data, reconstructed, data_range=1.0):.2f} dB")
    print(f"SSIM : {ssim(compressor.original_pixel_data, reconstructed, data_range=1.0):.4f}")
    print(f"Compression Ratio : {compression_results['compression_ratio']:.3f}")

    compressed_uint8 = cv2.normalize(
        reconstructed, None, 0, 255, cv2.NORM_MINMAX
    ).astype(np.uint8)


    kem_receiver = oqs.KeyEncapsulation("ML-KEM-768")
    pk_PQC = kem_receiver.generate_keypair()


    print("\n==================== SENDER SIDE ====================")

    nonce = secrets.token_bytes(16)
    receiver_id = b"Receiver-01"

    kem_sender = oqs.KeyEncapsulation("ML-KEM-768")
    ct_pqc, shared_secret = kem_sender.encap_secret(pk_PQC)

    session_seed = derive_session_seed(shared_secret, nonce, receiver_id)
    K_img = derive_image_key(session_seed, dcm)

    domain_keys = derive_domain_keys(K_img)
    chaos_params = map_key_to_chaos_params(K_img)

    encrypted_img, _ = encryptor.encrypt_image(
        compressed_uint8, key_params=chaos_params
    )

    ledger_hash = hashlib.sha256(K_img).hexdigest()


    print("\n==================== RECEIVER SIDE ====================")

    shared_secret_rx = kem_receiver.decap_secret(ct_pqc)
    assert shared_secret == shared_secret_rx, " ML-KEM shared secret mismatch"

    session_seed_rx = derive_session_seed(shared_secret_rx, nonce, receiver_id)
    K_img_rx = derive_image_key(session_seed_rx, dcm)

    chaos_params_rx = map_key_to_chaos_params(K_img_rx)

    if hashlib.sha256(K_img_rx).hexdigest() != ledger_hash:
        print(" Key verification failed. Abort.")
        sys.exit(1)

    decrypted_img = encryptor.decrypt_image(
        encrypted_img, chaos_params_rx
    )

    print(" Key verification successful. Decryption completed.")


    metrics = encryptor.calculate_metrics(
        compressed_uint8, encrypted_img, decrypted_img
    )

    print("\n==================== ENCRYPTION METRICS ====================")
    for k, v in metrics.items():
        print(f"{k}: {v:.4f}")




In [None]:
# running experiment for whole dataset
import pydicom
import numpy as np
import cv2
import json
import hashlib
from collections import defaultdict
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim



key_validator_chain = Blockchain()

fixed_keys = {
    'x0': 0.123456, 'y0': 0.654321, 'z0': 0.246810, 'w0': 0.135791,
    'a': 37.5, 'b': 3.5, 'c': 13.2, 'd': 7.8, 'e': 0.15
}

compressor = dcmCompressor()
encryptor = MedicalImageEncryptor()



TARGET_RESOLUTIONS = {256, 800, 960}
MAX_IMAGES_PER_RES = 20



metrics_by_resolution = defaultdict(lambda: {
    'psnr': [],
    'ssim': [],
    'compression_ratio': []
})

image_count_by_resolution = defaultdict(int)



for input_filepath in dicom_files:
    try:
        dcm = pydicom.dcmread(input_filepath, force=True)
        pixel_array = dcm.pixel_array.astype(np.float32)

        height, width = pixel_array.shape[:2]


        if height != width or height not in TARGET_RESOLUTIONS:
            continue

        if image_count_by_resolution[height] >= MAX_IMAGES_PER_RES:
            continue


        compression_results = compressor.compress(input_filepath)
        reconstructed = compressor.decompress(compression_results)

        psnr_val = psnr(
            compressor.original_pixel_data,
            reconstructed,
            data_range=1.0
        )

        ssim_val = ssim(
            compressor.original_pixel_data,
            reconstructed,
            data_range=1.0
        )


        reconstructed_uint8 = cv2.normalize(
            reconstructed, None, 0, 255, cv2.NORM_MINMAX
        ).astype(np.uint8)

        encrypted_img, encryption_keys_used = encryptor.encrypt_image(
            reconstructed_uint8, key_params=fixed_keys
        )

        key_hash = hashlib.sha256(
            json.dumps(encryption_keys_used, sort_keys=True).encode()
        ).hexdigest()

        key_validator_chain.add_block(key_hash)

        if key_hash != key_validator_chain.get_latest_block().data:
            continue


        metrics_by_resolution[height]['psnr'].append(psnr_val)
        metrics_by_resolution[height]['ssim'].append(ssim_val)
        metrics_by_resolution[height]['compression_ratio'].append(
            compression_results['compression_ratio']
        )

        image_count_by_resolution[height] += 1

    except Exception as e:
        print(f"Skipping {input_filepath}: {e}")



final_table = {}

for res, metrics in metrics_by_resolution.items():
    final_table[res] = {
        'count': len(metrics['psnr']),
        'PSNR_mean': np.mean(metrics['psnr']),
        'PSNR_std': np.std(metrics['psnr']),
        'SSIM_mean': np.mean(metrics['ssim']),
        'SSIM_std': np.std(metrics['ssim']),
        'CR_mean': np.mean(metrics['compression_ratio']),
        'CR_std': np.std(metrics['compression_ratio'])
    }



print("\n=== Compression Performance (20 Images per Resolution) ===")

for res in sorted(final_table.keys()):
    stats = final_table[res]
    print(f"\nResolution: {res} × {res}")
    print(f"Images used: {stats['count']}")
    print(f"PSNR: {stats['PSNR_mean']:.2f} ± {stats['PSNR_std']:.2f} dB")
    print(f"SSIM: {stats['SSIM_mean']:.4f} ± {stats['SSIM_std']:.4f}")
    print(f"CR:   {stats['CR_mean']:.2f} ± {stats['CR_std']:.2f}")


In [None]:
# Compression Performance & Per-Pixel Error Analysis
import pydicom
import numpy as np
import cv2
import json
import hashlib
import matplotlib.pyplot as plt
from collections import defaultdict
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim



key_validator_chain = Blockchain()

fixed_keys = {
    'x0': 0.123456, 'y0': 0.654321, 'z0': 0.246810, 'w0': 0.135791,
    'a': 37.5, 'b': 3.5, 'c': 13.2, 'd': 7.8, 'e': 0.15
}

compressor = dcmCompressor(j2k_quality=90)
encryptor = MedicalImageEncryptor()


TARGET_RESOLUTIONS = {256, 800, 960}
ERROR_PLOT_RES = {256, 800, 960}
MAX_IMAGES_PER_RES = 20



metrics_by_resolution = defaultdict(lambda: {
    'psnr': [],
    'ssim': [],
    'compression_ratio': [],
    'pixel_errors': []
})

image_count_by_resolution = defaultdict(int)



for input_filepath in dicom_files:
    try:
        dcm = pydicom.dcmread(input_filepath, force=True)
        pixel_array = dcm.pixel_array.astype(np.float32)

        if pixel_array.ndim != 2:
            continue

        height, width = pixel_array.shape

        if height != width or height not in TARGET_RESOLUTIONS:
            continue

        if image_count_by_resolution[height] >= MAX_IMAGES_PER_RES:
            continue


        compression_results = compressor.compress(input_filepath)
        reconstructed = compressor.decompress(compression_results)

        original = compressor.original_pixel_data


        psnr_val = psnr(original, reconstructed, data_range=1.0)
        ssim_val = ssim(original, reconstructed, data_range=1.0)

        abs_error = np.abs(original - reconstructed).flatten()

        metrics_by_resolution[height]['psnr'].append(psnr_val)
        metrics_by_resolution[height]['ssim'].append(ssim_val)
        metrics_by_resolution[height]['compression_ratio'].append(
            compression_results['compression_ratio']
        )
        metrics_by_resolution[height]['pixel_errors'].extend(abs_error.tolist())

        image_count_by_resolution[height] += 1

    except Exception as e:
        print(f"Skipping {input_filepath}: {e}")



print("\n=== Compression Performance & Per-Pixel Error Analysis ===")

for res in sorted(metrics_by_resolution.keys()):
    stats = metrics_by_resolution[res]

    if len(stats['psnr']) == 0:
        continue

    pixel_errors = np.array(stats['pixel_errors'])

    print(f"\nResolution: {res} × {res}")
    print(f"Images used: {len(stats['psnr'])}")
    print(f"PSNR: {np.mean(stats['psnr']):.2f} ± {np.std(stats['psnr']):.2f} dB")
    print(f"SSIM: {np.mean(stats['ssim']):.4f} ± {np.std(stats['ssim']):.4f}")
    print(f"CR:   {np.mean(stats['compression_ratio']):.2f} ± {np.std(stats['compression_ratio']):.2f}")

    print("Per-Pixel Error Statistics:")
    print(f"  Total pixels analyzed: {len(pixel_errors)}")
    print(f"  Mean absolute error:   {np.mean(pixel_errors):.6f}")
    print(f"  Max absolute error:    {np.max(pixel_errors):.6f}")
    print(f"  95th percentile:      {np.percentile(pixel_errors, 95):.6f}")
    print(f"  99th percentile:      {np.percentile(pixel_errors, 99):.6f}")


    if res in ERROR_PLOT_RES:
        plt.figure(figsize=(6, 4))
        plt.hist(pixel_errors, bins=100, log=True)
        plt.xlabel("Absolute Reconstruction Error")
        plt.ylabel("Pixel Count (log scale)")
        plt.title(f"Per-Pixel Error Distribution ({res}×{res})")
        plt.grid(True, linestyle="--", alpha=0.5)
        plt.tight_layout()
        plt.show()


In [None]:
# RUNTIME MEASUREMENT CELL
import time
import statistics
from collections import defaultdict
import cv2
import oqs
import secrets
import pydicom
import numpy as np


def timed(fn, *args, **kwargs):
    start = time.perf_counter()
    out = fn(*args, **kwargs)
    end = time.perf_counter()
    return out, (end - start) * 1000.0


runtime_by_size = defaultdict(lambda: {
    'count': 0,
    'compression_ms': [],
    'mlkem_ms': [],
    'encryption_ms': [],
    'decryption_ms': [],
    'total_ms': []
})


TARGET_RESOLUTIONS = {256, 800, 960}
MAX_IMAGES_PER_SIZE = 10

compressor = dcmCompressor(j2k_quality=90)
encryptor = MedicalImageEncryptor()

kem_receiver = oqs.KeyEncapsulation("ML-KEM-768")
pk_PQC = kem_receiver.generate_keypair()

receiver_id = b"Receiver-01"


for input_filepath in dicom_files:

        dcm = pydicom.dcmread(input_filepath, force=True)
        pixel_array = dcm.pixel_array.astype(np.float32)

        height, width = pixel_array.shape[:2]


        if height != width or height not in TARGET_RESOLUTIONS:
            continue

        image_size = (height, width)


        if runtime_by_size[image_size]['count'] >= MAX_IMAGES_PER_SIZE:
            continue


        if all(runtime_by_size[(r, r)]['count'] >= MAX_IMAGES_PER_SIZE for r in TARGET_RESOLUTIONS):
            break

        t_total_start = time.perf_counter()


        (_, t_comp) = timed(compressor.compress, input_filepath)
        compression_results = compressor.compress(input_filepath)
        reconstructed = compressor.decompress(compression_results)

        nonce = secrets.token_bytes(16)

        def kem_roundtrip():
            kem_sender = oqs.KeyEncapsulation("ML-KEM-768")
            ct, ss = kem_sender.encap_secret(pk_PQC)
            ss_rx = kem_receiver.decap_secret(ct)
            return ss, ss_rx

        ((shared_secret, shared_secret_rx), t_kem) = timed(kem_roundtrip)
        assert shared_secret == shared_secret_rx

        session_seed = derive_session_seed(shared_secret, nonce, receiver_id)
        K_img = derive_image_key(session_seed, dcm)
        chaos_params = map_key_to_chaos_params(K_img)

        compressed_uint8 = cv2.normalize(
            reconstructed, None, 0, 255, cv2.NORM_MINMAX
        ).astype(np.uint8)

        ((encrypted_img, _), t_enc) = timed(
            encryptor.encrypt_image,
            compressed_uint8,
            key_params=chaos_params
        )

        (_, t_dec) = timed(
            encryptor.decrypt_image,
            encrypted_img,
            chaos_params
        )

        t_total = (time.perf_counter() - t_total_start) * 1000.0

        runtime_by_size[image_size]['count'] += 1
        runtime_by_size[image_size]['compression_ms'].append(t_comp)
        runtime_by_size[image_size]['mlkem_ms'].append(t_kem)
        runtime_by_size[image_size]['encryption_ms'].append(t_enc)
        runtime_by_size[image_size]['decryption_ms'].append(t_dec)
        runtime_by_size[image_size]['total_ms'].append(t_total)



print("\n================ RUNTIME PERFORMANCE (PER IMAGE SIZE) ================\n")

for size in sorted(runtime_by_size.keys()):
    vals = runtime_by_size[size]
    if vals['count'] < 2:
        continue

    print(f"Image size: {size[0]}×{size[1]}  |  N = {vals['count']}")
    print(f"  Compression : {statistics.mean(vals['compression_ms']):.2f} ± {statistics.stdev(vals['compression_ms']):.2f} ms")
    print(f"  ML-KEM      : {statistics.mean(vals['mlkem_ms']):.2f} ± {statistics.stdev(vals['mlkem_ms']):.2f} ms")
    print(f"  Encryption  : {statistics.mean(vals['encryption_ms']):.2f} ± {statistics.stdev(vals['encryption_ms']):.2f} ms")
    print(f"  Decryption  : {statistics.mean(vals['decryption_ms']):.2f} ± {statistics.stdev(vals['decryption_ms']):.2f} ms")
    print(f"  End-to-End  : {statistics.mean(vals['total_ms']):.2f} ± {statistics.stdev(vals['total_ms']):.2f} ms\n")

In [None]:
#  NIST SP800-22 TESTS
import math


def image_to_bitstream(img_uint8):
    return ''.join(format(p, '08b') for p in img_uint8.flatten())



def monobit_test(bits):
    n = len(bits)
    s = sum(1 if b == '1' else -1 for b in bits)
    s_obs = abs(s) / math.sqrt(n)
    p = math.erfc(s_obs / math.sqrt(2))
    return p

def block_frequency_test(bits, M=128):
    n = len(bits)
    N = n // M
    chi_sq = 0
    for i in range(N):
        block = bits[i*M:(i+1)*M]
        pi = block.count('1') / M
        chi_sq += (pi - 0.5) ** 2
    chi_sq *= 4 * M
    p = math.exp(-chi_sq / 2)
    return p

def runs_test(bits):
    n = len(bits)
    pi = bits.count('1') / n
    if abs(pi - 0.5) > 2 / math.sqrt(n):
        return 0.0
    runs = 1 + sum(bits[i] != bits[i-1] for i in range(1, n))
    expected = 2 * n * pi * (1 - pi)
    variance = 2 * n * pi * (1 - pi) * (2 * pi * (1 - pi) - 1)
    z = abs(runs - expected) / math.sqrt(abs(variance))
    p = math.erfc(z / math.sqrt(2))
    return p

def approximate_entropy_test(bits, m=2):
    def phi(m):
        counts = {}
        for i in range(len(bits)):
            pattern = bits[i:i+m]
            if len(pattern) < m:
                pattern += bits[:m-len(pattern)]
            counts[pattern] = counts.get(pattern, 0) + 1
        total = sum(counts.values())
        return sum((v/total) * math.log(v/total) for v in counts.values())

    apen = phi(m) - phi(m+1)
    chi_sq = 2 * len(bits) * (math.log(2) - apen)
    p = math.exp(-chi_sq / 2)
    return p

def serial_test(bits, m=2):
    def count(m):
        c = {}
        for i in range(len(bits)):
            pattern = bits[i:i+m]
            if len(pattern) < m:
                pattern += bits[:m-len(pattern)]
            c[pattern] = c.get(pattern, 0) + 1
        return c

    c1 = count(m)
    c2 = count(m-1)
    psi1 = sum(v*v for v in c1.values()) * (2**m) / len(bits) - len(bits)
    psi2 = sum(v*v for v in c2.values()) * (2**(m-1)) / len(bits) - len(bits)
    delta = psi1 - psi2
    p = math.exp(-delta / 2)
    return p

# ===================== RUN NIST TESTS =====================
print("\n==================== NIST SP800-22 RANDOMNESS TESTS ====================")

bitstream = image_to_bitstream(encrypted_img)

nist_results = {
    "Monobit": monobit_test(bitstream),
    "Block Frequency": block_frequency_test(bitstream),
    "Runs": runs_test(bitstream),
    "Approximate Entropy": approximate_entropy_test(bitstream),
    "Serial": serial_test(bitstream)
}

for test, pval in nist_results.items():
    status = "PASS" if pval >= 0.01 else "FAIL"
    print(f"{test:22s} | p-value = {pval:.4f} | {status}")


In [None]:
#  UID COLLISION–BASED KEY SENSITIVITY TEST

import hmac, hashlib, secrets, sys, oqs, cv2, numpy as np, matplotlib.pyplot as plt
import pydicom
from skimage.metrics import peak_signal_noise_ratio as psnr, structural_similarity as ssim

def hkdf(ikm: bytes, salt: bytes, info: bytes, length=64):
    prk = hmac.new(salt, ikm, hashlib.sha256).digest()
    t = b""
    okm = b""
    c = 1
    while len(okm) < length:
        t = hmac.new(prk, t + info + bytes([c]), hashlib.sha256).digest()
        okm += t
        c += 1
    return okm[:length]

def derive_session_seed(shared_secret, nonce, receiver_id):
    return hkdf(shared_secret, nonce, receiver_id, 32)

def derive_image_key(session_seed, dcm):
    uid_blob = (
        dcm.StudyInstanceUID +
        dcm.SeriesInstanceUID +
        dcm.SOPInstanceUID +
        dcm.SOPClassUID
    ).encode()
    return hkdf(session_seed, b"DICOM", uid_blob, 64)

def map_key_to_chaos_params(K_img):
    v = [int.from_bytes(K_img[i:i+4], 'big') for i in range(0, 36, 4)]
    return {
        'x0': 0.1 + (v[0] % 1000) / 10000,
        'y0': 0.1 + (v[1] % 1000) / 10000,
        'z0': 0.1 + (v[2] % 1000) / 10000,
        'w0': 0.1 + (v[3] % 1000) / 10000,
        'a': 35.0 + (v[4] % 50) / 10,
        'b': 3.0 + (v[5] % 10) / 10,
        'c': 12.0 + (v[6] % 20) / 10,
        'd': 7.0 + (v[7] % 10) / 10,
        'e': 0.1 + (v[8] % 10) / 100
    }


def compute_npcr_uaci(C1, C2):
    diff = C1 != C2
    npcr = np.sum(diff) / diff.size * 100
    uaci = np.mean(np.abs(C1.astype(np.float32) - C2.astype(np.float32)) / 255) * 100
    return npcr, uaci


print(" UID Collision–Based Key Sensitivity Test")

input_filepath = "image path"
dcm = pydicom.dcmread(input_filepath, force=True)

compressor = dcmCompressor(j2k_quality=90)
encryptor = MedicalImageEncryptor()


compression_results = compressor.compress(input_filepath)
reconstructed = compressor.decompress(compression_results)

compressed_uint8 = cv2.normalize(
    reconstructed, None, 0, 255, cv2.NORM_MINMAX
).astype(np.uint8)


kem_receiver = oqs.KeyEncapsulation("ML-KEM-768")
pk = kem_receiver.generate_keypair()

kem_sender = oqs.KeyEncapsulation("ML-KEM-768")
ct, shared_secret = kem_sender.encap_secret(pk)
shared_secret_rx = kem_receiver.decap_secret(ct)

assert shared_secret == shared_secret_rx

nonce = secrets.token_bytes(16)
receiver_id = b"Receiver-01"
session_seed = derive_session_seed(shared_secret, nonce, receiver_id)


K_img_1 = derive_image_key(session_seed, dcm)
params_1 = map_key_to_chaos_params(K_img_1)

cipher_1, _ = encryptor.encrypt_image(compressed_uint8, key_params=params_1)


dcm_uid_perturbed = dcm.copy()
dcm_uid_perturbed.SOPInstanceUID = dcm.SOPInstanceUID[:-1] + (
    "1" if dcm.SOPInstanceUID[-1] != "1" else "2"
)

K_img_2 = derive_image_key(session_seed, dcm_uid_perturbed)
params_2 = map_key_to_chaos_params(K_img_2)

cipher_2, _ = encryptor.encrypt_image(compressed_uint8, key_params=params_2)


npcr, uaci = compute_npcr_uaci(cipher_1, cipher_2)

print("\n==================== UID SENSITIVITY RESULTS ====================")
print(f"SOPInstanceUID change: 1 digit")
print(f"NPCR : {npcr:.4f}%")
print(f"UACI : {uaci:.4f}%")
print(f"Key hash 1: {hashlib.sha256(K_img_1).hexdigest()[:16]}...")
print(f"Key hash 2: {hashlib.sha256(K_img_2).hexdigest()[:16]}...")


plt.figure(figsize=(10,4))
plt.subplot(1,2,1)
plt.imshow(cipher_1, cmap='gray')
plt.title("Ciphertext (Original UID)")
plt.axis('off')

plt.subplot(1,2,2)
plt.imshow(cipher_2, cmap='gray')
plt.title("Ciphertext (Perturbed UID)")
plt.axis('off')
plt.tight_layout()
plt.show()


#security analysis


In [None]:
def plot_difference_maps_from_pipeline(original_plain, encryptor, fixed_keys):

    plain1 = original_plain.copy()
    plain2 = original_plain.copy()

    # Safe minimal modification (no overflow)
    if plain2[0, 0] < 255:
        plain2[0, 0] += 1
    else:
        plain2[0, 0] -= 1

    cipher1, _ = encryptor.encrypt_image(plain1, key_params=fixed_keys)
    cipher2, _ = encryptor.encrypt_image(plain2, key_params=fixed_keys)

    plaintext_diff = np.abs(plain1.astype(np.int16) - plain2.astype(np.int16))
    ciphertext_diff = np.abs(cipher1.astype(np.int16) - cipher2.astype(np.int16))

    plt.figure(figsize=(10, 4))

    plt.subplot(1, 2, 1)
    plt.imshow(plaintext_diff, cmap='hot')
    plt.colorbar(fraction=0.046, pad=0.04)
    plt.title("(a) Plaintext Difference Map")
    plt.axis("off")

    plt.subplot(1, 2, 2)
    plt.imshow(ciphertext_diff, cmap='hot')
    plt.colorbar(fraction=0.046, pad=0.04)
    plt.title("(b) Ciphertext Difference Map")
    plt.axis("off")

    plt.suptitle("Difference Maps Under Known-Plaintext Scenario", fontsize=12)
    plt.tight_layout()
    plt.show()

plot_difference_maps_from_pipeline(
    compressed_uint8,
    encryptor,
    fixed_keys
)


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import io
import cv2

def plot_encrypted_primary_vs_residual_histograms(
    compression_results,
    compressor,
    encryptor,
    fixed_keys
):
    """
    Plot histogram comparison between encrypted primary and encrypted residual streams.
    """


    primary_img = np.array(
        Image.open(io.BytesIO(compression_results['j2k_primary']))
    ).astype(np.float32)

    primary_img = compressor._normalize_pixel_data(
        primary_img, compressor.metadata['BitsStored']
    )
    primary_uint8 = cv2.normalize(primary_img, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)


    residual_img = np.array(
        Image.open(io.BytesIO(compression_results['j2k_residual']))
    ).astype(np.float32)

    residual_img = cv2.normalize(residual_img, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)


    encrypted_primary, _ = encryptor.encrypt_image(primary_uint8, key_params=fixed_keys)
    encrypted_residual, _ = encryptor.encrypt_image(residual_img, key_params=fixed_keys)


    plt.figure(figsize=(10, 4))


    plt.subplot(1, 2, 1)
    plt.hist(encrypted_primary.flatten(), bins=256, range=(0, 255), color='gray')
    plt.title("(a) Encrypted Primary Stream")
    plt.xlabel("Pixel Intensity")
    plt.ylabel("Frequency")


    plt.subplot(1, 2, 2)
    plt.hist(encrypted_residual.flatten(), bins=256, range=(0, 255), color='gray')
    plt.title("(b) Encrypted Residual Stream")
    plt.xlabel("Pixel Intensity")
    plt.ylabel("Frequency")

    plt.suptitle("Histogram Comparison of Encrypted Primary and Residual Streams", fontsize=12)
    plt.tight_layout()
    plt.show()
plot_encrypted_primary_vs_residual_histograms(
    compression_results,
    compressor,
    encryptor,
    fixed_keys
)
