# Реализация модели защищенного канала передачи данных по спецификации TLS 1.3

In [1]:
IS_DEBUG = 1

In [2]:
def trace(*args, **kwargs):
    """
    Отладочная трассировка
    """
    
    global IS_DEBUG
    if IS_DEBUG:
        print('[TRACE]', end=' ')
        print(*args, **kwargs)

----

In [3]:
# Криптография из различных ГОСТ'ов
from pygost import mgm
from pygost import gost3412
from pygost import gost3410
from pygost import gost34112012256

# Криптография из западных стандартов
from Crypto.Cipher import AES
from Crypto.Hash import SHA256

# HKDF
from Crypto.Protocol.KDF import HKDF

# КГПСЧ
from Crypto.Random import get_random_bytes, random
from Crypto.Util.number import getStrongPrime

# Различные утилиты
from collections import namedtuple
from Crypto.Util import number
from functools import partial
from binascii import hexlify
from enum import Enum
import uuid
import gmpy2
import pickle

----

## Реализация оберток над криптографическими алгоритмами

In [4]:
def multiply_by_scalar(curve, scalar, point):
    """
    Умножение точки эллиптической кривой на скаляр 
    """
    
    return curve.exp(scalar, *point)


def add_points(curve, lhs, rhs):
    """
    Сложение двух точек эллиптической кривой
    """
    
    return curve._add(*lhs, *rhs)

---

In [5]:
#
# Сертификат это простая структурка, поэтому обойдусь именованным кортежем
#

Certificate = namedtuple('Certificate', ['id', 'public_key', 'signature'])

----

In [6]:
class KuznyechikMGM(object):
    """
    Класс-обертка над Кузнечиком в режиме MGM
    """
    
    KEY_SIZE = 32
    
    def __init__(self, key: bytes):
        """
        Инициализация - сохраняю ключ
        """

        self._key = key
    
    
    def encrypt(self, plaintext: bytes):
        """
        Зашифрование шифром Кузнечик в режиме MGM
        """
        
        encrypter = partial(KuznyechikMGM._encrypt_block, self._key)
        cipher = mgm.MGM(encrypter, KuznyechikMGM._block_size())
        
        nonce = KuznyechikMGM._generate_nonce()
        ct_with_tag = cipher.seal(nonce, plaintext, b'')
        
        return nonce, ct_with_tag[:-cipher.tag_size], ct_with_tag[-cipher.tag_size:]
    
    
    def decrypt(self, nonce: bytes, ciphertext: bytes, tag: bytes):
        """
        Расшифрование шифром Кузнечик в режиме MGM
        """
        
        encrypter = partial(KuznyechikMGM._encrypt_block, self._key)
        cipher = mgm.MGM(encrypter, KuznyechikMGM._block_size())
            
        ct_with_tag = ciphertext + tag
            
        return cipher.open(nonce, ct_with_tag, b'')
        
        
    @staticmethod
    def _block_size():
        return gost3412.GOST3412Kuznechik.blocksize
        
        
    @staticmethod
    def _encrypt_block(key: bytes, block: bytes):
        cipher = gost3412.GOST3412Kuznechik(key)
        return cipher.encrypt(block)
    
    
    @staticmethod
    def _generate_nonce():
        nonce = get_random_bytes(KuznyechikMGM._block_size())
        while bytearray(nonce)[0] & 0x80 > 0:
            nonce = get_random_bytes(KuznyechikMGM._block_size())
            
        return nonce

In [7]:
class AESGCM(object):
    """
    Класс-обертка над AES в режиме GCM
    """
    
    KEY_SIZE = 16
    
    def __init__(self, key: bytes):
        """
        Инициализация - сохраняю ключ
        """

        self._key = key
        
        
    def encrypt(self, plaintext: bytes):
        """
        Зашифрование шифром AES в режиме GCM
        """
        
        cipher = AES.new(self._key, AES.MODE_GCM)
        return cipher.nonce, *cipher.encrypt_and_digest(plaintext)
    
    
    def decrypt(self, nonce: bytes, ciphertext: bytes, tag: bytes):
        """
        Зашифрование шифром AES в режиме GCM
        """
        
        cipher = AES.new(self._key, AES.MODE_GCM, nonce=nonce)
        return cipher.decrypt_and_verify(ciphertext, tag)

----

In [8]:
class Streebog(gost34112012256.GOST34112012256):
    """
    pycryptodome-совместимая обертка над хэш-функцией Стрибог
    """
    
    digest_size = 32
    
    def __init__(self, data=None):
        """
        Инициализация и обновление состояния (если надо)
        """
        
        super(Streebog, self).__init__()
        if data is not None:
            self.update(data)
    
    @staticmethod
    def new(data=None):
        """
        Создание нового объекта
        """
        
        return Streebog(data)

In [9]:
def hash_digest(hasher_type, *args):
    """
    Вычисление хэша при помощи объекта-обертки
    """
    
    hasher = hasher_type.new()
    
    for arg in args:
        hasher.update(arg)
        
    return hasher.digest()

----

In [10]:
class DiffieHellmanProcessor(object):
    """
    Обработчик обмена ключами по протоколу Диффи-Хеллмана
    """
    
    def __init__(self, group=None, generator=None):
        """
        Инициализация - выбор группы, образующего элемента
        Возможно конструирование по готовой группе и образующему
        """

        self._group = Integers(getStrongPrime(512)) if group is None else group
        self._generator = random.randint(int(2), int(self._group.order() - 2)) if generator is None \
            else generator
        
        self._modulus = self._group.order()
        
        trace('[DiffieHellmanProcessor]', f'Using group of order {self._modulus}')
        
    
    def generate_private_key(self):
        """
        Генерирование закрытого ключа
        """
        
        return random.randint(int(2), int(self.order - 2))
    
    
    def get_public_key(self, private_key):
        """
        Получение открытого ключа по закрытому
        """
        
        return self.exponentiate(private_key, self._generator)
    
    
    def exponentiate(self, scalar, element):
        """
        Возведение в степень элемента группы
        """
        
        return self._group(int(gmpy2.powmod(int(element), int(scalar), self.order)))
    
    
    def element_to_bytes(self, element):
        """
        Конвертация элемента группы в байтовое представление
        """
        
        return number.long_to_bytes(int(element))
    
    
    @property
    def group(self):
        """
        Получение группы
        """
        
        return self._group
    
        
    @property
    def generator(self):
        """
        Получение образующего элемента
        """
        
        return self._generator
    
    
    @property
    def order(self):
        """
        Получение порядка группы
        """
        
        return self._modulus

