In [17]:
import subprocess

import os
key_hex = os.urandom(64).hex()
print(key_hex)


def batch_encrypt(codes: list[str], key_hex: str) -> list[str]:
    proc = subprocess.Popen(
        ["wsl", "./aes_siv_tool/target/release/aes_siv_tool", "encrypt", "--key", key_hex, "--batch"],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True
    )
    input_data = "\n".join(codes) + "\n"
    stdout, stderr = proc.communicate(input=input_data)
    if proc.returncode != 0:
        raise RuntimeError(f"Encryption failed:\n{stderr}")
    return stdout.strip().splitlines()

def batch_decrypt(ciphertexts: list[str], key_hex: str) -> list[str]:
    proc = subprocess.Popen(
        ["wsl", "./aes_siv_tool/target/release/aes_siv_tool", "decrypt", "--key", key_hex, "--batch"],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True
    )
    input_data = "\n".join(ciphertexts) + "\n"
    stdout, stderr = proc.communicate(input=input_data)
    if proc.returncode != 0:
        raise RuntimeError(f"Decryption failed:\n{stderr}")
    return stdout.strip().splitlines()



8f5cc21d394fff4145717222a01046d5645ff614524c68c33c7f2d5e4ea339fdf5c9ed66494012f3e7cc697effd30001841c9d97dbc254bbac93900944a170dd


In [29]:
codes = ["USER001", "USER002", "ADMIN001"]
encrypted = batch_encrypt(codes, key_hex)
print("Encrypted:", encrypted)

# decrypted = batch_decrypt(encrypted, key_hex)
# print("Decrypted:", decrypted)


Encrypted: ['3s+WuBJEx7Yrm7jWFvXZKjkZF6x0mCg=', 'g4mcPThWmzkcekN7Yp3Fl/dGKCTKGGU=', 'IdAY0ldznwmOLjBOcpuKMVtNDSmpfmss']


In [5]:
import base64
from cryptography.hazmat.primitives.ciphers.aead import AESSIV

zero_block = b"\x00" * 16

def pad_b64(s: str) -> bytes:
    """
    Decode a Base64 string, adding padding if necessary.
    """
    return base64.b64decode(s + '=' * (-len(s) % 4))


def encrypt_code(code: str, key_hex: str) -> str:
    """
    Encrypt a UTF-8 string using AES-SIV with a hex-encoded key.

    Key must be 256, 384, or 512 bits (hex length 64, 96, or 128 characters).
    Returns a Base64-encoded ciphertext.
    """
    key = bytes.fromhex(key_hex)
    if len(key) not in (32, 48, 64):
        raise ValueError(
            "Invalid key length: must be 256, 384, or 512 bits (hex length 64, 96, or 128 chars)"
        )
    siv = AESSIV(key)
    # AESSIV.encrypt(data: bytes, associated_data: Sequence[bytes])
    ct = siv.encrypt(code.encode("utf-8"), [zero_block, zero_block])
    return base64.b64encode(ct).decode("utf-8")


def decrypt_code(ciphertext_b64: str, key_hex: str) -> str:
    """
    Decrypt a Base64-encoded ciphertext using AES-SIV with a hex-encoded key.

    Accepts inputs with or without '=' padding. Key must be 256, 384, or 512 bits.
    Returns the decrypted UTF-8 string.
    """
    key = bytes.fromhex(key_hex)
    if len(key) not in (32, 48, 64):
        raise ValueError(
            "Invalid key length: must be 256, 384, or 512 bits (hex length 64, 96, or 128 chars)"
        )
    siv = AESSIV(key)
    ct = pad_b64(ciphertext_b64.strip())
    # AESSIV.decrypt(data: bytes, associated_data: Sequence[bytes])
    pt = siv.decrypt(ct, [zero_block, zero_block])
    return pt.decode("utf-8")


In [11]:
encrypt_code('aaaaa55555', "41c4fce8128d376642accfda50a1c22f63f4659ae4cd42ec05038c1bdc48c72e646b84265cd13db63d86fc5e826339dc4b2cbc51d0351a8610d6c235cbd8adba")

