# Easy

In [8]:
import base64
import json
import os
from hashlib import sha256

from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad

# ── CONFIG ────────────────────────────────────────────────────────────────
# Use a 32-byte key for AES-256
RAW_KEY   = b"super-secret-audit-key"        # <- replace with your secret
KEY       = sha256(RAW_KEY).digest()         # 32-byte key
BLOCKSIZE = 16                               # AES block size (bytes)

# ── CORE HELPERS ──────────────────────────────────────────────────────────
def _encrypt(plaintext: bytes) -> bytes:
    iv     = os.urandom(BLOCKSIZE)
    cipher = AES.new(KEY, AES.MODE_CBC, iv)
    return iv + cipher.encrypt(pad(plaintext, BLOCKSIZE))   # prepend IV

def _decrypt(ciphertext: bytes) -> bytes:
    iv, c  = ciphertext[:BLOCKSIZE], ciphertext[BLOCKSIZE:]
    cipher = AES.new(KEY, AES.MODE_CBC, iv)
    return unpad(cipher.decrypt(c), BLOCKSIZE)

# ── API ───────────────────────────────────────────────────────────────────
def append_id(chain_b64: str | None, new_id: str) -> str:
    """
    Add one source-record ID to the provenance chain.
    Returns the *new* composite ID (base64).
    """
    payload = {
        "id"  : new_id,
        "next": chain_b64         # None on the very first layer
    }
    plaintext  = json.dumps(payload, separators=(",", ":")).encode()
    ciphertext = _encrypt(plaintext)
    return base64.urlsafe_b64encode(ciphertext).decode()

def unravel_chain(chain_b64: str) -> list[str]:
    """
    Peel the chain layer-by-layer, yielding the source IDs in *reverse*
    (outermost first).  Stops automatically when 'next' is null.
    """
    ids, current = [], chain_b64
    while current:
        print("current:", current)
        blob       = base64.urlsafe_b64decode(current.encode())
        plaintext  = _decrypt(blob)
        payload    = json.loads(plaintext)
        ids.append(payload["id"])
        current    = payload["next"]
    return ids


In [9]:

source_ids = ["9b1deb4d-b5c6-4a2a-a8ff-1086db0dbeda",
                "9b1deb4d-b5c6-4a2a-a8ff-1086db0dbee0",
                "9b1deb4d-b5c6-4a2a-a8ff-1086db0dbee1"]

# Build the composite ID (fast, one layer per source)
composite = None
for sid in source_ids:
    composite = append_id(composite, sid)

print("Composite provenance ID:\n", composite, "\n")

# Audit / unravel (slow: one decrypt per source)
recovered = unravel_chain(composite)
print("Recovered IDs (outer → inner):")
for r in recovered:
    print("  •", r)

Composite provenance ID:
 ox-aosQ7rOikwaeE4rJtUp84Uk2EAhjlERKQV-N3BDrFKXOevN8_hl80a-f0z0XK2q6KrSuw3cJCcBMp_ddNX0QIzL-RF54UnkSXAPNtFOEZdEsf4kzU9_NIyDNA3MewPaR8mNuVNekjorPNwgf3PqQiioXDCeasOWudcn7rsGDxQGlo5iUWQE_0Ata6AuP7s_YRLyHANhv2YwJYxiG1oP1w-YuLIxVzmxFKFj1IAY19tROhhcM7ylaWqNx5LK0X9C5EvXlRFXaKCz2JeJu7MieeVihmdAOG0_5YSxlSADxRRwy-z_9Bv1_9gfu6wjW0NPfN1tJaBpauLQPi7h8XrDVue2vbwgsxcELTszhiSd6ijdaNGi0BGrqR13JS6L-4SIP6vPq9WkUgB9vXMElUbN3rj6giuwiAhKJiWGqkEdg_LQWUXTqSQiTxeKLIVdtB 

current: ox-aosQ7rOikwaeE4rJtUp84Uk2EAhjlERKQV-N3BDrFKXOevN8_hl80a-f0z0XK2q6KrSuw3cJCcBMp_ddNX0QIzL-RF54UnkSXAPNtFOEZdEsf4kzU9_NIyDNA3MewPaR8mNuVNekjorPNwgf3PqQiioXDCeasOWudcn7rsGDxQGlo5iUWQE_0Ata6AuP7s_YRLyHANhv2YwJYxiG1oP1w-YuLIxVzmxFKFj1IAY19tROhhcM7ylaWqNx5LK0X9C5EvXlRFXaKCz2JeJu7MieeVihmdAOG0_5YSxlSADxRRwy-z_9Bv1_9gfu6wjW0NPfN1tJaBpauLQPi7h8XrDVue2vbwgsxcELTszhiSd6ijdaNGi0BGrqR13JS6L-4SIP6vPq9WkUgB9vXMElUbN3rj6giuwiAhKJiWGqkEdg_LQWUXTqSQiTxeKLIVdtB
current: KeUpuXkuOjEehDqwfNqazGRN8CWZSx0A3aXMC0k8D5Q4ZyLseViEZjSH