In [11]:
class EllipticCurveDiffieProcessor(object):
    """
    Обработчик обмена ключами по протоколу Диффи-Хеллмана
    в группе точек эллиптической кривой
    """
    
    def __init__(self, curve=None, generator=None):
        """
        Инициализация - выбор кривой, образующего элемента
        Возможно конструирование по готовой кривой и образующему
        """
        
        #
        # Для простоты буду использовать кривую из ГОСТ 34.10 
        #
        
        self._curve = gost3410.CURVES['id-tc26-gost-3410-2012-512-paramSetA'] if curve is None else curve
        self._generator = (self._curve.x, self._curve.y) if generator is None else generator
        self._modulus = self._curve.p
        
        trace('[EllipticCurveDiffieProcessor]', 
              f'Using group of order {self._modulus} over curve {self._curve.name}')
        
    
    def generate_private_key(self):
        """
        Генерирование закрытого ключа
        """
        
        return random.randint(int(2), int(self.order - 2))
    
    
    def get_public_key(self, private_key):
        """
        Получение открытого ключа по закрытому
        """
        
        return self.exponentiate(private_key, self._generator)
    
    
    def exponentiate(self, scalar, element):
        """
        Умножение точки на скаляр
        """
        
        if not self._curve.contains(element):
            return None
        
        return multiply_by_scalar(self._curve, scalar, element)
    
    
    def element_to_bytes(self, element):
        """
        Конвертация элемента группы в байтовое представление
        """
        
        return gost3410.pub_marshal(element)
    
    
    @property
    def group(self):
        """
        Получение группы
        """
        
        return self._curve
    
    
    @property
    def generator(self):
        """
        Получение образующего элемента
        """
        
        return self._generator
    
    
    @property
    def order(self):
        """
        Получение порядка группы
        """
        
        return self._modulus

----

In [12]:
cipher_suites = ['ECDHE_KUZNYECHIK_MGM_STREEBOG', 'DHE_AES_GCM_SHA256']

In [13]:
def resolve_cipher_suite(cipher_suite):
    """
    Получение примитивов по строковому идентификатору
    """
        
    #
    # Пока захардкодил это все
    #
        
    if cipher_suite == 'ECDHE_KUZNYECHIK_MGM_STREEBOG':
        return EllipticCurveDiffieProcessor, KuznyechikMGM, Streebog
        
    elif cipher_suite == 'DHE_AES_GCM_SHA256':
        return DiffieHellmanProcessor, AESGCM, SHA256
        
    raise ValueError(f'Unsupported cipher suite {cipher_suite}')

----

In [14]:
def hmac(key, *args, hasher=Streebog):
    """
    Вычисление HMAC (согласно Р 50.1.113 - 2016)
    Информационная технология
    КРИПТОГРАФИЧЕСКАЯ ЗАЩИТА ИНФОРМАЦИИ
    Криптографические алгоритмы, сопутствующие применению алгоритмов 
    электронной цифровой подписи и функции хэширования
    """
    
    def xor(lhs, rhs):
        return bytes([l ^^ r for l, r in zip(lhs, rhs)])
    
    IPAD = b'\x36' * 64
    OPAD = b'\x5c' * 64
    
    K_star = key + (b'\x00' * (512 - len(key)))
    
    inner_hash = hash_digest(hasher, xor(K_star, IPAD), *args)
    return hash_digest(hasher, xor(K_star, OPAD), inner_hash)

In [15]:
def test_hmac():
    """
    Р 50.1.113 - 2016
    Информационная технология
    КРИПТОГРАФИЧЕСКАЯ ЗАЩИТА ИНФОРМАЦИИ
    Криптографические алгоритмы, сопутствующие применению алгоритмов 
    электронной цифровой подписи и функции хэширования
    
    4.1.1 HMAC_GOSTR3411_2012_256
    
    Приложение А. Контрольные примеры
    """
    
    K = b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f' + \
        b'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f'
    
    T = b'\x01\x26\xbd\xb8\x78\x00\xaf\x21\x43\x41\x45\x65\x63\x78\x01\x00'
    
    expected = b'\xa1\xaa\x5f\x7d\xe4\x02\xd7\xb3\xd3\x23\xf2\x99\x1c\x8d\x45\x34' + \
        b'\x01\x31\x37\x01\x0a\x83\x75\x4f\xd0\xaf\x6d\x7c\xd4\x92\x2e\xd9'
    
    assert hmac(K, T, hasher=Streebog) == expected

In [16]:
#
# Протестирую HMAC на контрольном примере из рекомендаций по стандартизации
#

test_hmac()

----

In [17]:
class GOST3410Signer(object):
    """
    Объект-создатель подписей по ГОСТ 34.10 - 2012
    """
    
    def __init__(self):
        """
        Инициализация кривой и ключевой пары
        """
        
        self._curve = gost3410.CURVES['id-tc26-gost-3410-2012-512-paramSetA']
        self._private_key = gost3410.prv_unmarshal(get_random_bytes(64))
        self._public_key = gost3410.public_key(self._curve, self._private_key)
        
        
    def sign(self, *args):
        """
        Подпись произвольного набора байтовых объектов
        """
        
        digest = hash_digest(Streebog, *args)
        return gost3410.sign(self._curve, self._private_key, digest)
        
        
    @property
    def public_key(self):
        """
        Получение открытого ключа
        """
        
        return self._public_key
    
    
    @property
    def private_key(self):
        """
        Получение закрытого ключа
        """
        
        return self._private_key

    
    @property
    def curve(self):
        """
        Получение используемой кривой
        """
        
        return self._curve
    
    
    @property
    def generator(self):
        """
        Получение образующего группы точек кривой
        """
        
        return self._curve.x, self._curve.y

In [18]:
def verify_signature(curve, public_key, signature, *args):
    """
    Проверка подписи по ГОСТ 34.10 - 2012
    """
    
    digest = hash_digest(Streebog, *args)
    return gost3410.verify(curve, public_key, digest, signature)

----

## Реализация участников протокола

In [19]:
Offer = namedtuple('Offer', ['cipher_suite', 'group', 'generator'])

