In [1]:
from noknow.core import ZK, ZKSignature, ZKParameters, ZKData, ZKProof
import copy

In [2]:
class ZKServer:
    def __init__(self, password, features):
        self.password = password
        self.features = features
        self.client_representations = {}
        self.tokens = {}
        self.initialized_zk_features = False
        for el in features:
            self.client_representations[el] = []
            self.tokens[el] = []
        self.zk = ZK.new(curve_name="secp384r1", hash_alg="sha3_512")
        self.signature: ZKSignature = self.zk.create_signature(password)
            
    def load_client_signature(self, signature):
        #load client signature
        client_signature = ZKSignature.load(signature)
        return ZK(client_signature.params)
    
    def create_token(self, feature, signature):
        #load client signature
        client_signature = ZKSignature.load(signature)
        client_zk = ZK(client_signature.params)
        # Create a signed token and send to the client
        token = self.zk.sign(self.password, client_zk.token()).dump(separator=":")
        self.tokens[feature] += [token]
        self.client_representations[feature] += [{'signature' : client_signature, 'zk' : client_zk}]
        assert token in self.tokens[feature]
        return token
        
    def verify_clients(self, feature, signature):
        # Get the token from the client
        proof = ZKData.load(signature)
        token = ZKData.load(proof.data, ":")
        
        proofs = []
        # In this example, the server signs the token so it can be sure it has not been modified
        for label in self.client_representations[feature]:
            client_signature = label['signature']
            client_zk = label['zk']
            proofs += [self.verify_client(proof, token, client_signature, client_zk)]
            
        return sum(proofs) >= 1
    
    def verify_client(self, proof, token, client_signature, client_zk):
        return self.zk.verify(token, self.signature) and \
                client_zk.verify(proof, client_signature, data=token)
    
    def verify(self, feature, signatures):
        return sum([self.verify_clients(feature, s) for s in signatures]) >= 1
    
    def init_zk(self, clients):
        if self.initialized_zk_features:
            print('Server already initialized')
            return
        for c in clients:
            self.create_token(c.feature, c.signature)
        #print('Server initialized with ' + str(len(clients)) + ' features')
        self.initialized_zk_features = True

In [3]:
class ZKInitClient:
    def __init__(self, feature, label):
        self.zk = ZK.new(curve_name="secp256k1", hash_alg="sha3_256")
        self.label = label
        self.feature = feature
        # Create signature and send to server
        self.signature = self.zk.create_signature(label).dump()
        
    def set_label(self, label):
        self.label = label
        self.signature = self.zk.create_signature(label).dump()
            
    def create_proof(self, server_token):
        # Create a proof that signs the provided token and sends to server
        proof = self.zk.sign(self.label, server_token).dump()
        return proof
    
    def create_proofs(self, server_tokens):
        return [self.create_proof(t) for t in server_tokens]

In [4]:
class ServerInitializer:
    def __init__(self, features, client_prototype):
        self.client_prototype = client_prototype
        self.features = features
        
    def create_clients(self):
        clients = []
        for feature in self.features:
            for label in self.features[feature]:
                client = copy.copy(self.client_prototype)
                client.feature = feature
                client.set_label(label)
                clients += [client]
        return clients

In [7]:
features = {
    'ETHNICITY': ['ASIAN', 'BLACK', 'INDIAN', 'WHITE'],
    'GENDER': ['FEMALE', 'MALE']
}

not_features = {
    'ETHNICITY': ['ROCK', 'PAPER', 'SCISSORS', 'UNCATEGORIZED'],
    'GENDER': ['ZERO', 'ONE']
}

class ZKFramework:
    def __init__(self, features, not_features, data=[]):
        self.client_prototype = ZKInitClient('', '')
        self.features = features
        self.not_features = not_features #TESTING
        self.server = ZKServer('password', features)
        self.server_initializer = ServerInitializer(features, self.client_prototype)
        self.server.init_zk(self.server_initializer.create_clients())
        self.test_server()
        
    def test_server(self):
        #Create a mock server initializer with actual features
        si_f = ServerInitializer(self.features, self.client_prototype)
        
        #Create a mock server initializer with unauthorized features (not_features)
        si_n = ServerInitializer(self.not_features, self.client_prototype)
        
        #Create a mock server
        server = ZKServer('password', features)
        
        #Register authrized clients in the mock server
        server.init_zk(si_f.create_clients())
        
        #Create proofs for authorized users
        mock_clients_f = si_f.create_clients()
        proofs_f = [server.verify(c.feature, c.create_proofs(server.tokens[c.feature])) 
                  for c 
                  in mock_clients_f]
        
        #Create proofs for unauthorized users
        mock_clients_n = si_n.create_clients()
        proofs_n = [server.verify_clients(c.feature, c.create_proof(server.tokens[c.feature][0])) 
                  for c 
                  in mock_clients_n]
        
        #Assert that all the registered clients get access to the server
        assert sum(proofs_f) == len(mock_clients_f)
        
        #Assert that none of the non-registered clients get access to the server
        assert sum(proofs_n) == 0

In [8]:
ZKFramework(features, not_features)

<__main__.ZKFramework at 0x1bf15d82f28>