Skip to content

Commit

Permalink
Merge pull request #67 from hbldh/mypy
Browse files Browse the repository at this point in the history
Add mypy for type checking
  • Loading branch information
hbldh committed Apr 17, 2024
2 parents 11b4c85 + aa4d46c commit 84558c6
Show file tree
Hide file tree
Showing 20 changed files with 229 additions and 192 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ jobs:
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 ./bankid --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Look for type errors
run: mypy

- name: Test with pytest
run: |
pytest tests --junitxml=junit/test-results-${{ matrix.os }}-${{ matrix.python-version }}.xml --cov=bankid --cov-report=xml --cov-report=html
Expand Down
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ docs/_build/
# PyBuilder
target/

# mypy
.mypy_cache

### Vagrant template
.vagrant/
Expand Down Expand Up @@ -125,5 +127,3 @@ atlassian-ide-plugin.xml
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties


27 changes: 27 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# PyBankID

Pull requests are welcome! They should target the [develop](https://github.com/hbldh/pybankid/tree/develop) branch.

## Development

Dependencies needed for development can be installed through pip:

```bash
pip install -r requirements-dev.txt
```

## Testing

The PyBankID solution can be tested with [pytest](https://pytest.org/):

```bash
pytest
```

## Type checking

PyBankID is annotated with types and [mypy](https://www.mypy-lang.org/) is used as type-checker. All contributions should include type annotations.

```bash
mypy
```
16 changes: 4 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ PyBankID provides both a synchronous and an asynchronous client for communicatio
```python
from bankid import BankIDClient
client = BankIDClient(certificates=(
'path/to/certificate.pem',
'path/to/key.pem',
'path/to/certificate.pem',
'path/to/key.pem',
))
```

Expand Down Expand Up @@ -125,12 +125,12 @@ client.collect(order_ref="a9b791c3-459f-492b-bf61-23027876140b")
}
```

Please note that the `collect` method should be used sparingly: in the [BankID Integration Guide](https://www.bankid.com/en/utvecklare/guider/teknisk-integrationsguide) it is specified that *"collect should be called every two seconds and must not be called more frequent than once per second"*.
Please note that the `collect` method should be used sparingly: in the [BankID Integration Guide](https://www.bankid.com/en/utvecklare/guider/teknisk-integrationsguide) it is specified that _"collect should be called every two seconds and must not be called more frequent than once per second"_.

PyBankID also implements the `phone/auth` and `phone/sign` methods, for performing authentication and signing with
users that are contacted through phone. For documentation on this, see [PyBankID's Read the Docs page](https://pybankid.readthedocs.io/en/latest/).

### Asynchronous client
### Asynchronous client

The asynchronous client is used in the same way as the synchronous client, with the difference that all request are performed asynchronously.

Expand Down Expand Up @@ -182,11 +182,3 @@ print(cert_and_key)
client = bankid.BankIDClient(
certificates=cert_and_key, test_server=True)
```

## Testing

The PyBankID solution can be tested with [pytest](https://pytest.org/):

```bash
pytest tests/
```
58 changes: 26 additions & 32 deletions bankid/asyncclient.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from typing import Optional, Tuple, Dict, Any
from typing import Any, Dict, Tuple, Union

import httpx

from bankid.baseclient import BankIDClientBaseclass
from bankid.exceptions import get_json_error_class


class BankIDAsyncClient(BankIDClientBaseclass):
class BankIDAsyncClient(BankIDClientBaseclass[httpx.AsyncClient]):
"""The asynchronous client to use for communicating with BankID servers via the v6 API.
:param certificates: Tuple of string paths to the certificate to use and
Expand All @@ -19,25 +19,19 @@ class BankIDAsyncClient(BankIDClientBaseclass):
"""

def __init__(self, certificates: Tuple[str, str], test_server: bool = False, request_timeout: Optional[int] = None):
def __init__(self, certificates: Tuple[str, str], test_server: bool = False, request_timeout: int = 5):
super().__init__(certificates, test_server, request_timeout)

kwargs = {
"cert": self.certs,
"headers": {"Content-Type": "application/json"},
"verify": self.verify_cert,
}
if request_timeout:
kwargs["timeout"] = request_timeout
self.client = httpx.AsyncClient(**kwargs)
headers = {"Content-Type": "application/json"}
self.client = httpx.AsyncClient(cert=self.certs, headers=headers, verify=str(self.verify_cert), timeout=request_timeout)

async def authenticate(
self,
end_user_ip: str,
requirement: Dict[str, Any] = None,
user_visible_data: str = None,
user_non_visible_data: str = None,
user_visible_data_format: str = None,
requirement: Union[Dict[str, Any], None] = None,
user_visible_data: Union[str, None] = None,
user_non_visible_data: Union[str, None] = None,
user_visible_data_format: Union[str, None] = None,
) -> Dict[str, str]:
"""Request an authentication order. The :py:meth:`collect` method
is used to query the status of the order.
Expand Down Expand Up @@ -85,18 +79,18 @@ async def authenticate(
response = await self.client.post(self._auth_endpoint, json=data)

if response.status_code == 200:
return response.json()
return response.json() # type: ignore[no-any-return]
else:
raise get_json_error_class(response)

async def phone_authenticate(
self,
personal_number: str,
call_initiator: str,
requirement: Dict[str, Any] = None,
user_visible_data: str = None,
user_non_visible_data: str = None,
user_visible_data_format: str = None,
requirement: Union[Dict[str, Any], None] = None,
user_visible_data: Union[str, None] = None,
user_non_visible_data: Union[str, None] = None,
user_visible_data_format: Union[str, None] = None,
) -> Dict[str, str]:
"""Initiates an authentication order when the user is talking
to the RP over the phone. The :py:meth:`collect` method
Expand Down Expand Up @@ -150,17 +144,17 @@ async def phone_authenticate(
response = await self.client.post(self._phone_auth_endpoint, json=data)

if response.status_code == 200:
return response.json()
return response.json() # type: ignore[no-any-return]
else:
raise get_json_error_class(response)

async def sign(
self,
end_user_ip,
end_user_ip: str,
user_visible_data: str,
requirement: Dict[str, Any] = None,
user_non_visible_data: str = None,
user_visible_data_format: str = None,
requirement: Union[Dict[str, Any], None] = None,
user_non_visible_data: Union[str, None] = None,
user_visible_data_format: Union[str, None] = None,
) -> Dict[str, str]:
"""Request a signing order. The :py:meth:`collect` method
is used to query the status of the order.
Expand Down Expand Up @@ -206,7 +200,7 @@ async def sign(
response = await self.client.post(self._sign_endpoint, json=data)

if response.status_code == 200:
return response.json()
return response.json() # type: ignore[no-any-return]
else:
raise get_json_error_class(response)

Expand All @@ -215,9 +209,9 @@ async def phone_sign(
personal_number: str,
call_initiator: str,
user_visible_data: str,
requirement: Dict[str, Any] = None,
user_non_visible_data: str = None,
user_visible_data_format: str = None,
requirement: Union[Dict[str, Any], None] = None,
user_non_visible_data: Union[str, None] = None,
user_visible_data_format: Union[str, None] = None,
) -> Dict[str, str]:
"""Initiates an authentication order when the user is talking to
the RP over the phone. The :py:meth:`collect` method
Expand Down Expand Up @@ -269,7 +263,7 @@ async def phone_sign(
response = await self.client.post(self._phone_sign_endpoint, json=data)

if response.status_code == 200:
return response.json()
return response.json() # type: ignore[no-any-return]
else:
raise get_json_error_class(response)

Expand Down Expand Up @@ -341,7 +335,7 @@ async def collect(self, order_ref: str) -> dict:
response = await self.client.post(self._collect_endpoint, json={"orderRef": order_ref})

if response.status_code == 200:
return response.json()
return response.json() # type: ignore[no-any-return]
else:
raise get_json_error_class(response)

Expand All @@ -362,6 +356,6 @@ async def cancel(self, order_ref: str) -> bool:
response = await self.client.post(self._cancel_endpoint, json={"orderRef": order_ref})

if response.status_code == 200:
return response.json() == {}
return response.json() == {} # type: ignore[no-any-return]
else:
raise get_json_error_class(response)
38 changes: 19 additions & 19 deletions bankid/baseclient.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
import base64
from datetime import datetime
from typing import Tuple, Optional, Dict, Any
from typing import Tuple, Dict, Any, Union, TypeVar, Generic
from urllib.parse import urljoin

from bankid.qr import generate_qr_code_content
from bankid.certutils import resolve_cert_path

import httpx

class BankIDClientBaseclass:
TClient = TypeVar("TClient", httpx.AsyncClient, httpx.Client)


class BankIDClientBaseclass(Generic[TClient]):
"""Baseclass for BankID clients.
Both the synchronous and asynchronous clients inherit from this base class and has the methods implemented here.
"""

client: TClient

def __init__(
self,
certificates: Tuple[str, str],
test_server: bool = False,
request_timeout: Optional[int] = None,
request_timeout: int = 5,
):
self.certs = certificates
self._request_timeout = request_timeout

if test_server:
self.api_url = "https://appapi2.test.bankid.com/rp/v6.0/"
Expand All @@ -36,28 +41,23 @@ def __init__(
self._collect_endpoint = urljoin(self.api_url, "collect")
self._cancel_endpoint = urljoin(self.api_url, "cancel")

self.client = None

@staticmethod
def generate_qr_code_content(qr_start_token: str, start_t: [float, datetime], qr_start_secret: str) -> str:
def generate_qr_code_content(qr_start_token: str, start_t: Union[float, datetime], qr_start_secret: str) -> str:
return generate_qr_code_content(qr_start_token, start_t, qr_start_secret)

@staticmethod
def _encode_user_data(user_data):
if isinstance(user_data, str):
return base64.b64encode(user_data.encode("utf-8")).decode("ascii")
else:
return base64.b64encode(user_data).decode("ascii")
def _encode_user_data(user_data: str) -> str:
return base64.b64encode(user_data.encode("utf-8")).decode("ascii")

def _create_payload(
self,
end_user_ip: str = None,
requirement: Dict[str, Any] = None,
user_visible_data: str = None,
user_non_visible_data: str = None,
user_visible_data_format: str = None,
):
data = {}
end_user_ip: Union[str, None] = None,
requirement: Union[Dict[str, Any], None] = None,
user_visible_data: Union[str, None] = None,
user_non_visible_data: Union[str, None] = None,
user_visible_data_format: Union[str, None] = None,
) -> Dict[str, str]:
data: Dict[str, Any] = {}
if end_user_ip:
data["endUserIp"] = end_user_ip
if requirement and isinstance(requirement, dict):
Expand Down
5 changes: 3 additions & 2 deletions bankid/certs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
# We have to pin these to prevent basic MITM attacks.

from pathlib import Path
from typing import Tuple


def get_test_cert_p12():
def get_test_cert_p12() -> Path:
return (Path(__file__).parent / "FPTestcert4_20230629.p12").resolve()


def get_test_cert_and_key():
def get_test_cert_and_key() -> Tuple[Path, Path]:
return (
(Path(__file__).parent / "FPTestcert4_20230629_cert.pem").resolve(),
(Path(__file__).parent / "FPTestcert4_20230629_key.pem").resolve(),
Expand Down
20 changes: 11 additions & 9 deletions bankid/certutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import os
import subprocess
from typing import Tuple
from typing import Tuple, Union

import pathlib
import importlib.resources
Expand All @@ -19,10 +19,12 @@


def resolve_cert_path(file: str) -> pathlib.Path:
return importlib.resources.files("bankid.certs").joinpath(file)
path = importlib.resources.files("bankid.certs").joinpath(file)
assert isinstance(path, pathlib.Path)
return path


def create_bankid_test_server_cert_and_key(destination_path: str = ".") -> Tuple[str]:
def create_bankid_test_server_cert_and_key(destination_path: str = ".") -> Tuple[str, str]:
"""Split the bundled test certificate into certificate and key parts and save them
as separate files, stored in PEM format.
Expand All @@ -35,9 +37,9 @@ def create_bankid_test_server_cert_and_key(destination_path: str = ".") -> Tuple
:rtype: tuple
"""
if os.getenv("TEST_CERT_FILE"):
if test_cert_file := os.getenv("TEST_CERT_FILE"):
certificate, key = split_certificate(
os.getenv("TEST_CERT_FILE"), destination_path, password=_TEST_CERT_PASSWORD
test_cert_file, destination_path, password=_TEST_CERT_PASSWORD
)

else:
Expand All @@ -48,7 +50,7 @@ def create_bankid_test_server_cert_and_key(destination_path: str = ".") -> Tuple
return certificate, key


def split_certificate(certificate_path, destination_folder, password=None):
def split_certificate(certificate_path: str, destination_folder: str, password: Union[str, None] = None) -> Tuple[str, str]:
"""Splits a PKCS12 certificate into Base64-encoded DER certificate and key.
This method splits a potentially password-protected
Expand All @@ -64,7 +66,7 @@ def split_certificate(certificate_path, destination_folder, password=None):
try:
# Attempt Linux and Darwin call first.
p = subprocess.Popen(["openssl", "version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
sout, serr = p.communicate()
sout, _ = p.communicate()
openssl_executable_version = sout.decode().lower()
if not (openssl_executable_version.startswith("openssl") or openssl_executable_version.startswith("libressl")):
raise BankIDError("OpenSSL executable could not be found. " "Splitting cannot be performed.")
Expand All @@ -76,7 +78,7 @@ def split_certificate(certificate_path, destination_folder, password=None):
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
sout, serr = p.communicate()
sout, _ = p.communicate()
if not sout.decode().lower().startswith("openssl"):
raise BankIDError("OpenSSL executable could not be found. " "Splitting cannot be performed.")
openssl_executable = "C:\\Program Files\\Git\\mingw64\\bin\\openssl.exe"
Expand Down Expand Up @@ -129,7 +131,7 @@ def split_certificate(certificate_path, destination_folder, password=None):
return out_cert_path, out_key_path


def main(verbose=True):
def main(verbose: bool = True) -> Tuple[str, str]:
paths = create_bankid_test_server_cert_and_key(os.path.expanduser("~"))
if verbose:
print("Saved certificate as {0}".format(paths[0]))
Expand Down

0 comments on commit 84558c6

Please sign in to comment.