In [20]:
class GenericParticipant(object):
    """
    Класс-абстракция над участником протокола, который получает сертификат на свой открытый
    ключ и может осуществлять подпись сообщений
    """
    
    def __init__(self, ca, *, fake_data=None, demo_cert_error=False):
        """
        Инициализация - получение идентификатора, генерирование ключевой пары, 
        получение сертификата
        """
        
        self._id = uuid.uuid4()
        self._signer = GOST3410Signer()
        
        if fake_data is not None:
            #
            # Демонстрация неудачного выполнения протокола
            #
            
            if demo_cert_error:
                #
                # Тут демонстрация неудачи получения серификата
                #
                
                self._signer._public_key = fake_data
                self._certificate = ca.issue_certificate(self)
            else:
                #
                # Тут как бы сертификат и открытый ключ правильные, а закрытый ключ противник не угадал
                #
                
                self._certificate = ca.issue_certificate(self)
                self._signer._public_key = fake_data
        else:
            #
            # Стандартное выполнение
            #
            
            self._certificate = ca.issue_certificate(self)
        
        self._n = None
        
        trace('[GenericParticipant]', f'Certificate = {self._certificate}')
        
        
    def sign(self, *args):
        """
        Подписывание произвольных данных
        """
        
        return self._signer.sign(*args)
        
        
    @property
    def public_key(self):
        """
        Получение открытого ключа подписи
        """
        
        return self._signer.public_key
    
    
    @property
    def identifier(self):
        """
        Получение идентификатора
        """
        
        return self._id
    
    
    @property
    def certificate(self):
        """
        Получение сертификата
        """
        
        return self._certificate
    
    
    @property
    def curve(self):
        """
        Получение кривой, используемой для подписи
        """
        
        return self._signer.curve
    
    
    def _get_point_for_schnorr_proof(self):
        #
        # Костыль, но все же работает
        #
        
        self._n = gost3410.prv_unmarshal(get_random_bytes(64))
        return multiply_by_scalar(self._signer.curve, 
                                  self._n, self._signer.generator)
    
    
    def _get_response(self, chellenge):
        return self._n + chellenge * self._signer.private_key

In [21]:
class TLS13Participant(GenericParticipant):
    """
    Обобщенный участник общения по TLS 1.3
    """
    
    def __init__(self, ca, *, fake_data=None, demo_cert_error=False):
        """
        Инициализация - по факту просто сохраняю ссылку на СА
        """
        
        super(TLS13Participant, self).__init__(ca, 
                                               fake_data=fake_data, 
                                               demo_cert_error=demo_cert_error)
        self._ca = ca
    
    
    def check_certificate(self, identifier: uuid.UUID, public_key, cert: Certificate):
        """
        Проверка полученного сертификата
        """
        
        if self._ca.is_revoked(cert):
            trace('[TLS13Participant]', f'Revoked certificate')
            return False
            
        id_as_bytes = str(identifier).encode()
        pk_as_bytes = gost3410.pub_marshal(public_key)
        
        if not verify_signature(self._ca.curve, 
                                self._ca.public_key, 
                                cert.signature, 
                                id_as_bytes, 
                                pk_as_bytes):
            trace('[TLS13Participant]', f'Invalid certificate')
            return False
        
        return True
        
        
    @staticmethod
    def check_mac(key, mac, *args):
        """
        Проверка имитовставки
        """
        
        return mac == hmac(key, *args)
    
    
    @staticmethod
    def check_signature(curve, public_key, signature, *args):
        """
        Проверка подписи
        """
        
        return verify_signature(curve, public_key, signature, *args)

In [22]:
class TLS13Client(TLS13Participant):
    """
    Клиент для TLS 1.3
    """
        
    S2C = 0
    C2S = 1
    
    class AuthMode(Enum):
        """
        Режим аутентицикации
        """
        
        OWA = 1  # Односторонняя
        TWA = 2  # Двусторонняя
    
    
    def __init__(self, ca, cipher_suite, *, fake_data=None, demo_cert_error=False):
        """
        Инициализация - разбор набора примитивов
        """
        
        super(TLS13Client, self).__init__(ca, fake_data=fake_data, 
                                          demo_cert_error=demo_cert_error)
        
        self._key_exchange_type, self._cipher_type, self._hasher = resolve_cipher_suite(cipher_suite)
        self._cipher_suite = cipher_suite
        self._session_keys = {}
        
        trace('[TLS13Client]', f'Using cipher suite {cipher_suite}')
        
        
    def establish_connection(self, server, auth_mode: AuthMode):
        """
        Установление соединения с сервером
        """
        
        key_exchange = self._key_exchange_type()
        
        ephemeral_dh_sk = key_exchange.generate_private_key()
        ephemeral_dh_pk = key_exchange.get_public_key(ephemeral_dh_sk)
        client_nonce    = get_random_bytes(64)
        
        trace('[TLS13Client]', f'Ephemeral DH public key: {ephemeral_dh_pk}')
        trace('[TLS13Client]', f'Nonce: {hexlify(client_nonce)}')
        
        offer = Offer(self._cipher_suite, 
                      key_exchange.group, 
                      key_exchange.generator)
        
        #
        # А теперь запрос на сервер
        #
        
        is_one_way = auth_mode == TLS13Client.AuthMode.OWA
        server_hello = server.get_hello(self.identifier, ephemeral_dh_pk, client_nonce, 
                                        offer, is_one_way)
        
        if server_hello is None:
            trace('[TLS13Client]', f'Server refused the connection')
            return False
        
        #
        # Распакую ответ
        #
        
        ephemeral_dh_server_pk, server_nonce, cipher_suite, c1, c2, c3, c4 = server_hello
        
        if cipher_suite != self._cipher_suite:
            trace('[TLS13Client]', f'Unsupported cipher suite {cipher_suite}')
            return False
        
        #
        # Общий секрет DH
        #
        
        ephemeral_dh_common = key_exchange.exponentiate(ephemeral_dh_sk, ephemeral_dh_server_pk)
        if ephemeral_dh_common is None:
            trace(f'[TLS13Client]', 'Server ECDH public key is not on curve')
        
        #
        # Вырабатываю ключи для симметричного шифрования и имитовставки
        #
        
        client_packet = b''.join([key_exchange.element_to_bytes(ephemeral_dh_pk),
                                  client_nonce, str(offer).encode()])
        
        server_packet = b''.join([key_exchange.element_to_bytes(ephemeral_dh_server_pk),
                                  server_nonce, cipher_suite.encode()])
        
        key_derivation_data = b''.join([key_exchange.element_to_bytes(ephemeral_dh_common),
                                        client_packet, server_packet])
        
        encryption_key, mac_key = HKDF(key_derivation_data, self._cipher_type.KEY_SIZE, 
                                       b'\x00' * 16, self._hasher, 2)
        
        #
        # Расшифровываю полученные пакеты
        #
        
        cipher = self._cipher_type(encryption_key)
        
        try:
            cert_requested     = cipher.decrypt(*c1)
            server_certificate = cipher.decrypt(*c2)
            server_signature   = cipher.decrypt(*c3)
            server_mac         = cipher.decrypt(*c4)
        except ValueError:
            trace('[TLS13Client]', f'Malformed server response')
            return False
        
        #
        # Проверяю полученнные данные
        #
        
        if is_one_way == cert_requested:
            #
            # Запросили сертификат в ходе односторонней аутентификации или наоборот
            #
            
            trace('[TLS13Client]', f'Invalid client certificate request')
            return False
        
        if not self.check_certificate(server.identifier, server.public_key, 
                                      pickle.loads(server_certificate)):
            trace('[TLS13Client]', f'Invalid server certificate')
            return False
        
        if not TLS13Participant.check_signature(server.curve, server.public_key, server_signature, 
                                                client_packet, server_packet, c1[1], c2[1]):
            trace('[TLS13Client]', f'Malformed server response')
            return False
        
        if not TLS13Participant.check_mac(mac_key, server_mac, 
                                          client_packet, server_packet, c1[1], c2[1], c3[1]):
            trace('[TLS13Client]', f'Malformed server response')
            return False
            
        #
        # Если требуется двусторонняя аутентификация, то зашлю на сервер сертификат
        #
        
        if not is_one_way:
            c5 = cipher.encrypt(pickle.dumps(self.certificate))
            
            signature = self.sign(client_packet, server_packet, c1[1], c2[1], c3[1], c4[1], c5[1])
            c6 = cipher.encrypt(signature)
            
            mac = hmac(mac_key, client_packet, server_packet, c1[1], c2[1], c3[1], c4[1], c5[1], c6[1])
            c7 = cipher.encrypt(mac)
            
            if not server.process_client_certificate(self, c5, c6, c7):
                trace('[TLS13Client]', f'Server refused the connection')
                return False
        
        #
        # Теперь я генерирую ключи шифрования
        #
        
        key_derivation_data = b''.join([key_exchange.element_to_bytes(ephemeral_dh_common),
                                        client_packet, server_packet, c1[1], c2[1], c3[1], c4[1]])
            
        self._session_keys[server.identifier] = HKDF(key_derivation_data, self._cipher_type.KEY_SIZE,
                                                     b'\x00' * 16, self._hasher, 2)
        
        trace('[TLS13Client]', f'Server accepted the connection')
        
        trace('[TLS13Client]', 
              f's -> c key: {hexlify(self._session_keys[server.identifier][TLS13Client.S2C])}')
        trace('[TLS13Client]', 
              f'c -> s key: {hexlify(self._session_keys[server.identifier][TLS13Client.C2S])}')
        
        return True
    
    
    def send_wait_receive_message(self, server, message: str):
        """
        Отправка сообщения на сервер и получение ответа
        """
        
        if server.identifier not in self._session_keys:
            trace('[TLS13Client]', f'Connection with server {server.identifier} is not established yet')
            return
        
        cipher = self._cipher_type(self._session_keys[server.identifier][TLS13Client.C2S])
        
        response = server._receive_message_and_respond(self, cipher.encrypt(message.encode()))
        if response is None:
            trace('[TLS13Client]', f'Malformed server response')
        
        try:
            cipher = self._cipher_type(self._session_keys[server.identifier][TLS13Client.S2C])
            return cipher.decrypt(*response).decode()
        except ValueError:
            trace('[TLS13Client]', f'Malformed server response')
            
            
    def change_keys(self, server):
        """
        Смена ключей по инициативе клиента
        """
        
        if server.identifier not in self._session_keys:
            trace('[TLS13Client]', f'Connection with server {server.identifier} is not established yet')
            return
        
        #
        # Меняю у себя
        #
        
        self._change_keys_internal(server.identifier)
        
        #
        # Меняю на сервере
        #
        
        server._change_keys_internal(self.identifier)
        
        
    def _change_keys_internal(self, server_id):
        s2c_key = HKDF(self._session_keys[server_id][TLS13Client.S2C], self._cipher_type.KEY_SIZE, 
                       b'\x00' * 16, self._hasher, 1)
        
        c2s_key = HKDF(self._session_keys[server_id][TLS13Client.C2S], self._cipher_type.KEY_SIZE, 
                       b'\x00' * 16, self._hasher, 1)
        
        trace('[TLS13Client]', f'New s -> c key: {hexlify(s2c_key)}')
        trace('[TLS13Client]', f'New c -> s key: {hexlify(c2s_key)}')
        
        self._session_keys[server_id] = (s2c_key, c2s_key)

