In [51]:
import secrets
import time

from py_ecc.optimized_bls12_381 import G1, G2, curve_order,add, multiply, pairing
#from mpyc.runtime import mpc
from mpyc.runtime import mpc

In [52]:
class BBSPlus:
    def __init__(self, ell: int):
        self.ell = ell
        self.g1 = G1
        self.g2 = G2
        self.H = [multiply(G1, secrets.randbelow(curve_order)) for _ in range(ell + 1)]
        self.x = secrets.randbelow(curve_order)
        self.X = multiply(G2, self.x)

    def sign(self, messages: list[int]) -> tuple:
        if len(messages) != self.ell:
            raise ValueError(f"Expected {self.ell} messages, got {len(messages)}")
        e = secrets.randbelow(curve_order)
        s = secrets.randbelow(curve_order)
        B = self.g1
        B = add(B, multiply(self.H[0], s))
        for mi, Hi in zip(messages, self.H[1:]):
            mi_mod = mi % curve_order
            B = add(B, multiply(Hi, mi_mod))
        denom = (self.x + e) % curve_order
        inv = pow(denom, -1, curve_order)
        A = multiply(B, inv)
        return (A, e, s)

    def verify(self, messages: list[int], signature: tuple) -> bool:
        A, e, s = signature
        if len(messages) != self.ell:
            return False
        B = self.g1
        B = add(B, multiply(self.H[0], s))
        for mi, Hi in zip(messages, self.H[1:]):
            mi_mod = mi % curve_order
            B = add(B, multiply(Hi, mi_mod))
        left_input = add(self.X, multiply(self.g2, e % curve_order))  # G2 element
        left = pairing(left_input, A)
        right = pairing(self.g2, B)
        return left == right

In [53]:
def lagrange(j, J, q=curve_order):
    num = 1
    den = 1
    for m in J:
        if m == j:
            continue
        num = (num * (-m)) % q
        den = (den * (j - m)) % q
    inv_den = pow(den, -1, q)
    return (num * inv_den) % q

In [54]:
def poly_eval(coeffs, x, q=curve_order):
    res = 0
    power = 1
    for c in coeffs:
        res = (res + c * power) % q
        power = (power * x) % q
    return res

In [55]:
class ThresholdBBS:
    def __init__(self, ell: int, n: int, t: int):
        if not (1 <= t <= n):
            raise ValueError("Threshold t must satisfy 1 <= t <= n")
        self.ell = ell
        self.n = n
        self.t = t
        # Use BBSPlus to generate public params and master secret x0
        base = BBSPlus(ell)
        self.g1 = base.g1
        self.g2 = base.g2
        self.H = base.H
        self.X = base.X
        # Master secret to be shared
        x0 = base.x
        self._poly = [x0] + [secrets.randbelow(curve_order) for _ in range(t-1)]
        self.shares = [ poly_eval(self._poly, i+1) for i in range(n) ]

    def threshold_sign(self, messages: list[int], subset: list[int]) -> tuple:
        if len(subset) != self.t:
            raise ValueError(f"Need exactly {self.t} shares to sign")
        if any(i < 0 or i >= self.n for i in subset):
            raise ValueError("Subset indices out of range")
        J = [i+1 for i in subset]
        x_recon = 0
        for j in J:
            lam = lagrange(j, J)
            xj = self.shares[j-1]
            x_recon = (x_recon + lam * xj) % curve_order
        # Use standard BBS+ sign algorithm with x_recon
        e = secrets.randbelow(curve_order)
        s = secrets.randbelow(curve_order)
        # Compute B
        B = self.g1
        B = add(B, multiply(self.H[0], s))
        for mi, Hi in zip(messages, self.H[1:]):
            mi_mod = mi % curve_order
            B = add(B, multiply(Hi, mi_mod))
        # Compute A = B^(1/(x_recon + e))
        denom = (x_recon + e) % curve_order
        inv = pow(denom, -1, curve_order)
        A = multiply(B, inv)
        return (A, e, s)



    def verify(self, messages: list[int], signature: tuple) -> bool:
        A, e, s = signature
        if len(messages) != self.ell:
            return False
        B = self.g1
        B = add(B, multiply(self.H[0], s))
        for mi, Hi in zip(messages, self.H[1:]):
            mi_mod = mi % curve_order
            B = add(B, multiply(Hi, mi_mod))
        left = pairing(add(self.X, multiply(self.g2, e % curve_order)), A)
        right = pairing(self.g2, B)
        return left == right