'ZtMnE9Yp20f9zSU/PfiJVUzBNWdfsh/C2bc='

In [3]:
import pandas
import datetime


df = pandas.DataFrame({"test1":[f"CODE{i:05d}" for i in range(1000000)]})


In [6]:
now = datetime.datetime.now()
df['encrypted'] = df['test1'].apply(encrypt_code, key_hex="41c4fce8128d376642accfda50a1c22f63f4659ae4cd42ec05038c1bdc48c72e646b84265cd13db63d86fc5e826339dc4b2cbc51d0351a8610d6c235cbd8adba")

print(datetime.datetime.now()-now)

0:00:12.566971


In [30]:
df['encrypted']

0        sDidWDQqFFn187SVcb0nAmWWnfIdpTptig==
1        Vee3UEv4RaNKzsqUjmUDW0IQz0Pd74muPQ==
2        nYLxqzXP1UjwGLwiiu9YaygsypkLk0kACQ==
3        cyRljjNNjkJniG8ZDQ1leXRw8T6tcZfYQw==
4        Rqvg6alkz+2nvT78BK+Ub/t6esbecy5G/w==
                         ...                 
99995    uUgCsRbC5a0E16ymybnW/K5YIq/MR3QsLw==
99996    deTCI4wgxfb+pqSXCWwt94hPaKhudad7uw==
99997    2NJvq4L2P86BwpPGYJ0/HF2Xm6OEFPlWmw==
99998    iiRWl+e5qHAFnbI6SNhVwY1psuIyM51KkQ==
99999    3YDzHdNJpxOl8IThy2HxMn9g0kgib0G1vA==
Name: encrypted, Length: 100000, dtype: object

In [12]:
df

Unnamed: 0,test1
0,CODE00000
1,CODE00001
2,CODE00002
3,CODE00003
4,CODE00004
...,...
99995,CODE99995
99996,CODE99996
99997,CODE99997
99998,CODE99998


In [21]:
decrypt_code('TE226ueCrN5mLfrnDyE9BaB56LhNuGHEukU=', "41c4fce8128d376642accfda50a1c22f63f4659ae4cd42ec05038c1bdc48c72e646b84265cd13db63d86fc5e826339dc4b2cbc51d0351a8610d6c235cbd8adba")

InvalidTag: 

In [78]:
s = 'mzyBVeqShbPHiSR0TPg6fKZxAKvz'
s + '=' * (-len(s) % 4)

'mzyBVeqShbPHiSR0TPg6fKZxAKvz'

In [None]:
import subprocess
from typing import List

RUST_AES_SIV_TOOL = "./target/release/aes_siv_tool"

def run_rust(mode: str, codes: List[str], key_hex: str) -> List[str]:
    """
    Calls the Rust AES-SIV tool in batch mode for either encrypt or decrypt.
    Returns a list of output lines corresponding to each input code.
    """
    # Build command: e.g. ["./aes_siv_tool", "encrypt", "--key", key_hex, "--batch"]
    cmd = ["wsl", RUST_AES_SIV_TOOL, mode, "--key", key_hex, "--batch"]
    # cmd = ["wsl", "echo"]
    
    # Launch subprocess with pipes
    proc = subprocess.Popen(
        cmd,
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True
    )
    # Send all codes as newline-separated, ending with a newline
    input_data = "\n".join(codes) + "\n"
    print(input_data)
    stdout, stderr = proc.communicate(input_data)
    print("enc done")
    if proc.returncode != 0:
        raise RuntimeError(f"Rust tool error ({mode}):\n{stderr.strip()}")
    
    # Split on lines, remove any trailing empties
    return [line for line in stdout.splitlines() if line]

def encrypt_batch(codes: List[str], key_hex: str, chunk_size: int = 5000) -> List[str]:
    """
    Encrypts a list of codes in chunks to manage memory and subprocess limits.
    """
    encrypted = []
    for i in range(0, len(codes), chunk_size):
        chunk = codes[i:i+chunk_size]
        encrypted.extend(run_rust("encrypt", chunk, key_hex))
    return encrypted

