# Citizen Measure: Vault Transit + Tink AEAD Study
You are helping design secure data handling for **Citizen Measure**, a global monitoring system where citizen-scientists collect sensitive evidence (water pollution, deforestation, child labor, trafficking). Devices may be confiscated or hacked; protecting encrypted data and keys is critical.

This notebook guides you through using **HashiCorp Vault Transit** (Key Encryption Key / KEK) to wrap a local **Tink AEAD keyset** (Data Encryption Key / DEK).

### Flow Overview
1. Generate a Transit key in the Vault UI (no code yet).
2. Demonstrate access to that key metadata in the terminal / notebook.
3. Generate local Tink keyset and envelope encrypt (wrap) with Transit key.
4. Encrypt application data with the unwrapped Tink primitive.
5. Rotate key in Vault UI; observe continued decrypt functionality.
6. Discuss rewrap (we do **not** implement; too complex for this study).

> Never paste production tokens or secrets into a shared notebook. All actions here are for an isolated dev instance.

## 0. Dependencies (run once if needed)

In [None]:
# Install required libs (skip if already installed).
%pip install --quiet tink requests
%pip3 install tink[hcvault]


Note: you may need to restart the kernel to use updated packages.
zsh:1: no matches found: tink[gcpkms]
zsh:1: no matches found: tink[gcpkms]
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


## 1. Create a key 
You can use this [link](https://127.0.0.1:8200/ui/vault/dashboard) to access the GUI. 

**What is a Transit key?**  The Vault *Transit* secrets engine does not store arbitrary data; instead it offers cryptographic operations. A *Transit key* (sometimes called a key encryption key / KEK) is managed by Vault and never leaves the server. When you ask Vault to encrypt something, it produces a ciphertext that embeds key version information. When you decrypt, Vault chooses the correct key version transparently. This lets you rotate keys without breaking existing ciphertext.

In this notebook we already enabled Transit key function for you. 

But you do **not** need to run those now.

Use the Vault UI to create the Transit key named `cm-data-key`. Ensure the Transit engine is enabled (UI will prompt if not).

In [None]:
VAULT_ADDR = 'https://127.0.0.1:8200'  # change if remote

# ! Make sure you replace the token with your own from Vault UI or CLI.
VAULT_TOKEN = 'hvs.Tbl5cLIq5ysiW3kiMN3TWbJ4'       # <-- replace before running
VAULT_KEY_NAME = 'cm-data-key'         # must match UI-created key
DEV_SKIP_VERIFY = True                 # set False for trusted certs
print('VAULT_ADDR:', VAULT_ADDR)

VAULT_ADDR: https://127.0.0.1:8200


## 2. Verify Transit Key Exists
Please use the Python cell below to show us that you are capable of obtaining the api key:

1) Python (the cell right below)
- The cell will send a GET request to the same endpoint and print the HTTP status and JSON body.
- Expected: `Status: 200` and a JSON object with `data.keys` and `latest_version`.
- If you see errors:
  - 404: The key name is wrong or the key hasn’t been created yet in the UI.
  - 403: The token lacks permissions; ensure you’re using a token with `transit/` read access.
  - TLS/verify issues: set `DEV_SKIP_VERIFY = True` in dev, or configure a trusted CA in prod.