In [23]:
class TLS13Server(TLS13Participant):
    """
    Сервер для TLS 1.3
    """
    
    CIPHER_TYPE            = 0
    HASHER                 = 1
    HANDSHAKE_ENC_KEY      = 2
    HANDSHAKE_MAC_KEY      = 3
    HANDSHAKE_VALUES       = 4
    HANDSHAKE_KEY_EXCHANGE = 5
    
    S2C = 0
    C2S = 1
    
    
    def __init__(self, ca, cipher_suites):
        """
        Инициализация - сохранение настроек
        """
        
        super(TLS13Server, self).__init__(ca)
        
        self._cipher_suites = cipher_suites
        self._session_keys  = {}
        self._mapped_states = {}
        
        
    def get_hello(self, client_id, ephemeral_dh_client_pk, client_nonce, offer, is_one_way):
        """
        Получение ServerHello
        """
        
        #
        # Провверяю, что поддерживается вообще набор примитивов
        #
        
        cipher_suite, group, generator = offer
        
        if cipher_suite not in self._cipher_suites:
            trace('[TLS13Server]', f'Unsupported cipher suite {cipher_suite}')
            return None
        
        trace('[TLS13Server]', f'Using cipher suite {cipher_suite} with client {client_id}')
        
        #
        # Получаю примитивы
        #
        
        key_exchange_type, cipher_type, hasher = resolve_cipher_suite(cipher_suite)
        key_exchange = key_exchange_type(group, generator)
        
        #
        # Вырабатываю ключи DH и nonce, после чего получаю общий секрет DH
        #
        
        ephemeral_dh_sk = key_exchange.generate_private_key()
        ephemeral_dh_pk = key_exchange.get_public_key(ephemeral_dh_sk)
        server_nonce    = get_random_bytes(64)
        
        trace('[TLS13Server]', f'Ephemeral DH public key: {ephemeral_dh_pk}')
        trace('[TLS13Server]', f'Nonce: {hexlify(client_nonce)}')
        
        ephemeral_dh_common = key_exchange.exponentiate(ephemeral_dh_sk, ephemeral_dh_client_pk)
        if ephemeral_dh_common is None:
            trace(f'[TLS13Client]', 'Client ECDH public key is not on curve')
        
        #
        # Вырабатываю ключи для симметричного шифрования и имитовставки
        #
        
        client_packet = b''.join([key_exchange.element_to_bytes(ephemeral_dh_client_pk),
                                  client_nonce, str(offer).encode()])
        
        server_packet = b''.join([key_exchange.element_to_bytes(ephemeral_dh_pk),
                                  server_nonce, cipher_suite.encode()])
        
        key_derivation_data = b''.join([key_exchange.element_to_bytes(ephemeral_dh_common),
                                        client_packet, server_packet])
        
        encryption_key, mac_key = HKDF(key_derivation_data, cipher_type.KEY_SIZE, 
                                       b'\x00' * 16, hasher, 2)
        
        #
        # Рассчитываю нужные значения
        # 
        
        cipher = cipher_type(encryption_key)
        
        cert_request = bytes([not is_one_way])
        c1 = cipher.encrypt(cert_request)
        
        cert = pickle.dumps(self.certificate)
        c2 = cipher.encrypt(cert)
        
        signature = self.sign(client_packet, server_packet, c1[1], c2[1])
        c3 = cipher.encrypt(signature)
        
        mac = hmac(mac_key, client_packet, server_packet, c1[1], c2[1], c3[1])
        c4 = cipher.encrypt(mac)
        
        #
        # Тут будут сохранены нужные значения
        #
        
        self._mapped_states[client_id] = [None, None, None, None, None, None]
        
        #
        # Теперь выработаю ключи (если это односторонняя аутентификация)
        #
        
        if is_one_way:
            self._session_keys[client_id] = TLS13Server._generate_keys(key_exchange, cipher_type.KEY_SIZE, 
                                                                       hasher, ephemeral_dh_common, 
                                                                       client_packet, server_packet, 
                                                                       c1, c2, c3, c4)
        
        #
        # Сохраняю остальное и возвращаю клиенту
        #
        
        self._mapped_states[client_id][TLS13Server.CIPHER_TYPE]       = cipher_type
        self._mapped_states[client_id][TLS13Server.HASHER]            = hasher
        self._mapped_states[client_id][TLS13Server.HANDSHAKE_ENC_KEY] = encryption_key
        self._mapped_states[client_id][TLS13Server.HANDSHAKE_MAC_KEY] = mac_key
        self._mapped_states[client_id][TLS13Server.HANDSHAKE_VALUES]  = (ephemeral_dh_common, client_packet, 
                                                                         server_packet, c1, c2, c3, c4)
        self._mapped_states[client_id][TLS13Server.HANDSHAKE_KEY_EXCHANGE] = key_exchange
        
        return ephemeral_dh_pk, server_nonce, cipher_suite, c1, c2, c3, c4
    
    
    def process_client_certificate(self, client, c5, c6, c7):
        """
        Проверка сертификата клиента
        """
        
        client_id = client.identifier
        
        #
        # Достану ранее сохраненные параметры
        #
        
        cipher_type    = self._mapped_states[client_id][TLS13Server.CIPHER_TYPE]
        hasher         = self._mapped_states[client_id][TLS13Server.HASHER]
        encryption_key = self._mapped_states[client_id][TLS13Server.HANDSHAKE_ENC_KEY]
        mac_key        = self._mapped_states[client_id][TLS13Server.HANDSHAKE_MAC_KEY]
        
        ephemeral_dh_common, client_packet, server_packet, c1, c2, c3, c4 = \
            self._mapped_states[client_id][TLS13Server.HANDSHAKE_VALUES]
        
        key_exchange   = self._mapped_states[client_id][TLS13Server.HANDSHAKE_KEY_EXCHANGE]
        
        #
        # Расшифрую полученный пакет
        #
        
        cipher = cipher_type(encryption_key)
        
        try:
            client_certificate = cipher.decrypt(*c5)
            client_signature   = cipher.decrypt(*c6)
            client_mac         = cipher.decrypt(*c7)
        except ValueError:
            trace('[TLS13Server]', f'Malformed client response')
            return False
        
        #
        # проверю все полученные данные
        #
        
        if not self.check_certificate(client.identifier, client.public_key, 
                                      pickle.loads(client_certificate)):
            trace('[TLS13Server]', f'Invalid client certificate')
            return False
        
        if not TLS13Participant.check_signature(client.curve, client.public_key, client_signature, 
                                                client_packet, server_packet, 
                                                c1[1], c2[1], c3[1], c4[1], c5[1]):
            trace('[TLS13Server]', f'Malformed client response')
            return False
        
        if not TLS13Participant.check_mac(mac_key, client_mac, client_packet, server_packet, 
                                          c1[1], c2[1], c3[1], c4[1], c5[1], c6[1]):
            trace('[TLS13Server]', f'Malformed client response')
            return False
        
        #
        # Все чудесно - генерирую ключи
        #
        
        key_derivation_data = b''.join([key_exchange.element_to_bytes(ephemeral_dh_common),
                                        client_packet, server_packet, c1[1], c2[1], c3[1], c4[1]])
            
        self._session_keys[client_id] = TLS13Server._generate_keys(key_exchange, cipher_type.KEY_SIZE, 
                                                                   hasher, ephemeral_dh_common, client_packet, 
                                                                   server_packet, c1, c2, c3, c4)
        
        return True
            
            
    def change_keys(self, client):
        """
        Смена ключей по инициативе сервера
        """
        
        if client.identifier not in self._session_keys:
            trace('[TLS13Client]', f'Connection with client {client.identifier} is not established yet')
            return
        
        #
        # Меняю у себя
        #
        
        self._change_keys_internal(client.identifier)
        
        #
        # Меняю на сервере
        #
        
        client._change_keys_internal(self.identifier)
        
        
    def _change_keys_internal(self, client_id):
        s2c_key = HKDF(self._session_keys[client_id][TLS13Client.S2C], 
                       self._mapped_states[client_id][TLS13Server.CIPHER_TYPE].KEY_SIZE, 
                       b'\x00' * 16, self._mapped_states[client_id][TLS13Server.HASHER], 1)
        
        c2s_key = HKDF(self._session_keys[client_id][TLS13Client.C2S], 
                       self._mapped_states[client_id][TLS13Server.CIPHER_TYPE].KEY_SIZE, 
                       b'\x00' * 16, self._mapped_states[client_id][TLS13Server.HASHER], 1)
        
        trace('[TLS13Client]', f'New s -> c key: {hexlify(s2c_key)}')
        trace('[TLS13Client]', f'New c -> s key: {hexlify(c2s_key)}')
        
        self._session_keys[client_id] = (s2c_key, c2s_key)
    
    
    def _receive_message_and_respond(self, client, encrypted_message):
        client_id = client.identifier
        
        try:
            cipher = self._mapped_states[client_id][TLS13Server.CIPHER_TYPE](
                self._session_keys[client_id][TLS13Server.C2S])
            
            message = cipher.decrypt(*encrypted_message).decode()
            print(f'Received message "{message}"')
            
            cipher = self._mapped_states[client_id][TLS13Server.CIPHER_TYPE](
                self._session_keys[client_id][TLS13Server.S2C])
            
            return cipher.encrypt(f'Hello, {message}!'.encode())
        except ValueError:
            trace('[TLS13Server]', f'Malformed client message')
    
    
    @staticmethod
    def _generate_keys(key_exchange, key_size, hasher, ephemeral_dh_common, 
                       client_packet, server_packet, c1, c2, c3, c4):
        key_derivation_data = b''.join([key_exchange.element_to_bytes(ephemeral_dh_common),
                                        client_packet, server_packet, c1[1], c2[1], c3[1], c4[1]])
            
        s2c_key, c2s_key = HKDF(key_derivation_data, key_size, 
                                b'\x00' * 16, hasher, 2)
        
        trace('[TLS13Server]', f's -> c key: {hexlify(s2c_key)}')
        trace('[TLS13Server]', f'c -> s key: {hexlify(c2s_key)}')
        
        return s2c_key, c2s_key

