# **Full Demo**

In [1]:
import numpy as np
import random
import hashlib
import matplotlib.pyplot as plt

Simulate user sampling n objects

In [2]:
def simulate_user_behavior(n, gamma, delta, preferred_pool):
    samples = []
    for _ in range(n):
        if random.random() < gamma:
            # Pick from a pool (relevant interest)
            if random.random() < delta:
                # Pick from preferred pool
                samples.append(random.choice(pools[preferred_pool]))
            else:
                # Pick from another non-neutral pool
                other = [k for k in ["A", "B"] if k != preferred_pool][0]
                samples.append(random.choice(pools[other]))
        else:
            # Pick from neutral pool
            samples.append(random.choice(neutral_pool))
    return samples

In [3]:
#to do: should also be set dynamically

#defining universe and pools - adversary choice
pool_A = ["👍🏻", "👋🏻"]
pool_B = ["👍🏿", "👋🏿"]
pool_C = ["👍🏽", "👋🏽"]
neutral_pool = ["😎", "😂"]

all_emojis = pool_A + pool_B + pool_C + neutral_pool
pools = {"A": pool_A, "B": pool_B, "C": pool_C, "N": neutral_pool}

emoji_pool = {e: "A" for e in pool_A}
emoji_pool.update({e: "B" for e in pool_B})
emoji_pool.update({e: "C" for e in pool_C})
emoji_pool.update({e: "N" for e in neutral_pool})

print("Emoji pool:")
print(emoji_pool)

#settings for user sampling - user behaviour
n = 7
gamma = 0.6
delta = 0.8
preferred_pool = "A"
epsilon = 4
m = 16

true_emojis = simulate_user_behavior(n, gamma, delta, preferred_pool)

print("Sampled emojis:")
print(true_emojis)

Emoji pool:
{'👍🏻': 'A', '👋🏻': 'A', '👍🏿': 'B', '👋🏿': 'B', '👍🏽': 'C', '👋🏽': 'C', '😎': 'N', '😂': 'N'}
Sampled emojis:
['👋🏻', '👍🏻', '😎', '👋🏻', '👋🏻', '😎', '👍🏻']


Apply (dummy) CMS to n objects

In [4]:
def hash_emoji(emoji, m):
    # Simple stable hash function
    return int(hashlib.md5(emoji.encode()).hexdigest(), 16) % m

In [5]:
def one_hot_vector(emoji, m):
    vec = np.zeros(m, dtype=int)
    idx = hash_emoji(emoji, m)
    vec[idx] = 1
    return vec

In [6]:
def flip_bits(vec, epsilon):
    xi = 1 / (1 + np.exp(epsilon / 2))
    flip_mask = np.random.rand(len(vec)) < xi
    return np.bitwise_xor(vec, flip_mask.astype(int))

In [7]:
print("one-hot-vector is 1 at position:")
hashed_emojis = [hash_emoji(e, m) for e in true_emojis]
print(hashed_emojis)

print("one-hot-vector per n:")
hashed_vector_emojis = [one_hot_vector(e, m) for e in true_emojis]
print(hashed_vector_emojis)

print("flipped vector:")
flipped_vector_emojis = [flip_bits(e, epsilon) for e in hashed_vector_emojis]
print(flipped_vector_emojis)

print("distance between one-hot-vector and flipped vector")
distances = [np.sum(h != f) for h, f in zip(hashed_vector_emojis, flipped_vector_emojis)]
distances = [int(x) for x in distances]
print(distances)

one-hot-vector is 1 at position:
[11, 1, 3, 11, 11, 3, 1]
one-hot-vector per n:
[array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0]), array([0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), array([0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0]), array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0]), array([0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), array([0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])]
flipped vector:
[array([0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 1, 1, 0, 0, 0]), array([0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]), array([0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0]), array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0]), array([0, 1, 0, 0, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0]), array([0, 0, 0, 1, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0]), array([1, 1, 0, 0, 0, 1, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0])]
distance between one-hot-vector and flipped vector
[3, 2, 2, 1, 4, 3, 6]


Attack

In [8]:
def pr_observation_given_z(x_t, z_vec, epsilon):
    xi = 1 / (1 + np.exp(epsilon / 2))
    d = np.sum(x_t != z_vec)
    return (xi ** d) * ((1 - xi) ** (m - d))

In [9]:
for j, x_t in enumerate(hashed_vector_emojis):
    for i, z_vec in enumerate(flipped_vector_emojis):
        p = pr_observation_given_z(x_t, z_vec, epsilon)
        print(f"Pr(x_t[{j}] | z_vec[{i}]) = {p:.5f}")