In [56]:
class ThresholdBBSMPC:
    def __init__(self, ell, n, t):
        self.ell, self.n, self.t = ell, n, t
        # Public BBS+ params
        self.g1, self.g2 = G1, G2
        self.H = [multiply(G1, secrets.randbelow(curve_order)) for _ in range(ell+1)]
        # Party 0 picks the master secret x; others pick 0
        if mpc.pid == 0:
            x0 = secrets.randbelow(curve_order)
        else:
            x0 = 0
        # Secret‐share x0 over the field Z_q
        secfld = mpc.SecFld(curve_order)
        self.x_share = secfld(x0)  # P0’s input = x0; others’ input = 0
        # Reconstruct the public key X = g2^x
        
    async def threshold_sign(self, messages):
        # 1. Generate nonces (publicly known)
        e = secrets.randbelow(curve_order)
        s = secrets.randbelow(curve_order)
        # 2. Compute the BBS+ base point B = g1 + s*H[0] + sum(m_i*H[i])
        B = self.g1
        B = add(B, multiply(self.H[0], s))
        for mi, Hi in zip(messages, self.H[1:]):
            B = add(B, multiply(Hi, mi % curve_order))
        # 3. Secret‐share e so we can do x+e in the MPC
        secfld = mpc.SecFld(curve_order)
        e_share = secfld(e)
        # 4. Compute share of denom = x + e
        denom_share = self.x_share + e_share
        # 5. Compute the shared inverse inv_share = 1/denom_share
        inv_share = denom_share ** -1  # MPyC does Beaver‐triples inversion under the hood
        # 6. Open (reveal) inv = inv(x+e) to *all* parties
        inv = await mpc.output(inv_share)
        # 7. Locally compute A = B^inv
        A = multiply(B, int(inv))
        return (A, e, s)

    def verify(self, messages, signature):
        A, e, s = signature
        B = self.g1
        B = add(B, multiply(self.H[0], s))
        for mi, Hi in zip(messages, self.H[1:]):
            B = add(B, multiply(Hi, mi % curve_order))
        left  = pairing(add(self.X, multiply(self.g2, e % curve_order)), A)
        right = pairing(self.g2, B)
        return left == right

In [57]:
class ThresholdBBSMPCBenchmark:
    """Benchmarking and Real Data Test for ThresholdBBSMPC"""

    def __init__(self, ell, n, t, mpc):
        self.ell = ell
        self.n = n
        self.t = t
        self.mpc = mpc
        self.tbbsmpc = ThresholdBBSMPC(ell, n, t)

    async def benchmark(self, repeat=3):
        messages = [42 + i for i in range(self.ell)]

        # Open the public key X = g2^x from shares
        secfld = self.mpc.SecFld(curve_order)
        X_share = await self.mpc.output(self.tbbsmpc.x_share)
        X = multiply(G2, int(X_share))
        self.tbbsmpc.X = X

        t0 = time.time()
        for _ in range(repeat):
            signature = await self.tbbsmpc.threshold_sign(messages)
        t1 = time.time()
        sign_time = (t1 - t0) / repeat

        t2 = time.time()
        for _ in range(repeat):
            valid = self.tbbsmpc.verify(messages, signature)
        t3 = time.time()
        verify_time = (t3 - t2) / repeat

        sig_size = sum([len(str(x)) for x in signature])
        print(f"ThresholdBBSMPC ell={self.ell} n={self.n} t={self.t}: sign:{sign_time:.4f}s verify:{verify_time:.4f}s size:{sig_size} valid:{valid}")
        return {
            "scheme": "ThresholdBBSMPC",
            "ell": self.ell,
            "n": self.n,
            "t": self.t,
            "sign_time": sign_time,
            "verify_time": verify_time,
            "sig_size": sig_size,
            "valid": valid
        }

    async def real_data_test(self, real_messages):
        messages_int = [int.from_bytes(s.encode('utf8'), 'big') % curve_order for s in real_messages]

        # Open the public key X = g2^x from shares
        secfld = self.mpc.SecFld(curve_order)
        X_share = await self.mpc.output(self.tbbsmpc.x_share)
        X = multiply(G2, int(X_share))
        self.tbbsmpc.X = X

        signature = await self.tbbsmpc.threshold_sign(messages_int)
        valid = self.tbbsmpc.verify(messages_int, signature)

        print("==== ThresholdBBSMPC Real Data Test ====")
        print("Messages (raw):", real_messages)
        print("Messages (int):", messages_int)
        print("Signature:", signature)
        print("Verify:", valid)
        return valid