In [24]:
class TLS13CertificationAuthority(object):
    """
    Удостоверющий центр для TLS 1.3
    """
    
    def __init__(self):
        """
        Инициализация - генерирование ключевой пары
        """
        
        self._signer = GOST3410Signer()
        
        self._certificates = set()
        self._revoked_certificates = set()
        
        trace('[TLS13CertificationAuthority]', 
              f'Public key = {self._signer.public_key}')
        
        
    @property
    def public_key(self):
        """
        Получение открытого ключа
        """
        
        return self._signer.public_key
    
    
    @property
    def curve(self):
        """
        Получение кривой, используемой для подписи
        """
        
        return self._signer.curve
        
        
    def issue_certificate(self, participant: GenericParticipant):
        """
        Выдача сертификата на открытый ключ
        """
        
        #
        # Проверю, что клиент действительно владеет закрытым ключом
        #
        
        if not self._check_participant(participant):
            trace('[TLS13CertificationAuthority]', 
                  f'Participant cannot prove a valid private key possession')
            
            return None
        
        #
        # Все чудесно, подписываю ключ и отдаю сертификат
        #
        
        id_as_bytes = str(participant.identifier).encode()
        pk_as_bytes = gost3410.pub_marshal(participant.public_key)
        
        cert = Certificate(participant.identifier, participant.public_key, 
                           self._signer.sign(id_as_bytes, pk_as_bytes))
        
        self._certificates.add(cert)
        
        trace('[TLS13CertificationAuthority]', 
              f'Issued certificate for {participant.identifier}. Total certificates: {len(self._certificates)}')
        
        return cert
    
    
    def revoke_certificate(self, participant: GenericParticipant):
        """
        Отзыв сертификата
        """
        
        #
        # А можно ли вообще отзывать сертификат?
        #
        
        if not self._check_participant(participant):
            trace('[TLS13CertificationAuthority]', 
                  f'Participant cannot prove a valid private key possession')
            
            return 
        
        if participant.certificate not in self._certificates:
            #
            # А отзывать то и нечего
            #
            
            return
            
        self._revoked_certificates.add(participant.certificate)
        self._certificates.discard(participant.certificate)
        
        trace('[TLS13CertificationAuthority]', 
              f'List of revoked certificates now has {len(self._revoked_certificates)} elements')
        
        
    def is_revoked(self, cert: Certificate):
        """
        Проверка на факт отзыва сертификата
        """
        
        return cert in self._revoked_certificates
    
    
    def _check_participant(self, participant: GenericParticipant) -> bool:
        Q = participant._get_point_for_schnorr_proof()
        c = gost3410.prv_unmarshal(get_random_bytes(64))
        t = participant._get_response(c)
        
        tP  = multiply_by_scalar(self._signer.curve, t, self._signer.generator)
        cPK = multiply_by_scalar(self._signer.curve, c, participant.public_key)
        
        return tP == add_points(self._signer.curve, Q, cPK)

