# Протокол SIGMA

In [1]:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization, hashes
import os
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from hashlib import sha256
import hmac

In [2]:
class Alice:
    def __init__(self, name):
        self.ecdsa_secret_key = ec.generate_private_key(ec.SECP384R1(), default_backend())
        self.ecdh_secret_key = ec.generate_private_key(ec.SECP384R1(), default_backend())
        self.data = {'A_id': name.encode()}
        self.r_size = 128
        self.aes_block_size = 16
        self.tag_size = 32
        
        
    def receive_data(self, data):
        self.data.update(data)
        
        
    def step_0(self):
        self.data['ECDSA_SK_A'] = self.ecdsa_secret_key
        self.data['ECDSA_PK_A'] = self.ecdsa_secret_key.public_key()
        return {'ECDSA_PK_A': self.data['ECDSA_PK_A']}
    
    
    def step_1(self):
        self.data['DH_SK_A'] = self.ecdh_secret_key
        self.data['DH_PK_A'] = self.ecdh_secret_key.public_key()
        self.data['r_A'] = os.urandom(self.r_size)
        return {'DH_PK_A': self.data['DH_PK_A'], 'r_A': self.data['r_A']}
    
    
    def _transform_public_key_to_bytes(self, public_key):
        return public_key.public_bytes(encoding=serialization.Encoding.PEM, 
                                       format=serialization.PublicFormat.SubjectPublicKeyInfo)

    
    def step_3(self):
        # verify ECDSA sign
        self.data['ECDSA_PK_B'].verify(self.data['ECDSA_SIGN_B'], self._transform_public_key_to_bytes(self.data['DH_PK_A']) +
                                    self._transform_public_key_to_bytes(self.data['DH_PK_B']), ec.ECDSA(hashes.SHA256()))
        
        shared_key = hmac.new(self.data['r_A'] + self.data['r_B'], 
                             self.data['DH_SK_A'].exchange(ec.ECDH(), self.data['DH_PK_B']),
                             sha256).digest()
        
        self.data['k_m'] = shared_key[:len(shared_key) // 2]
        self.data['k_e'] = shared_key[len(shared_key) // 2:]
        
        #verify HMAC
        if self.data['MAC_B'] != hmac.new(self.data['k_m'], self.data['B_id'], sha256).digest():
            raise ValueError('MAC_B is not correct')
            
        self.data['ECDSA_SIGN_A'] = self.ecdsa_secret_key.sign(self._transform_public_key_to_bytes(self.data['DH_PK_A']) +
                                                              self._transform_public_key_to_bytes(self.data['DH_PK_B']),
                                                              ec.ECDSA(hashes.SHA256()))
        
        self.data['MAC_A'] = hmac.new(self.data['k_m'], self.data['A_id'], sha256).digest()
        return {'A_id': self.data['A_id'],
               'ECDSA_SIGN_A': self.data['ECDSA_SIGN_A'], 
               'MAC_A': self.data['MAC_A']}
    
    
    def _add_block(self, block, is_final=False):            
        update_part = self.aes_ctr.update(block)
        self.tag.update(update_part)

        if is_final:
            update_part += self.aes_ctr.finalize() + self.tag.digest()
        return update_part 
    
    
    def process_data(self, data):
        iv = os.urandom(self.aes_block_size)
        self.tag = hmac.new(self.data['k_m'], iv, digestmod=sha256)
        self.ciphertext = iv
        self.aes_ctr = Cipher(algorithms.AES(self.data['k_e']), modes.CTR(iv), 
                              backend=default_backend()).encryptor()

        for i in range(0, len(data), self.aes_block_size):
            self.ciphertext += self._add_block(data[i:i+self.aes_block_size], i + self.aes_block_size >= len(data))

        return self.ciphertext

In [3]:
class Bob:
    def __init__(self, name):
        self.ecdsa_secret_key = ec.generate_private_key(ec.SECP384R1(), default_backend())
        self.ecdh_secret_key = ec.generate_private_key(ec.SECP384R1(), default_backend())
        self.data = {'B_id': name.encode()}
        self.r_size = 128
        self.aes_block_size = 16
        self.tag_size = 32
    
    
    def receive_data(self, data):
        self.data.update(data)
    
    
    def step_0(self):
        self.data['ECDSA_SK_B'] = self.ecdsa_secret_key
        self.data['ECDSA_PK_B'] = self.ecdsa_secret_key.public_key()
        return {'ECDSA_PK_B': self.data['ECDSA_PK_B']}
    
    
    def _transform_public_key_to_bytes(self, public_key):
        return public_key.public_bytes(encoding=serialization.Encoding.PEM, 
                                       format=serialization.PublicFormat.SubjectPublicKeyInfo)
    
    
    def step_2(self):
        self.data['DH_SK_B'] = self.ecdh_secret_key
        self.data['DH_PK_B'] = self.ecdh_secret_key.public_key()
        self.data['r_B'] = os.urandom(self.r_size)
        shared_key = hmac.new(self.data['r_A'] + self.data['r_B'],
                              self.data['DH_SK_B'].exchange(ec.ECDH(), self.data['DH_PK_A']), 
                              sha256).digest()
        self.data['k_m'] = shared_key[:len(shared_key) // 2]
        self.data['k_e'] = shared_key[len(shared_key) // 2:]
        
        self.data['ECDSA_SIGN_B'] = self.ecdsa_secret_key.sign(self._transform_public_key_to_bytes(self.data['DH_PK_A']) +
                                                              self._transform_public_key_to_bytes(self.data['DH_PK_B']),
                                                              ec.ECDSA(hashes.SHA256()))
        
        self.data['MAC_B'] = hmac.new(self.data['k_m'], self.data['B_id'], sha256).digest()
        return {'DH_PK_B': self.data['DH_PK_B'], 
               'r_B': self.data['r_B'], 
               'B_id': self.data['B_id'],
               'ECDSA_SIGN_B': self.data['ECDSA_SIGN_B'],
               'MAC_B': self.data['MAC_B']}
    
    
    def step_4(self):
        # verify ECDSA sign
        self.data['ECDSA_PK_A'].verify(self.data['ECDSA_SIGN_A'], self._transform_public_key_to_bytes(self.data['DH_PK_A']) +
                                    self._transform_public_key_to_bytes(self.data['DH_PK_B']), ec.ECDSA(hashes.SHA256()))
        
        #verify HMAC
        if self.data['MAC_A'] != hmac.new(self.data['k_m'], self.data['A_id'], sha256).digest():
            raise ValueError('MAC_A is not correct')
        
        print('Общие ключи для аутентификации и шифрования успешно получены')
        
    
    def _add_block(self, block, is_final=False):            
        plaintext = self.aes_ctr.update(block)
        if is_final:
            plaintext += self.aes_ctr.finalize()
        return plaintext
    
    
    def process_data(self, data):
        tag = data[-self.tag_size:]
        data = data[:-self.tag_size]
        if tag != hmac.new(self.data['k_m'], data, sha256).digest():
            raise ValueError('Unsuccessful MAC check')

        self.aes_ctr = Cipher(algorithms.AES(self.data['k_e']), modes.CTR(data[:self.aes_block_size]), 
                              backend=default_backend()).decryptor()

        data = data[self.aes_block_size:]
        self.plaintext = bytearray()
        for i in range(0, len(data), self.aes_block_size):
            self.plaintext += self._add_block(data[i:i+self.aes_block_size], i + self.aes_block_size >= len(data))

        return self.plaintext

In [4]:
alice = Alice('alice')
bob = Bob('bob')

bob.receive_data(alice.step_0())
alice.receive_data(bob.step_0())

bob.receive_data(alice.step_1())

alice.receive_data(bob.step_2())

bob.receive_data(alice.step_3())

bob.step_4()

Общие ключи для аутентификации и шифрования успешно получены


In [5]:
message = b'hello-bob-it-is-alice'

In [6]:
cipher = alice.process_data(message)

In [7]:
bob.process_data(cipher) == message

True