<a href="https://colab.research.google.com/github/Iykechuks11/STS-Protocol-Implementation-with-RSA-Signatures/blob/master/sts_5_2_implementation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### STS-IMPLEMENTATION (5.2)

Tools I used to achieve this implementation.
- DHKE
- RSA DSA
- AES
- X. 509 certificate

In [None]:
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption, PublicFormat
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import os
from cryptography.hazmat.primitives.asymmetric import dh
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography import x509
from cryptography.x509.oid import NameOID
import datetime
from cryptography.hazmat.backends import default_backend # why?
# Pad the signed content to make its length a multiple of the block size
from cryptography.hazmat.primitives.padding import PKCS7

In [None]:
class AES:
    def __init__(self, shared_key):
        if len(shared_key) not in {16, 24, 32}:
            raise ValueError("Key must be 128, 192, or 256 bits.")
        self.shared_key = shared_key

    def aes_encrypt(self, content):
        # Generate a random IV (Initialization Vector)
        iv = os.urandom(16)

        # Create the AES cipher in CBC mode
        cipher = Cipher(algorithms.AES(self.shared_key), modes.CBC(iv), backend=default_backend())
        encryptor = cipher.encryptor()

        # Pad the content
        padder = PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(content) + padder.finalize()

        # Encrypt the padded data
        encrypted_content = encryptor.update(padded_data) + encryptor.finalize()

        return iv + encrypted_content  # Prepend IV to the ciphertext

    def aes_decrypt(self, encrypted_data):
        # Extract the IV from the beginning of the ciphertext
        iv = encrypted_data[:16]
        ciphertext = encrypted_data[16:]

        # Create the AES cipher in CBC mode
        cipher = Cipher(algorithms.AES(self.shared_key), modes.CBC(iv), backend=default_backend())
        decryptor = cipher.decryptor()

        # Decrypt the ciphertext
        padded_content = decryptor.update(ciphertext) + decryptor.finalize()

        # Remove padding
        unpadder = PKCS7(algorithms.AES.block_size).unpadder()
        content = unpadder.update(padded_content) + unpadder.finalize()

        return content

In [None]:
def derived_key(shared_key):
  derived_key = HKDF(
    algorithm=hashes.SHA256(),
    length=32,
    salt=None,
    info=b'handshake data',
    ).derive(shared_key)

  return derived_key

# Converts public key objects to bytes
def public_key_to_bytes(public_key):
  pub_key = public_key.public_bytes(
      encoding=serialization.Encoding.DER,
      format=serialization.PublicFormat.SubjectPublicKeyInfo
  )

  return pub_key

In [None]:
# Generates RSA key-pairs (Long-term key)

def generate_keys(name):
  private_key = rsa.generate_private_key(
  public_exponent=65537,
  key_size=2048,
  )

  with open(f"{name.lower()}_private_key.pem", "wb") as f:
    f.write(private_key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.TraditionalOpenSSL,
        encryption_algorithm=serialization.BestAvailableEncryption(b"passphrase"),
    ))

  public_key = private_key.public_key()

  return public_key


In [None]:
# RSA DSA (Sign, Verify)
def sign_public_keys(name, message):

  with open(f"/content/{name}_private_key.pem", "rb") as key_file:
    private_key = serialization.load_pem_private_key(
        key_file.read(),
        password=b"passphrase",
    )

  signature = private_key.sign(
    message,
    padding.PSS(
        mgf=padding.MGF1(hashes.SHA256()),
        salt_length=padding.PSS.MAX_LENGTH
    ),
    hashes.SHA256()
    )

  return signature

# Verification algorithm
def verify(public_key, signature, message):
  public_key.verify(
      signature,
      message,
      padding.PSS(
          mgf=padding.MGF1(hashes.SHA256()),
          salt_length=padding.PSS.MAX_LENGTH
      ),
      hashes.SHA256()
  )

In [None]:
# Self-signed certificates
def certificate(name, public_key):

  with open(f"/content/{name}_private_key.pem", "rb") as key_file:
    private_key = serialization.load_pem_private_key(
      key_file.read(),
      password=b"passphrase",

      )

  subject = issuer = x509.Name([
    x509.NameAttribute(NameOID.COUNTRY_NAME, "MK")])

  cert = x509.CertificateBuilder().subject_name(
    subject
    ).issuer_name(
        issuer
    ).serial_number(
        x509.random_serial_number()
    ).not_valid_before(
        datetime.datetime.now(datetime.timezone.utc)
    ).not_valid_after(
        datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=10)
    ).public_key(public_key).sign(private_key, hashes.SHA256())

  with open(f"{name.lower()}_certificate.pem", "wb") as f:
    f.write(cert.public_bytes(serialization.Encoding.PEM))