In [58]:
# =================== Benchmarking ===================
# Benchmarking measures the signing and verification speed, signature size, and correctness for both standard BBS+ and threshold BBS+ signature schemes.
# It compares how signature generation and verification behave under different parameters(e.g., number of messages, threshold settings, number of participants).

def benchmark_bbsplus(ell: int, repeat=3):
    messages = [42 + i for i in range(ell)]
    bbs = BBSPlus(ell)
    t0 = time.time()
    for _ in range(repeat):
        signature = bbs.sign(messages)
    t1 = time.time()
    t2 = time.time()
    for _ in range(repeat):
        valid = bbs.verify(messages, signature)
    t3 = time.time()
    sign_time = (t1 - t0) / repeat
    verify_time = (t3 - t2) / repeat
    sig_size = sum([len(str(x)) for x in signature])
    return {"scheme": "BBS+", "ell": ell, "sign_time": sign_time, "verify_time": verify_time, "sig_size": sig_size, "valid": valid}

def benchmark_thresholdbbs(ell, n, t, repeat=3):
    tbbs = ThresholdBBS(ell, n, t)
    messages = [42 + i for i in range(ell)]
    subset = list(range(t))
    t0 = time.time()
    for _ in range(repeat):
        signature = tbbs.threshold_sign(messages, subset)
    t1 = time.time()
    t2 = time.time()
    for _ in range(repeat):
        valid = tbbs.verify(messages, signature)
    t3 = time.time()
    sign_time = (t1 - t0) / repeat
    verify_time = (t3 - t2) / repeat
    sig_size = sum([len(str(x)) for x in signature])
    return {"scheme": "ThresholdBBS+", "ell": ell, "n": n, "t": t, "sign_time": sign_time, "verify_time": verify_time, "sig_size": sig_size, "valid": valid}

def run_benchmarks():
    print("==== BBS+ vs Threshold BBS+ Benchmark ====")
    print("---- BBS+ ----")
    for ell in [1, 3, 10, 30]:
        result = benchmark_bbsplus(ell)
        print(f"BBS+ ell={result['ell']}: sign:{result['sign_time']:.4f}s verify:{result['verify_time']:.4f}s size:{result['sig_size']} valid:{result['valid']}")
    print("---- Threshold BBS+ ----")
    for ell in [1, 3, 10]:
        for n, t in [(3,2), (5,3), (10,5)]:
            result = benchmark_thresholdbbs(ell, n, t)
            print(f"ThresholdBBS+ ell={result['ell']} n={result['n']} t={result['t']}: sign:{result['sign_time']:.4f}s verify:{result['verify_time']:.4f}s size:{result['sig_size']} valid:{result['valid']}")
    
async def run_benchmarks_mpc(mpc):
    print("---- ThresholdBBSMPC ----")
    for ell in [1, 3, 10]:
        for n, t in [(3,2), (5,3), (10,5)]:
            tbbs = ThresholdBBSMPC(ell, n, t)
            # Open public key X = g2^x from shares
            secfld = mpc.SecFld(curve_order)
            X_share = await mpc.output(tbbs.x_share)  
            X = multiply(G2, int(X_share))
            tbbs.X = X
            messages = [10 + i for i in range(ell)]  


            t0 = time.time()
            for _ in range(3):
                signature = await tbbs.threshold_sign(messages)
            t1 = time.time()
            sign_time = (t1 - t0) / 3


            t2 = time.time()
            for _ in range(3):
                valid = tbbs.verify(messages, signature)
            t3 = time.time()
            verify_time = (t3 - t2) / 3

            sig_size = sum([len(str(x)) for x in signature])

            if mpc.pid == 0:  
                print(f"ThresholdBBSMPC ell={ell} n={n} t={t}: sign:{sign_time:.4f}s verify:{verify_time:.4f}s size:{sig_size} valid:{valid}")
    print("========================")


run_benchmarks()
mpc.run(run_benchmarks_mpc(mpc))

