In [74]:
import numpy as np
import random
import math
import hashlib
import requests

In [75]:
class BBS:
    def __init__(self, p, q, state = 0):
        self.n = p*q
        self.state = state

        if state == 0:
            self.state = random.randint(2, self.n - 1)
    
    def generate_bytes(self, n: int):
        seq = np.zeros(n, dtype=object)
        seq[0] = self.state

        for i in range(1, n):
            seq[i] = pow(seq[i - 1], 2, self.n)

        self.state = seq[-1]
        seq = np.array(seq % (2**8), dtype=np.uint8) 

        return seq
    
def bytes_to_num(byte_seq):
    res = 0
    for b in byte_seq:
        res = res*(2**8) + int(b)

    return res


In [76]:
OPTIMUS_PRIMES = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47]
R = {}

for d in OPTIMUS_PRIMES:
    R[d] = [1]
    while R[d].count(R[d][-1]) < 2:
        R[d].append((R[d][-1] * 2) % d)
    R[d].pop()

# ### Метод пробних ділень
def petod_drobnyx_mylen(num):
    b = bin(num)[:1:-1]
    
    if b[0] == '0':
        return 2
    
    for d in OPTIMUS_PRIMES[1::]:
        sum = 0
        for i in range(len(b)):
            sum += int(b[i]) * R[d][i % len(R[d])]
            sum %= d
        
        if sum == 0:
            return d

    return 1
    
# ### Ймовірнісний алгоритм Міллера-Рабіна та загальний алгоритм для знаходження простих чисел
def miller_rabin(num, base):
    i = 1
    while (num - 1) % (2 ** i) == 0:
        i += 1

    k = i - 1
    d = (num - 1) // (2 ** k)

    a_d = pow(base, d, num)

    if a_d == 1:
        return True
    
    a_d2i = a_d
    for j in range(k):
        if a_d2i == (num - 1):
            return True
        
        a_d2i = (a_d2i ** 2) % num

    return False


def check_prime(num, error_prob = 0.005):
    if petod_drobnyx_mylen(num) != 1:
        return False

    t = int(math.ceil(math.log(1 / error_prob, 4)))
    s = 0
    for _ in range(t):
        a = random.randrange(3, num + 1)
        s += int(miller_rabin(num, a))

    return s > (t / 2)