## Protocol Starts

## Alice sends public (Ephemeral) keys using DH approach.

In [None]:
# Generate some parameters. These can be reused.
parameters = dh.generate_parameters(generator=2, key_size=1024)

In [None]:
# Alice sends (a, p, a^x)
def alice_generates(parameters):
  alice_private_key = parameters.generate_private_key()
  alice_public_key = alice_private_key.public_key()
  alice_public_key_bytes = public_key_to_bytes(alice_public_key)

  return alice_public_key_bytes, alice_public_key, alice_private_key


[alice_public_key_bytes, alice_public_key, alice_private_key] = alice_generates(parameters)

In [None]:
alice_public_key
# alice_private_key

<cryptography.hazmat.bindings._rust.openssl.dh.DHPrivateKey at 0x799443e15bf0>

## Bob receives public parameters, computes shared key, signs public parameter keys, encrypts, generates self-signed certificate and sends ----->

In [None]:
# Bob computes k_yx, sends (a^y, cert_b, E_k_yx(sign(a^y, a^x)))

def bob_generates(alice_public_key, parameters, name='bob'):
  bob_private_key = parameters.generate_private_key()
  bob_public_key = bob_private_key.public_key()

  # Bob derives shared key
  bob_shared_key = bob_private_key.exchange(alice_public_key)
  bob_derived_key = derived_key(bob_shared_key)
  print(bob_derived_key)

  # Bob generates long-term keys
  bob_pub = generate_keys(name)
  print(f'Bob_pub: {bob_pub}')

  # Bob signs both public keys
  bob_message = public_key_to_bytes(bob_public_key) + public_key_to_bytes(alice_public_key)
  print(f'Bob_message: {bob_message}')

  signature = sign_public_keys(name, bob_message)
  print(f'Bob signs concatenated key: {signature}')

  # Encrypt signed
  aes = AES(bob_derived_key)
  encrypted_signed_public_keys = aes.aes_encrypt(signature)
  print(f'Bob encrypted concatenated keys: {encrypted_signed_public_keys}')

  # Generate self-signed certificate
  certificate(name, bob_pub)

  data = {
      "bob_public_key": bob_public_key,
      "encrypted_parameters": encrypted_signed_public_keys,
      "bob_public_long_key": bob_pub,
      "bob_message": bob_message,
      "bob_derived_key": bob_derived_key
  }

  return data

In [None]:
bob_sends = bob_generates(alice_public_key, parameters)

bob_public_key = bob_sends["bob_public_key"]
encrypted_parameters = bob_sends["encrypted_parameters"]
bob_public_long_key = bob_sends["bob_public_long_key"]
bob_message = bob_sends["bob_message"]
bob_derived_key = bob_sends["bob_derived_key"]


b'~\xb5S\x19\xe2P H%\x80\x1e;8\xb1,^\x91]\xd4\x164\xe5Ix\x9f\xd0\xfc\x8b/\x95L\x12'
Bob_pub: <cryptography.hazmat.bindings._rust.openssl.rsa.RSAPublicKey object at 0x7f861f71a350>
Bob_message: b'0\x82\x01\x1f0\x81\x95\x06\t*\x86H\x86\xf7\r\x01\x03\x010\x81\x87\x02\x81\x81\x00\xaeU_\xdaK\xd3G\xc4\xfe/ \x97\xb7U>|\xb0\x18\x033\xa6\xed\x11\xe8nk\x12h\xa61~s\xf7\xfb\x03\xe6o_\x11\xa5\xb3\x96\x98\xc5sUg\xa8Z\x89\x84E\xd0\xce]\xf4\xaf\x0e\xa3\xcc\xe5\x897\x87J\xcd\xbb\x1f*:fH+Z\x1a.\x1c\xc3\x97\xa6\t\xc3\xa7]\xad\x1eP\r\xb7R\xa2E\xaa\xa2\xa3oq\x91_\x1cm\xd18\xc3z\xdexg\xc2\xf7\xe0$\xb2\n1]N\xd9\xf7\xfc3"\x95\x18\xe7\x9cP\xbf\x02\x01\x02\x03\x81\x84\x00\x02\x81\x80}\xa30{\x0e\xa9kM\xefq\xdb[\x96\x10\xcc*\x0b]\x8b"\xbe\xf7C\x8f\x98\xec\xc4-\xb4E=\xe4%R\xf4>\x9d\xc1\xd7\xd2\x02+U\xed\x85\\\xd92BdF\x00J\x91\xee\x8e\xe5\xea\xda\xfb\xd8v\xcb\x9f!\xfb5I\xcf\x05"iy\xbc\xc4\xa8w\xda\xd4\xb5Y/\x0e[\x0f\x1cT\xe26\xaam\x16\xd2B\x9e\xbc4xh\xf3\xa6\xc3\x1a\xc7Rp\x95\xe4\xe1\xd9\x92\xda6\xe2\xea\x17\x12O\x