Pr(x_t[0] | z_vec[0]) = 0.00033
Pr(x_t[0] | z_vec[1]) = 0.00004
Pr(x_t[0] | z_vec[2]) = 0.00004
Pr(x_t[0] | z_vec[3]) = 0.01776
Pr(x_t[0] | z_vec[4]) = 0.00004
Pr(x_t[0] | z_vec[5]) = 0.00001
Pr(x_t[0] | z_vec[6]) = 0.00000
Pr(x_t[1] | z_vec[0]) = 0.00001
Pr(x_t[1] | z_vec[1]) = 0.00240
Pr(x_t[1] | z_vec[2]) = 0.00004
Pr(x_t[1] | z_vec[3]) = 0.00033
Pr(x_t[1] | z_vec[4]) = 0.00004
Pr(x_t[1] | z_vec[5]) = 0.00001
Pr(x_t[1] | z_vec[6]) = 0.00000
Pr(x_t[2] | z_vec[0]) = 0.00001
Pr(x_t[2] | z_vec[1]) = 0.00004
Pr(x_t[2] | z_vec[2]) = 0.00240
Pr(x_t[2] | z_vec[3]) = 0.00033
Pr(x_t[2] | z_vec[4]) = 0.00000
Pr(x_t[2] | z_vec[5]) = 0.00033
Pr(x_t[2] | z_vec[6]) = 0.00000
Pr(x_t[3] | z_vec[0]) = 0.00033
Pr(x_t[3] | z_vec[1]) = 0.00004
Pr(x_t[3] | z_vec[2]) = 0.00004
Pr(x_t[3] | z_vec[3]) = 0.01776
Pr(x_t[3] | z_vec[4]) = 0.00004
Pr(x_t[3] | z_vec[5]) = 0.00001
Pr(x_t[3] | z_vec[6]) = 0.00000
Pr(x_t[4] | z_vec[0]) = 0.00033
Pr(x_t[4] | z_vec[1]) = 0.00004
Pr(x_t[4] | z_vec[2]) = 0.00004
Pr(x_t[4

In [10]:
def phi_bar_eq2(x, preferred_pool, gamma, delta, p_hat, pools, emoji_pool):
    k = len([p for p in pools.keys() if p != 'N'])
    pool_x = emoji_pool[x]

    if pool_x == preferred_pool:
        return gamma * delta * p_hat[x] / sum(p_hat[z] for z in pools[preferred_pool])
    elif pool_x in pools and pool_x != 'N':
        return (gamma * (1 - delta) / (k - 1)) * p_hat[x] / sum(p_hat[z] for z in pools[pool_x])
    else:
        return (1 - gamma) * p_hat[x] / sum(p_hat[z] for z in pools['N'])

In [11]:
def infer_pool(observations, m=16, epsilon=4, gamma_grid=10, delta_grid=10, p_hat=None, pools=None, emoji_pool=None):

    all_pool_labels = [p for p in pools.keys() if p != 'N']
    neutral_label = 'N'
    k = len(all_pool_labels)

    if p_hat is None:
        all_emojis = sum(pools.values(), [])
        p_hat = {e: 1 / len(all_emojis) for e in all_emojis}

    all_emojis = sum(pools.values(), [])
    pool_scores = {}

    #integral is grid based
    gammas = np.linspace(0, 1, gamma_grid)
    deltas = np.linspace(1 / k, 1, delta_grid)

    for pool_id in all_pool_labels:
        score = 0
        for gamma in gammas:
            for delta in deltas:
                total_likelihood = 1
                for x_t in observations:
                    inner = 0
                    for z in all_emojis:
                        z_vec = one_hot_vector(z, m)
                        likelihood = pr_observation_given_z(x_t, z_vec, epsilon)
                        phi = phi_bar_eq2(z, pool_id, gamma, delta, p_hat, pools, emoji_pool)
                        inner += likelihood * phi
                    total_likelihood *= inner
                score += total_likelihood
        pool_scores[pool_id] = score

    total = sum(pool_scores.values())
    confidences = {k: v / total for k, v in pool_scores.items()}
    return confidences


In [12]:
obfuscated_vectors = flipped_vector_emojis
conf = infer_pool(obfuscated_vectors, m=16, epsilon=4,
                  pools=pools, emoji_pool=emoji_pool)
print(conf)

{'A': np.float64(0.653268627656341), 'B': np.float64(0.1434327829855271), 'C': np.float64(0.20329858935813194)}


**Quick simulation**

In [13]:
#settings for user sampling - user behaviour
n = 7
gamma = 0.5
delta = 0.5
preferred_pool = "A"
epsilon = 4
m = 16

true_emojis = simulate_user_behavior(n, gamma, delta, preferred_pool)

print("Sampled emojis:")
print(true_emojis)

print("one-hot-vector is 1 at position:")
hashed_emojis = [hash_emoji(e, m) for e in true_emojis]
print(hashed_emojis)

print("one-hot-vector per n:")
hashed_vector_emojis = [one_hot_vector(e, m) for e in true_emojis]
print(hashed_vector_emojis)

print("flipped vector:")
flipped_vector_emojis = [flip_bits(e, epsilon) for e in hashed_vector_emojis]
print(flipped_vector_emojis)

print("distance between one-hot-vector and flipped vector")
distances = [np.sum(h != f) for h, f in zip(hashed_vector_emojis, flipped_vector_emojis)]
distances = [int(x) for x in distances]
print(distances)

Sampled emojis:
['😂', '👍🏿', '👍🏻', '😎', '😂', '😎', '👋🏿']
one-hot-vector is 1 at position:
[15, 14, 1, 3, 15, 3, 1]
one-hot-vector per n:
[array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]), array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0]), array([0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), array([0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]), array([0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), array([0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])]
flipped vector:
[array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]), array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0]), array([0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]), array([0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]), array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1]), array([0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), array([0, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1])]
distance between one-hot-vector and flipped vecto

