# Dependencies

In [1]:
!pip3 install pyzmq cryptography sympy pycryptodome

Defaulting to user installation because normal site-packages is not writeable


# Initialization

In [5]:
import sympy
import random
import zmq
import threading
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
import base64

In [6]:
# math
BITS_OF_PRIME_GROUP = 512

# socket
LOCAL_PORT = 4080
SERVER_HOST = "localhost"
SERVER_PORT = 4080

# Basics

## Primes and Modular arithmetics

In [7]:
def get_random_prime():
    """
    Generate a random prime number of the specified number of bits.
    """

    lower_bound = 2**(BITS_OF_PRIME_GROUP - 1)
    upper_bound = 2**BITS_OF_PRIME_GROUP - 1

    prime = sympy.randprime(lower_bound, upper_bound)
    
    return prime

def find_generator(prime):
    """
    For every prime divisor q of p -1 check if g^{(p-1)/q} ≡ 1 mod p. If that happens, discard g
    If it survives till the last prime divisor of p - 1 is a generator.
    """

    while 1:
        candidate = pow(random.randint(2, prime), 2, prime)

        # avoid g=2 because of Bleichenbacher's attack described # in "Generating
        # ElGamal signatures without knowning the secret key", 1996
        if candidate in (1, 2):
            continue

        # Discard g if it divides p-1 because of the attack described
        # in Note 11.67 (iii) in HAC
        if (prime - 1) % candidate == 0:
            continue

        # g^{-1} must not divide p-1 because of Khadir's attack described in
        # "Conditions of the generator for forging ElGamal signature", 2011
        ginv = inverse(candidate, prime)
        if (prime - 1) % ginv == 0:
            continue

        # found
        break

    return candidate

def inverse(num, prime):
    """
    Multiplicative inverse.
    """

    return pow(num, prime - 2, prime)

## Secret Key Encryption

The cryptography library contains Fernet, an implementation of symmetric (secret key) authenticated cryptography.

In [32]:
###############################################
# IMPLEMENTATION OF AES KEY ENCRYPTION SCHEME #
###############################################

def aes_generate_key():
    """
    Generate a 256-bit (32-byte) secret key for AES encryption.
    https://github.com/Legrandin/pycryptodome/blob/master/lib/Crypto/Cipher/AES.py
    https://pycryptodome.readthedocs.io/en/latest/src/cipher/aes.html
    """

    # AES-256 requires a 32-byte key
    key = get_random_bytes(32)

    return key

def pad(msg):
    """
    Pad message to be a multiple of AES block size (16 bytes).
    """

    # get how much padding
    padding_length = AES.block_size - len(msg) % AES.block_size

    # apply padding
    return msg + bytes([padding_length] * padding_length)

def unpad(msg):
    """
    Remove padding from decrypted message.
    """

    return msg[:-msg[-1]]

def aes_encrypt(msg, key):
    """
    Encrypts a message with a key using AES-256 in CTR mode.
    """

    # pad message
    msg = pad(msg)

    # init AES
    cipher = AES.new(key, AES.MODE_CTR)

    # encrypt message
    encrypted_msg = cipher.encrypt(msg)

    # return concatenation with nonce
    return cipher.nonce + encrypted_msg
    