### Alice verifies Bob

In [None]:
# Alice receives a^y, computes k_xy, decrypts, D_k_xy, and verifies cert_b
# Sends cert_a, E_k_xy(sign(a^x, a^y))

def alice_receives(alice_public_key, bob_public_key, encrypted_signed_public_keys, bob_message, name='bob'):
  # Alice derives shared key
  alice_shared_key = alice_private_key.exchange(bob_public_key)
  alice_derived_key = derived_key(alice_shared_key)
  print(f'Alice shared ephemeral key: {alice_derived_key}')

  # Decrypts the signed encrypted signed message
  aes = AES(alice_derived_key)
  decrypted_data = aes.aes_decrypt(encrypted_signed_public_keys)

  print(f'Decrypted signed message: {decrypted_data}')

  # Retrieves bobs public key
  with open(f"{name.lower()}_certificate.pem", "rb") as cert_file:
    cert_data = cert_file.read()

  # Parse the certificate
  bob_certificate = x509.load_pem_x509_certificate(cert_data)

  # Extract the bob public key from certificate
  extracted_bob_public_long_key = bob_certificate.public_key()

  # Verifies decrypted signed message
  verification = verify(extracted_bob_public_long_key, decrypted_data, bob_message)
  cert_file.close()


  try:
    print('Verification was successful')
    print('Alice verifies Bob!\n\n\n')

  except:
    print(verification)

  # Alice then
  # Generates long-term keys
  alice_pub = generate_keys('alice')  ## Long-lasting key
  print(f'Alice_pub: {alice_pub}')

  # Bob signs both public keys
  alice_message = public_key_to_bytes(alice_public_key) + public_key_to_bytes(bob_public_key)
  print(f'Alice_message: {alice_message}')

  signature = sign_public_keys('alice', alice_message)
  print(f'Alice signs concatenated key: {signature}')

  # Encrypt signed
  aes = AES(alice_derived_key)
  alice_encrypted_signed_public_keys = aes.aes_encrypt(signature)
  print(f'Alice encrypted concatenated keys: {alice_encrypted_signed_public_keys}')

  # Generate self-signed certificate
  cert = certificate('alice', alice_pub)

  data = {
      "alice_public_key": alice_public_key,
      "encrypted_parameters": alice_encrypted_signed_public_keys,
      "alice_public_long_key": alice_pub,
      "alice_message": alice_message,
  }

  return data

In [None]:
alice_sends = alice_receives(alice_public_key, bob_public_key, encrypted_parameters, bob_message)

alice_encrypted_signed_public_keys = alice_sends["encrypted_parameters"]
alice_message = alice_sends["alice_message"]

