In [11]:
# This test computes the sign of (z - theta) using a FSS sign gate in a 2PC setting.
import funshade
import numpy as np
import crypten

#%%
#==============================================================================#
#                              PARAMETER SELECTION                             #
#==============================================================================#
K = 1        # Number of elements in the input vector (single comparison)
theta = 5    # Threshold to compare it with

#%%
#==============================================================================#
#                             EXPERIMENT PREPARATION                           #
#==============================================================================#
# Create integer vector of length K
rng = np.random.default_rng(seed=42)
a = 6  # Tensor a
b = 5   # Tensor b
z = np.array([a], dtype=funshade.DTYPE)  # Input vector z

# Create parties
class party:
    def __init__(self, j: int):
        self.j = j
P0 = party(0)
P1 = party(1)

#%%
#==============================================================================#
#                                 OFFLINE PHASE                                #
#==============================================================================#
# (1) Generate input masks and fss keys (Semi-Honest third party, TEE, 2PC interaction)
r_in0, r_in1, k0, k1 = funshade.FssGenSign(K, theta)
# Distribute randomness to (P0, P1)
P0.r_in_j = r_in0;           P1.r_in_j = r_in1
P0.k_j    = k0;              P1.k_j    = k1

# (2) Generate secret shares of the input vector (z)
z_0 = rng.integers(np.iinfo(funshade.DTYPE).min,
                   np.iinfo(funshade.DTYPE).max, size=K, dtype=funshade.DTYPE)
z_1 = z - z_0
# Distribute shares to (P0, P1)
P0.z_j = z_0;                P1.z_j = z_1


#%%
#==============================================================================#
#                                  ONLINE PHASE                                #
#==============================================================================#
# (3) Mask the input vector (z) with the input masks (r_in)
P0.z_hat_j = P0.z_j + P0.r_in_j
P1.z_hat_j = P1.z_j + P1.r_in_j

# (4) Compute the comparison to the threshold theta
P0.o_j = funshade.FssEvalSign(K, P0.j, P0.k_j, P0.z_hat_j)
P1.o_j = funshade.FssEvalSign(K, P1.j, P1.k_j, P1.z_hat_j)

# (5) Reconstruct the final result
o = P0.o_j + P1.o_j

#%%
#==============================================================================#
#                              CHECK CORRECTNESS                               #
#==============================================================================#
# Check the final result
o_ground = (z > theta)                         # Ground truth
# assert np.allclose(o_ground, o)
print("o_ground:", o_ground)
print("o:", o)
# %%

o_ground: [ True]
o: [-1133853683]


In [19]:
import torch
import numpy as np
import crypten
import crypten.communicator as comm
import torch.distributed as dist

#%%
#==============================================================================#
#                              PARAMETER SELECTION                             #
#==============================================================================#
K = 1        # Number of elements in the input vector (single comparison)
theta = 6    # Threshold to compare it with

#%%
#==============================================================================#
#                             EXPERIMENT PREPARATION                           #
#==============================================================================#
# Create integer vector of length K
rng = np.random.default_rng(seed=42)
a = 6  # Tensor a
z = torch.tensor([a], dtype=torch.int64)  # Input vector z
# z = crypten.cryptensor(z)

# Create parties
class party:
    def __init__(self, j: int):
        self.j = j
P0 = party(0)
P1 = party(1)

#%%
#==============================================================================#
#                                 OFFLINE PHASE                                #
#==============================================================================#
# (1) Generate input masks and fss keys (Semi-Honest third party, TEE, 2PC interaction)
r_in0, r_in1, k0, k1 = funshade.FssGenSign(K, theta)
# Distribute randomness to (P0, P1)

P0.r_in_j = torch.tensor(r_in0, dtype=torch.int64)
P1.r_in_j = torch.tensor(r_in1, dtype=torch.int64)
P0.k_j    = k0
P1.k_j    = k1
# crypten
rank = comm.get().get_rank()
if rank == 0:
    r_in_j = torch.tensor(r_in0, dtype=torch.int64)
    k_j = k0
else:
    r_in_j = torch.tensor(r_in1, dtype=torch.int64)
    k_j = k1

# (2) Generate secret shares of the input vector (z)
z_0 = torch.tensor(rng.integers(np.iinfo(funshade.DTYPE).min,
                                np.iinfo(funshade.DTYPE).max, size=K, dtype=funshade.DTYPE), dtype=torch.int64)