----
## Демонстрация удачного выполнения протоколов

In [25]:
# Создание удостоверяющего центра
ca = TLS13CertificationAuthority()

[TRACE] [TLS13CertificationAuthority] Public key = (9237264371806511014013829797689107727684273464315529815963174839145489040913886641304691031597668626243643676499436396099470213306903270077232065018767052, 11248603154821226733167199500457017201390443518998373497952095335261467697828680467265199154840998677250801436514308795673323270928268934129282354951328341)


In [26]:
# Создание сервера
srv = TLS13Server(ca, cipher_suites)

[TRACE] [TLS13CertificationAuthority] Issued certificate for cff281f3-ba55-4780-b7dd-6d77fd16640f. Total certificates: 1
[TRACE] [GenericParticipant] Certificate = Certificate(id=UUID('cff281f3-ba55-4780-b7dd-6d77fd16640f'), public_key=(2952434368131604414648296804023549923273811297696477250255516490798054910082503155931995941052067446942158873958304268828751675726422546846256864980909435, 10976801285619484879344581361681581175939838093166147570369944311411393717047378526587962277273009206201681335355812723621680348299545013542229410386804093), signature=b"\x0c\xed*!\r3+\xf0u\x9c\xd5\xdb\xba<\xe5\x13\xd6nY\x8a@\xd0L\x1e\x19\xa4\x06\x95i\xc9%s\x89\x16\xc0\xd3\xf2\x189\xe4/\x9c\xad\xea\xe1\xf3\xba\x86\xe0\x98\x95\xa6/\xebO\xac\xa5\x1a\xa1\xf9\x9e' \x06uY\x9f\x8f^J3Pt\x12?\xe7\xcd\xefW7\xc1\x1b\x95w\x91\xaeh\x1d\xde\xbfx.\xd5\xc1US\r\xf7c\xfb\xbe\x10PQ\x152pOQ\x08-\x86B<\xd8!C\xaa\x1e;\x93\x8aK\xd8\xe6R'\x8a")


In [27]:
# Первый валидный пользователь (использует ECDH, Кузнечик в режиме MGM и Стрибог)
alice = TLS13Client(ca, cipher_suite=cipher_suites[0])

[TRACE] [TLS13CertificationAuthority] Issued certificate for 5489b7e7-55e3-4a6f-b470-d20334ecad7f. Total certificates: 2
[TRACE] [GenericParticipant] Certificate = Certificate(id=UUID('5489b7e7-55e3-4a6f-b470-d20334ecad7f'), public_key=(9907912646195752283518542507064829984787530324540992783354668365799807003812636970162196544219328064831353791558926325471671618735117058377291989481903197, 7940910856002180916797251648072974792742713835381753211360711402915576032055313250541853596244396163736690810638291394792882147812297336449016262648588954), signature=b'\xe12+\x81,\x14\xc0B\x08\x8d\xfb>\xb2\x93p\xc3]\x9f\xc6\x98=\xe3\xd8P\x8aJ\x89\x1f\xcd\t&_@H\xc6\x9a\xd4\xc8\xc4\x8cb\x85\xb9\xb9\x07\xdc\xab\x90S\x82\x96W\xaeC\x8f\xcc\x800\xf5DLe(\xd52o0\xcc\xb9_\x034\x9as\x99z\xf9v\xc5< ^\xc1\xaa\xd7\x8b[\xb0\x8a\xb9R\xc6\x0e?\xc0\x8e\x9b\xd9\xec|Z\x17m\xcaU\xcc\x99_p\xfa\x19\xd3\x12\xbdIx@\xac\xe9A\xc5!\xc3G\xfb\xb9\xee>')
[TRACE] [TLS13Client] Using cipher suite ECDHE_KUZNYECHIK_MGM_STREEBOG