def generate_prime(len: int, excl = []):
    gen = BBS(int('425D2B9BFDB25B9CF6C416CC6E37B59C1F', 16), int('D5BBB96D30086EC484EBA3D7F9CAEB07', 16))

    while True:
        p = bytes_to_num(gen.generate_bytes(len // 8))
        if check_prime(p) and (p not in excl):
            return p

    
def generate_safe_prime(len: int, excl = []):
    gen = BBS(int('425D2B9BFDB25B9CF6C416CC6E37B59C1F', 16), int('D5BBB96D30086EC484EBA3D7F9CAEB07', 16))

    while True:
        p = bytes_to_num(gen.generate_bytes(len // 8))
        if not check_prime(p) or (p in excl):
            continue

        q = (p - 1) // 2
        if check_prime(q):
            return p


In [77]:
KEY_LENGTH = 256


class Server:
    __base_url = 'http://asymcryptwebservice.appspot.com/rsa/'
    s = requests.Session()
    n = None
    e = None
    
    # Setup server private key and receive server pub key
    def set_server_key(self, key_l: int) -> (str, str):
        req = f'{self.__base_url}serverKey?keySize={key_l}'
        r = self.s.get(req)
        if r.status_code != 200:
            raise RuntimeError(f"Incorrect server status code {r.status_code} for request {req}")
        r = r.json()
        print(r)
        self.n, self.e = (int(r['modulus'], 16), int(r['publicExponent'], 16))
        return (self.n, self.e)
    
    # Ask server to encrypt
    def encrypt(self, M: str, rec_n, rec_e, type='TEXT'):
        req = f'{self.__base_url}encrypt?modulus={format(rec_n, "X")}&publicExponent={format(rec_e, "X")}&message={M}&type={type}'
        r = self.s.get(req)
        if r.status_code != 200:
            raise RuntimeError(f"Incorrect server status code {r.status_code} for request {req}")
        r = r.json()
        print(r)
        return r['cipherText']
    
    # Ask server to decrypt this message with his private keys
    def decrypt(self, C: str, type='TEXT'):
        req = f'{self.__base_url}decrypt?message={C}&expectedType={type}'
        r = self.s.get(req)
        if r.status_code != 200:
            raise RuntimeError(f"Incorrect server status code {r.status_code} for request {req}")
        r = r.json()
        print(r)
        return r['message']
    
    # Ask server to sign this message with his private keys
    def sign(self, M: str, type='TEXT'):
        req = f'{self.__base_url}sign?message={M}&type={type}'
        r = self.s.get(req)
        # if r.status_code != 200:
        #     raise RuntimeError(f"Incorrect server status code {r.status_code} for request {req}")
        r = r.json()
        print(r)
        return r['signature']
    
    # Verify the message using this public key
    def verify(self, M: str, sign: str, u_n, u_e, type='TEXT'):
        req = f'{self.__base_url}verify?message={M}&type={type}&signature={sign}&modulus={format(u_n, "X")}&publicExponent={format(u_e, "X")}'
        r = self.s.get(req)
        if r.status_code != 200:
            raise RuntimeError(f"Incorrect server status code {r.status_code} for request {req}")
        r = r.json()
        print(r)
        return r['verified']
    
    # Receive a pair (64bit encrypted key, signature for this key) from the server
    def sendKey(self, rec_n, rec_e) -> (str, str):
        req = f'{self.__base_url}sendKey?modulus={format(rec_n, "X")}&publicExponent={format(rec_e, "X")}'
        print(req)
        r = self.s.get(req)
        if r.status_code != 200:
            raise RuntimeError(f"Incorrect server status code {r.status_code} for request {req}")
        r = r.json()
        print(r)
        return (r["key"], r['signature'])
    
    # Ask server to decrypt and verify key encrypted with user's modulo and publicExponent
    def receiveKey(self, K_enc: str, sign, u_n, u_e):
        req = f'{self.__base_url}receiveKey?key={K_enc}&signature={sign}&modulus={format(u_n, "X")}&publicExponent={format(u_e, "X")}'
        r = self.s.get(req)
        if r.status_code != 200:
            raise RuntimeError(f"Incorrect server status code {r.status_code} for request {req}")
        r = r.json()
        print(r)
        return r['verified']


In [83]:
def str2hex(s: str):
    res = ""

    for c in s:
        cb = hex(ord(c))
        res += cb[2::]

    return res

def num2str(n: int):
    text = str()
    while n != 0:
        text += chr(n % 256)
        n //= 256
    
    return text[::-1]


class User:
    def __init__(self, p, q, e = 2**16 + 1, serv = Server()):
        if not check_prime(p) or not check_prime(q):
            raise RuntimeError("p or q is not a prime number.")

        if math.gcd(e, (p-1)*(q-1)) != 1:
            raise RuntimeError("e is not invertible modulo phi(n)")
        
        self.serv = serv
        self.p = p
        self.q = q
        self.e = e
        self.n = p*q
        self.d = pow(e, -1, (self.p - 1)*(self.q - 1))
        self.get_server_public_key(KEY_LENGTH * 2)
    
    def get_server_public_key(self, len: int):
        self.serv.set_server_key(len)


    def send_message(self, M: str):
        s_n = self.serv.n, 16
        s_e = self.serv.e, 16

        C = pow(int(str2hex(M), 16), s_e, s_n)

        M1 = self.serv.decrypt(C)

        check = (M1 == M)

    def send_message_sign(self, M: str):
        S = pow(int(str2hex(M), 16), self.d, self.n)

        auth_check = self.serv.verify(M, S, self.n, self.e)
    
    def receive_message(self, M: str):
        C = self.serv.encrypt(M, self.n, self.e)
        M1 = num2str(pow(int(C, 16), self.d, self.n))

        check = (M1 == M)
        print(check)
        print(M1)
        print(M)
        
    def receive_message_sign(self, M: str):
        S = self.serv.sign(M)
        M1 = num2str(pow(int(S, 16), self.serv.e, self.serv.n))
        
        auth_check = (M1 == M)
        print(auth_check)
        print(M1)
        print(M)
        
        
    def recieve_secret_key(self):
        if self.n < self.serv.n:
            raise RuntimeError("Cannot receive secret key")
        
        # <E(K), E(S(K))>
        s_K, s_S = self.serv.sendKey(self.n, self.e)
        K = pow(int(s_K, 16), self.d, self.n)
        S = pow(int(s_S, 16), self.d, self.n)

        auth_check = (pow(S, self.serv.e, self.serv.n) == K)
        print(auth_check)
        

    def send_secret_key(self, K: str):
        if self.n > self.serv.n:
            raise RuntimeError("Cannot send secret key")

        EK = pow(int(K, 16), self.serv.e, self.serv.n)
        ES = pow(pow(int(K, 16), self.d, self.n), self.serv.e, self.serv.n)

        auth_check = self.serv.receiveKey(format(EK, "X"), format(ES, "X"), self.n, self.e)


In [79]:
p = generate_safe_prime(KEY_LENGTH)
print(p)

q = generate_safe_prime(KEY_LENGTH, [p])
print(q)


11400796500694357019777089912014309300047609920694224038906550774965572951219
69255514385209446989141652696070457462256500802508808189347176455744534343479


In [84]:
u = User(p, q)

# u.recieve_secret_key()
u.send_secret_key("dfb0cd9586cf2d9f".capitalize())
# u.receive_message("Pipa")
# u.receive_message_sign("Pipa")


{'modulus': '8208CF086C81A99199F013CE0C93D023A97DB11E7411B3CD5A5B66B16C0C229DDD8CD714EC47D0D819C9ECBEF0B4A9BEF553CF0799BCA8E6120C51E4B281B46F', 'publicExponent': '10001'}
{'key': 'DFB0CD9586CF2D9F', 'verified': True}