z_1 = z - z_0
# Distribute shares to (P0, P1)
P0.z_j = z_0
P1.z_j = z_1

# crypten
if rank == 0:
    z_j = z_0
else:
    z_j = z_1

#%%
#==============================================================================#
#                                  ONLINE PHASE                                #
#==============================================================================#
# (3) Mask the input vector (z) with the input masks (r_in)
P0.z_hat_j = P0.z_j + P0.r_in_j
P1.z_hat_j = P1.z_j + P1.r_in_j
# crypten
z_hat_j = z_j + r_in_j

# (4) Compute the comparison to the threshold theta
P0.o_j = funshade.FssEvalSign(K, P0.j, P0.k_j, P0.z_hat_j.numpy().astype(funshade.DTYPE))
P1.o_j = funshade.FssEvalSign(K, P1.j, P1.k_j, P1.z_hat_j.numpy().astype(funshade.DTYPE))
# crypten
if rank == 0:
    o_j_1 = funshade.FssEvalSign(K, P0.j, P0.k_j, P0.z_hat_j.numpy().astype(funshade.DTYPE))
    o_j_2 = torch.zeros_like(torch.tensor(o_j_1))
else:
    o_j_2 = funshade.FssEvalSign(K, P1.j, P1.k_j, P1.z_hat_j.numpy().astype(funshade.DTYPE))
    o_j_1 = torch.zeros_like(torch.tensor(o_j_2))
dist.barrier()
# dist.broadcast(torch.tensor(o_j_1), src=0)
# dist.broadcast(torch.tensor(o_j_2), src=1)
# (5) Reconstruct the final result
o = P0.o_j + P1.o_j
# crypten
# o = o_j_1 + o_j_2

#%%
#==============================================================================#
#                              CHECK CORRECTNESS                               #
#==============================================================================#
# Check the final result
o_ground = (z > theta).int()                         # Ground truth
# assert np.allclose(o_ground, o)
print("o_ground:", o_ground.item())
print("o:", o)
# %%

o_ground: 0
o: [1438673861]


### Funshade 应用

In [2]:
import funshade
import numpy as np

#%%
#==============================================================================#
#                              PARAMETER SELECTION                             #
#==============================================================================#
K = 1        # Number of vectors in the database
l = 512         # Length (number of elements) of each vector (2**8)
theta = 0     # Threshold for the matching
max_el = 2**12  # Maximum value for vector elements. To avoid overflows:
                #       2*log2(max_el) + log2(l) <= 32 bits

#%%
#==============================================================================#
#                             EXPERIMENT PREPARATION                           #
#==============================================================================#
# Create normalized reference templates (Y) and live template (x)
rng = np.random.default_rng(seed=42)
def sample_biometric_template(K: int, l: int):
    """Sample a set of K biometric templates of length l."""
    templates = rng.uniform(-1, 1, size=(K,l))
    return templates / np.linalg.norm(templates, axis=1, keepdims=True)
x_float = sample_biometric_template(1, l).flatten()
Y_float = sample_biometric_template(K, l)

# Convert to integers via fixed-point scaling (multiply by a scaling factor max_el)
x = (x_float*max_el).astype(funshade.DTYPE)
Y = (Y_float*max_el).astype(funshade.DTYPE)
theta_fp = int(theta*(max_el**2))          # threshold must be upscaled twice

# Alternatively, you could directly input integer templates:
# x = np.random.randint(-max_el, max_el, size=l,     dtype=funshade.DTYPE)
# Y = np.random.randint(-max_el, max_el, size=(K,l), dtype=funshade.DTYPE)

# Create parties
class party:
    def __init__(self, j: int):
        self.j = j
BP   = party(0) # P0
Gate = party(1) # P1
P0 = party(0)
P1 = party(1)

#%%
#==============================================================================#
#                                 OFFLINE PHASE                                #
#==============================================================================#
# (1) Generate correlated randomness (Semi-Honest third party, TEE, 2PC interaction)
d_x0, d_x1, d_y0, d_y1, d_xy0, d_xy1, r_in0, r_in1, k0, k1 = funshade.setup(K, l, theta_fp)