Alice shared ephemeral key: b'~\xb5S\x19\xe2P H%\x80\x1e;8\xb1,^\x91]\xd4\x164\xe5Ix\x9f\xd0\xfc\x8b/\x95L\x12'
Decrypted signed message: b'\xb5\xaf\xa9\xc3\xcaM\xecTE\xcb\xd8\xc9\x93j\xf7w\x02j\xa3\xad7\xdb\x01.\x87\x02$\xad\xb3\xd6\xc8\x9f\x1f\xe3\xe3\xf2\xf5C\x04$x\xe79\x8b9\xf5\xe4\xd2\xf0\xd4\x82m\xe8\xb7\x99?\xf0\xdf%\x16T\xf9\\\xa1x],\xb4\xd5\xfeq\x8e\x97\xfb\n\x95\xf84\xa9 PY\x10\x91\n\xf0\x04\xff\xbe6\xc2\xa2\xdf\x9145\x8bw\xc4\xba\xe0\x15\xa3i\xf6\xdc\xc6\xe0c\xca\xd3G\xf2\x15\xd8 \x85\xfdb\x8f\xfa~\xb7e\x1ag\xb1W\xdfe\xff\xea5\xc2\x81l\xe8G\xfd\x88\n\xb7\xccobf8\xdd\x9f\xa5I\xeb\xa6\x86\xd3\xc1F\x7f@\x87t\xb9\xb8U\x15\x9c$\xcdfD5z\xe7\xf4p\xd0\xa5T\xbe\xcdr\xa49\x15L\\_\xc22g\xe9\x1e\xd0<2Uj\x13\x1d\xe2\xdc\xcf\xe6\x0f\xf5\x8e\x07(\x0f]\xa6\x8b^\xe1\xab\x0f\xa8v?\xcb\x0e\x97\xa6k\\\xcct\x16\x13\xae\xc8|S\xd0\xa0\xc2\x83\x1e\xde?\x19\x8e\xf5\xf1n\x8a\x86\xf6y\xe1C*vP\x81\xd3'
Verification was successful
Alice verifies Bob!



Alice_pub: <cryptography.hazmat.bindings._rust.ope

### Bob verifies Alice

In [None]:
# Bob computes k_yx, sends (a^y, cert_b, E_k_yx(sign(a^y, a^x)))

def bob_verifies(bob_derived_key, alice_encrypted_signed_public_keys, alice_message, name='alice'):
  # Decrypts the signed encrypted signed message
  aes = AES(bob_derived_key)
  decrypted_data = aes.aes_decrypt(alice_encrypted_signed_public_keys)

  print(f'Decrypted signed message: {decrypted_data}')

  # Retrieves alice public key
  with open(f"{name.lower()}_certificate.pem", "rb") as cert_file:
    cert_data = cert_file.read()

  # Parse the certificate
  alice_certificate = x509.load_pem_x509_certificate(cert_data)

  # Extract the bob public key from certificate
  extracted_alice_public_long_key = alice_certificate.public_key()

  # Verifies decrypted signed message
  verification = verify(extracted_alice_public_long_key, decrypted_data, alice_message)
  cert_file.close()

  try:
    print('Verification was successful')
    print('Bob verifies Alice!\n\n\n')

    print('-----------CONNECTION ESTABLISHED--------')

  except:
    print(verification)

  return

In [None]:
bob_verifies(bob_derived_key, alice_encrypted_signed_public_keys, alice_message)

Decrypted signed message: b'.bI\xdcF\xeb\xfb;;\x07f\x89\x92\xd6\x93\xc72<\x10c\xf6\x9eS\x8c\xfc\x7f\xb91\xa3\x87U\xc5\x9dC5c\xde\x90X\xd0*J\xe4H\xb4\xc8/\xaf:E\x94e2d\x11\xea|Db\xef\xa9y\xd6\xe4\x89Y\xfb\x8cz\xf4,[\x85]\xcfK\xd2\xb3/\xa5\xf4\xf0}X\x9d\x8c\xbaO\xf8jK\x19\x0ec\xbbC^\x0bkO-\x1a\x8aK/\\\x1c\x8b\xe3\x04p\x9e\xfaM\xd4\xe4\x88\x84{Y\xe6AK\x1e\xeb\xfaP\x8c\xf0]\'e\xda\xfd\x89t\xf6\xc6\xb1m1\x15z\x95\xde$\xeb@\xfe\x11:\x9d\xfa`\x0b\x8e\xaeW\x15\x9a\x15\xafv\x0e6\x1e\xdc\xa4\xeer#\xca\x8d6\x00M\x08\x13N\xdfDg\xc2\x06\x8f\x03`8S-{/\xe0\x15?\xcf%=\xc2D\x13\xfc)\x88u\x86\xc7\xbb\xd9\xbf\x88b}\x859\x90"\xb8\xbf\xad\x82}\xbay8\xf2\xbf\x1a\x80P\x16CA\xa6H\x1a\x07\xe9\xc4\xab\xf8!\x18\x1cpI\x89\xef\x9d\xccJ\x87\x1d\xfeA-'
Verification was successful
Bob verifies Alice!



-----------CONNECTION ESTABLISHED--------
