Skip to content
This repository was archived by the owner on Mar 10, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# crypto parameters
AES_GCM_NONCE_LEN = 12 # bytes

OTP_PAD_SIZE = 10240 # bytes
OTP_PAD_SIZE = 11264 # bytes
OTP_PADDING_LENGTH = 2 # bytes
OTP_PADDING_LIMIT = 1024 # bytes

Expand Down
16 changes: 7 additions & 9 deletions core/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def generate_kem_keys(algorithm: str = "Kyber1024"):
return private_key, public_key


def decrypt_kyber_shared_secrets(ciphertext_blob: bytes, private_key: bytes, otp_pad_size: int = 10240, algorithm: str = "Kyber1024"):
def decrypt_kyber_shared_secrets(ciphertext_blob: bytes, private_key: bytes, otp_pad_size: int = OTP_PAD_SIZE):
"""
Decapsulates shared_secrets of size otp_pad_size and returns the resulting shared_secrets.
The ciphertexts_blob is expected to be a concatenated sequence of Kyber ciphertexts,
Expand All @@ -123,11 +123,12 @@ def decrypt_kyber_shared_secrets(ciphertext_blob: bytes, private_key: bytes, otp
split the blob and decapsulate in order.
"""

cipher_size = 1568 # Kyber1024 ciphertext size

shared_secrets = b''
cipher_size = 1568 # Kyber1024 ciphertext size
cursor = 0
cursor = 0

with oqs.KeyEncapsulation(algorithm, secret_key=private_key) as kem:
with oqs.KeyEncapsulation("Kyber1024", secret_key=private_key) as kem:
while len(shared_secrets) < otp_pad_size:
ciphertext = ciphertext_blob[cursor:cursor + cipher_size]

Expand All @@ -140,7 +141,7 @@ def decrypt_kyber_shared_secrets(ciphertext_blob: bytes, private_key: bytes, otp

return shared_secrets[:otp_pad_size]

def generate_kyber_shared_secrets(public_key: bytes, otp_pad_size: int = OTP_PAD_SIZE, algorithm: str = "Kyber1024"):
def generate_kyber_shared_secrets(public_key: bytes, otp_pad_size: int = OTP_PAD_SIZE):
"""
Generates shared_secrets of size otp_pad_size and returns both the ciphertext list-
and the generated shared_secrets.
Expand All @@ -159,7 +160,7 @@ def generate_kyber_shared_secrets(public_key: bytes, otp_pad_size: int = OTP_PAD
shared_secrets = b''
ciphertexts_blob = b''

with oqs.KeyEncapsulation(algorithm) as kem:
with oqs.KeyEncapsulation("Kyber1024") as kem:
while len(shared_secrets) < otp_pad_size:
ciphertext, shared_secret = kem.encap_secret(public_key)

Expand All @@ -169,9 +170,6 @@ def generate_kyber_shared_secrets(public_key: bytes, otp_pad_size: int = OTP_PAD
return ciphertexts_blob, shared_secrets[:otp_pad_size]


def randomize_replay_protection_number(replay_protection_number: int) -> int:
return random_number_range(replay_protection_number, random_number_range(replay_protection_number + 1, replay_protection_number + random_number_range(100, 1000)))

def random_number_range(a: int, b: int) -> int:
return secrets.randbelow(b - a + 1) + a

1 change: 0 additions & 1 deletion logic/background_worker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from core.requests import http_request
from logic.storage import save_account_data
from logic.contacts import save_contact
from logic.get_user import get_target_lt_public_key
from logic.smp import smp_unanswered_questions, smp_data_handler
from logic.pfs import pfs_data_handler
from logic.message import messages_data_handler
Expand Down
30 changes: 13 additions & 17 deletions logic/contacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,24 @@ def generate_random_nickname(user_data: dict, user_data_lock, contact_id: str, n
return nickname


def save_contact(user_data: dict, user_data_lock, contact_id: str, contact_public_key: bytes) -> None:
def save_contact(user_data: dict, user_data_lock, contact_id: str) -> None:
with user_data_lock:
if contact_id in user_data["contacts"]:
raise ValueError("Contact already saved!")

for i, v in user_data["contacts"].items():
if v["lt_sign_public_key"] == contact_public_key:
raise ValueError("Contact long-term auth signing public-key is duplicated, and we have no idea why")


user_data["contacts"][contact_id] = {
"nickname": None,
"lt_sign_public_key": contact_public_key,
"lt_sign_keys": {
"contact_public_key": None,
"our_keys": {
"private_key": None,
"public_key": None
},
"our_hash_chain": None,
"contact_hash_chain": None

},
"lt_sign_key_smp": {
"verified": False,
"pending_verification": False,
Expand All @@ -59,23 +64,14 @@ def save_contact(user_data: dict, user_data_lock, contact_id: str, contact_publi
},
"rotation_counter": None,
"rotate_at": None,
"our_hash_chain": None,
"contact_hash_chain": None

},
"message_sign_keys": {
"contact_public_key": None,
"our_keys": {
"private_key": None,
"public_key": None
}
},
"our_pads": {
"replay_protection_number": None,
"hash_chain": None,
"pads": None
},
"contact_pads": {
"replay_protection_number": None,
"hash_chain": None,
"pads": None
},
}
Expand Down
22 changes: 9 additions & 13 deletions logic/get_user.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
from base64 import b64decode
from core.requests import http_request

def get_target_lt_public_key(user_data: dict, target_id: str) -> bytes:
url = user_data["server_url"]
def check_if_contact_exists(user_data: dict, user_data_lock, contact_id: str) -> bool:
with user_data_lock:
url = user_data["server_url"]

try:
response = http_request(f"{url}/get_user?user_id={target_id}", "GET")
response = http_request(f"{url}/get_user?user_id={contact_id}", "GET")
except:
raise ValueError("Could not connect to server, try again")

if not "status" in response:
raise ValueError("Server gave a malformed response! This could be a malicious act, we suggest you retry and if problem persists use another server")
raise ValueError("Server gave a malformed response")

if response["status"] == "failure" or not response.get("public_key"):
if response["status"] == "failure":
if not 'error' in response:
raise ValueError("Server gave a malformed response! This could be a malicious act, we suggest you retry and if problem persists use another server")
raise ValueError("Server gave a malformed response")
else:
raise ValueError(response["error"][:1024])

if response["status"] == "success":
return True

# Dry run to validate server's base64. Helps prevents denial-of-service startup crashes -
# when client first reads parses their account file
try:
return b64decode(response["public_key"], validate=True)
except:
raise ValueError("Server gave a malformed public_key! This could be a malicious act, we suggest you retry and if problem persists use another server")
return False
Loading