# Distribute randomness to (P0, P1)
BP.d_x_j  = d_x0;            Gate.d_x_j  = d_x1
BP.d_y_j  = d_y0;            Gate.d_y_j  = d_y1
BP.d_xy_j = d_xy0;           Gate.d_xy_j = d_xy1
BP.r_in_j = r_in0;           Gate.r_in_j = r_in1
BP.k_j    = k0;              Gate.k_j    = k1
BP.d_y    = d_y0 + d_y1;     Gate.d_x    = d_x0 + d_x1
# (2) Get and secret share the reference DB (Y)
BP.Y    = Y.flatten()                       # Biometric Provider (BP) receives reference DB (enrollment)
BP.D_y = funshade.share(K, l, BP.Y, BP.d_y) # BP generates Delta share of Y
Gate.D_y = BP.D_y                           # BP: Send(D_y) --> Gate   
del BP.Y                                    # Delete the plaintext reference DB

#%%
#==============================================================================#
#                                  ONLINE PHASE                                #
#==============================================================================#
# (3) Get and secret share the live template
Gate.x   = np.tile(x, K)                         # Gate captures the live template (x)
Gate.D_x = funshade.share(K, l, Gate.x, Gate.d_x)# Gate generates Delta share of x
BP.D_x   = Gate.D_x                              # Gate: Send(D_x) --> BP
del Gate.x                                       # Delete the plaintext live template

# (4) Compute the masked matching score (z_hat) shares
BP.z_hat_j   = funshade.eval_dist(K, l, BP.j, 
        BP.r_in_j,   BP.D_x,    BP.D_y,     BP.d_x_j,   BP.d_y_j,   BP.d_xy_j)
Gate.z_hat_j = funshade.eval_dist(K, l, Gate.j, 
        Gate.r_in_j, Gate.D_x,  Gate.D_y,   Gate.d_x_j, Gate.d_y_j, Gate.d_xy_j)

# Exchange z_hat_j shares to reconstruct z_hat
BP.z_hat_nj   = Gate.z_hat_j                 # BP: Send(z_hat_j) --> Gate
Gate.z_hat_nj = BP.z_hat_j                   # Gate: Send(z_hat_j) --> BP
# Compute the comparison to the threshold theta
BP.o_j     = funshade.eval_sign(K, BP.j, BP.k_j, BP.z_hat_j, BP.z_hat_nj)
Gate.o_j   = funshade.eval_sign(K, Gate.j, Gate.k_j, Gate.z_hat_j, Gate.z_hat_nj)

# (5) Reconstruct the final result
o = BP.o_j + Gate.o_j

##########################################
r_in0, r_in1, k0, k1 = funshade.FssGenSign(K, theta)
z = -10
z_0 = z + r_in0
z_1 = z + z_0

P0.o_j = funshade.eval_sign(K, P0.j, k0, z_0, z_1)
P1.o_j = funshade.eval_sign(K, P1.j, k1, z_1, z_0)
fss = P0.o_j + P1.o_j
print("fss:", fss)
print("fss's dtype:", fss.dtype)
##########################################

#%%
#==============================================================================#
#                              CHECK CORRECTNESS                               #
#==============================================================================#
# Check the matching score (z) for the live template (x) and the reference DB (Y)
z_ground = x@Y.T                                               # Ground truth
z_exper  = (BP.z_hat_j+Gate.z_hat_j) - (BP.r_in_j+Gate.r_in_j) # Experimental result
assert np.allclose(z_ground, z_exper)

# Check the final result
o_ground = (x_float@Y_float.T > theta)                         # Ground truth
print("o_ground:", o_ground)
print("o:", o)
assert np.allclose(o_ground, o)
print("Funshade executed correctly")

fss: [0]
fss's dtype: int32
o_ground: [False]
o: [0]
Funshade executed correctly


In [None]:
import time

import funshade
import numpy as np

from outer.Communication import api
import secSkyline_funshade.Thread.thread_simulate as thread_simulator



def MSB(batch_num, in_share, party, filename):
    # check > 0
    local_r_key = party.local_recv(filename=filename)
    r_in = local_r_key[0]
    key = local_r_key[1]
    mask_in_share = in_share + r_in
    tic = time.time()
    party.send(mask_in_share)
    recv = party.recv()
    during = time.time() - tic
    print("test one round cost (in MSB): ", during)
    out_share = funshade.eval_sign(batch_num, party.party_id, key, mask_in_share, recv)
    return out_share


In [None]:
import funshade
import numpy as np

from outer.Communication.dealer import TrustedDealer
from outer.Communication.semi_honest_party import SemiHonestParty
from secSkyline_funshade.Protocals.multi_protocal import gen_beaver_triplet

