Skip to content

Commit

Permalink
feat: add typing (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco committed Mar 24, 2023
1 parent 5e24c68 commit b1e3298
Showing 1 changed file with 34 additions and 18 deletions.
52 changes: 34 additions & 18 deletions src/chacha20poly1305_reuseable/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import os
import typing
from typing import Union
from typing import Optional, Union

from cryptography import exceptions
from cryptography.exceptions import InvalidTag
Expand Down Expand Up @@ -48,8 +48,8 @@ def __init__(self, key: Union[bytes, bytearray]) -> None:

self._cipher_name = b"chacha20-poly1305"
self._key = key
self._decrypt_ctx = None
self._encrypt_ctx = None
self._decrypt_ctx: Optional[object] = None
self._encrypt_ctx: Optional[object] = None

@classmethod
def generate_key(cls) -> bytes:
Expand Down Expand Up @@ -127,13 +127,13 @@ def _check_params(
raise ValueError("Nonce must be 12 bytes")


def _create_ctx(): # type: ignore[no-untyped-def]
def _create_ctx() -> object:
ctx = backend._lib.EVP_CIPHER_CTX_new()
ctx = backend._ffi.gc(ctx, backend._lib.EVP_CIPHER_CTX_free)
return ctx


def _set_cipher(ctx, cipher_name, operation): # type: ignore[no-untyped-def]
def _set_cipher(ctx: object, cipher_name: bytes, operation: int) -> None:
evp_cipher = backend._lib.EVP_get_cipherbyname(cipher_name)
backend.openssl_assert(evp_cipher != backend._ffi.NULL)
res = backend._lib.EVP_CipherInit_ex(
Expand All @@ -147,12 +147,12 @@ def _set_cipher(ctx, cipher_name, operation): # type: ignore[no-untyped-def]
backend.openssl_assert(res != 0)


def _set_key_len(ctx, key_len): # type: ignore[no-untyped-def]
def _set_key_len(ctx: object, key_len: int) -> None:
res = backend._lib.EVP_CIPHER_CTX_set_key_length(ctx, key_len)
backend.openssl_assert(res != 0)


def _set_key(ctx, key, operation): # type: ignore[no-untyped-def]
def _set_key(ctx: object, key: bytes, operation: int) -> None:
key_ptr = backend._ffi.from_buffer(key)
res = backend._lib.EVP_CipherInit_ex(
ctx,
Expand All @@ -165,14 +165,14 @@ def _set_key(ctx, key, operation): # type: ignore[no-untyped-def]
backend.openssl_assert(res != 0)


def _set_decrypt_tag(ctx, tag): # type: ignore[no-untyped-def]
def _set_decrypt_tag(ctx: object, tag: bytes) -> None:
res = backend._lib.EVP_CIPHER_CTX_ctrl(
ctx, backend._lib.EVP_CTRL_AEAD_SET_TAG, len(tag), tag
)
backend.openssl_assert(res != 0)


def _set_nonce_len(ctx, nonce_len): # type: ignore[no-untyped-def]
def _set_nonce_len(ctx: object, nonce_len: int) -> None:
res = backend._lib.EVP_CIPHER_CTX_ctrl(
ctx,
backend._lib.EVP_CTRL_AEAD_SET_IVLEN,
Expand All @@ -182,7 +182,7 @@ def _set_nonce_len(ctx, nonce_len): # type: ignore[no-untyped-def]
backend.openssl_assert(res != 0)


def _set_nonce(ctx, nonce, operation): # type: ignore[no-untyped-def]
def _set_nonce(ctx: object, nonce: Union[bytes, bytearray], operation: int) -> None:
nonce_ptr = backend._ffi.from_buffer(nonce)
res = backend._lib.EVP_CipherInit_ex(
ctx,
Expand All @@ -195,7 +195,9 @@ def _set_nonce(ctx, nonce, operation): # type: ignore[no-untyped-def]
backend.openssl_assert(res != 0)


def _aead_setup_with_fixed_nonce_len(cipher_name, key, nonce_len, operation): # type: ignore[no-untyped-def]
def _aead_setup_with_fixed_nonce_len(
cipher_name: bytes, key: Union[bytes, bytearray], nonce_len: int, operation: int
) -> object:
ctx = _create_ctx()
_set_cipher(ctx, cipher_name, operation)
_set_key_len(ctx, len(key))
Expand All @@ -204,28 +206,36 @@ def _aead_setup_with_fixed_nonce_len(cipher_name, key, nonce_len, operation): #
return ctx


def _process_aad(ctx, associated_data): # type: ignore[no-untyped-def]
def _process_aad(ctx: object, associated_data: bytes) -> None:
outlen = backend._ffi.new("int *")
res = backend._lib.EVP_CipherUpdate(
ctx, backend._ffi.NULL, outlen, associated_data, len(associated_data)
)
backend.openssl_assert(res != 0)


def _process_data(ctx, data): # type: ignore[no-untyped-def]
def _process_data(ctx: object, data: bytes) -> bytes:
outlen = backend._ffi.new("int *")
buf = backend._ffi.new("unsigned char[]", len(data))
res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, data, len(data))
backend.openssl_assert(res != 0)
return backend._ffi.buffer(buf, outlen[0])[:]


def _encrypt_with_fixed_nonce_len(ctx, nonce, data, associated_data, tag_length): # type: ignore[no-untyped-def]
def _encrypt_with_fixed_nonce_len(
ctx: object,
nonce: Union[bytes, bytearray],
data: bytes,
associated_data: bytes,
tag_length: int,
) -> bytes:
_set_nonce(ctx, nonce, _ENCRYPT)
return _encrypt_data(ctx, data, associated_data, tag_length)


def _encrypt_data(ctx, data, associated_data, tag_length): # type: ignore[no-untyped-def]
def _encrypt_data(
ctx: object, data: bytes, associated_data: bytes, tag_length: int
) -> bytes:
_process_aad(ctx, associated_data)
processed_data = _process_data(ctx, data)
outlen = backend._ffi.new("int *")
Expand All @@ -242,21 +252,27 @@ def _encrypt_data(ctx, data, associated_data, tag_length): # type: ignore[no-un
return processed_data + tag


def _tag_from_data(data, tag_length): # type: ignore[no-untyped-def]
def _tag_from_data(data: bytes, tag_length: int) -> bytes:
if len(data) < tag_length:
raise InvalidTag
return data[-tag_length:]


def _decrypt_with_fixed_nonce_len(ctx, nonce, data, associated_data, tag_length): # type: ignore[no-untyped-def]
def _decrypt_with_fixed_nonce_len(
ctx: object,
nonce: Union[bytes, bytearray],
data: bytes,
associated_data: bytes,
tag_length: int,
) -> bytes:
tag = _tag_from_data(data, tag_length)
data = data[:-tag_length]
_set_nonce(ctx, nonce, _DECRYPT)
_set_decrypt_tag(ctx, tag)
return _decrypt_data(ctx, data, associated_data)


def _decrypt_data(ctx, data, associated_data): # type: ignore[no-untyped-def]
def _decrypt_data(ctx: object, data: bytes, associated_data: bytes) -> bytes:
_process_aad(ctx, associated_data)
processed_data = _process_data(ctx, data)
outlen = backend._ffi.new("int *")
Expand Down

0 comments on commit b1e3298

Please sign in to comment.