In [14]:
true_pool = preferred_pool
obfuscated_vectors = flipped_vector_emojis
confidences = infer_pool(obfuscated_vectors, m=m, epsilon=epsilon,
                         pools=pools, emoji_pool=emoji_pool)

print("📊 Pool confidences:", confidences)

guessed_pool = max(confidences, key=confidences.get)

if guessed_pool == true_pool:
    print(f"✅ ADV WINS: Guessed pool = {guessed_pool}, True pool = {true_pool}")
else:
    print(f"❌ ADV LOSES: Guessed pool = {guessed_pool}, True pool = {true_pool}")


📊 Pool confidences: {'A': np.float64(0.24065135004663313), 'B': np.float64(0.5557648152858883), 'C': np.float64(0.20358383466747856)}
❌ ADV LOSES: Guessed pool = B, True pool = A


In [15]:
def apply_cms(emoji, m=16, epsilon=4, d=2):
    xi = 1 / (1 + np.exp(epsilon / 2))
    vec = multi_hot_vector(emoji, m, d)
    flip = np.random.rand(m) < xi
    vec = np.abs(vec - flip.astype(int))
    return vec.astype(int)

In [16]:
import numpy as np
import random

#settings for user sampling - user behaviour
gamma = 0.1
delta = 0.1
preferred_pool = "A"

# Configuration
N_USERS = 100
n = 7  # number of emojis per user
m = 16  # CMS vector length
epsilon = 4
d = 2  # number of hash functions

# Record wins
wins = 0

# Simulate over N users
for user_id in range(N_USERS):

    # Draw random gamma ∈ [0, 1], delta ∈ [1/k, 1]
    k = len([p for p in pools.keys() if p != 'N'])
    #gamma = np.random.uniform(0, 1)
    #delta = np.random.uniform(1/k, 1)
    gamma = gamma
    delta = delta

    # Pick true pool randomly
    true_pool = random.choice([p for p in pools if p != 'N'])

    true_emojis = simulate_user_behavior(n, gamma, delta, true_pool)

    # Obfuscate
    hashed_vector_emojis = [one_hot_vector(e, m) for e in true_emojis]
    obfuscated_vectors = [flip_bits(e, epsilon) for e in hashed_vector_emojis]
    #obfuscated_vectors = [apply_cms(e, m=m, epsilon=epsilon, d=d) for e in true_emojis]

    # Infer
    confidences = infer_pool(obfuscated_vectors, m=m, epsilon=epsilon,
                             p_hat=None, pools=pools, emoji_pool=emoji_pool)

    guessed_pool = max(confidences, key=confidences.get)

    # Evaluate win
    if guessed_pool == true_pool:
        wins += 1
        print(f"[{user_id+1}] ✅ ADV WINS: guessed = {guessed_pool}, true = {true_pool}")
    else:
        print(f"[{user_id+1}] ❌ ADV LOSES: guessed = {guessed_pool}, true = {true_pool}")

# Final stats
print(f"\n📊 ADV Win Rate: {wins}/{N_USERS} = {wins / N_USERS:.2%}")


[1] ❌ ADV LOSES: guessed = B, true = C
[2] ❌ ADV LOSES: guessed = A, true = B
[3] ❌ ADV LOSES: guessed = B, true = A
[4] ✅ ADV WINS: guessed = A, true = A


KeyboardInterrupt: 