# Open Private Join and Activation Cryptography Overview

This document provides an overview of the cryptographic primitives, operations, workflows found in the IAB Tech Lab [Open Private Join and Activation](https://iabtechlab.com/datacleanrooms/) (OPJA) standard. This is accomplished by presenting high-level template implementations (written using Python 3) of encryption/decryption operations, protocol participants, and (for context) some data management workflows. The underlying cryptographic primitives required for those workflows are invoked via the interface provided by the [cryptography.io](https://cryptography.io) and [oblivious](https://oblivious.readthedocs.io) Python libraries.

The workflow implementations in this document are only *illustrations* that can serve as a guide and aid in understanding the OPJA standard (*e.g.*, when assembling a prototype, development, or production implementation of a component that conforms to OPJA). These illustrations do not exhaustively acknowledge or address all security, privacy, performance, scalability, software engineering, and information technology issues that may be considered in production implementations.

The definitions and implementatinos found in this document are organized according to common encapsulation, modularity, and reuse principles drawn from the practice of software engineering. Thus, the  order in which they appear may not match the OPJA standard.

## Dependencies

This document requires Python 3.11 and is designed to be viewed and executed using [Jupyter Notebook](https://jupyter.org/). The document also relies on a few additional dependencies. All required dependencies can be found in the accompanying `requirements.txt` file.

The Python class and function definitions in this document are annotated with their types.

In [1]:
from __future__ import annotations
from typing import Optional, Tuple, Sequence

A number of built-in libraries are used throughout this document.

In [2]:
import os
import struct
import random
import secrets
import base64
import uuid
import collections

Cryptographic primitives within this document are invoked via the interface provided by the [cryptography.io](https://cryptography.io) Python library. In many environments, installing the library should be sufficient. However, in some cases, there may be a mismatch between the latest [cryptography.io](https://cryptography.io) release and the particular version of OpenSSL (or equivalent) that is installed or against which the installed version of Python is compiled.

In [3]:
import cryptography

## Common Cryptographic Operations

OPJA workflows rely on both a symmetric-key block cipher and a hybrid public-key cryptosystem. The sections below present implementation variants for each of these two schemes.

### Symmetric-Key Block Cipher: AES-128 GCM

Minimal implementations of AES-128 GCM encryption and decryption functions are presented below, based on an [example](https://cryptography.io/en/latest/hazmat/primitives/symmetric-encryption/#cryptography.hazmat.primitives.ciphers.modes.GCM) found in the [cryptography.io](https://cryptography.io) documentation.

In [4]:
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey

def aes_128_gcm_encrypt(
        key_shared: bytes,
        plaintext: bytes,
        associated_data: bytes,
        nonce: bytes
    ) -> Tuple[bytes, bytes]:
    """
    Encrypt a plaintext (coupled with unencrypted associated data and
    using the specified nonce); return the ciphertext and accompanying tag.
    """
    # Construct an AES-GCM ``Cipher`` object with the given ``key`` and a
    # randomly generated ``nonce``.
    encryptor = Cipher(algorithms.AES(key_shared), modes.GCM(nonce)).encryptor()

    # ``associated_data`` will be authenticated but not encrypted,
    # it must also be passed in on decryption.
    encryptor.authenticate_additional_data(associated_data)

    # Encrypt the plaintext and get the associated ciphertext.
    # GCM does not require padding.
    ciphertext = encryptor.update(plaintext) + encryptor.finalize()

    return (ciphertext, encryptor.tag)

def aes_128_gcm_decrypt(
        key_shared: bytes,
        associated_data: bytes,
        ciphertext: bytes,
        tag: bytes,
        nonce: bytes
    ) -> bytes:
    """
    Decrypt a ciphertext (coupled with the associated data, tag, and
    nonce that were involved in the original encryption of the ciphertext).
    """
    # Construct a ``Cipher`` object, with the ``key``, ``nonce``, and
    # the GCM ``tag`` used for authenticating the message.
    decryptor = Cipher(algorithms.AES(key_shared), modes.GCM(nonce, tag)).decryptor()

    # Put ``associated_data`` back or the ``tag`` will fail to verify
    # when the decryptor is finalized.
    decryptor.authenticate_additional_data(associated_data)

    # Decryption gets us the authenticated plaintext.
    # If the tag does not match an InvalidTag exception will be raised.
    return decryptor.update(ciphertext) + decryptor.finalize()

Below is a simple test of the two functions defined above. The sender and receiver each generate their own key pair. Then, each uses their own secret key and the other party's public key to construct their own copy of the shared key.

In [5]:
sender_key_secret = X25519PrivateKey.generate()
sender_key_public = sender_key_secret.public_key()

receiver_key_secret = X25519PrivateKey.generate()
receiver_key_public = receiver_key_secret.public_key()

sender_key_shared = sender_key_secret.exchange(receiver_key_public)
receiver_key_shared = receiver_key_secret.exchange(sender_key_public)

nonce = secrets.token_bytes(12)
(ciphertext, tag) = aes_128_gcm_encrypt(sender_key_shared, b'message', b'assoc data', nonce)
assert (
    b'message'
    ==
    aes_128_gcm_decrypt(receiver_key_shared, b'assoc data', ciphertext, tag, nonce)
)

### Hybrid Cryptosystem: Hybrid Public Key Encryption (HPKE)

Minimal implementations of HPKE authenticated encryption and decryption functions (as defined in [RFC 9180](https://www.rfc-editor.org/rfc/rfc9180.html)) are presented below. The implementations below correspond to the ciphersuite found within the OPJA specification:

* mode: **Auth**,
* Key Encapsulation Machanism (KEM): **DHKEM (X25519, HKDF-SHA256)**,
* Key Derivation Functions (KDF): **HKDF-SHA256**, and
* Authenticated Encryption with Associated Data (AEAD): **AES-128-GCM**.

In [6]:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, hmac
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from cryptography.hazmat.primitives.ciphers import aead
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey, X25519PublicKey

HPKE_VERSION = b'HPKE-v1'
AUTH = 2
DHKEM_X25519_HKDF_SHA256 = 0x0020
HKDF_SHA256 = 0x0001
AES128_GCM = 0x0001

AEAD_KEY_SIZE = 16
DHKEM_X25519_HKDF_SHA256_SECRET_SIZE = 32

def labeled_extract(suite_id: bytes, salt: bytes, label: bytes, ikm: bytes) -> bytes:
    ctx = hmac.HMAC(salt, hashes.SHA256(), backend=default_backend())
    ctx.update(HPKE_VERSION + suite_id + label + ikm) # Labeled ``ikm``.
    return ctx.finalize()

def labeled_expand(suite_id: bytes, prk: bytes, label: bytes, info: bytes, length: int) -> bytes:
    labeled_info = struct.pack('>H', length) + HPKE_VERSION + suite_id + label + info
    assert length <= 255 * hashes.SHA256().digest_size
    t_n_minus_1 = b''
    n = 1
    data = b''
    while len(data) < length:
        ctx = hmac.HMAC(prk, hashes.SHA256(), backend=default_backend())
        ctx.update(t_n_minus_1 + labeled_info + n.to_bytes(1, byteorder='big'))
        t_n_minus_1 = ctx.finalize()
        data += t_n_minus_1
        n += 1
    return data[:length]

def extract_and_expand(suite_id: bytes, dh: bytes, kem_context: bytes, length: int) -> bytes:
    eae_prk = labeled_extract(suite_id, b'', b'eae_prk', dh)
    return labeled_expand(suite_id, eae_prk, b'shared_secret', kem_context, length)

def key_schedule(shared_secret: bytes, psk_id: bytes = b'') -> bytes:
    suite_id = b'HPKE' + struct.pack('>HHH', DHKEM_X25519_HKDF_SHA256, HKDF_SHA256, AES128_GCM)
    psk_id_hash = labeled_extract(suite_id, b'', b'psk_id_hash', psk_id)
    info_hash = labeled_extract(suite_id, b'', b'info_hash', b'')
    secret = labeled_extract(suite_id, shared_secret, b'secret', b'')
    key_schedule_context = bytes([AUTH]) + psk_id_hash + info_hash
    return labeled_expand(suite_id, secret, b'key', key_schedule_context, AEAD_KEY_SIZE)

def hpke_x25519_sha256_sha256_aes_128_gcm_auth_encap(pkr: bytes, sks: bytes) -> Tuple[bytes, bytes]:
    suite_id = b'KEM' + struct.pack('>H', DHKEM_X25519_HKDF_SHA256)
    ek_secret = X25519PrivateKey.generate()
    ek_public = ek_secret.public_key()
    enc = ek_public.public_bytes(Encoding.Raw, PublicFormat.Raw)
    dh = ek_secret.exchange(pkr) + sks.exchange(pkr)
    pks = sks.public_key()
    kem_context = (
        enc +
        pkr.public_bytes(Encoding.Raw, PublicFormat.Raw) +
        pks.public_bytes(Encoding.Raw, PublicFormat.Raw)
    )
    shared_secret = extract_and_expand(suite_id, dh, kem_context, DHKEM_X25519_HKDF_SHA256_SECRET_SIZE)
    return (enc, key_schedule(shared_secret))

def hpke_x25519_sha256_sha256_aes_128_gcm_auth_decap(enc: bytes, skr: bytes, pks: bytes) -> bytes:
    suite_id = b'KEM' + struct.pack('>H', DHKEM_X25519_HKDF_SHA256)
    pke = X25519PublicKey.from_public_bytes(enc)
    pkr = skr.public_key()
    dh = skr.exchange(pke) + skr.exchange(pks)
    kem_context = (
        enc +
        pkr.public_bytes(Encoding.Raw, PublicFormat.Raw) +
        pks.public_bytes(Encoding.Raw, PublicFormat.Raw)
    )
    shared_secret = extract_and_expand(suite_id, dh, kem_context, DHKEM_X25519_HKDF_SHA256_SECRET_SIZE)
    return key_schedule(shared_secret)

Below is a simple test of the two functions defined above. The sender and receiver each generate their own key pair. Then, each uses their own secret key and the other party's public key to construct their own copy of the shared key.

In [7]:
sender_key_secret = X25519PrivateKey.generate()
sender_key_public = sender_key_secret.public_key()

receiver_key_secret = X25519PrivateKey.generate()
receiver_key_public = receiver_key_secret.public_key()

(key_encapsulated, sender_key_shared) =\
    hpke_x25519_sha256_sha256_aes_128_gcm_auth_encap(
        receiver_key_public,
        sender_key_secret
    )
receiver_key_shared =\
    hpke_x25519_sha256_sha256_aes_128_gcm_auth_decap(
        key_encapsulated,
        receiver_key_secret,
        sender_key_public
    )

nonce = secrets.token_bytes(12)
(ciphertext, tag) = aes_128_gcm_encrypt(sender_key_shared, b'message', b'assoc data', nonce)
assert (
    b'message'
    ==
    aes_128_gcm_decrypt(receiver_key_shared, b'assoc data', ciphertext, tag, nonce)
)

## Matching Protocol

The OPJA specification presents two possible reference designs for the matching system: one based on private set intersection (PSI) and one relying on a trusted execution environment (TEE). The example workflow presented in this section illustrates the relevant cryptographic steps within the PSI reference design using the [oblivious](https://oblivious.readthedocs.io) library, which provides high-level wrappers for [ristretto255](https://datatracker.ietf.org/doc/html/draft-irtf-cfrg-ristretto255-decaf448) objects and operations.

In [8]:
from oblivious.ristretto import point, scalar

publisher_key = scalar()
publisher_ids = [
    'alice@example.org',
    'bob@example.org',
    'carla@example.org'
]

advertiser_key = scalar()
advertiser_ids = [
    'bob@example.org',
    'carla@example.org',
    'dan@example.org'
]

# Publisher and advertiser each mask their own data.
publisher_ids_masked = [
    publisher_key * point.hash(publisher_id.encode('utf-8'))
    for publisher_id in publisher_ids
]
advertiser_ids_masked = [
    advertiser_key * point.hash(advertiser_id.encode('utf-8'))
    for advertiser_id in advertiser_ids
]

# Publisher masks advertiser's masked data and shuffles it.
advertiser_ids_masked_twice = [
    publisher_key * advertiser_id_masked
    for advertiser_id_masked in advertiser_ids_masked
]
random.shuffle(advertiser_ids_masked_twice)

# Advertiser masks publisher's masked data but does not shuffle it.
publisher_ids_masked_twice = [
    advertiser_key * publisher_id_masked
    for publisher_id_masked in publisher_ids_masked
]

# The matching system can now compare the twice-masked
# data sets. The sizes of the intersections are the same.
assert(
    len(set(publisher_ids) & set(advertiser_ids))
    ==
    len(
        set(publisher_ids_masked_twice) 
        &
        set(advertiser_ids_masked_twice)
    )
)

# Because the publisher's data was not shuffled, the matching
# system can determine the label for each publisher-masked
# identifier.
[
    (
        # Publisher-masked identifier (understood by publisher).
        publisher_id_masked,

        # Label (encrypted before delivery to publisher).
        publisher_id_masked_twice in advertiser_ids_masked_twice
    )
    for (publisher_id_masked, publisher_id_masked_twice) in zip(
        publisher_ids, 
        publisher_ids_masked_twice
    )
]

[('alice@example.org', False),
 ('bob@example.org', True),
 ('carla@example.org', True)]

## Activation Protocol

The activation protocol within the OPJA specification consists of a number of component operations and workflows. This section presents implementations of some of those components and a simplified example scenario that illustrates how the components interoperate. It is important to note that the scenario should be viewed as abstract: practical considerations (such as how to implement asynchronous communication channels between different participants) are ignored.

### Label Encryption and Decryption

The activation protocol involves the preparation and delivery of *encrypted labels*. Below are minimal implementations of functions for encrypting and decrypting an ordered collection of labels. These implementations adhere to the OPJA specification.

In [9]:
def encrypt_labels(
        key: bytes,
        match_transaction_id: bytes,
        ls: Sequence[bool]
    ) -> Sequence[str]:
    """
    Encrypt a sequence of boolean labels and return the sequence of
    ciphertexts.
    """
    es = []
    nonce_base = os.urandom(12)

    for (i, l) in enumerate(ls):

        # The boolean label encoded as a bytes-like object.
        b = bytes([255 if l else 0])

        # Generate label-specific nonce..
        nonce = bytes([x ^ y for (x, y) in zip(i.to_bytes(12, 'big'), nonce_base)])

        # Create encrypted label from ``b``.
        (c, t) = aes_128_gcm_encrypt(
            key,
            b,
            match_transaction_id,
            nonce
        )
        
        # Add Base64-encoded encrypted label to result list.
        e = base64.standard_b64encode(nonce + c + t).decode()
        es.append(e)

    return es

def decrypt_labels(
        key: bytes,
        match_transaction_id: bytes,
        cs: Sequence[str]
    ) -> Sequence[bool]:
    """
    Decrypt a sequence of encrypted label ciphertexts and return the
    original labels.
    """
    ls = []

    for (i, c) in enumerate(cs):
        
        # Retrieve raw byte vector from Base64-encoded string.
        raw = base64.standard_b64decode(c.encode())

        # Disassemble raw byte vector into constituent parts.
        (nonce, c, tag) = (raw[:12], raw[12:-16], raw[-16:])
    
        # Decrypt, convert to a boolean value, and add to result list.
        l = 255 == aes_128_gcm_decrypt(key, match_transaction_id, c, tag, nonce)[0]
        ls.append(l)

    return ls

Below is a simple test of the functions defined above.

In [10]:
sender_key_secret = X25519PrivateKey.generate()
sender_key_public = sender_key_secret.public_key()

receiver_key_secret = X25519PrivateKey.generate()
receiver_key_public = receiver_key_secret.public_key()

sender_key_shared = sender_key_secret.exchange(receiver_key_public)
receiver_key_shared = receiver_key_secret.exchange(sender_key_public)

ls = [True, False, True, False]
match_transaction_id = b'1234567890'
es = encrypt_labels(sender_key_shared, match_transaction_id, ls)
ls_ = decrypt_labels(receiver_key_shared, match_transaction_id, es)
assert ls == ls_

### Participants

All participants (publishers, advertisers, matching systems, DSPs, and SSPs) must have the capacity to perform some basic cryptographic key management operations. The class definition below includes methods corresponding to these operations.

In [11]:
class Participant:
    """
    Functionalities common to all participants.
    """
    def __init__(self: Participant, identifier: Optional[str] = None):
        """
        Each participant has a unique identifier
        """
        self.identifier = uuid.uuid4() if identifier is None else identifier

    def generate_key_pair(self: Participant) -> Tuple[bytes, bytes]:
        """
        Generate an individual public-private key pair.
        """
        key_private = X25519PrivateKey.generate()
        key_public = key_private.public_key()
        return (key_public, key_private)

    def initial_key_pairs(self: Participant):
        """
        Prepare an ordered collection of five public-private key pairs.
        The "first" key is at the right-most end of the ordered collection.
        """
        self.key_pairs = collections.deque([self.generate_key_pair() for _ in range(5)])
        
    def rotate_key_pairs(self: Participant):
        """
        Rotate the key pairs by removing the "last" (left-most end) key pair
        in the collection and adding a newly generated key pair.
        """
        self.key_pairs.popleft()
        self.key_pairs.append(self.generate_key_pair())

    def first_key_pair(self: Participant) -> Tuple[bytes, bytes]:
        """
        Return the newest key pair (*i.e.*, right-most end).
        """
        return self.key_pairs[-1]

    def first_key_public(self: Participant) -> bytes:
        """
        Return the public key from the newest key pair (*i.e.*, right-most end).
        """
        return self.first_key_pair()[0]

    def first_key_private(self: Participant) -> bytes:
        """
        Return the private key from the newest key pair (*i.e.*, right-most end).
        """
        return self.first_key_pair()[1]


### Example Workflow

Below is a workflow sketch (using the functions and classes defined above) of a simplified activation workflow between a matching system and a DSP. Note that in the sketch below, the delivery of the encapsulated key to the activation system (via the advertiser) and the distinct offline phase in which the activation system derives and caches the shared key are both simplified and folded into the method that decrypts the encrypted labels.

In [16]:
class MatchingSystem(Participant):
    """
    Functionalities specific to matching systems.
    """
    def encrypt_labels_for(
            self: MatchingSystem,
            participant: Participant,
            match_transaction_id: bytes,
            ls: Sequence[bool]
        ) -> Tuple[bytes, Sequence[str]]:
        (key_encapsulated, key_shared) = hpke_x25519_sha256_sha256_aes_128_gcm_auth_encap(
            participant.first_key_public(),
            self.first_key_private()
        )
        return (
            key_encapsulated,
            encrypt_labels(key_shared, match_transaction_id, ls)
        )

class ActivationSystem(Participant):
    """
    Functionalities specific to activation systems.
    """
    def decrypt_labels_from(
            self: ActivationSystem,
            participant: Participant,
            key_encapsulated: bytes,
            match_transaction_id: bytes,
            es: Sequence[str]
        ) -> Sequence[bool]:
        key_shared = hpke_x25519_sha256_sha256_aes_128_gcm_auth_decap(
            key_encapsulated,
            self.first_key_private(),
            participant.first_key_public()
        )
        return decrypt_labels(key_shared, match_transaction_id, es)

In [17]:
matching_system = MatchingSystem()
matching_system.initial_key_pairs()

dsp = ActivationSystem()
dsp.initial_key_pairs()

labels = [True, False, True, False]
match_transaction_id = b'1234567890'

# Obtain both the encapsulated key and the encrypted labels
# from the matching system.
(key_encapsulated, labels_encrypted) = matching_system.encrypt_labels_for(
    dsp,
    match_transaction_id, 
    labels
)

# Provide both the encapsulated key and the encrypted labels
# to the activation system.
labels_decrypted = dsp.decrypt_labels_from(
    matching_system,
    key_encapsulated,
    match_transaction_id,
    labels_encrypted
)

assert labels == labels_decrypted

In [14]:
# End of file.