if __name__ == "__main__":
    print("gennerate FSS based on funshade...")
    """
        input：xi + r_ini
        function：if(x>theta) == if(x+r>theta+r), 其中 x+r 两方各持一份share
        return：0 or 1 的算数 share
    """
    # TODO 1.run genSign 生成 {k0,r_in0}  {k1,r_in1}（for map vector）
    K = 1000  # Number of elements in the input vector
    theta = 0  # Threshold to compare it with
    r_in0, r_in1, k0, k1 = funshade.FssGenSign(K, theta)  # numpy array

    # TODO 2.将 {k0,r_in0} 存储到本地文件
    dealer = TrustedDealer()
    file_name0, file_name1 = f'gen_MSB_{K}_key0.key', f'gen_MSB_{K}_key1.key'
    dealer.send([r_in0, k0], name=file_name0)
    dealer.send([r_in1, k1], name=file_name1)

    # TODO 3.生成beaver乘法三元组
    gen_beaver_triplet(dealer)

    # TODO 4.run genSign 生成 {k0,r_in0}  {k1,r_in1} （for fetch one）
    r_in0, r_in1, k0, k1 = funshade.FssGenSign(1, theta)  # numpy array

    # TODO 5.将 {k0,r_in0} 存储到本地文件
    file_name0, file_name1 = f'gen_MSB_one_key0.key', f'gen_MSB_one_key1.key'
    dealer.send([r_in0, k0], name=file_name0)
    dealer.send([r_in1, k1], name=file_name1)



### Fss 大于等于实现(也就实现了小于)

one key generation of DCF

two evalulation of DCF

In [3]:
import funshade
import numpy as np
import torch
import crypten
import crypten.communicator as comm
import torch.distributed as dist
import crypten.mpc as mpc

crypten.init()

#%%
#==============================================================================#
#                              PARAMETER SELECTION                             #
#==============================================================================#
K = 3       # Number of elements in the input vector
theta = 0    # Threshold to compare it with
beta = 2

@mpc.run_multiprocess(world_size=2)
def run():
    """ 
    DCF: f_a,b <
        ouputs b if x<a and 0 otherwise.
    """
    # Create integer vector of length K
    rng = np.random.default_rng(seed=42)
    a = torch.tensor([[[[1,-2,0]]]])  # Tensor a
    
    z = crypten.cryptensor(a)  # Input vector z
    shape = z.size()
    crypten.print(f"z's num:{(z.size())[-1]}")
    z = z._tensor.data

    ##########################################
    r_in0, r_in1, k0, k1 = funshade.FssGenSign(K, theta)

    rank = comm.get().get_rank()
    if rank == 0:
        z_0 = z + r_in0
        z_0 = z_0.numpy().astype(np.int32)
        z_0 = torch.tensor(z_0)  # Convert to torch.Tensor
        z_1 = torch.zeros_like(z_0)  # Initialize z_1 to zeros
    else:
        z_1 = z + r_in1
        z_1 = z_1.numpy().astype(np.int32)
        z_1 = torch.tensor(z_1)  # Convert to torch.Tensor
        z_0 = torch.zeros_like(z_1)  # Initialize z_0 to zeros
    dist.barrier()
    dist.broadcast(z_0, src=0)
    dist.broadcast(z_1, src=1)
    z_0 = z_0.view(-1)
    z_1 = z_1.view(-1)
    # Compute the comparison to the threshold theta
    if rank == 0:
        o_j_1 = funshade.eval_sign(K, rank, k0, z_0.numpy(), z_1.numpy())
        o_j_1 = torch.tensor(o_j_1)  # Convert to torch.Tensor
        o_j_2 = torch.zeros_like(o_j_1)
    else:
        o_j_2 = funshade.eval_sign(K, rank, k1, z_1.numpy(), z_0.numpy())
        o_j_2 = torch.tensor(o_j_2)  # Convert to torch.Tensor
        o_j_1 = torch.zeros_like(o_j_2)

    dist.barrier()
    dist.broadcast(o_j_1, src=0)
    dist.broadcast(o_j_2, src=1)

    o = (o_j_1 + o_j_2)*beta
    crypten.print("o:", o)
    # Check the final result

    o_ground = (a >= theta)  # Convert o_ground to integer
    crypten.print("o_ground:", o_ground)
    crypten.print("FSS gate executed correctly")

run()

z's num:3
o: tensor([2, 0, 2], dtype=torch.int32)
o_ground: tensor([[[[ True, False,  True]]]])
FSS gate executed correctly


[None, None]