Make sure the variables above are correct:
- `VAULT_ADDR` points to your Vault (e.g., https://127.0.0.1:8200).
- `VAULT_TOKEN` is set to a valid token.
- `VAULT_KEY_NAME` matches the key you created in the UI (default: `cm-data-key`).

In [3]:
import requests, json

# Uses VAULT_ADDR, VAULT_TOKEN, VAULT_KEY_NAME, DEV_SKIP_VERIFY from the config cell above
meta_resp = requests.get(
    f"{VAULT_ADDR}/v1/transit/keys/{VAULT_KEY_NAME}",
    headers={"X-Vault-Token": VAULT_TOKEN},
    verify=not DEV_SKIP_VERIFY,
)
print("Status:", meta_resp.status_code)
print(
    json.dumps(meta_resp.json(), indent=2)
    if meta_resp.status_code == 200
    else meta_resp.text
)


Status: 200
{
  "request_id": "1b42ed3b-5747-ed20-fe11-14c59365094f",
  "lease_id": "",
  "renewable": false,
  "lease_duration": 0,
  "data": {
    "allow_plaintext_backup": false,
    "auto_rotate_period": 0,
    "deletion_allowed": false,
    "derived": false,
    "exportable": false,
    "imported_key": false,
    "keys": {
      "1": 1763513503
    },
    "latest_version": 1,
    "min_available_version": 0,
    "min_decryption_version": 1,
    "min_encryption_version": 0,
    "name": "cm-data-key",
    "supports_decryption": true,
    "supports_derivation": true,
    "supports_encryption": true,
    "supports_signing": false,
    "type": "aes256-gcm96"
  },
  "wrap_info": null,
  "auth": null,
  "mount_type": "transit"
}




## 3. VaultTransitAead Adapter (Tink <-> Vault Transit)

### Task: Set up the KEK adapter (Vault Transit → Tink AEAD)

Now we will be using Google Tink for data encryption:

What you’ll do in the next code cell:
- Register Tink AEAD and define `VaultTransitAead`, which calls Vault Transit `/encrypt` and `/decrypt`.
- Instantiate `remote_aead` using your `VAULT_ADDR`, `VAULT_TOKEN`, and `VAULT_KEY_NAME`.

Success criteria:
- The cell runs without error and prints `Remote AEAD ready: <key-name>`.

Notes:
- This does not encrypt your application data yet; it prepares the KEK used in the next step to wrap the Tink keyset.
- If you get 403/404/TLS errors here, revisit the key verification section above to confirm access and configuration.

In [None]:
 

import base64
from absl import logging  # optional; you can drop absl entirely if you want

import tink
from tink import aead
from tink.integration import hcvault

# One-time Tink AEAD registration
aead.register()

# Associated data used to encrypt/decrypt the keyset itself
_KEYSET_ENC_AD = b'encrypted keyset example'


class VaultTransitAead(aead.Aead):
    def __init__(self, addr: str, token: str, key_name: str, verify_tls: bool = True):
        self.addr = addr.rstrip('/')
        self.key_name = key_name
        self.verify = verify_tls
        self.session = requests.Session()
        self.session.headers.update({'X-Vault-Token': token})

    def encrypt(self, plaintext: bytes, associated_data: bytes) -> bytes:
        payload = {'plaintext': base64.b64encode(plaintext).decode('ascii')}
        if associated_data:
            payload['context'] = base64.b64encode(associated_data).decode('ascii')
        resp = self.session.post(
            f"{self.addr}/v1/transit/encrypt/{self.key_name}",
            json=payload,
            verify=self.verify,
        )
        resp.raise_for_status()
        return resp.json()['data']['ciphertext'].encode('utf-8')

    def decrypt(self, ciphertext: bytes, associated_data: bytes) -> bytes:
        payload = {'ciphertext': ciphertext.decode('utf-8')}
        if associated_data:
            payload['context'] = base64.b64encode(associated_data).decode('ascii')
        resp = self.session.post(
            f"{self.addr}/v1/transit/decrypt/{self.key_name}",
            json=payload,
            verify=self.verify,
        )
        resp.raise_for_status()
        plaintext_b64 = resp.json()['data']['plaintext']
        return base64.b64decode(plaintext_b64)
    
def _get_remote_aead(vault_addr: str, token: str, key_name: str, verify_tls: bool) -> aead.Aead:
    return VaultTransitAead(vault_addr, token, key_name, verify_tls=verify_tls)


def _load_keyset(keyset_path: str, vault_addr: str, token: str, key_name: str, verify_tls: bool):
    remote_aead = _get_remote_aead(vault_addr, token, key_name, verify_tls)

    with open(keyset_path, "rt") as f:
        serialized = f.read()

    try:
        return tink.json_proto_keyset_format.parse_encrypted(
            serialized,
            remote_aead,
            _KEYSET_ENC_AD,
        )
    except tink.TinkError as e:
        raise RuntimeError(f"Keyset load/decrypt error: {e}")


def encrypt_file(
        input_path: str,
        output_path: str,
        keyset_path: str, 
        vault_addr: str, 
        token: str, 
        key_name: str, 
        verify_tls: bool, 
        associated_data: bytes = b""):
    
    # Normalize to bytes
    if isinstance(associated_data, str):
        ad = associated_data.encode()
    else:
        ad = associated_data or b""

    keyset = _load_keyset(keyset_path, vault_addr, token, key_name, verify_tls)
    cipher = keyset.primitive(aead.Aead)

    with open(input_path, "rb") as f:
        pt = f.read()

    ct = cipher.encrypt(pt, ad)
    print("Ciphertext:", ct)
    with open(output_path, "wb") as f:
        f.write(ct)


def decrypt_file(
        input_path: str, 
        output_path: str, 
        keyset_path: str, 
        vault_addr: str, 
        token: str, 
        key_name: str, 
        verify_tls: bool, 
        associated_data: bytes = b""
    ):
    # Normalize to bytes
    if isinstance(associated_data, str):
        ad = associated_data.encode()
    else:
        ad = associated_data or b""

    keyset = _load_keyset(keyset_path, vault_addr, token, key_name, verify_tls)
    cipher = keyset.primitive(aead.Aead)

    with open(input_path, "rb") as f:
        ct = f.read()

    pt = cipher.decrypt(ct, ad)
    print("Plaintext:", pt)
    with open(output_path, "wb") as f:
        f.write(pt)



# Using the tink + Hashicorp adaptor


In [None]:

VAULT_ADDR = 'https://127.0.0.1:8200'  # change if remote
VAULT_KEY_NAME = 'cm-data-key'         # must match UI-created key
DEV_SKIP_VERIFY = True 
print('VAULT_ADDR:', VAULT_ADDR)
print('VAULT_KEY_NAME:', VAULT_KEY_NAME)

encrypt_file(
    input_path="plaintext.txt",
    output_path="ciphertext.bin",
    keyset_path="me_keyset.json",
    vault_addr=VAULT_ADDR,
    token=VAULT_TOKEN,
    key_name=VAULT_KEY_NAME,
    verify_tls=not DEV_SKIP_VERIFY,
    associated_data="file encryption example",
)


decrypt_file(
    input_path="ciphertext.bin",
    output_path="decrypted.txt",
    keyset_path="me_keyset.json",
    vault_addr=VAULT_ADDR,
    token=VAULT_TOKEN,
    key_name=VAULT_KEY_NAME,
    verify_tls=not DEV_SKIP_VERIFY,
    associated_data="file encryption example",
)

VAULT_ADDR: https://127.0.0.1:8200
VAULT_KEY_NAME: cm-data-key
Ciphertext: b'\x01~\xd4e\x9a\xfc\xec\xc4\x03\xc5\xa0\xf4\xdc\x84\xe7\x1fmh\x10 >\x7f|\x86n\xd5\x17m\x90\xfe\xd78\xb8\x1a\xcb\x86N\x15+\xef\x97\xceK\xa5\xc6\r\x85\xe7]\x98xY\xb7h\x83$\xfb\xb7\x82\xe1'
Plaintext: b'this is a test, encrypt me!'




## 4. Generate & Envelope Encrypt Tink Keyset

In [23]:
from pathlib import Path

KEYSET_PATH = Path('encrypted_keyset.json')
KEYSET_ASSOCIATED_DATA = b'encrypted keyset example'


def generate_encrypted_keyset(keyset_path=KEYSET_PATH, associated_data=KEYSET_ASSOCIATED_DATA, remote=remote_aead):
    key_template = aead.aead_key_templates.AES128_GCM
    keyset_handle = tink.new_keyset_handle(key_template)
    encrypted_keyset = tink.json_proto_keyset_format.serialize_encrypted(
        keyset_handle, remote, associated_data
    )
    Path(keyset_path).write_text(encrypted_keyset)
    return keyset_handle


def load_encrypted_keyset(keyset_path=KEYSET_PATH, associated_data=KEYSET_ASSOCIATED_DATA, remote=remote_aead):
    serialized = Path(keyset_path).read_text()
    return tink.json_proto_keyset_format.parse_encrypted(
        serialized, remote, associated_data
    )


def primitive_from_keyset(keyset_handle=None, **kwargs):
    handle = keyset_handle or load_encrypted_keyset(**kwargs)
    return handle.primitive(aead.Aead)


def encrypt_file(input_path, output_path, primitive, associated_data=b''):
    plaintext = Path(input_path).read_bytes()
    ciphertext = primitive.encrypt(plaintext, associated_data)
    Path(output_path).write_bytes(ciphertext)
    return ciphertext


def decrypt_file(input_path, output_path, primitive, associated_data=b''):
    ciphertext = Path(input_path).read_bytes()
    plaintext = primitive.decrypt(ciphertext, associated_data)
    Path(output_path).write_bytes(plaintext)
    return plaintext


## 4. Rotate Transit Key in UI (No Code)

Use Vault UI: open transit key `cm-data-key` → Rotate. Prior ciphertext still decrypts because version info is embedded in the Vault ciphertext.

### Discussion Prompts
1. Why continue to decrypt without rewrap?
2. When would scheduled rewrap be beneficial?
3. What if key compromise is suspected?

### Some documentation: 

Tink core docs

- (Google Official Documentation)[https://developers.google.com/tink/key-management-overview]
- (Vault transit API)[https://developer.hashicorp.com/vault/api-docs/secret/transit]