In [4]:
# Run in Google Colab (Python 3.10) or local Python (>=3.8)
# Writes CSV to ./ec_lwe_result.csv and ./ec_lwe_result_summary.csv

import time
import secrets
import hashlib
import csv
from statistics import mean, stdev
from collections import defaultdict

# -----------------------
# Curve parameters (secp256k1-like)
# -----------------------
p = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F
a = 0
b = 7
Gx = 55066263022277343669578718895168534326250603453777594175500187360389116729240
Gy = 32670510020758816978083085130507043184471273380659243275938904335757337482424
P = (Gx, Gy)
n = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141

# -----------------------
# Field and EC arithmetic (prototype)
# -----------------------
def mod_inv(x):
    return pow(x % p, p - 2, p)

def ec_add(P1, P2):
    if P1 is None:
        return P2
    if P2 is None:
        return P1
    x1, y1 = P1
    x2, y2 = P2
    if x1 == x2 and (y1 + y2) % p == 0:
        return None
    if P1 == P2:
        lam = (3 * x1 * x1 + a) * mod_inv(2 * y1) % p
    else:
        lam = (y2 - y1) * mod_inv((x2 - x1) % p) % p
    x3 = (lam * lam - x1 - x2) % p
    y3 = (lam * (x1 - x3) - y1) % p
    return (x3, y3)

def ec_mul(k, Pt):
    R = None
    Q = Pt
    k = k % n
    while k:
        if k & 1:
            R = ec_add(R, Q)
        Q = ec_add(Q, Q)
        k >>= 1
    return R

# -----------------------
# Point compression / decompression
# -----------------------
def compress_point(Pt):
    x, y = Pt
    prefix = b'\x02' if y % 2 == 0 else b'\x03'
    return prefix + x.to_bytes(32, 'big')