def decrypt_batch(ciphertexts: List[str], key_hex: str, chunk_size: int = 5000) -> List[str]:
    """
    Decrypts a list of ciphertexts in chunks.
    """
    decrypted = []
    for i in range(0, len(ciphertexts), chunk_size):
        chunk = ciphertexts[i:i+chunk_size]
        decrypted.extend(run_rust("decrypt", chunk, key_hex))
    return decrypted

# Example usage with 100,000 rows:
if __name__ == "__main__":
    import os
    from datetime import datetime

    
    # Generate dummy codes
    codes = [f"CODE{i:05d}" for i in range(10000)]
    key_hex = os.getenv("AES_SIV_KEY", "41c4fce8128d376642accfda50a1c22f63f4659ae4cd42ec05038c1bdc48c72e646b84265cd13db63d86fc5e826339dc4b2cbc51d0351a8610d6c235cbd8adba")  # set your 128-hex-digit key in env

    now = datetime.now()
    print(now)
    # Encrypt in batches
    encrypted_codes = encrypt_batch(codes, key_hex)
    print(f"Encrypted {len(encrypted_codes)} codes.")

    # Decrypt in batches to verify
    decrypted_codes = decrypt_batch(encrypted_codes, key_hex)
    assert decrypted_codes == codes
    print("Round-trip successful!")
    print(datetime.now() - now)


2025-07-25 09:13:50.964203
CODE00000
CODE00001
CODE00002
CODE00003
CODE00004
CODE00005
CODE00006
CODE00007
CODE00008
CODE00009
CODE00010
CODE00011
CODE00012
CODE00013
CODE00014
CODE00015
CODE00016
CODE00017
CODE00018
CODE00019
CODE00020
CODE00021
CODE00022
CODE00023
CODE00024
CODE00025
CODE00026
CODE00027
CODE00028
CODE00029
CODE00030
CODE00031
CODE00032
CODE00033
CODE00034
CODE00035
CODE00036
CODE00037
CODE00038
CODE00039
CODE00040
CODE00041
CODE00042
CODE00043
CODE00044
CODE00045
CODE00046
CODE00047
CODE00048
CODE00049
CODE00050
CODE00051
CODE00052
CODE00053
CODE00054
CODE00055
CODE00056
CODE00057
CODE00058
CODE00059
CODE00060
CODE00061
CODE00062
CODE00063
CODE00064
CODE00065
CODE00066
CODE00067
CODE00068
CODE00069
CODE00070
CODE00071
CODE00072
CODE00073
CODE00074
CODE00075
CODE00076
CODE00077
CODE00078
CODE00079
CODE00080
CODE00081
CODE00082
CODE00083
CODE00084
CODE00085
CODE00086
CODE00087
CODE00088
CODE00089
CODE00090
CODE00091
CODE00092
CODE00093
CODE00094
CODE00095
CODE00096
COD

In [1]:
import aes_siv_py

In [22]:
aes_siv_py.decrypt_batch()

TypeError: decrypt_batch() missing 2 required positional arguments: 'cts' and 'key_hex'

In [7]:
now = datetime.datetime.now()
# aes_siv_py.decrypt_batch(cts = aes_siv_py.encrypt_batch(codes = df['test1'], key_hex = "41c4fce8128d376642accfda50a1c22f63f4659ae4cd42ec05038c1bdc48c72e646b84265cd13db63d86fc5e826339dc4b2cbc51d0351a8610d6c235cbd8adba"), key_hex = "41c4fce8128d376642accfda50a1c22f63f4659ae4cd42ec05038c1bdc48c72e646b84265cd13db63d86fc5e826339dc4b2cbc51d0351a8610d6c235cbd8adba")
encrypted = aes_siv_py.encrypt_batch(codes = df['test1'], key_hex = "41c4fce8128d376642accfda50a1c22f63f4659ae4cd42ec05038c1bdc48c72e646b84265cd13db63d86fc5e826339dc4b2cbc51d0351a8610d6c235cbd8adba")
print(datetime.datetime.now()-now)
# print(encrypted[100])

0:00:00.602071