In [28]:
# Второй валидный пользователь (использует DH, AES в режиме GCM и SHA256)
bob = TLS13Client(ca, cipher_suite=cipher_suites[1])

[TRACE] [TLS13CertificationAuthority] Issued certificate for c7e4bb9c-daad-4994-8593-7d9b5294b5a3. Total certificates: 3
[TRACE] [GenericParticipant] Certificate = Certificate(id=UUID('c7e4bb9c-daad-4994-8593-7d9b5294b5a3'), public_key=(2251878083096053887122890837339579300839842972941162754894340064492973075060530028700628568516407911382398518337421717175273504296092231969738657741429604, 10797531670825744352710383177024072323027560053001158677177060222586005463984629295409563417545334108698494247713700695572300471555014410227563719416267331), signature=b'\xca\xd8\xbe\xfa^\xa1\x81\xfc\x1f\xf6e\xfe\x13\x1e\x9af!\xcf&\xb5\xc2\x15\x0c\xc8S\x89\xd6J\x17\xcfEM\x84\xe6\xc9%\x1b\xdb\x1b\xf3\xf75\x91\x84\xb2\xfb\x99\xaa\x9c\x9d\x94L\xe3b\x0c\x11\xb2\x11\xf5\xe3\x9d\x93<\xaco`SA\x9a\xb2<\xc9\x037;\xc0\xa6{b\xa2\xfbX\\\x874\xf9\xd2\xe0r\x0f%#s\xb8U\xbd\x05\x9b\xc6UW\xbc\xd2\xc5\xe6&l`w\xd0$\x96\x90\xc8G\xf3\x1f/_l\xd6U\xd2\xb6\xd7\x00\x13\xa4')
[TRACE] [TLS13Client] Using cipher suite DHE_AES_G

In [29]:
# Устанавливаю соединение с односторонней аутентификацией
alice.establish_connection(srv, TLS13Client.AuthMode.OWA)

[TRACE] [EllipticCurveDiffieProcessor] Using group of order 13407807929942597099574024998205846127479365820592393377723561443721764030073546976801874298166903427690031858186486050853753882811946569946433649006083527 over curve id-tc26-gost-3410-2012-512-paramSetA
[TRACE] [TLS13Client] Ephemeral DH public key: (3096513813853603336133943633852197160438449553693664964199034785997120539064963954377747883483870354601118312309704933450977953292291866559845105767902185, 9607120987651802974574891289366619888465951454340537061761011666131589055583981856437023849057646569955810500859250877136488299994485884083804278447204886)
[TRACE] [TLS13Client] Nonce: b'8a0d1166e93a1baf9190e7d8aa0d77640fa292eed37bd918e53ef3954a718afa1b83c12af13eb800d1f1969f82b6937c295675c7aeb90146923124b439890224'
[TRACE] [TLS13Server] Using cipher suite ECDHE_KUZNYECHIK_MGM_STREEBOG with client 5489b7e7-55e3-4a6f-b470-d20334ecad7f
[TRACE] [EllipticCurveDiffieProcessor] Using group of order 13407807929942597099574024998205846

True

In [30]:
# Устанавливаю соединение с двусторонней аутентификацией
bob.establish_connection(srv, TLS13Client.AuthMode.TWA)

[TRACE] [DiffieHellmanProcessor] Using group of order 9506979994000095416554690044573828952066204242557158362409027675371121829223238249823308878975915671259387923406113220510606236315183900269250077428160271
[TRACE] [TLS13Client] Ephemeral DH public key: 1126643822731507912847953834093188016825941957864015263464159780794485770562272968528144285906204070244698564860822605692735473136459531965347801941133417
[TRACE] [TLS13Client] Nonce: b'4ad5d71de519ca4930f2a8aa53512b70584502dd4efe0269dc266d7b1ff412a0689571c6c748eec44dda91e4c0d72982db63789c7abce0e18d5d66927ab46146'
[TRACE] [TLS13Server] Using cipher suite DHE_AES_GCM_SHA256 with client c7e4bb9c-daad-4994-8593-7d9b5294b5a3
[TRACE] [DiffieHellmanProcessor] Using group of order 9506979994000095416554690044573828952066204242557158362409027675371121829223238249823308878975915671259387923406113220510606236315183900269250077428160271
[TRACE] [TLS13Server] Ephemeral DH public key: 193786648123508604997408627692646156907892861382932679994510574

True

In [31]:
# Отправлю сообщение
alice.send_wait_receive_message(srv, 'world')

Received message "world"


'Hello, world!'

In [32]:
# Сменю ключи по инициативе клиента
alice.change_keys(srv)

[TRACE] [TLS13Client] New s -> c key: b'fb806eb415d7366ee98cac1ad829c8f631dc29ddea3a3b41622f63897ad0b3b3'
[TRACE] [TLS13Client] New c -> s key: b'9f290de527efc1f41bfbad2da88fed3931d6d863f75eb496abfbf9de5de47d88'
[TRACE] [TLS13Client] New s -> c key: b'fb806eb415d7366ee98cac1ad829c8f631dc29ddea3a3b41622f63897ad0b3b3'
[TRACE] [TLS13Client] New c -> s key: b'9f290de527efc1f41bfbad2da88fed3931d6d863f75eb496abfbf9de5de47d88'


In [33]:
# И еще раз отправлю сообщение, чтобы было видно, что ключи успешно поменялись
alice.send_wait_receive_message(srv, 'world')

Received message "world"


'Hello, world!'

In [34]:
# Отправлю сообщение другим клиентом
bob.send_wait_receive_message(srv, 'crypto')

Received message "crypto"


'Hello, crypto!'

In [35]:
# Смена ключей по инициативе сервера
srv.change_keys(bob)

[TRACE] [TLS13Client] New s -> c key: b'8ffca0d5436ecd939d66028674550b6f'
[TRACE] [TLS13Client] New c -> s key: b'bfba8c1b064a5cb388808cefe3a4d6fa'
[TRACE] [TLS13Client] New s -> c key: b'8ffca0d5436ecd939d66028674550b6f'
[TRACE] [TLS13Client] New c -> s key: b'bfba8c1b064a5cb388808cefe3a4d6fa'


In [36]:
# Все по-прежнему успешно отправляется
bob.send_wait_receive_message(srv, 'crypto')

Received message "crypto"


'Hello, crypto!'

----
## Отзыв сертификата

In [37]:
# Третий валидный пользователь
carol = TLS13Client(ca, cipher_suite=cipher_suites[0])

