In [20]:
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes
from cryptography.fernet import Fernet
import time

# Generate RSA Keys
def generate_key_pair():
    private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
    public_key = private_key.public_key()
    return private_key, public_key

A_private, A_public = generate_key_pair()
B_private, B_public = generate_key_pair()

# ACL Roles
ACL = {
    "admin": ["read", "write", "delete"],
    "analyst": ["read", "write"],
    "guest": ["read"]
}

# Authentication Function
def authenticate(signer_private, verifier_public, message):
    try:
        signature = signer_private.sign(
            message,
            padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH),
            hashes.SHA256()
        )
        verifier_public.verify(
            signature,
            message,
            padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH),
            hashes.SHA256()
        )
        return True, signature
    except Exception:
        return False, None

# Authorization Function
def authorize(role, action):
    allowed_actions = ACL.get(role.lower(), [])
    return action in allowed_actions

# AES Encryption
def aes_communication(shared_key, message):
    cipher = Fernet(shared_key)
    encrypted = cipher.encrypt(message)
    decrypted = cipher.decrypt(encrypted)
    return encrypted.decode(), decrypted.decode()

# Replay Protection
def verify_timestamped_message(message_with_time, window=30):
    try:
        msg_parts = message_with_time.split(b'||')
        msg = msg_parts[0]
        timestamp = float(msg_parts[1].decode())
        current_time = time.time()
        if abs(current_time - timestamp) > window:
            return False, "Replay attack detected!"
        return True, msg.decode()
    except:
        return False, "Invalid message format!"
last_timestamp = 0

def run_scenario(title, sender_priv, receiver_priv, role, action, message, spoofed=False, replay=False):
    global last_timestamp

    # Use old timestamp if simulating replay attack
    if replay:
        timestamp = last_timestamp  # resend same timestamp
    else:
        timestamp = time.time()

    timestamped = message + b'||' + str(timestamp).encode()

    # Authentication
    if spoofed:
        auth_A_to_B, _ = authenticate(sender_priv, receiver_priv.public_key(), timestamped)
        auth_B_to_A, _ = authenticate(receiver_priv, sender_priv.public_key(), timestamped)
    else:
        auth_A_to_B, _ = authenticate(sender_priv, sender_priv.public_key(), timestamped)
        auth_B_to_A, _ = authenticate(receiver_priv, receiver_priv.public_key(), timestamped)

    # Authorization
    if auth_A_to_B and auth_B_to_A:
        authz = authorize(role, action)
    else:
        authz = "Skipped (authentication failed)"

    # AES
    aes_key = Fernet.generate_key()
    encrypted, decrypted = aes_communication(aes_key, message)
    if not (auth_A_to_B and auth_B_to_A and authz == True):
        decrypted = "Not allowed"

    # Replay Check
    if auth_A_to_B and auth_B_to_A:
        try:
            msg_parts = timestamped.split(b'||')
            msg = msg_parts[0]
            ts = float(msg_parts[1].decode())
            current_time = time.time()

            if ts < last_timestamp:
                replay_valid = False
                replay_result = "⚠️ Detected replay attack! Hacker reused an old message."
            elif abs(current_time - ts) > 30:
                replay_valid = False
                replay_result = "Replay attack detected (expired)."
            else:
                replay_valid = True
                replay_result = msg.decode()
                last_timestamp = ts  # only update if valid
        except:
            replay_valid = False
            replay_result = "Invalid timestamp format!"
    else:
        replay_valid = False
        replay_result = "Authentication failed — Replay check not performed"

    # Output
    print(f"{title}:")
    print(f"Authentication A -> B: {auth_A_to_B}\n")
    print(f"Authentication B -> A: {auth_B_to_A}\n")
    print(f"Role: {role}\n")
    print(f"Requested Action: {action}\n")
    print(f"Authorization Result: {authz}\n")
    print(f"Encrypted AES Message: {encrypted}\n")
    print(f"Decrypted AES Message: {decrypted}\n")
    print(f"Replay Check Valid: {replay_valid}\n")
    print(f"Replay Check Result: {replay_result}\n")


In [24]:
run_scenario("Scenario 1", A_private, B_private, "analyst", "write", b"Hello") 
print()
run_scenario("Scenario 2", A_private, B_private, "analyst", "delete", b"Hello 2")
print()

run_scenario("Scenario 3", B_private, A_private, "guest", "read", b"Hello 3", spoofed=True)
print()

run_scenario("Scenario 4 (Replay Attack)", A_private, B_private, "analyst", "write", b"Hello", replay=True)


Scenario 1:
Authentication A -> B: True

Authentication B -> A: True

Role: analyst

Requested Action: write

Authorization Result: True

Encrypted AES Message: gAAAAABoE0LuLLnzx1Vs3XPYX4iBVOTuVTJkr7U3Fy5WaL8rtDba31codRrsO659ENQt_U5F3nfs5SN6AIz_rmcgw57F8WApgQ==

Decrypted AES Message: Hello

Replay Check Valid: True

Replay Check Result: Hello


Scenario 2:
Authentication A -> B: True

Authentication B -> A: True

Role: analyst

Requested Action: delete

Authorization Result: False

Encrypted AES Message: gAAAAABoE0LulTQJiKBDhMgABX2BmtPxMMV7tVd9VhP3x8J8a_99OYrMGBsxbA_7g6WIul7livoCNR5yeUnguZR6DpMqkIkqcg==

Decrypted AES Message: Not allowed

Replay Check Valid: True

Replay Check Result: Hello 2


Scenario 3:
Authentication A -> B: False

Authentication B -> A: False

Role: guest

Requested Action: read

Authorization Result: Skipped (authentication failed)

Encrypted AES Message: gAAAAABoE0Lu2oMoWJMj8mJCZPA8TwysdLA0ZNIBXzXhcLwXZfZLI5lSh8ZFbH-RtNUBsadu-bWXUTrgvfRskQcAPMPTDlWGtg==

Decry