def decompress_point(data):
    prefix = data[0]
    x = int.from_bytes(data[1:], 'big')
    rhs = (pow(x, 3, p) + b) % p
    y = pow(rhs, (p + 1) // 4, p)
    if (y % 2 == 0 and prefix == 0x02) or (y % 2 == 1 and prefix == 0x03):
        return (x, y)
    return (x, (-y) % p)

# -----------------------
# RFC9380-like hash-to-curve (prototype)
# -----------------------
def hash_to_curve(msg):
    h = hashlib.sha256(msg).digest()
    x = int.from_bytes(h, 'big') % p
    y = pow((x**3 + 7) % p, (p + 1) // 4, p)
    return (x, y)

# -----------------------
# Key derivation (key separation)
# -----------------------
def kdf_enc_mac(shared_bytes):
    K_enc = hashlib.sha256(shared_bytes + b'\x00').digest()
    K_mac = hashlib.sha256(shared_bytes + b'\x01').digest()
    return K_enc, K_mac

# -----------------------
# EC-LWE core
# -----------------------
def key_generation():
    k_B = secrets.randbelow(n - 1) + 1
    Q_B = ec_mul(k_B, P)
    return k_B, Q_B

def encode_shared_point(S):
    return S[0].to_bytes(32, 'big') + S[1].to_bytes(32, 'big')

def encrypt_chunk(chunk, SID, nonce, Q_B):
    M = int.from_bytes(chunk, 'big') % p
    r = secrets.randbelow(n - 1) + 1
    C1 = ec_mul(r, P)
    S = ec_mul(r, Q_B)

    shared = encode_shared_point(S) + SID.encode() + nonce
    K_enc, K_mac = kdf_enc_mac(shared)

    e = hash_to_curve(K_enc)
    C2 = (M + e[0]) % p

    C1_ser = compress_point(C1)
    mac_data = C1_ser + C2.to_bytes(32, 'big') + SID.encode() + nonce
    tag = hashlib.sha256(K_mac + mac_data).digest()

    return C1_ser, C2, nonce, tag

def decrypt_chunk(C1_ser, C2, nonce, tag, k_B, SID):
    C1 = decompress_point(C1_ser)
    S = ec_mul(k_B, C1)

    shared = encode_shared_point(S) + SID.encode() + nonce
    K_enc, K_mac = kdf_enc_mac(shared)

    mac_data = C1_ser + C2.to_bytes(32, 'big') + SID.encode() + nonce
    expected = hashlib.sha256(K_mac + mac_data).digest()
    if not secrets.compare_digest(tag, expected):
        return None

    e = hash_to_curve(K_enc)
    M = (C2 - e[0]) % p
    return M.to_bytes(32, 'big')

# -----------------------
# Chunking
# -----------------------
def chunk_message(msg, size=32):
    out = []
    for i in range(0, len(msg), size):
        block = msg[i:i+size]
        if len(block) < size:
            block += b'\x00' * (size - len(block))
        out.append(block)
    return out

# -----------------------
# Benchmark
# -----------------------
def run_benchmark(message_sizes, runs_per_size):
    records = []
    for size in message_sizes:
        for run in range(1, runs_per_size + 1):
            msg = secrets.token_bytes(size)
            SID = secrets.token_hex(8)
            nonce0 = secrets.token_bytes(16)

            t0 = time.perf_counter()
            k_B, Q_B = key_generation()
            keygen = time.perf_counter() - t0

            chunks = chunk_message(msg)
            ciphertext = []

            t0 = time.perf_counter()
            for i, ch in enumerate(chunks):
                nonce = (int.from_bytes(nonce0, 'big') + i).to_bytes(16, 'big')
                ciphertext.append(encrypt_chunk(ch, SID, nonce, Q_B))
            enc = time.perf_counter() - t0

            t0 = time.perf_counter()
            recovered = b''
            for C1, C2, nonce, tag in ciphertext:
                pt = decrypt_chunk(C1, C2, nonce, tag, k_B, SID)
                if pt is None:
                    recovered = b''
                    break
                recovered += pt
            dec = time.perf_counter() - t0

            recovered = recovered[:len(msg)]
            status = "OK" if recovered == msg else "FAIL"

            ct_size = len(ciphertext) * (33 + 32 + 32)
            records.append([size, run, keygen, enc, dec, ct_size, status])
    return records

# -----------------------
# CSV output
# -----------------------
def write_csv(records):
    with open("ec_lwe_result.csv", "w", newline="") as f:
        w = csv.writer(f)
        w.writerow(["MsgSize","Run","KeyGen","Enc","Dec","CiphertextSize","Status"])
        w.writerows(records)

    summary = defaultdict(list)
    for r in records:
        summary[r[0]].append(r)

    with open("ec_lwe_result_summary.csv", "w", newline="") as f:
        w = csv.writer(f)
        w.writerow(["MsgSize","KeyGenMean","EncMean","DecMean","CiphertextSize"])
        for size, vals in summary.items():
            w.writerow([
                size,
                f"{mean(v[2] for v in vals):.6f}",
                f"{mean(v[3] for v in vals):.6f}",
                f"{mean(v[4] for v in vals):.6f}",
                vals[0][5]
            ])

# -----------------------
# Main
# -----------------------
if __name__ == "__main__":
    SIZES = [16, 32, 64, 128, 256, 512, 1024]
    RUNS = 50
    recs = run_benchmark(SIZES, RUNS)
    write_csv(recs)
    print("Benchmark complete.")


Benchmark complete.


In [None]:
import secrets
import hashlib
import math
from collections import Counter

# ============================================================
#  Elliptic-curve parameters (secp256k1-like)
# ============================================================
p = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F
a, b = 0, 7

Gx = 55066263022277343669578718895168534326250603453777594175500187360389116729240
Gy = 32670510020758816978083085130507043184471273380659243275938904335757337482424
P = (Gx, Gy)

# ============================================================
#  Field & EC operations (prototype)
# ============================================================
def mod_inv(x):
    return pow(x % p, p - 2, p)

def ec_add(P1, P2):
    if P1 is None:
        return P2
    if P2 is None:
        return P1
    x1, y1 = P1
    x2, y2 = P2
    if x1 == x2 and (y1 + y2) % p == 0:
        return None
    if P1 == P2:
        lam = (3 * x1 * x1) * mod_inv(2 * y1) % p
    else:
        lam = (y2 - y1) * mod_inv((x2 - x1) % p) % p
    x3 = (lam * lam - x1 - x2) % p
    y3 = (lam * (x1 - x3) - y1) % p
    return (x3, y3)

def ec_mul(k, P):
    R = None
    Q = P
    while k:
        if k & 1:
            R = ec_add(R, Q)
        Q = ec_add(Q, Q)
        k >>= 1
    return R

# ============================================================
# RFC 9380 Simplified SWU (prototype)
# ============================================================
def hash_to_field(msg, count=2):
    dst = b"EC-LWE_XMD:SHA-256_SSWU_"
    b0 = hashlib.sha256(msg + dst).digest()
    b1 = hashlib.sha256(b0 + dst).digest()
    uniform = (b0 + b1)[:count * 32]
    return [
        int.from_bytes(uniform[32*i:32*(i+1)], 'big') % p
        for i in range(count)
    ]

def map_to_curve_simple_swu(u):
    Z = -11 % p
    tv1 = (u * u) % p
    tv1 = (tv1 * Z) % p
    x1 = mod_inv((tv1 + 1) % p)
    x1 = (-b * x1) % p
    gx = (x1 * x1 * x1 + b) % p
    y = pow(gx, (p + 1) // 4, p)
    return (x1, y)

def hash_to_curve_rfc9380(msg):
    u0, u1 = hash_to_field(msg)
    Q1 = map_to_curve_simple_swu(u0)
    Q2 = map_to_curve_simple_swu(u1)
    R = ec_add(Q1, Q2)
    return R if R is not None else P

# ============================================================
# Key separation (aligned with main EC-LWE)
# ============================================================
def kdf_enc(shared_bytes):
    return hashlib.sha256(shared_bytes + b'\x00').digest()

# ============================================================
# EC-LWE (noise-only encryption for entropy test)
# ============================================================
def key_generation():
    k = secrets.randbelow(p - 1) + 1
    Q = ec_mul(k, P)
    return k, Q

def encrypt_single_chunk(chunk, SID, nonce, Q_B):
    M = int.from_bytes(chunk, 'big') % p
    r = secrets.randbelow(p - 1) + 1

    C1 = ec_mul(r, P)
    S = ec_mul(r, Q_B)

    shared = (
        S[0].to_bytes(32, 'big') +
        S[1].to_bytes(32, 'big') +
        SID.encode() +
        nonce
    )

    K_enc = kdf_enc(shared)
    e = hash_to_curve_rfc9380(K_enc)

    C2 = (M + e[0]) % p
    return C2.to_bytes(32, 'big')

# ============================================================
# Shannon entropy computation
# ============================================================
def shannon_entropy(data_bytes):
    counts = Counter(data_bytes)
    total = len(data_bytes)
    return -sum((c / total) * math.log2(c / total) for c in counts.values())

# ============================================================
# Ciphertext diversity experiment
# ============================================================
def test_ciphertext_diversity(runs=100):
    SID = "EntropySession"
    nonce0 = secrets.token_bytes(16)
    nonce0_int = int.from_bytes(nonce0, 'big')

    message = b"HelloEC-LWE!!"  # fixed plaintext
    chunk = message.ljust(32, b'\x00')

    _, Q_B = key_generation()
    masked_chunks = []

    for i in range(runs):
        nonce_i = (nonce0_int + i).to_bytes(16, 'big')
        C2_bytes = encrypt_single_chunk(chunk, SID, nonce_i, Q_B)
        masked_chunks.append(C2_bytes)

    flat = b''.join(masked_chunks)
    entropy = shannon_entropy(flat)
    unique = len(set(masked_chunks))

    print(f"Runs                : {runs}")
    print(f"Unique C2 values    : {unique}/{runs}")
    print(f"Entropy (bits/byte) : {entropy:.4f}")

# ============================================================
# Run tests
# ============================================================
if __name__ == "__main__":
    for r in [100, 200, 500, 1000]:
        test_ciphertext_diversity(r)
        print("-" * 50)


Runs                : 100
Unique C2 values    : 100/100
Entropy (bits/byte) : 7.9382
--------------------------------------------------
Runs                : 200
Unique C2 values    : 200/200
Entropy (bits/byte) : 7.9746
--------------------------------------------------
Runs                : 500
Unique C2 values    : 500/500
Entropy (bits/byte) : 7.9889
--------------------------------------------------
Runs                : 1000
Unique C2 values    : 1000/1000
Entropy (bits/byte) : 7.9938
--------------------------------------------------


In [None]:
import secrets

# --------------------------------------------------
# Assumes the following functions already exist
# from your authenticated EC-LWE implementation:
#
# key_generation()
# encrypt_chunk(chunk, SID, nonce, Q_B)
# decrypt_chunk(C1_ser, C2, nonce, tag, k_B, SID)
# --------------------------------------------------

def tampering_test():
    print("=== Tampering / Malleability Test ===")

    # Fixed plaintext (one chunk)
    message = b"AttackTestBlock".ljust(32, b'\x00')
    SID = "TamperSession"
    nonce = secrets.token_bytes(16)

    # Key generation
    k_B, Q_B = key_generation()

    # Encrypt
    C1_ser, C2, nonce, tag = encrypt_chunk(message, SID, nonce, Q_B)

    # Tamper with ciphertext (flip 1 bit in C2)
    tampered_C2 = C2 ^ 1  # flip least significant bit

    # Attempt decryption
    result = decrypt_chunk(C1_ser, tampered_C2, nonce, tag, k_B, SID)

    if result is None:
        print("[PASS] Tampering detected — decryption aborted.")
    else:
        print("[FAIL] Tampering NOT detected!")

# --------------------------------------------------
# Run the test
# --------------------------------------------------
if __name__ == "__main__":
    tampering_test()


=== Tampering / Malleability Test ===
[PASS] Tampering detected — decryption aborted.