[TRACE] [TLS13CertificationAuthority] Issued certificate for d8ccf265-c605-48c2-9a2b-ddaf120c147d. Total certificates: 4
[TRACE] [GenericParticipant] Certificate = Certificate(id=UUID('d8ccf265-c605-48c2-9a2b-ddaf120c147d'), public_key=(5651857046109576868634162258423423862904656990671186695172839768621169277992526340506249013813737168031690777631838841622751226473468613245960991313431258, 5173318820680614609971939285420556608041895982692470205348305050367655111519605590503387535753417519991320380271247569289202680700176068628129735991616370), signature=b"#\x04\t\x13\xec5\xd8\xa3\x85d\xecf\x83\x9f\xfe\xc7\xcf5;izA\xa5HU%/\x02\xd0G\t\xc2\x13\x83\x9b\x9aD\xb5\xbf\xe1\xaa\x18\xb6\xd5\xf1\x1f\x11>S\xfa.\xa8\x8b!\xc0\xb0q\xa2q\xe2!U\xe6k\xd8o$}1d\x06\x01\xb2\xb6\xa0\x18\x9e\x92\xc6\xf1'\xb6\x9a\xfeGB\xd0\xd0\xe7\xc2\xb0+S\x0c\xb0!\xb9\xcf\xb4\xcd\xd3\xa8s\x95OrYm\xb3\xb9\xa5\xeaL\x16\x98\xad\x94\x03!L\xc6Y\xf9\xb4Du\xc8\x86")
[TRACE] [TLS13Client] Using cipher suite ECDHE_KUZNYECHIK_MGM_STR

In [38]:
# Отзову сертификат Кэрол
ca.revoke_certificate(carol)

[TRACE] [TLS13CertificationAuthority] List of revoked certificates now has 1 elements


In [39]:
# Попробую подключиться, используя двустороннюю аутентификацию
carol.establish_connection(srv, TLS13Client.AuthMode.TWA)

[TRACE] [EllipticCurveDiffieProcessor] Using group of order 13407807929942597099574024998205846127479365820592393377723561443721764030073546976801874298166903427690031858186486050853753882811946569946433649006083527 over curve id-tc26-gost-3410-2012-512-paramSetA
[TRACE] [TLS13Client] Ephemeral DH public key: (12594658663370633382711267812913627773960512710998102661944583507289935379568245130289050933098253477119131783582601944236396144783393499020112945399154176, 9034024238985285238215389485186311973727919691888933821252551384807341568105959032390277505573682226140431394117847014155212933218523588972466368726546447)
[TRACE] [TLS13Client] Nonce: b'69d078821e45c36e4980741632009737a2330df045deb5329b4aa7f80aa1c49a4f70fd876e6de3ac8b7e3a88363485bf5c7f0048ded5caea0db276ac018ebc69'
[TRACE] [TLS13Server] Using cipher suite ECDHE_KUZNYECHIK_MGM_STREEBOG with client d8ccf265-c605-48c2-9a2b-ddaf120c147d
[TRACE] [EllipticCurveDiffieProcessor] Using group of order 1340780792994259709957402499820584

False

----
## Демонстрация неудачного выполнения протоколов

In [40]:
# Тут Мэллори пытается установить подключение с двусторонней аутентификацией,
# используя сертификат не на свой ключ (я просто подменю тут открытый ключ для простоты)
mallory = TLS13Client(ca, cipher_suite=cipher_suites[0], 
                      fake_data=bob.public_key)

[TRACE] [TLS13CertificationAuthority] Issued certificate for 041896fe-5aca-418d-9cc3-25dc07aac785. Total certificates: 4
[TRACE] [GenericParticipant] Certificate = Certificate(id=UUID('041896fe-5aca-418d-9cc3-25dc07aac785'), public_key=(3586322229102105266401889042110175814571234773107558059101216573681974901740781715554681570496688614662106676084209624845155849775903037462176530323583270, 5294999387932722439183288501830100569456924475662708086536925275685673935219617005235789559364807446527014322147603489828688395778736855339631716385841238), signature=b"6'p\x8f+G\xc0\x9a\x0b\xbaK<\xeb\xb7\x10\xe9\xfb\xe0\x86\x07\xac%\xe6q\x05\xc4\xffv\xb4\xf8,\x7f|\x1f\x8c\xb4\xa5\x8d\xa0O\xa1\x19\xa7dr\x91\x90\xf1AT\xfb\x8dd$\x83\xa4\x03\xac2Dm9X\x91\xb4le\xef\x00V<\x1f\x95\x1d\x01\xb9e\x1e\xbd\xcb\x13u!\xbf\x95\xee\x1e\xe5\xbb\xc0R\x05\x90CP\xa0a\xb5\x87\xbeLg\xc5\n\xd0\x86pc4I\xdeEe\xbc\xab\xfb\x17\x0b\x92%\xf0\xcb\xc5\xff\xd3\xad\xd0+")
[TRACE] [TLS13Client] Using cipher suite ECDHE_KUZNYECHIK_MG

In [41]:
mallory.establish_connection(srv, TLS13Client.AuthMode.TWA)

[TRACE] [EllipticCurveDiffieProcessor] Using group of order 13407807929942597099574024998205846127479365820592393377723561443721764030073546976801874298166903427690031858186486050853753882811946569946433649006083527 over curve id-tc26-gost-3410-2012-512-paramSetA
[TRACE] [TLS13Client] Ephemeral DH public key: (8596238456982877358679431706711567787393417309021691700599374923578333553947701929063645646869402700435814407521606348876416783334045932971071012012866517, 10727307413366236823911381186193627997238997076856823623196156279722721753164422550573205058875197345073770604548390619935113948304372116365612595691535377)
[TRACE] [TLS13Client] Nonce: b'564bb1b8d5848ad19aa2736534bf6545c8a3bb3bf8e594b5f304be10a2fdedf2909f83a5bc7c6c55b3098371b402bbe2d707a842150ca395091c920fe9fcb409'
[TRACE] [TLS13Server] Using cipher suite ECDHE_KUZNYECHIK_MGM_STREEBOG with client 041896fe-5aca-418d-9cc3-25dc07aac785
[TRACE] [EllipticCurveDiffieProcessor] Using group of order 1340780792994259709957402499820584

False

In [42]:
# Мэллори теперь пытается получить сертификат на ключ Боба, не зная его закрытого ключа
mallory = TLS13Client(ca, cipher_suite=cipher_suites[0], 
                      fake_data=bob.public_key, 
                      demo_cert_error=True)

[TRACE] [TLS13CertificationAuthority] Participant cannot prove a valid private key possession
[TRACE] [GenericParticipant] Certificate = None
[TRACE] [TLS13Client] Using cipher suite ECDHE_KUZNYECHIK_MGM_STREEBOG
