# Imports

In [None]:
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
import jwt
from datetime import datetime, timezone
import re
import hashlib
from typing import Any
import requests
import json
import pickle
import base64
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey, RSAPublicKey
from typing import cast
from time import sleep

# Token client class definition (mimics mobile app)

In [None]:
DEV_URL = "http://127.0.0.1:8000"
DEV_URL = "http://192.168.203.132:8000"
DEV_URL = "https://zaqzxcswsde.ru"

In [None]:
class TokenClient:

    app_version = "v0.0.5"
    api_url = "/mainrequest/"
    base_url = DEV_URL


    def __init__(self, pin = "1234567890", saved_token = None):
        self.private_key: RSAPrivateKey
        self.public_key: RSAPublicKey

        if not saved_token:
            self.private_key, self.public_key = self._generate_keypair()
            self.private_bytes, self.public_bytes = self._get_key_bytes()
            self.ticket = ""
        else:
            decoded_data = self._restore_saved_token(saved_token)
            if decoded_data:
                self.private_bytes, self.public_bytes, self.ticket = decoded_data
                self.private_key, self.public_key = self._get_keys_from_bytes()

        self.fingerprint = self._get_fingerprint()
        self.pin = self._get_pin(pin)

    def _get_url(self):
        return self.base_url + self.api_url

    def _generate_keypair(self) -> tuple[RSAPrivateKey, RSAPublicKey]:
        private_key: RSAPrivateKey =  rsa.generate_private_key(key_size=2048, public_exponent=65537)
        public_key: RSAPublicKey = private_key.public_key()
        return (private_key, public_key)
    
    def _get_key_bytes(self) -> tuple[str, str]:
        private_bytes = self.private_key.private_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PrivateFormat.PKCS8,
                encryption_algorithm=serialization.NoEncryption(),
            ).decode()
        # private_bytes = re.sub(r'(\r\n)|\n', '', private_bytes)

        public_bytes = self.public_key.public_bytes(
                    encoding=serialization.Encoding.PEM,
                    format=serialization.PublicFormat.SubjectPublicKeyInfo,
                ).decode()
        self._raw_public_bytes = public_bytes
        public_bytes = re.sub(r'(\r\n)|\n', '', public_bytes)

        return (private_bytes, public_bytes)
    
    def _get_keys_from_bytes(self) -> tuple[RSAPrivateKey, RSAPublicKey]:
        private_key = cast(RSAPrivateKey,
            serialization.load_pem_private_key(
                self.private_bytes.encode(),
                password=None
            )
        )
        public_key = cast(RSAPublicKey,
            serialization.load_pem_public_key(
                self.public_bytes.encode()
            )
        )
        return (private_key, public_key)
    
    def _get_fingerprint(self):
        return ' '.join(f'{byte:02X}' for byte in hashlib.sha256(self.public_bytes.encode('utf-8')).digest()[:6])
    
    def _get_pin(self, pin):
        return hashlib.sha256(f"{pin}{self.fingerprint}".encode('utf-8')).hexdigest()
    
    def _get_jwt_dict(self):
        jwt_dict = {
            "version": self.app_version,
            "request_time": datetime.now(timezone.utc).isoformat(),
            "public_key": self.public_bytes,
            "pin": self.pin,
            "ticket": self.ticket
        }
        return jwt_dict
    
    def _encode_jwt_token(self, jwt_dict : dict[str, Any]):
        return jwt.encode(jwt_dict, self.private_key, algorithm="RS256")
    
    def _get_request_data(self, jwt_token):
        return {'token': jwt_token}
    
    def _get_ticket_from_response(self, response):
        return json.loads(response)['ticket']

    def send_mainrequest(self):
        jwt_dict = self._get_jwt_dict()
        jwt_token = self._encode_jwt_token(jwt_dict)
        request_data = self._get_request_data(jwt_token)

        response = requests.post(self._get_url(), json=request_data)

        if response.status_code == 200:
            self.ticket = self._get_ticket_from_response(response.content)

        return (response.status_code, response.content)

    def save_token(self):
        pickled_data = pickle.dumps((self.private_bytes, self._raw_public_bytes, self.ticket))
        base64_bytes = base64.b64encode(pickled_data)
        return base64_bytes.decode()


    def _restore_saved_token(self, encoded_string : str):
        base64_bytes = encoded_string.encode()
        pickled_data = base64.b64decode(base64_bytes)
        unpickled_data: tuple[str, str, str] = pickle.loads(pickled_data)
        if (type(unpickled_data) != tuple): return None
        if len(unpickled_data) != 3: return None
        unpickled_data = (
            unpickled_data[0],
            re.sub(r'(\r\n)|\n', '', unpickled_data[1]),
            unpickled_data[2]
        )
        return unpickled_data
    
    def __str__(self):
        return self.fingerprint
    
    def __repr__(self):
        return f"Token({self.fingerprint})"