def aes_decrypt(token, key):
    """
    Decrypts a message with a key using AES-256 in CBC mode.
    """

    # extract nonce and encrypted message
    nonce = token[:AES.block_size // 2]
    encrypted_msg = token[AES.block_size // 2:]

    # init AES
    cipher = AES.new(key, AES.MODE_CTR, nonce=nonce)

    # decrypt
    decrypted_msg = cipher.decrypt(encrypted_msg)

    # unpad
    decrypted_msg = unpad(decrypted_msg)
    return decrypted_msg

In [34]:
#####################################
# TEST OF AES KEY ENCRYPTION SCHEME #
#####################################

# test message
input_msg = b"Hello, World!"

# generate random key
key = aes_generate_key()

# encrypt the test message
token = aes_encrypt(input_msg, key)

# decrypt the test message
output_msg = aes_decrypt(token, key)

# print results
print("Original message: ", input_msg.decode("utf-8"))
print("Encrypted message: ", token)
print("Decrypted message: ", output_msg.decode("utf-8"))

Original message:  Hello, World!
Encrypted message:  b'\xf1\x9c\xd7AI\xa1\x8b\xf0\x14?\x0c\xcc)k1m\xd15\xba\xe9\x05X\x11\xc6'
Decrypted message:  Hello, World!


## Public Key Encryption

Implementation of ElGamal encryption scheme.

In [6]:
###############################################
# IMPLEMENTATION OF ELGAMAL ENCRYPTION SCHEME #
###############################################

def elgamal_generate_keys():
    """
    Generate public and private keys for the ElGamal encryption scheme.
    The naming scheme follows what has been used in the course slides.
    """

    # generate random prime that defines the group
    p = get_random_prime()

    # find a generator for the group
    g = find_generator(p)

    # generate a private key as a random number
    x = random.randint(1, p - 1)

    # calculate the public key as g^x mod p
    h = pow(g, x, p)

    return (g, p, h), x

def elgamal_encrypt(msg, public_key):
    """
    Encrypt a message with the ElGamal encryption scheme.
    """

    # from bytes to integer
    msg = int.from_bytes(msg, byteorder="big")

    # unpack public key
    g, p, h = public_key

    # generate a random number
    r = random.randint(1, p - 1)

    # calculate the two powers of the ciphertext
    gr = pow(g, r, p)
    hr = pow(h, r, p)

    # calculate the ciphertext
    c = (gr, hr * msg % p)

    return c

def elgamal_decrypt(c, private_key, public_key):
    """
    Decrypt a message with the ElGamal encryption scheme.
    """

    # unpack private key
    x = private_key

    # unpack public key
    g, p, h = public_key

    # unpack ciphertext
    c1, c2 = c

    # decrypt message
    m = (c2 * pow(c1, p - 1 - x, p)) % p

    # from integer to bytes
    m = m.to_bytes((m.bit_length() + 7) // 8, byteorder="big")

    return m

In [7]:
#####################################
# TEST OF ELGAMAL ENCRYPTION SCHEME #
#####################################

# test message
input_msg = b"Hello!"

# generate keys
public_key, private_key = elgamal_generate_keys()

# encrypt message
ciphertext = elgamal_encrypt(input_msg, public_key)

# decrypt message
output_msg = elgamal_decrypt(ciphertext, private_key, public_key)

# print results
print("Original message:", input_msg.decode())
print("Encrypted message:", ciphertext)
print("Decrypted message:", output_msg.decode())

Original message: Hello!
Encrypted message: (2231910908049189478948787713272429854670911340284814528581481336921463344875474317541623818896185702621584998700494469257338103838518197423879720937951687, 1251985525154637205235251735539389747284461852616611515780353742731893861858365380747296890948020504698694555757902524278052902188982594730809206169187411)
Decrypted message: Hello!


## Socket Communication

In [8]:
##########################################
# IMPLEMENTATION OF SOCKET COMMUNICATION #
##########################################

class Socket:
    def __init__(self, socket_type, address):
        """
        Initialize a ZeroMQ Socket.
        socket_type can be zmq.REQ, zmq.REP, zmq.PUB, zmq.SUB
        """

        # init socket settings
        self.context = zmq.Context.instance()
        self.socket = self.context.socket(socket_type)
        self.address = address

        if socket_type == zmq.REP:
            self.bind()
        elif socket_type == zmq.REQ:
            self.connect()
    
    def bind(self):
        """
        Bind the socket to given address.
        """

        self.socket.bind(self.address)
    
    def connect(self):
        """
        Connect the socket to a given address.
        """

        self.socket.connect(self.address)
    
    def send(self, msg):
        """
        Send a message through the socket.
        """

        self.socket.send_pyobj(msg)
    
    def receive(self):
        """
        Receive a message from the socket.
        """

        return self.socket.recv_pyobj()
    
    def close(self):
        """
        Close the socket.
        """

        self.socket.close()
    
    def __del__(self):
        """
        Destructor to close the socket.
        """

        self.close()

In [9]:
################################
# TEST OF SOCKET COMMUNICATION #
################################

# create a server and a client sockets
server = Socket(socket_type=zmq.REP, address=f"tcp://{SERVER_HOST}:{SERVER_PORT}")
client = Socket(socket_type=zmq.REQ, address=f"tcp://{SERVER_HOST}:{SERVER_PORT}")

# send a message from client to server
client.send("Hello, Server!")
print("Client received:", server.receive())

# send a message from server to client
server.send("Hello, Client!")
print("Server received:", client.receive())

# close the sockets
server.close()
client.close()

Client received: Hello, Server!
Server received: Hello, Client!


# Oblivious Transfer

## Passive Security

Implementation of a 1-out-of-2 Oblivious Transfer protocol with passive security.

In [10]:
class ObliviousTransfer_PS:
    """
    Implement the oblivious transfer protocol with passive security.

    In passive security, Bob sends both public keys to Alice, the one he wants to receive
    normally and the other obliviously. Alice sends the ciphertexts of both messages to Bob,
    encrypted with the corresponding public key. Bob decrypts the chosen ciphertext,
    receiving the message he wants.
    """

    def __init__(self, socket):
        # to send and receive messages
        self.socket = socket

    def receiver_side(self, choice):
        """
        Implement Bob's side of the oblivious transfer protocol with passive security.
        """

        # Bob wants message 0
        if choice == 0:
            pk0, sk0 = elgamal_generate_keys()
            pk1, _ = elgamal_generate_keys() # oblivious of secret key
            
        # Bob wants message 1
        else:
            pk0, _ = elgamal_generate_keys() # oblivious of secret key
            pk1, sk1 = elgamal_generate_keys()

        # send public keys to Alice
        self.socket.send((pk0, pk1))

        # receive the ciphertexts
        c0, c1 = self.socket.receive()

        # decrypt the chosen ciphertext
        if choice == 0:
            m = elgamal_decrypt(c0, sk0, pk0)
        else:
            m = elgamal_decrypt(c1, sk1, pk1)

        # close the socket
        self.socket.close()

        return m
    
    def sender_side(self, msg0, msg1):
        """
        Implement Alice's side of the oblivious transfer protocol with passive security.
        """

        # get the public keys from Bob
        h0, h1 = self.socket.receive()

        # encrypt the messages
        c0 = elgamal_encrypt(msg0, h0)
        c1 = elgamal_encrypt(msg1, h1)

        # send the ciphertexts to Bob
        self.socket.send((c0, c1))

        # close the socket
        self.socket.close()

In [11]:
# example of Oblivious Transfer with passive security

socket_bob = Socket(socket_type=zmq.REQ, address=f"tcp://{SERVER_HOST}:{SERVER_PORT}")
socket_alice = Socket(socket_type=zmq.REP, address=f"tcp://{SERVER_HOST}:{SERVER_PORT}")

def run_bob(socket_bob):
    # initialize Bob's side
    ot_bob = ObliviousTransfer_PS(socket_bob)

    # Bob wants message 1
    choice = 1

    # run Bob's side of the protocol
    received_message = ot_bob.receiver_side(choice)

    # show the received message
    print("Bob received:", received_message.decode("utf-8"))

    # close the socket
    socket_bob.close()

def run_alice(socket_alice):
    # initialize Alice's side
    ot_alice = ObliviousTransfer_PS(socket_alice)

    # get Alice's messages
    msg0 = b"Hello"
    msg1 = b"World"

    # run Alice's side of the protocol
    ot_alice.sender_side(msg0, msg1)

    # close the socket
    socket_alice.close()

# run Alice and Bob in separate threads
bob_thread = threading.Thread(target=run_bob, args=(socket_bob,))
alice_thread = threading.Thread(target=run_alice, args=(socket_alice,))

bob_thread.start()
alice_thread.start()

bob_thread.join()
alice_thread.join()

Bob received: World


## Active Security

Implementation of a 1-out-of-2 Oblivious Transfer protocol with active security.

In [12]:
class ObliviousTransfer_AS:
    """
    Implement the oblivious transfer protocol with active security.

    In active security, Alice sends two public keys to Bob, Bob will use the one linked to the message
    he wants to recieve to encrypt his key. Alice will decrypt it using both keys, obtaining two different
    keys. Alice will then encrypt both messages with the two keys and send them to Bob. Bob will be able
    to decrypt only the message linked to the key he chose.
    """

    def __init__(self, socket):
        # to send and receive messages
        self.socket = socket

    def sender_side(self, msg0, msg1):
        """
        Implement Alice's side of the oblivious transfer protocol with active security.
        """

        # generate two key pairs
        pk0, sk0 = elgamal_generate_keys()
        pk1, sk1 = elgamal_generate_keys()

        # send public keys to Bob
        self.socket.send((pk0, pk1))

        # recieve encrypted key from Bob
        c = self.socket.receive()

        # decrypt the key with both private keys
        k0 = Fernet(elgamal_decrypt(c, sk0, pk0))
        k1 = Fernet(elgamal_decrypt(c, sk1, pk1))

        # encrypt the messages with the keys
        c0 = secret_encrypt(msg0, k0)
        c1 = secret_encrypt(msg1, k1)

        # send the ciphertexts to Bob
        self.socket.send((c0, c1))

        # close the socket
        self.socket.close()

    def receiver_side(self, choice):
        """
        Implement Bob's side of the oblivious transfer protocol with active security.
        """

        # generate secret key
        k = secret_generate_key()

        # receive the public keys
        pk0, pk1 = self.socket.receive()

        # encrypt with the chosen public key
        if choice == 0:
            c = elgamal_encrypt(k, pk0)
        
        else:
            c = elgamal_encrypt(k, pk1)

        # send the encrypted key to Alice
        self.socket.send(c)

        # receive the ciphertexts
        c0, c1 = self.socket.receive()

        # decrypt the chosen ciphertext
        if choice == 0:
            m = secret_decrypt(c0, k)

        else:
            m = secret_decrypt(c1, k)

        # close the socket
        self.socket.close()

        return m

In [23]:
k = secret_generate_key()
pk0, sk0 = elgamal_generate_keys()
pk1, sk1 = elgamal_generate_keys()
c = elgamal_encrypt(k, pk0)
k0 = elgamal_decrypt(c, sk0, pk0)
k1 = elgamal_decrypt(c, sk1, pk1)

secret_encrypt(b"a", k0)
secret_encrypt(b"b", k1)

ValueError: Fernet key must be 32 url-safe base64-encoded bytes.

In [14]:
k == k0

True

In [18]:
import base64
import binascii

binascii.a2b_base64(k0)

b'\xcd|G\xf6t\x04\xc8hw\xec\xce8v+\x10\x89M,\x167\x8bG;\x15\x12\x05h\ne\xc3\x15r'

In [None]:
# example of Oblivious Transfer with active security

socket_bob = Socket(socket_type=zmq.REP, address=f"tcp://{SERVER_HOST}:{SERVER_PORT}")
socket_alice = Socket(socket_type=zmq.REQ, address=f"tcp://{SERVER_HOST}:{SERVER_PORT}")

def run_bob(socket_bob):
    # initialize Bob's side
    ot_bob = ObliviousTransfer_AS(socket_bob)

    # Bob wants message 1
    choice = 1

    # run Bob's side of the protocol
    received_message = ot_bob.receiver_side(choice)

    # show the received message
    print("Bob received:", received_message.decode("utf-8"))

    # close the socket
    socket_bob.close()

def run_alice(socket_alice):
    # initialize Alice's side
    ot_alice = ObliviousTransfer_AS(socket_alice)

    # get Alice's messages
    msg0 = b"Hello"
    msg1 = b"World"

    # run Alice's side of the protocol
    ot_alice.sender_side(msg0, msg1)

    # close the socket
    socket_alice.close()

# run Alice and Bob in separate threads
bob_thread = threading.Thread(target=run_bob, args=(socket_bob,))
alice_thread = threading.Thread(target=run_alice, args=(socket_alice,))

bob_thread.start()
alice_thread.start()

bob_thread.join()
alice_thread.join()

Exception in thread Thread-8 (run_alice):
Traceback (most recent call last):
  File "c:\ProgramData\anaconda3\Lib\site-packages\cryptography\fernet.py", line 34, in __init__
    key = base64.urlsafe_b64decode(key)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\ProgramData\anaconda3\Lib\base64.py", line 134, in urlsafe_b64decode
    return b64decode(s)
           ^^^^^^^^^^^^
  File "c:\ProgramData\anaconda3\Lib\base64.py", line 88, in b64decode
    return binascii.a2b_base64(s, strict_mode=validate)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
binascii.Error: Incorrect padding

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "c:\ProgramData\anaconda3\Lib\threading.py", line 1075, in _bootstrap_inner
    self.run()
  File "c:\ProgramData\anaconda3\Lib\threading.py", line 1012, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\Utente\AppData\Local\Temp\ipykernel_17476\342073072.py", line 31