==== BBS+ vs Threshold BBS+ Benchmark ====
---- BBS+ ----
BBS+ ell=1: sign:0.0148s verify:0.7989s size:504 valid:True
BBS+ ell=3: sign:0.0135s verify:0.8098s size:504 valid:True
BBS+ ell=10: sign:0.0157s verify:0.7977s size:504 valid:True
BBS+ ell=30: sign:0.0193s verify:0.8139s size:502 valid:True
---- Threshold BBS+ ----
ThresholdBBS+ ell=1 n=3 t=2: sign:0.0133s verify:0.8176s size:505 valid:True
ThresholdBBS+ ell=1 n=5 t=3: sign:0.0134s verify:0.8114s size:504 valid:True
ThresholdBBS+ ell=1 n=10 t=5: sign:0.0131s verify:0.8055s size:505 valid:True
ThresholdBBS+ ell=3 n=3 t=2: sign:0.0140s verify:0.8322s size:505 valid:True
ThresholdBBS+ ell=3 n=5 t=3: sign:0.0137s verify:0.8009s size:503 valid:True
ThresholdBBS+ ell=3 n=10 t=5: sign:0.0142s verify:0.8198s size:503 valid:True
ThresholdBBS+ ell=10 n=3 t=2: sign:0.0147s verify:0.8028s size:503 valid:True
ThresholdBBS+ ell=10 n=5 t=3: sign:0.0162s verify:0.7956s size:504 valid:True
ThresholdBBS+ ell=10 n=10 t=5: sign:0.0153s verify:0.81

In [59]:
def string_to_int_list(str_list):
    return [int.from_bytes(s.encode('utf8'), 'big') % curve_order for s in str_list]


# verify that encoding real-world data preserves the security and correctness of the signature process.
# Real data test for BBSPlus and ThresholdBBS
print("==== BBS+ Real Data Test ====")
bbs_real = BBSPlus(3)
real_messages = ["yeshouxianbei", "114514", "z114514@ad.unsw.edu.au"]
messages_int = string_to_int_list(real_messages)
sig = bbs_real.sign(messages_int)
print("Messages (raw):", real_messages)
print("Messages (int):", messages_int)
print("Signature:", sig)
print("Verify:", bbs_real.verify(messages_int, sig))
print()

print("==== Threshold BBS+ Real Data Test ====")
tbbs_real = ThresholdBBS(3, 5, 3)

subset = [0, 2, 4]
sig2 = tbbs_real.threshold_sign(messages_int, subset)
print("Messages (raw):", real_messages)
print("Subset:", subset)
print("Signature:", sig2)
print("Verify:", tbbs_real.verify(messages_int, sig2))


# Real data test for ThresholdBBSMPC

async def real_data_test_mpc(mpc):
    print()
    print("==== ThresholdBBSMPC Real Data Test ====")
    ell, n, t = 3, 5, 3  
    tbbsmpc = ThresholdBBSMPC(ell, n, t)

    secfld = mpc.SecFld(curve_order)
    X_share = await mpc.output(tbbsmpc.x_share)
    X = multiply(G2, int(X_share))
    tbbsmpc.X = X


    real_messages = ["yeshouxianbei", "114514", "z114514@ad.unsw.edu.au"]
    messages_int = [int.from_bytes(s.encode('utf8'), 'big') % curve_order for s in real_messages]


    signature = await tbbsmpc.threshold_sign(messages_int)
    valid = tbbsmpc.verify(messages_int, signature)

    if mpc.pid == 0:
        print("Messages (raw):", real_messages)
        print("Messages (int):", messages_int)
        print("Signature:", signature)
        print("Verify:", valid)

mpc.run(real_data_test_mpc(mpc))

==== BBS+ Real Data Test ====
Messages (raw): ['yeshouxianbei', '114514', 'z114514@ad.unsw.edu.au']
Messages (int): [9618005169869363825106802074985, 54087399059764, 45717513622168189320559085781022544088200214291046773]
Signature: ((3919052083682381681956655167868755180970017306489640948465621759883187566224785641965869821815957106301138827174373, 3399829629606604148402251472287838376382096347535016240762259088800740318395751250073306507920702034795207474581351, 1319869603576795035132686879511598718363910840793466443510469194273883051180652145515179659591921338988333789614027), 45394845351908492955224461707129836194139344229843944907749372277031294259856, 15702066001735322008340613206555035166548986226585290538636658082450525185664)
Verify: True

==== Threshold BBS+ Real Data Test ====
Messages (raw): ['yeshouxianbei', '114514', 'z114514@ad.unsw.edu.au']
Subset: [0, 2, 4]
Signature: ((23757532472657258922434265056446898128057394668569894709916310675839050356550368052545137702061624543