# Code playground

In [None]:
class MassTokenCreator:

    tokens_dict : dict[str, TokenClient] = dict()

    def __init__(self, amount):
        for _ in range(amount):
            token = TokenClient()
            print(token.send_mainrequest())
            self.tokens_dict[token.fingerprint] = token
        print(list(self.tokens_dict.keys()))
        
    @classmethod
    def send_requests(cls, tokens: list[str] | None = None):
        if tokens:
            for fingerprint in tokens:
                print(fingerprint, cls.tokens_dict[fingerprint].send_mainrequest())
        else:
            for token in cls.tokens_dict.values():
                print(token.fingerprint, token.send_mainrequest())

    @classmethod
    def clear_tokens(cls, tokens: list[str] | None = None):
        if tokens:
            for token in tokens:
                cls.tokens_dict.pop(token)
        else:
            cls.tokens_dict.clear()


In [None]:
MassTokenCreator.clear_tokens()
MassTokenCreator(1)

In [None]:
MassTokenCreator.send_requests()

In [None]:
def get_login_url(user_id):
    return DEV_URL + "/canlogin/" + user_id

In [None]:
# создание нового экземпляра токена
token = TokenClient(pin="0123456789")
# отпечаток нового токена
print(token.fingerprint)
# регистрация токена, получаем первый тикет
print(token.send_mainrequest())

# сохранение токена в файл для переиспользования
# и удаление переменной
open("saved_token.txt", 'wt').write(token.save_token())
del token

In [None]:
# восстановление токена из файла
token = TokenClient(saved_token=open("saved_token.txt", 'rt').read())
# отпечаток восстановленного токена, не должен измениться
print(token.fingerprint)

# запрос
# завершится ошибкой, потому что токен создаётся неактивным
print(token.send_mainrequest())

In [None]:
# токен был активирован через веб-интерфейс
# теперь запрос завершится успехом
print(token.send_mainrequest())

# к токену был через веб-интерфейс привязан пользователь
USER_UUID = "4e0e82b7-836d-4e13-adfc-7837e55c0ee8"

# ждём, пока подтверждение истечёт
sleep(6)

# запрашиваем вход пользователя после ожидания
# получаем false
print(requests.get(get_login_url(USER_UUID)).content)

# теперь запрашиваем вход сразу после подтверждения
# получим true
token.send_mainrequest()
print(requests.get(get_login_url(USER_UUID)).content)

# пытаемся войти второй раз, сработает защита от двойного входа
# снова получим false
print(requests.get(get_login_url(USER_UUID)).content)

# Server Load Testing

In [None]:
import asyncio
from random import randint
from concurrent.futures import ThreadPoolExecutor
from asyncio import CancelledError, Task

tokens_array = list()

async def create_50_tokens(amount = 50) -> None:
    global tokens_array
    tokens_array = [TokenClient() for _ in range(amount)]
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor() as executor:
        for token in tokens_array:
            loop.run_in_executor(executor, token.send_mainrequest)

async def send_periodic_requests(token: TokenClient):
    try:
        while True:
            token.send_mainrequest()
            print(f"sent request from token {token.fingerprint}")
            await asyncio.sleep(randint(30,50)/10)
    except asyncio.CancelledError:
        print(f"cancelled requests from token {token.fingerprint}")

async def start_periodic_requests():
    tasks : list[Task] = list()
    try:
        for token in tokens_array:
            task = asyncio.create_task(send_periodic_requests(token))
            tasks.append(task)
        await asyncio.gather(*tasks) # infinite loop here
    except CancelledError:
        print("started cancellation procedure")
        for task in tasks:
            task.cancel()
        await asyncio.gather(*tasks, return_exceptions=True)
        print("cancellation procedure finished")

In [None]:
await create_50_tokens()

In [None]:
await start_periodic_requests()