# More Advanced


In [10]:
"""
Production-style onion provenance chain.
  • AES-256-GCM  (authenticated, nonce = 12B, tag = 16B)
  • Each layer = 12-byte nonce || ciphertext || 16-byte tag
  • Plaintext  = 16-byte UUID  || next_layer (bytes or b"")
  • Storage    = base64url( layer_bytes )
Requires:  pip install cryptography
"""
import uuid

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers.aead import AESGCM

# ── CONFIG ──────────────────────────────────────────────────────────────
# 32-byte master key:  derive from KMS / Vault in real life
_RAW_KEY = os.getenv("PROVENANCE_KEY", "demo-insecure-key").encode()
KEY = hashes.Hash(hashes.SHA256(), backend=default_backend())
KEY.update(_RAW_KEY)
AES_KEY = KEY.finalize()                      # 32 bytes
AEAD    = AESGCM(AES_KEY)

UUID_SIZE = 16                               # raw bytes
NONCE_SIZE = 12                              # GCM standard
TAG_SIZE = 16                                # GCM standard


# ── LOW-LEVEL HELPERS ───────────────────────────────────────────────────
def _encrypt_layer(plaintext: bytes) -> bytes:
    nonce = os.urandom(NONCE_SIZE)
    ct    = AEAD.encrypt(nonce, plaintext, associated_data=None)
    # ct already contains the 16-byte auth tag at its end
    return nonce + ct                        # total = 12 + len(ct)

def _decrypt_layer(blob: bytes) -> bytes:
    nonce, ct = blob[:NONCE_SIZE], blob[NONCE_SIZE:]
    return AEAD.decrypt(nonce, ct, associated_data=None)


# ── PUBLIC API ──────────────────────────────────────────────────────────
def append_id(chain_b64: str | None, new_uuid: str | uuid.UUID) -> str:
    """
    Return a new composite provenance ID (base64url string) by
    wrapping `new_uuid` around the existing chain (or start fresh).
    """
    if isinstance(new_uuid, str):
        new_uuid = uuid.UUID(new_uuid)
    next_layer = base64.urlsafe_b64decode(chain_b64.encode()) if chain_b64 else b""

    plaintext  = new_uuid.bytes + next_layer  # 16B + prior_cipher
    cipher     = _encrypt_layer(plaintext)    # bytes
    return base64.urlsafe_b64encode(cipher).decode()


def unravel_chain(chain_b64: str) -> list[str]:
    """
    Fully audit the composite ID, peeling layers until empty.
    Returns UUID strings from outermost → innermost.
    """
    ids, blob = [], base64.urlsafe_b64decode(chain_b64.encode())

    while blob:
        plaintext = _decrypt_layer(blob)
        uid_bytes, blob = plaintext[:UUID_SIZE], plaintext[UUID_SIZE:]
        ids.append(str(uuid.UUID(bytes=uid_bytes)))
    return ids

In [11]:
sources = [uuid.uuid4() for _ in range(4)]

comp = None
for s in sources:
    comp = append_id(comp, s)
print("[Composite ID]\n", comp, "\n")

recovered = unravel_chain(comp)
print("[Recovered IDs]")
for r in recovered:
    print(" •", r)

[Composite ID]
 adsRgoIy-NoXt6PYJWyQms_feEx2LnMamkdqafly6HUUnN-oO8IOp34v62XSjemX6LPDuLPnEZZSYgOJ0yHAZ5teYSdTgIK_zqONvISS-qpxlKOu1RxScSipDWWUZysbZNzOR6Whruyz0-1lhgF2UVMIotFcUQv52KakudCvLPGm7Y_oIPXd9A1XfAFhM31kW0x1rXMnJiczkQ6QHok-_-jtV8tq72JBalq-p6oTcKY= 

[Recovered IDs]
 • c4405fb7-1164-4793-81eb-15dc2dbd7021
 • 91e1b6eb-121d-42ac-8d57-7a93749b5341
 • 47ef63cf-0e19-4db1-ab75-f989a9e45fa8
 • 02f2b229-8885-47d2-b2f2-aeaab04a99d2


# Onion Style?

In [17]:
"""
 - Deterministic nonce, no per-layer nonce stored
 - Depth (uint32) stored once so decoder can regenerate nonces
 - AES-256-GCM, 128-bit tag
 Layout: 4B salt | 4B depth | ciphertext
   ciphertext layer i   = AES_GCM( nonce(depth=i), UUID || inner_cipher )
   nonce(salt, i)       = HMAC_SHA256(master, salt || i)[:12]
"""
import hashlib
import hmac
import os
import struct
import uuid

from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers.aead import AESGCM

RAW_KEY  = os.getenv("PROV_KEY", "demo-key").encode()
MASTER_K = hashlib.sha256(RAW_KEY).digest()          # 32 B AES key
AEAD     = AESGCM(MASTER_K)                          # 16-byte tag
UUID_LEN = 16
TAG_LEN  = 16
NONCE_LEN = 12

def _nonce(salt: bytes, depth: int) -> bytes:
    msg = salt + struct.pack(">I", depth)
    return hmac.new(MASTER_K, msg, hashlib.sha256).digest()[:NONCE_LEN]

# ── API ────────────────────────────────────────────────────────────────
def append_id(chain_b64: str | None, new_uuid: str | uuid.UUID) -> str:
    uid = uuid.UUID(str(new_uuid)).bytes

    if chain_b64 is None:                       # first / innermost layer
        salt   = os.urandom(4)                  # record-level salt
        depth  = 0                              # this new layer's index
        inner  = b""
    else:
        raw    = base64.urlsafe_b64decode(chain_b64.encode())
        salt   = raw[:4]
        depth0 = struct.unpack(">I", raw[4:8])[0]
        inner  = raw[8:]
        depth  = depth0 + 1                     # outer layer gets +1

    nonce      = _nonce(salt, depth)
    ciphertext = AEAD.encrypt(nonce, uid + inner, None)

    header = salt + struct.pack(">I", depth)
    return base64.urlsafe_b64encode(header + ciphertext).decode()

def unravel_chain(chain_b64: str) -> list[str]:
    raw    = base64.urlsafe_b64decode(chain_b64.encode())
    salt   = raw[:4]
    depth  = struct.unpack(">I", raw[4:8])[0]
    blob   = raw[8:]
    ids    = []

    while True:
        nonce      = _nonce(salt, depth)
        plaintext  = AEAD.decrypt(nonce, blob, None)
        ids.append(str(uuid.UUID(bytes=plaintext[:UUID_LEN])))

        blob  = plaintext[UUID_LEN:]
        if not blob:                            # reached sentinel inner ""
            break
        depth -= 1
    return ids

In [19]:
if __name__ == "__main__":
    src_ids = [uuid.uuid4() for _ in range(10)]

    comp = None
    for s in src_ids:
        print("Appending:", s, "to", comp)
        comp = append_id(comp, s)

    print("Composite ID length:", len(comp), "chars (base64url)")
    print("Recovered (outer ➜ inner):")
    for sid in unravel_chain(comp):
        print(" •", sid)

Appending: e327eec6-d77a-4d1b-8d94-05877c6bf8ce to None
Appending: 14270013-7954-4e77-a219-e9e527679e5a to kbQy9QAAAAC417AYx8Af8rU0bKbaUqIeJ45Lnx9TqCpvZsVk--v5qQ==
Appending: 193e86e0-f9ba-41c3-97aa-ec7a8c4d5883 to kbQy9QAAAAEljE4Xnpisn9H786EobMydnB-x2jptOmrC7nXCYLXkklVVmwsKzB7oTbK_axcc2Bl1s1pwPD-gFxNJZ_51HwQ3
Appending: 7b529adf-725c-444e-a9a9-329eb0adefef to kbQy9QAAAALh3wTs7YtFiDWJKzbbaiWKbdlhS-Y870c4OfoGMvBIsZF9j6YYerBOQDqjwCZGUWvi7RMlNaVJyigtZ2S6is3Pk13Oha2HxS_DRfzk1Np_B9tV5KvOaOBnwbSgDUqdJYo=
Appending: aaa5ec05-9fdc-40ba-912c-b5b6e6608370 to kbQy9QAAAANQ4L897REdKFbrpy4_TYiT9FgpFj1towH_5UuzaC8A97xBhIWEdVdem0gbmyHH5GdeH2DgL77WfTrI3sYkyhsAgCwIUdUfE5BkkqdTkqcB3m73049OJK8g4PIXu8Je2-monNEtkPdBkbtLXTvjGMoZBIGM7Hy3rLQN9hl3wFN9mQ==
Appending: 799bfca6-a4c7-47b3-a91c-8629042c872c to kbQy9QAAAAQmG45Lsrw8ExnaWZBk32oQF-NomfzV89E54dEGjeNoii7dQE-oDzOcn13ArmYr0R5bzUbyNb9FlMpxnIG7MMkjysVWNtcp0f5vv0uuZXl3pHBu6Q57R72wAZwWWOFs-1p1L0VJjeKNb2DDQJCXOOMZTnDOi8u4yPOJR0K5gP85kYH9sAjg_R-MzuCAw7zDKadv5O6gC