From da45e20b3a21ec413cd8976fa34ec0957c9439cc Mon Sep 17 00:00:00 2001 From: serhiy Date: Mon, 25 Jul 2022 09:37:20 +0100 Subject: [PATCH] Fix type annotations and workflows to facilitate them Problem: initial type annotation implementation was incomplete and has rotted Solution: Fix the type annotations and add task workflows to run it signed-off-by: serhiy1 --- .devcontainer/Dockerfile | 22 ++++++ .devcontainer/devcontainer.json | 56 +++++++++++++ .github/workflows/python-package.yml | 1 + Taskfile.yml | 13 +++ Taskfile_dev.yml | 72 +++++++++++++++++ archivist/access_policies.py | 43 +++++----- archivist/appidp.py | 9 ++- archivist/applications.py | 25 +++--- archivist/archivist.py | 69 ++++++++-------- archivist/archivistpublic.py | 46 ++++++----- archivist/asset.py | 7 +- archivist/assetattachments.py | 21 ++--- archivist/assets.py | 49 +++++++----- archivist/attachments.py | 23 +++--- archivist/cmds/runner/run.py | 5 +- archivist/compliance.py | 13 +-- archivist/compliance_policies.py | 25 +++--- archivist/compliance_policy_requests.py | 6 +- archivist/composite.py | 7 +- archivist/confirmer.py | 59 +++++++++----- archivist/dictmerge.py | 10 ++- archivist/errors.py | 8 +- archivist/events.py | 90 ++++++++++++--------- archivist/headers.py | 5 +- archivist/locations.py | 42 ++++++---- archivist/or_dict.py | 7 +- archivist/parser.py | 2 +- archivist/publisher.py | 8 +- archivist/runner.py | 101 ++++++++++++------------ archivist/sbommetadata.py | 6 +- archivist/sboms.py | 27 ++++--- archivist/subjects.py | 31 ++++---- archivist/subjects_confirmer.py | 14 ++-- archivist/tenancies.py | 9 ++- archivist/timestamp.py | 4 +- archivist/type_aliases.py | 15 ---- archivist/uploader.py | 9 ++- archivist/utils.py | 3 +- archivist/withdrawer.py | 8 +- requirements-dev.txt | 1 + 40 files changed, 613 insertions(+), 358 deletions(-) create mode 100644 .devcontainer/Dockerfile create mode 100644 .devcontainer/devcontainer.json create mode 100644 Taskfile_dev.yml delete mode 100644 archivist/type_aliases.py diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile new file mode 100644 index 00000000..9ff807e2 --- /dev/null +++ b/.devcontainer/Dockerfile @@ -0,0 +1,22 @@ +# See here for image contents: https://github.com/microsoft/vscode-dev-containers/tree/v0.241.1/containers/python-3/.devcontainer/base.Dockerfile + +# [Choice] Python version (use -bullseye variants on local arm64/Apple Silicon): 3, 3.10, 3.9, 3.8, 3.7, 3.6, 3-bullseye, 3.10-bullseye, 3.9-bullseye, 3.8-bullseye, 3.7-bullseye, 3.6-bullseye, 3-buster, 3.10-buster, 3.9-buster, 3.8-buster, 3.7-buster, 3.6-buster +ARG VARIANT="3.10-bullseye" +FROM mcr.microsoft.com/vscode/devcontainers/python:0-${VARIANT} + +# [Choice] Node.js version: none, lts/*, 16, 14, 12, 10 +ARG NODE_VERSION="none" +RUN if [ "${NODE_VERSION}" != "none" ]; then su vscode -c "umask 0002 && . /usr/local/share/nvm/nvm.sh && nvm install ${NODE_VERSION} 2>&1"; fi +RUN sudo curl --location https://taskfile.dev/install.sh -o /install.sh + +# [Optional] If your pip requirements rarely change, uncomment this section to add them to the image. +# COPY requirements.txt /tmp/pip-tmp/ +# RUN pip3 --disable-pip-version-check --no-cache-dir install -r /tmp/pip-tmp/requirements.txt \ +# && rm -rf /tmp/pip-tmp + +# [Optional] Uncomment this section to install additional OS packages. +# RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ +# && apt-get -y install --no-install-recommends + +# [Optional] Uncomment this line to install global node packages. +# RUN su vscode -c "source /usr/local/share/nvm/nvm.sh && npm install -g " 2>&1 \ No newline at end of file diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 00000000..c9a934c8 --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,56 @@ +// For format details, see https://aka.ms/devcontainer.json. For config options, see the README at: +// https://github.com/microsoft/vscode-dev-containers/tree/v0.241.1/containers/python-3 +{ + "name": "Python 3", + "build": { + "dockerfile": "Dockerfile", + "context": "..", + "args": { + // Update 'VARIANT' to pick a Python version: 3, 3.10, 3.9, 3.8, 3.7, 3.6 + // Append -bullseye or -buster to pin to an OS version. + // Use -bullseye variants on local on arm64/Apple Silicon. + "VARIANT": "3.7", + // Options + "NODE_VERSION": "lts/*" + } + }, + + // Configure tool-specific properties. + "customizations": { + // Configure properties specific to VS Code. + "vscode": { + // Set *default* container specific settings.json values on container create. + "settings": { + "python.defaultInterpreterPath": "/usr/local/bin/python", + "python.linting.enabled": true, + "python.linting.pylintEnabled": true, + "python.formatting.autopep8Path": "/usr/local/py-utils/bin/autopep8", + "python.formatting.blackPath": "/usr/local/py-utils/bin/black", + "python.formatting.yapfPath": "/usr/local/py-utils/bin/yapf", + "python.linting.banditPath": "/usr/local/py-utils/bin/bandit", + "python.linting.flake8Path": "/usr/local/py-utils/bin/flake8", + "python.linting.mypyPath": "/usr/local/py-utils/bin/mypy", + "python.linting.pycodestylePath": "/usr/local/py-utils/bin/pycodestyle", + "python.linting.pydocstylePath": "/usr/local/py-utils/bin/pydocstyle", + "python.linting.pylintPath": "/usr/local/py-utils/bin/pylint", + "python.analysis.typeCheckingMode": "strict" + }, + + // Add the IDs of extensions you want installed when the container is created. + "extensions": [ + "ms-python.python", + "ms-python.vscode-pylance" + ] + } + }, + "remoteUser": "vscode", + "postStartCommand": "pip install -r requirements-dev.txt && sudo sh /install.sh -d -b ~/.local/bin" + // Use 'forwardPorts' to make a list of ports inside the container available locally. + // "forwardPorts": [], + + // Use 'postCreateCommand' to run commands after the container is created. + // "postCreateCommand": "pip3 install --user -r requirements.txt", + + // Comment out to connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root. + +} diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 77ba136e..922f1695 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -38,6 +38,7 @@ jobs: ./scripts/version.sh pycodestyle --format=pylint setup.py archivist examples functests unittests python3 -m pylint --rcfile=pylintrc setup.py archivist examples functests unittests + python3 -m pyright archivist black archivist examples unittests functests (cd docs && make clean && make html) modified=$(git status -s | wc -l) diff --git a/Taskfile.yml b/Taskfile.yml index 3c664ff9..74336522 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -1,5 +1,11 @@ version: '3' +includes: + # Task file to be used inside the dev-containter + dev: + taskfile: ./Taskfile_dev.yml + dir: ./ + tasks: about: @@ -42,6 +48,7 @@ tasks: - ./scripts/builder.sh python3 --version - ./scripts/builder.sh pycodestyle --format=pylint setup.py archivist examples functests unittests - ./scripts/builder.sh python3 -m pylint --rcfile=pylintrc setup.py archivist examples functests unittests + - ./scripts/builder.sh python3 -m pyright archivist clean: desc: Clean git repo @@ -66,6 +73,12 @@ tasks: cmds: - ./scripts/builder.sh black setup.py archivist examples functests unittests + type-check: + desc: Runs the pyright type checker against the core archivst files + deps: [about] + cmds: + - ./scripts/builder.sh pyright archivist + functests: desc: Run functests - requires an archivist instance and a authtoken deps: [about] diff --git a/Taskfile_dev.yml b/Taskfile_dev.yml new file mode 100644 index 00000000..dd0e5900 --- /dev/null +++ b/Taskfile_dev.yml @@ -0,0 +1,72 @@ +version: '3' + +tasks: + + about: + desc: Generate about.py + cmds: + - ./scripts/version.sh + status: + - test -s archivist/about.py + + audit: + desc: Audit the code + deps: [about] + cmds: + - pip-audit -r requirements.txt + + check: + desc: Check the style, bug and quality of the code + deps: [about] + cmds: + - python3 --version + - pycodestyle --format=pylint setup.py archivist examples functests unittests + - python3 -m pylint --rcfile=pylintrc setup.py archivist examples functests unittests + - python3 -m pyright archivist + + clean: + desc: Clean git repo + cmds: + - find -name '*,cover' -type f -delete + - git clean -fdX + + deps: + desc: Show dependency tree + cmds: + - /bin/bash -c "pipdeptree" + + docs: + desc: Create sphinx documentation + deps: [about] + cmds: + - /bin/bash -c "cd docs && make clean && make html" + + format: + desc: Format code using black + deps: [about] + cmds: + - black setup.py archivist examples functests unittests + + functests: + desc: Run functests - requires an archivist instance and a authtoken + deps: [about] + cmds: + - ./scripts/functests.sh + + type-check: + desc: Runs the pyright type checker against the core archivst files + deps: [about] + cmds: + - pyright archivist + + notebooks: + desc: Run jupyter notebooks + deps: [about] + cmds: + - jupyter notebook --ip 0.0.0.0 --no-browser --notebook-dir=./notebooks/ + + unittests: + desc: Run unittests + deps: [about] + cmds: + - ./scripts/unittests.sh diff --git a/archivist/access_policies.py b/archivist/access_policies.py index 0c0fe92e..6981e19f 100644 --- a/archivist/access_policies.py +++ b/archivist/access_policies.py @@ -21,12 +21,13 @@ """ -from typing import Dict, List, Optional +from __future__ import annotations from logging import getLogger from copy import deepcopy +from typing import Any, Generator, Optional # pylint:disable=cyclic-import # but pylint doesn't understand this feature -from . import archivist as type_helper # pylint:disable=unused-import +from . import archivist from .assets import Asset from .constants import ( @@ -35,7 +36,6 @@ ASSETS_LABEL, ) from .dictmerge import _deepmerge -from .type_aliases import NoneOnError LOGGER = getLogger(__name__) @@ -45,7 +45,7 @@ class AccessPolicy(dict): """AccessPolicy object""" @property - def name(self) -> NoneOnError[str]: + def name(self) -> str | None: """str: name of the access policy""" return self.get("display_name") @@ -61,16 +61,19 @@ class _AccessPoliciesClient: """ - def __init__(self, archivist: "type_helper.Archivist"): - self._archivist = archivist - self._subpath = f"{archivist.root}/{ACCESS_POLICIES_SUBPATH}" + def __init__(self, archivist_instance: archivist.Archivist): + self._archivist = archivist_instance + self._subpath = f"{archivist_instance.root}/{ACCESS_POLICIES_SUBPATH}" self._label = f"{self._subpath}/{ACCESS_POLICIES_LABEL}" def __str__(self) -> str: return f"AccessPoliciesClient({self._archivist.url})" def create( - self, props: Dict, filters: List, access_permissions: List + self, + props: dict[str, Any], + filters: list[dict[str, Any]], + access_permissions: list[dict[str, Any]], ) -> AccessPolicy: """Create access policy @@ -92,7 +95,7 @@ def create( ), ) - def create_from_data(self, data: Dict) -> AccessPolicy: + def create_from_data(self, data: dict[str, Any]) -> AccessPolicy: """Create access policy Creates access policy with request body from data stream. @@ -130,9 +133,9 @@ def update( self, identity, *, - props: Optional[Dict] = None, - filters: Optional[List] = None, - access_permissions: Optional[List] = None, + props: Optional[dict[str, Any]] = None, + filters: Optional[list[dict]] = None, + access_permissions: Optional[list[dict]] = None, ) -> AccessPolicy: """Update Access Policy @@ -157,7 +160,7 @@ def update( ) ) - def delete(self, identity: str) -> Dict: + def delete(self, identity: str) -> dict[str, Any]: """Delete Access Policy Deletes access policy. @@ -173,11 +176,11 @@ def delete(self, identity: str) -> Dict: def __params( self, - props: Optional[Dict], + props: Optional[dict[str, Any]], *, - filters: Optional[List] = None, - access_permissions: Optional[List] = None, - ) -> Dict: + filters: list[dict] | None = None, + access_permissions: list[dict] | None = None, + ) -> dict[str, Any]: params = deepcopy(props) if props else {} if filters is not None: params["filters"] = filters @@ -204,7 +207,7 @@ def count(self, *, display_name: Optional[str] = None) -> int: def list( self, *, page_size: Optional[int] = None, display_name: Optional[str] = None - ): + ) -> Generator[AccessPolicy, None, None]: """List access policies. List access policies that match criteria. @@ -231,7 +234,7 @@ def list( # additional queries on different endpoints def list_matching_assets( self, access_policy_id: str, *, page_size: Optional[int] = None - ): + ) -> Generator[Asset, None, None]: """List matching assets. List assets that match access policy. @@ -255,7 +258,7 @@ def list_matching_assets( def list_matching_access_policies( self, asset_id: str, *, page_size: Optional[int] = None - ): + ) -> Generator[AccessPolicy, None, None]: """List matching access policies. List access policies that match asset. diff --git a/archivist/appidp.py b/archivist/appidp.py index 0572f3c5..45dd425d 100644 --- a/archivist/appidp.py +++ b/archivist/appidp.py @@ -21,11 +21,12 @@ """ +from __future__ import annotations from logging import getLogger # pylint:disable=cyclic-import # but pylint doesn't understand this feature # pylint:disable=too-few-public-methods -from . import archivist as type_helper # pylint:disable=unused-import +from . import archivist from .constants import ( APPIDP_SUBPATH, @@ -51,9 +52,9 @@ class _AppIDPClient: """ - def __init__(self, archivist: "type_helper.Archivist"): - self._archivist = archivist - self._subpath = f"{archivist.root}/{APPIDP_SUBPATH}" + def __init__(self, archivist_instance: archivist.Archivist): + self._archivist = archivist_instance + self._subpath = f"{archivist_instance.root}/{APPIDP_SUBPATH}" self._label = f"{self._subpath}/{APPIDP_LABEL}" def __str__(self) -> str: diff --git a/archivist/applications.py b/archivist/applications.py index a4cf34da..6da46244 100644 --- a/archivist/applications.py +++ b/archivist/applications.py @@ -21,11 +21,12 @@ """ +from __future__ import annotations from logging import getLogger -from typing import Dict, Optional +from typing import Any, Optional # pylint:disable=cyclic-import # but pylint doesn't understand this feature -from . import archivist as type_helper # pylint:disable=unused-import +from . import archivist from .constants import ( APPLICATIONS_SUBPATH, @@ -53,15 +54,15 @@ class _ApplicationsClient: """ - def __init__(self, archivist: "type_helper.Archivist"): - self._archivist = archivist - self._subpath = f"{archivist.root}/{APPLICATIONS_SUBPATH}" + def __init__(self, archivist_instance: archivist.Archivist): + self._archivist = archivist_instance + self._subpath = f"{archivist_instance.root}/{APPLICATIONS_SUBPATH}" self._label = f"{self._subpath}/{APPLICATIONS_LABEL}" def __str__(self) -> str: return f"ApplicationsClient({self._archivist.url})" - def create(self, display_name: str, custom_claims: Dict) -> Application: + def create(self, display_name: str, custom_claims: dict[str, str]) -> Application: """Create application Creates application with defined attributes. @@ -82,7 +83,7 @@ def create(self, display_name: str, custom_claims: Dict) -> Application: ), ) - def create_from_data(self, data: Dict) -> Application: + def create_from_data(self, data: dict[str, Any]) -> Application: """Create application Creates application with request body from data stream. @@ -115,8 +116,8 @@ def update( self, identity: str, *, - display_name: str = None, - custom_claims: Optional[Dict] = None, + display_name: Optional[str] = None, + custom_claims: Optional[dict[str, str]] = None, ) -> Application: """Update Application @@ -141,7 +142,7 @@ def update( ) ) - def delete(self, identity: str) -> Dict: + def delete(self, identity: str) -> dict[str, Any]: """Delete Application Deletes application. @@ -159,8 +160,8 @@ def __params( self, *, display_name: Optional[str] = None, - custom_claims: Optional[Dict] = None, - ) -> Dict: + custom_claims: Optional[dict[str, str]] = None, + ) -> dict[str, Any]: params = {} diff --git a/archivist/archivist.py b/archivist/archivist.py index d0169b8b..d2f78db2 100644 --- a/archivist/archivist.py +++ b/archivist/archivist.py @@ -29,11 +29,11 @@ attachments, IAM subjects and IAM access policies documented elsewhere. """ - +from __future__ import annotations from logging import getLogger from copy import deepcopy from time import time -from typing import BinaryIO, Dict, Optional, Union +from typing import Any, BinaryIO, Optional, Tuple from requests_toolbelt.multipart.encoder import MultipartEncoder @@ -66,7 +66,6 @@ from .subjects import _SubjectsClient from .tenancies import _TenanciesClient -from .type_aliases import MachineAuth LOGGER = getLogger(__name__) @@ -79,7 +78,8 @@ class Archivist(ArchivistPublic): # pylint: disable=too-many-instance-attribute Args: url (str): URL of archivist endpoint - auth: string representing JWT token. + auth: string representing JWT token, or a Tuple pair representing an + Appregistration ID and secret. verify: if True the certificate is verified max_time (float): maximum time in seconds to wait for confirmation @@ -107,9 +107,9 @@ class Archivist(ArchivistPublic): # pylint: disable=too-many-instance-attribute def __init__( self, url: str, - auth: Union[None, str, MachineAuth], + auth: str | Tuple[str, str] | None, *, - fixtures: Optional[Dict] = None, + fixtures: Optional[dict[str, dict[Any, Any]]] = None, verify: bool = True, max_time: float = MAX_TIME, ): @@ -120,13 +120,11 @@ def __init__( ) if isinstance(auth, tuple): + self._machine_auth = auth self._auth = None - self._client_id = auth[0] - self._client_secret = auth[1] else: self._auth = auth - self._client_id = None - self._client_secret = None + self._machine_auth = None self._expires_at = 0 if url.endswith("/"): @@ -155,7 +153,7 @@ def __init__( def __str__(self) -> str: return f"Archivist({self._url})" - def __getattr__(self, value: str): + def __getattr__(self, value: str) -> object: """Create endpoints on demand""" LOGGER.debug("getattr %s", value) client = self.CLIENTS.get(value) @@ -183,20 +181,24 @@ def root(self) -> str: return self._root @property - def auth(self) -> str: - """str: authorization token.""" - if self._client_id is not None and self._expires_at < time(): - apptoken = self.appidp.token(self._client_id, self._client_secret) # type: ignore + def auth(self) -> str | None: + """str: authorization token""" + + if self._auth is None and self._machine_auth is None: + return None + + if self._machine_auth and self._expires_at < time(): + apptoken = self.appidp.token(*self._machine_auth) self._auth = apptoken.get("access_token") if self._auth is None: raise ArchivistError("Auth token from client id,secret is invalid") self._expires_at = time() + apptoken["expires_in"] - 10 # fudge factor LOGGER.info("Refresh token") - return self._auth # type: ignore + return self._auth @property - def Public(self): # pylint: disable=invalid-name + def Public(self) -> ArchivistPublic: # pylint: disable=invalid-name """Get a Public instance""" return ArchivistPublic( fixtures=deepcopy(self._fixtures), @@ -204,7 +206,7 @@ def Public(self): # pylint: disable=invalid-name max_time=self._max_time, ) - def __copy__(self): + def __copy__(self) -> Archivist: return Archivist( self._url, self.auth, @@ -213,8 +215,9 @@ def __copy__(self): max_time=self._max_time, ) - def _add_headers(self, headers: Optional[Dict]) -> Dict: - if headers is not None: + def _add_headers(self, headers: dict[str, str] | None) -> dict[str, Any]: + + if isinstance(headers, dict): newheaders = {**headers} else: newheaders = {} @@ -232,11 +235,11 @@ def _add_headers(self, headers: Optional[Dict]) -> Dict: def post( self, url: str, - request: Optional[Dict], + request: dict[str, Any] | None, *, - headers: Optional[Dict] = None, - data: Optional[bool] = False, - ) -> Dict: + headers: Optional[dict[str, Any]] = None, + data: dict[str, Any] | bool = False, + ) -> dict[str, Any]: """POST method (REST) Creates an entity @@ -275,11 +278,11 @@ def post_file( self, url: str, fd: BinaryIO, - mtype: str, + mtype: str | None, *, - form: Optional[str] = "file", - params: Optional[Dict] = None, - ) -> Dict: + form: str = "file", + params: Optional[dict] = None, + ) -> dict[str, Any]: """POST method (REST) - upload binary Uploads a file to an endpoint @@ -319,7 +322,9 @@ def post_file( return response.json() @retry_429 - def delete(self, url: str, *, headers: Optional[Dict] = None) -> Dict: + def delete( + self, url: str, *, headers: Optional[dict[str, Any]] = None + ) -> dict[str, Any]: """DELETE method (REST) Deletes an entity @@ -349,10 +354,10 @@ def delete(self, url: str, *, headers: Optional[Dict] = None) -> Dict: def patch( self, url: str, - request: Dict, + request: dict[str, Any], *, - headers: Optional[Dict] = None, - ) -> Dict: + headers: Optional[dict[str, Any]] = None, + ) -> dict[str, Any]: """PATCH method (REST) Updates the specified entity. diff --git a/archivist/archivistpublic.py b/archivist/archivistpublic.py index b1c57c28..2142cad8 100644 --- a/archivist/archivistpublic.py +++ b/archivist/archivistpublic.py @@ -22,10 +22,11 @@ """ +from __future__ import annotations from logging import getLogger from collections import deque from copy import deepcopy -from typing import BinaryIO, Dict, List, Optional +from typing import Any, BinaryIO, Optional import requests from requests.models import Response @@ -78,7 +79,7 @@ class ArchivistPublic: # pylint: disable=too-many-instance-attributes def __init__( self, *, - fixtures: Optional[Dict] = None, + fixtures: Optional[dict[str, Any]] = None, verify: bool = True, max_time: float = MAX_TIME, ): @@ -97,7 +98,7 @@ def __init__( def __str__(self) -> str: return "ArchivistPublic()" - def __getattr__(self, value: str): + def __getattr__(self, value: str) -> object: """Create endpoints on demand""" client = self.CLIENTS.get(value) @@ -142,12 +143,12 @@ def max_time(self) -> float: return self._max_time @property - def fixtures(self) -> Dict: + def fixtures(self) -> dict[str, Any]: """dict: Contains predefined attributes for each endpoint""" return self._fixtures @fixtures.setter - def fixtures(self, fixtures: Dict): + def fixtures(self, fixtures: dict[str, Any]): """dict: Contains predefined attributes for each endpoint""" self._fixtures = _deepmerge(self._fixtures, fixtures) @@ -158,7 +159,7 @@ def __copy__(self): max_time=self._max_time, ) - def _add_headers(self, headers: Optional[Dict]) -> Dict: + def _add_headers(self, headers: Optional[dict]) -> dict[str, str]: if headers is not None: newheaders = {**headers} else: @@ -174,9 +175,9 @@ def get( self, url: str, *, - headers: Optional[Dict] = None, - params: Optional[Dict] = None, - ) -> Dict: + headers: Optional[dict[str, str]] = None, + params: Optional[dict[str, Any]] = None, + ) -> dict[str, Any]: """GET method (REST) Args: @@ -209,8 +210,8 @@ def get_file( url: str, fd: BinaryIO, *, - headers: Optional[Dict] = None, - params: Optional[Dict] = None, + headers: Optional[dict[str, str]] = None, + params: Optional[dict[str, Any]] = None, ) -> Response: """GET method (REST) - chunked @@ -252,10 +253,10 @@ def get_file( def __list( self, url: str, - params: Optional[Dict], + params: Optional[dict[str, Any]], *, page_size: Optional[int] = None, - headers: Optional[Dict] = None, + headers: Optional[dict[str, str]] = None, ) -> Response: if page_size is not None: if params is not None: @@ -278,7 +279,7 @@ def __list( return response - def last_response(self, *, responses: int = 1) -> List[Response]: + def last_response(self, *, responses: int = 1) -> list[Response]: """Returns the requested number of response objects from the response ring buffer Args: @@ -292,8 +293,13 @@ def last_response(self, *, responses: int = 1) -> List[Response]: return list(self._response_ring_buffer)[:responses] def get_by_signature( - self, url: str, field: str, params: Dict, *, headers: Optional[Dict] = None - ) -> Dict: + self, + url: str, + field: str, + params: dict[str, Any], + *, + headers: Optional[dict[str, str]] = None, + ) -> dict[str, Any]: """GET method (REST) with params string Reads an entity indirectly by searching for its signature @@ -339,7 +345,7 @@ def get_by_signature( return records[0] - def count(self, url: str, *, params: Optional[Dict] = None) -> int: + def count(self, url: str, *, params: Optional[dict[str, Any]] = None) -> int: """GET method (REST) with params string Returns the count of objects that match params @@ -363,7 +369,7 @@ def count(self, url: str, *, params: Optional[Dict] = None) -> int: headers={HEADERS_REQUEST_TOTAL_COUNT: "true"}, ) - count = _headers_get(response.headers, HEADERS_TOTAL_COUNT) # type: ignore + count = _headers_get(response.headers, HEADERS_TOTAL_COUNT) if count is None: raise ArchivistHeaderError("Did not get a count in the header") @@ -376,8 +382,8 @@ def list( field: str, *, page_size: Optional[int] = None, - params: Optional[Dict] = None, - headers: Optional[Dict] = None, + params: Optional[dict[str, Any]] = None, + headers: Optional[dict[str, str]] = None, ): """GET method (REST) with params string diff --git a/archivist/asset.py b/archivist/asset.py index 04f31716..df803fad 100644 --- a/archivist/asset.py +++ b/archivist/asset.py @@ -1,8 +1,7 @@ """Asset data class """ - -from .type_aliases import NoneOnError +from __future__ import annotations class Asset(dict): @@ -13,7 +12,7 @@ class Asset(dict): """ @property - def primary_image(self) -> NoneOnError[str]: + def primary_image(self) -> str | None: """Primary Image Attachment that is the primary image of the asset. @@ -40,7 +39,7 @@ def primary_image(self) -> NoneOnError[str]: return None @property - def name(self) -> NoneOnError[str]: + def name(self) -> str | None: """str: name of the asset""" name = None try: diff --git a/archivist/assetattachments.py b/archivist/assetattachments.py index 3f665f47..6085417a 100644 --- a/archivist/assetattachments.py +++ b/archivist/assetattachments.py @@ -24,15 +24,16 @@ # pylint:disable=too-few-public-methods +from __future__ import annotations from copy import deepcopy from logging import getLogger -from typing import BinaryIO, Dict, Optional +from typing import Any, BinaryIO, Optional from urllib.parse import urlparse from requests.models import Response # pylint:disable=cyclic-import # but pylint doesn't understand this feature -from . import archivist as type_helper # pylint:disable=unused-import +from . import archivist from .constants import ( SEP, @@ -56,10 +57,10 @@ class _AssetAttachmentsClient: """ - def __init__(self, archivist: "type_helper.Archivist"): - self._archivist = archivist - self._public = archivist.public - self._subpath = f"{archivist.root}/{ASSETATTACHMENTS_SUBPATH}" + def __init__(self, archivist_instance: archivist.Archivist): + self._archivist = archivist_instance + self._public = archivist_instance.public + self._subpath = f"{archivist_instance.root}/{ASSETATTACHMENTS_SUBPATH}" self._label = f"{self._subpath}/{ASSETATTACHMENTS_LABEL}" def __str__(self) -> str: @@ -96,7 +97,7 @@ def _identity(self, identity: str, attachment_id: str) -> str: return f"{self._label}/{identity}/{uuid}" - def __params(self, params: Optional[Dict]) -> Dict: + def __params(self, params: Optional[dict[str, Any]]) -> dict[str, Any]: params = deepcopy(params) if params else {} # pylint: disable=protected-access return _deepmerge(self._archivist.fixtures.get(ATTACHMENTS_LABEL), params) @@ -107,8 +108,8 @@ def download( attachment_id: str, fd: BinaryIO, *, - params: Optional[Dict] = None, - ) -> dict: + params: Optional[dict[str, Any]] = None, + ) -> Response: """Read attachment Reads attachment into data sink (usually a file opened for write). @@ -142,7 +143,7 @@ def info( self, identity: str, attachment_id: str, - ) -> Response: + ) -> dict[str, Any]: """Read asset attachment info Reads asset attachment info diff --git a/archivist/assets.py b/archivist/assets.py index 92e5efe2..03cb6693 100644 --- a/archivist/assets.py +++ b/archivist/assets.py @@ -21,12 +21,13 @@ """ +from __future__ import annotations from logging import getLogger -from typing import Dict, Optional, Tuple +from typing import Any, Optional, Tuple from copy import deepcopy # pylint:disable=cyclic-import # but pylint doesn't understand this feature -from . import archivist as type_helper # pylint:disable=unused-import +from . import archivist from .asset import Asset from .constants import ( @@ -54,10 +55,10 @@ class _AssetsPublic: """ - def __init__(self, archivist: "type_helper.Archivist"): - self._archivist = archivist - self._public = archivist.public - self._subpath = f"{archivist.root}/{ASSETS_SUBPATH}" + def __init__(self, archivist_instance: archivist.Archivist): + self._archivist = archivist_instance + self._public = archivist_instance.public + self._subpath = f"{archivist_instance.root}/{ASSETS_SUBPATH}" def __str__(self) -> str: return "AssetsPublic()" @@ -97,14 +98,17 @@ class _AssetsRestricted(_AssetsPublic): """ - def __init__(self, archivist: "type_helper.Archivist"): - super().__init__(archivist) + def __init__(self, archivist_instance: archivist.Archivist): + super().__init__(archivist_instance) self._label = f"{self._subpath}/{ASSETS_LABEL}" + self.pending_count: int = 0 def __str__(self) -> str: return f"AssetsRestricted({self._archivist.url})" - def __params(self, props: Optional[Dict], attrs: Optional[Dict]) -> Dict: + def __params( + self, props: Optional[dict[str, Any]], attrs: Optional[dict[str, Any]] + ) -> dict[str, Any]: params = deepcopy(props) if props else {} if attrs: params["attributes"] = attrs @@ -114,8 +118,8 @@ def __params(self, props: Optional[Dict], attrs: Optional[Dict]) -> Dict: def create( self, *, - props: Optional[Dict] = None, - attrs: Optional[Dict] = None, + props: Optional[dict[str, Any]] = None, + attrs: Optional[dict[str, Any]] = None, confirm: bool = True, ) -> Asset: """Create asset @@ -138,7 +142,7 @@ def create( data = self.__params(newprops, attrs) return self.create_from_data(data, confirm=confirm) - def create_from_data(self, data: Dict, *, confirm: bool = True) -> Asset: + def create_from_data(self, data: dict[str, Any], *, confirm: bool = True) -> Asset: """Create asset Creates asset with request body from data stream. @@ -159,7 +163,7 @@ def create_from_data(self, data: Dict, *, confirm: bool = True) -> Asset: return self.wait_for_confirmation(asset["identity"]) def create_if_not_exists( - self, data: Dict, *, confirm: bool = True + self, data: dict[str, Any], *, confirm: bool = True ) -> Tuple[Asset, bool]: """ Creates an asset and associated locations and attachments if asset @@ -274,7 +278,10 @@ def wait_for_confirmation(self, identity: str) -> Asset: return confirmer._wait_for_confirmation(self, identity) def wait_for_confirmed( - self, *, props: Optional[Dict] = None, attrs: Optional[Dict] = None + self, + *, + props: Optional[dict[str, Any]] = None, + attrs: Optional[dict[str, Any]] = None, ) -> bool: """Wait for assets to be confirmed. @@ -302,7 +309,10 @@ def wait_for_confirmed( return confirmer._wait_for_confirmed(self, props=newprops, attrs=attrs) def count( - self, *, props: Optional[Dict] = None, attrs: Optional[Dict] = None + self, + *, + props: Optional[dict[str, Any]] = None, + attrs: Optional[dict[str, Any]] = None, ) -> int: """Count assets. @@ -322,8 +332,8 @@ def list( self, *, page_size: Optional[int] = None, - props: Optional[Dict] = None, - attrs: Optional[Dict] = None, + props: Optional[dict[str, Any]] = None, + attrs: Optional[dict[str, Any]] = None, ): """List assets. @@ -349,7 +359,10 @@ def list( ) def read_by_signature( - self, *, props: Optional[Dict] = None, attrs: Optional[Dict] = None + self, + *, + props: Optional[dict[str, Any]] = None, + attrs: Optional[dict[str, Any]] = None, ) -> Asset: """Read Asset by signature. diff --git a/archivist/attachments.py b/archivist/attachments.py index 8f473de8..3865fe3c 100644 --- a/archivist/attachments.py +++ b/archivist/attachments.py @@ -24,15 +24,16 @@ # pylint:disable=too-few-public-methods +from __future__ import annotations from copy import deepcopy from io import BytesIO from logging import getLogger -from typing import BinaryIO, Dict, Optional +from typing import BinaryIO, Optional, Any from requests.models import Response # pylint:disable=cyclic-import # but pylint doesn't understand this feature -from . import archivist as type_helper # pylint:disable=unused-import +from . import archivist from .constants import ( ATTACHMENTS_SUBPATH, @@ -63,15 +64,15 @@ class _AttachmentsClient: """ - def __init__(self, archivist: "type_helper.Archivist"): - self._archivist = archivist - self._subpath = f"{archivist.root}/{ATTACHMENTS_SUBPATH}" + def __init__(self, archivist_instance: archivist.Archivist): + self._archivist = archivist_instance + self._subpath = f"{archivist_instance.root}/{ATTACHMENTS_SUBPATH}" self._label = f"{self._subpath}/{ATTACHMENTS_LABEL}" def __str__(self) -> str: return f"AttachmentsClient({self._archivist.url})" - def create(self, data: Dict) -> Dict: # pragma: no cover + def create(self, data: dict[str, Any]) -> dict[str, Any]: # pragma: no cover """ Create an attachment and return struct suitable for use in an asset or event creation. @@ -136,7 +137,7 @@ def create(self, data: Dict) -> Dict: # pragma: no cover return result - def upload(self, fd: BinaryIO, *, mtype: str = None) -> Attachment: + def upload(self, fd: BinaryIO, *, mtype: Optional[str] = None) -> Attachment: """Create attachment Creates attachment from opened file or other data source. @@ -159,7 +160,7 @@ def upload(self, fd: BinaryIO, *, mtype: str = None) -> Attachment: ) ) - def __params(self, params: Optional[Dict]) -> Dict: + def __params(self, params: Optional[dict[str, Any]]) -> dict[str, Any]: params = deepcopy(params) if params else {} # pylint: disable=protected-access return _deepmerge(self._archivist.fixtures.get(ATTACHMENTS_LABEL), params) @@ -169,8 +170,8 @@ def download( identity: str, fd: BinaryIO, *, - params: Optional[Dict] = None, - ) -> dict: + params: Optional[dict[str, Any]] = None, + ) -> Response: """Read attachment Reads attachment into data sink (usually a file opened for write).. @@ -195,7 +196,7 @@ def download( def info( self, identity: str, - ) -> Response: + ) -> dict[str, Any]: """Read attachment info Reads attachment info diff --git a/archivist/cmds/runner/run.py b/archivist/cmds/runner/run.py index 4be22f9e..2e666e3b 100644 --- a/archivist/cmds/runner/run.py +++ b/archivist/cmds/runner/run.py @@ -1,5 +1,6 @@ # pylint: disable=missing-docstring +from __future__ import annotations from logging import getLogger from os import environ from sys import exit as sys_exit @@ -9,13 +10,13 @@ from ... import about # pylint:disable=cyclic-import # but pylint doesn't understand this feature -from ... import archivist as type_helper # pylint:disable=unused-import +from ... import archivist LOGGER = getLogger(__name__) -def run(arch: "type_helper.Archivist", args): +def run(arch: archivist.Archivist, args): LOGGER.info("Using version %s of jitsuin-archivist", about.__version__) LOGGER.info("Namespace %s", args.namespace) diff --git a/archivist/compliance.py b/archivist/compliance.py index eccdd4b0..a41cd527 100644 --- a/archivist/compliance.py +++ b/archivist/compliance.py @@ -21,11 +21,12 @@ """ +from __future__ import annotations from logging import getLogger -from typing import Optional +from typing import Any, Optional # pylint:disable=cyclic-import # but pylint doesn't understand this feature -from . import archivist as type_helper # pylint:disable=unused-import +from . import archivist from .constants import ( COMPLIANCE_SUBPATH, @@ -56,9 +57,9 @@ class _ComplianceClient: # pylint: disable=too-few-public-methods """ - def __init__(self, archivist: "type_helper.Archivist"): - self._archivist = archivist - self._subpath = f"{archivist.root}/{COMPLIANCE_SUBPATH}" + def __init__(self, archivist_instance: archivist.Archivist): + self._archivist = archivist_instance + self._subpath = f"{archivist_instance.root}/{COMPLIANCE_SUBPATH}" self._label = f"{self._subpath}/{COMPLIANCE_LABEL}" def __str__(self) -> str: @@ -95,7 +96,7 @@ def compliant_at( self.compliant_at_report(response) return Compliance(**response) - def compliant_at_report(self, compliance: Compliance): + def compliant_at_report(self, compliance: dict[str, Any]): """ Prints report of compliance_at request diff --git a/archivist/compliance_policies.py b/archivist/compliance_policies.py index da6195cc..ed61da25 100644 --- a/archivist/compliance_policies.py +++ b/archivist/compliance_policies.py @@ -25,12 +25,13 @@ """ +from __future__ import annotations from copy import deepcopy from logging import getLogger -from typing import Dict, Optional, Union +from typing import Any, Optional, Union # pylint:disable=cyclic-import # but pylint doesn't understand this feature -from . import archivist as type_helper # pylint:disable=unused-import +from . import archivist from .compliance_policy_requests import ( CompliancePolicySince, @@ -73,9 +74,9 @@ class _CompliancePoliciesClient: """ - def __init__(self, archivist: "type_helper.Archivist"): - self._archivist = archivist - self._subpath = f"{archivist.root}/{COMPLIANCE_POLICIES_SUBPATH}" + def __init__(self, archivist_instance: archivist.Archivist): + self._archivist = archivist_instance + self._subpath = f"{archivist_instance.root}/{COMPLIANCE_POLICIES_SUBPATH}" self._label = f"{self._subpath}/{COMPLIANCE_POLICIES_LABEL}" def __str__(self) -> str: @@ -108,7 +109,7 @@ def create( """ return self.create_from_data(policy.dict()) - def create_from_data(self, data: Dict) -> CompliancePolicy: + def create_from_data(self, data: dict[str, Any]) -> CompliancePolicy: """Create compliance_policy Creates compliance_policy with request body from data stream. @@ -138,7 +139,7 @@ def read(self, identity: str) -> CompliancePolicy: """ return CompliancePolicy(**self._archivist.get(f"{self._subpath}/{identity}")) - def delete(self, identity: str) -> Dict: + def delete(self, identity: str) -> dict[str, Any]: """Delete Compliance Policy Deletes compliance policy. @@ -153,14 +154,14 @@ def delete(self, identity: str) -> Dict: """ return self._archivist.delete(f"{self._subpath}/{identity}") - def __params(self, props: Optional[Dict]) -> Dict: + def __params(self, props: Optional[dict[str, Any]]) -> dict[str, Any]: params = deepcopy(props) if props else {} # pylint: disable=protected-access return _deepmerge( self._archivist.fixtures.get(COMPLIANCE_POLICIES_LABEL), params ) - def count(self, *, props: Optional[Dict] = None) -> int: + def count(self, *, props: Optional[dict[str, Any]] = None) -> int: """Count compliance policies. Counts number of compliance policies that match criteria. @@ -177,7 +178,9 @@ def count(self, *, props: Optional[Dict] = None) -> int: params=self.__params(props), ) - def list(self, *, page_size: Optional[int] = None, props: Dict = None): + def list( + self, *, page_size: Optional[int] = None, props: Optional[dict[str, Any]] = None + ): """List compliance policies. Lists compliance policies that match criteria. @@ -200,7 +203,7 @@ def list(self, *, page_size: Optional[int] = None, props: Dict = None): ) ) - def read_by_signature(self, *, props: Dict = None): + def read_by_signature(self, *, props: Optional[dict[str, Any]] = None): """Read compliance policy by signature. Reads compliance policy that meets criteria. Only one compliance policy is expected. diff --git a/archivist/compliance_policy_requests.py b/archivist/compliance_policy_requests.py index 76c79a3e..8306b744 100644 --- a/archivist/compliance_policy_requests.py +++ b/archivist/compliance_policy_requests.py @@ -4,8 +4,8 @@ """ +from __future__ import annotations from dataclasses import dataclass, asdict -from typing import List from .compliance_policy_type import CompliancePolicyType from .or_dict import and_list @@ -23,7 +23,7 @@ class CompliancePolicyBase: description: str display_name: str - asset_filter: List[List] + asset_filter: list[list] def dict(self): """Emit dictionary representation""" @@ -88,7 +88,7 @@ class CompliancePolicyRichness(CompliancePolicyBase): complies with a set of assertions. """ - richness_assertions: List[List] + richness_assertions: list[list] compliance_type: str = CompliancePolicyType.COMPLIANCE_RICHNESS.name def dict(self): diff --git a/archivist/composite.py b/archivist/composite.py index c5527f44..c0dcc7dd 100644 --- a/archivist/composite.py +++ b/archivist/composite.py @@ -21,10 +21,11 @@ """ +from __future__ import annotations from logging import getLogger # pylint:disable=cyclic-import # but pylint doesn't understand this feature -from . import archivist as type_helper # pylint:disable=unused-import +from . import archivist LOGGER = getLogger(__name__) @@ -42,8 +43,8 @@ class _CompositeClient: These mthods are not unittested and provided as a convenience. """ - def __init__(self, archivist: "type_helper.Archivist"): - self._archivist = archivist + def __init__(self, archivist_instance: archivist.Archivist): + self._archivist = archivist_instance def __str__(self) -> str: return f"CompositeClient({self._archivist.url})" diff --git a/archivist/confirmer.py b/archivist/confirmer.py index abd14827..8f7f5638 100644 --- a/archivist/confirmer.py +++ b/archivist/confirmer.py @@ -1,10 +1,11 @@ """assets confirmer interface """ +from __future__ import annotations from logging import getLogger from copy import deepcopy -from typing import overload +from typing import Any, Optional, overload, Union import backoff @@ -18,22 +19,29 @@ # pylint:disable=cyclic-import # but pylint doesn't understand this feature -from . import assets # pylint:disable=unused-import -from . import events # pylint:disable=unused-import +from . import assets +from . import events from .utils import backoff_handler -MAX_TIME = 1200 +MAX_TIME = 1200 LOGGER = getLogger(__name__) +# pylint: disable=protected-access +PublicManagers = Union[assets._AssetsPublic, events._EventsPublic] +PrivateManagers = Union[assets._AssetsRestricted, events._EventsRestricted] +Managers = Union[PublicManagers, PrivateManagers] + +ReturnTypes = Union[assets.Asset, events.Event] + def __lookup_max_time(): return MAX_TIME -def __on_giveup_confirmation(details): - identity = details["args"][1] - elapsed = details["elapsed"] +def __on_giveup_confirmation(details: dict[str, Any]): + identity: str = details["args"][1] + elapsed: str = details["elapsed"] raise ArchivistUnconfirmedError( f"confirmation for {identity} timed out after {elapsed} seconds" ) @@ -47,27 +55,38 @@ def __on_giveup_confirmation(details): @overload def _wait_for_confirmation( - self: "assets._AssetsPublic", identity: str -) -> "assets.Asset": + self: assets._AssetsRestricted, identity: str +) -> assets.Asset: + ... # pragma: no cover + + +@overload +def _wait_for_confirmation(self: assets._AssetsPublic, identity: str) -> assets.Asset: ... # pragma: no cover @overload def _wait_for_confirmation( - self: "events._EventsPublic", identity: str -) -> "events.Event": + self: events._EventsRestricted, identity: str +) -> events.Event: + ... # pragma: no cover + + +@overload +def _wait_for_confirmation(self: events._EventsPublic, identity: str) -> events.Event: ... # pragma: no cover @backoff.on_predicate( backoff.expo, - logger=None, + logger=None, # type: ignore max_time=__lookup_max_time, on_backoff=backoff_handler, on_giveup=__on_giveup_confirmation, ) -def _wait_for_confirmation(self, identity): +def _wait_for_confirmation(self: Managers, identity: str) -> ReturnTypes: """Return None until entity is confirmed""" + entity = self.read(identity) LOGGER.debug("entity %s", entity) @@ -84,13 +103,13 @@ def _wait_for_confirmation(self, identity): if entity[CONFIRMATION_STATUS] == CONFIRMATION_CONFIRMED: return entity - return None + return None # type: ignore -def __on_giveup_confirmed(details): - self = details["args"][0] +def __on_giveup_confirmed(details: dict[str, Any]): + self: PrivateManagers = details["args"][0] count = self.pending_count - elapsed = details["elapsed"] + elapsed: int = details["elapsed"] raise ArchivistUnconfirmedError( f"{count} pending assets still present after {elapsed} seconds" ) @@ -98,12 +117,14 @@ def __on_giveup_confirmed(details): @backoff.on_predicate( backoff.expo, - logger=None, + logger=None, # type: ignore max_time=__lookup_max_time, on_backoff=backoff_handler, on_giveup=__on_giveup_confirmed, ) -def _wait_for_confirmed(self, *, props=None, **kwargs) -> bool: +def _wait_for_confirmed( + self: PrivateManagers, *, props: Optional[dict[str, Any]] = None, **kwargs: Any +) -> bool: """Return False until all entities are confirmed""" # look for unconfirmed entities diff --git a/archivist/dictmerge.py b/archivist/dictmerge.py index 579db9e4..3addfee0 100644 --- a/archivist/dictmerge.py +++ b/archivist/dictmerge.py @@ -1,13 +1,15 @@ """Archivist dict deep merge """ - +from __future__ import annotations from copy import deepcopy -from typing import Optional +from typing import Any, Optional from flatten_dict import flatten, unflatten -def _deepmerge(dct1: Optional[dict], dct2: Optional[dict]) -> dict: +def _deepmerge( + dct1: Optional[dict[str, Any]], dct2: Optional[dict[str, Any]] +) -> dict[str, Any]: """Deep merge 2 dictionaries The settings from dct2 overwrite or add to dct1 @@ -23,7 +25,7 @@ def _deepmerge(dct1: Optional[dict], dct2: Optional[dict]) -> dict: return unflatten({**flatten(dct1), **flatten(dct2)}) -def _dotdict(dct: Optional[dict]) -> Optional[dict]: +def _dotdict(dct: Optional[dict[str, Any]]) -> dict[str, str] | None: """Emit nested dictionary as dot delimited dict with one level""" if dct is None: return None diff --git a/archivist/errors.py b/archivist/errors.py index c65617ff..b9d62e59 100644 --- a/archivist/errors.py +++ b/archivist/errors.py @@ -8,6 +8,8 @@ from logging import getLogger from typing import Optional +from requests import Response + from .constants import HEADERS_RETRY_AFTER from .headers import _headers_get @@ -90,7 +92,7 @@ class Archivist5xxError(ArchivistError): """Any other 5xx error""" -def __identity(response): +def __identity(response: Response) -> str: identity = "unknown" if response.request: LOGGER.debug("Request %s", response.request) @@ -109,7 +111,7 @@ def __identity(response): return identity -def __description(response): +def __description(response: Response) -> str: status_code = response.status_code if status_code == 404: return f"{__identity(response)} not found ({status_code})" @@ -119,7 +121,7 @@ def __description(response): return f"{url}: {text} ({status_code})" -def _parse_response(response): +def _parse_response(response: Response): """Parse REST response Validates REST response. This is a convenience function called diff --git a/archivist/events.py b/archivist/events.py index e743ff39..0a8ba7b0 100644 --- a/archivist/events.py +++ b/archivist/events.py @@ -22,12 +22,13 @@ """ +from __future__ import annotations from copy import deepcopy from logging import getLogger -from typing import Dict, Optional +from typing import Any, Optional # pylint:disable=cyclic-import # but pylint doesn't understand this feature -from . import archivist as type_helper # pylint:disable=unused-import +from . import archivist from .constants import ( ASSETS_SUBPATH, @@ -52,21 +53,21 @@ class Event(dict): """ @property - def when(self): + def when(self) -> str | None: """when Timestamp of event """ try: - when = self["timestamp_declared"] + when: str = self["timestamp_declared"] except KeyError: pass else: return when try: - when = self["timestamp_accepted"] + when: str = self["timestamp_accepted"] except KeyError: pass else: @@ -75,7 +76,7 @@ def when(self): return None @property - def who(self): + def who(self) -> str | None: """who Principal identity. @@ -83,14 +84,14 @@ def who(self): """ try: - who = self["principal_declared"]["display_name"] + who: str = self["principal_declared"]["display_name"] except (KeyError, TypeError): pass else: return who try: - who = self["principal_accepted"]["display_name"] + who: str = self["principal_accepted"]["display_name"] except (KeyError, TypeError): pass else: @@ -110,10 +111,10 @@ class _EventsPublic: """ - def __init__(self, archivist: "type_helper.Archivist"): - self._archivist = archivist - self._public = archivist.public - self._subpath = f"{archivist.root}/{ASSETS_SUBPATH}" + def __init__(self, archivist_instance: archivist.Archivist): + self._archivist = archivist_instance + self._public = archivist_instance.public + self._subpath = f"{archivist_instance.root}/{ASSETS_SUBPATH}" def __str__(self) -> str: return "EventsPublic()" @@ -122,6 +123,7 @@ def _identity(self, identity: str) -> str: """Return fully qualified identity If public then expect a full url as argument """ + if self._public: return identity @@ -142,8 +144,11 @@ def read(self, identity: str) -> Event: return Event(**self._archivist.get(f"{self._identity(identity)}")) def _params( - self, props: Optional[Dict], attrs: Optional[Dict], asset_attrs: Optional[Dict] - ) -> Dict: + self, + props: Optional[dict[str, Any]], + attrs: Optional[dict[str, Any]], + asset_attrs: Optional[dict[str, Any]], + ) -> dict[str, Any]: params = deepcopy(props) if props else {} if attrs: params["event_attributes"] = attrs @@ -156,9 +161,9 @@ def count( self, *, asset_id: Optional[str] = None, - props: Optional[Dict] = None, - attrs: Optional[Dict] = None, - asset_attrs: Optional[Dict] = None, + props: Optional[dict[str, Any]] = None, + attrs: Optional[dict[str, Any]] = None, + asset_attrs: Optional[dict[str, Any]] = None, ) -> int: """Count events. @@ -177,13 +182,17 @@ def count( # wildcarding not allowed when public - asset_id is required (not optional) # if asset_id is wildcarded a 401 will be returned from upstream - if not self._public: - asset_id = asset_id or ASSETS_WILDCARD + if not self._public and not asset_id: + asset_id = ASSETS_WILDCARD + # The type checker rightly points out in the case of an event being public but with no + # asset_id will cause issues in the _identity function and the count function LOGGER.debug("asset_id %s", asset_id) - LOGGER.debug("event_id %s", f"{self._identity(asset_id)}/{EVENTS_LABEL}") + LOGGER.debug( + "event_id %s", f"{self._identity(asset_id)}/{EVENTS_LABEL}" # type:ignore + ) return self._archivist.count( - f"{self._identity(asset_id)}/{EVENTS_LABEL}", + f"{self._identity(asset_id)}/{EVENTS_LABEL}", # type:ignore params=self._params(props, attrs, asset_attrs), ) @@ -192,9 +201,9 @@ def list( *, asset_id: Optional[str] = None, page_size: Optional[int] = None, - props: Optional[Dict] = None, - attrs: Optional[Dict] = None, - asset_attrs: Optional[Dict] = None, + props: Optional[dict[str, Any]] = None, + attrs: Optional[dict[str, Any]] = None, + asset_attrs: Optional[dict[str, Any]] = None, ): """List events. @@ -219,7 +228,7 @@ def list( return ( Event(**a) for a in self._archivist.list( - f"{self._identity(asset_id)}/{EVENTS_LABEL}", + f"{self._identity(asset_id)}/{EVENTS_LABEL}", # type:ignore EVENTS_LABEL, page_size=page_size, params=self._params(props, attrs, asset_attrs), @@ -230,9 +239,9 @@ def read_by_signature( self, *, asset_id: Optional[str] = None, - props: Optional[Dict] = None, - attrs: Optional[Dict] = None, - asset_attrs: Optional[Dict] = None, + props: Optional[dict[str, Any]] = None, + attrs: Optional[dict[str, Any]] = None, + asset_attrs: Optional[dict[str, Any]] = None, ) -> Event: """Read event by signature. @@ -255,7 +264,7 @@ def read_by_signature( return Event( **self._archivist.get_by_signature( - f"{self._identity(asset_id)}/{EVENTS_LABEL}", + f"{self._identity(asset_id)}/{EVENTS_LABEL}", # type:ignore EVENTS_LABEL, params=self._params(props, attrs, asset_attrs), ) @@ -273,16 +282,20 @@ class _EventsRestricted(_EventsPublic): """ + def __init__(self, archivist_instance: archivist.Archivist): + super().__init__(archivist_instance) + self.pending_count: int = 0 + def __str__(self) -> str: return f"EventsRestricted({self._archivist.url})" def create( self, asset_id: str, - props: Dict, - attrs: Dict, + props: dict[str, Any], + attrs: dict[str, Any], *, - asset_attrs: Optional[Dict] = None, + asset_attrs: Optional[dict[str, Any]] = None, confirm: bool = True, ) -> Event: """Create event @@ -308,7 +321,9 @@ def create( confirm=confirm, ) - def create_from_data(self, asset_id: str, data: Dict, *, confirm=True) -> Event: + def create_from_data( + self, asset_id: str, data: dict[str, Any], *, confirm: bool = True + ) -> Event: """Create event Creates event for given asset from data. @@ -367,7 +382,8 @@ def create_from_data(self, asset_id: str, data: Dict, *, confirm=True) -> Event: if not confirm: return event - return self.wait_for_confirmation(event["identity"]) + event_id: str = event["identity"] + return self.wait_for_confirmation(event_id) def wait_for_confirmation(self, identity: str) -> Event: """Wait for event to be confirmed. @@ -389,9 +405,9 @@ def wait_for_confirmed( self, *, asset_id: Optional[str] = None, - props: Optional[Dict] = None, - attrs: Optional[Dict] = None, - asset_attrs: Optional[Dict] = None, + props: Optional[dict[str, Any]] = None, + attrs: Optional[dict[str, Any]] = None, + asset_attrs: Optional[dict[str, Any]] = None, ) -> bool: """Wait for events to be confirmed. diff --git a/archivist/headers.py b/archivist/headers.py index c0113a70..1d4a5dda 100644 --- a/archivist/headers.py +++ b/archivist/headers.py @@ -4,10 +4,11 @@ """ -from typing import Dict, Optional +from typing import Optional +from requests import models -def _headers_get(headers: Dict, key: str) -> Optional[str]: +def _headers_get(headers: models.CaseInsensitiveDict, key: str) -> Optional[str]: if headers is not None: ret = headers.get(key) if ret is not None: diff --git a/archivist/locations.py b/archivist/locations.py index 34ad41a2..ad91ace2 100644 --- a/archivist/locations.py +++ b/archivist/locations.py @@ -22,18 +22,18 @@ """ +from __future__ import annotations from copy import deepcopy from logging import getLogger -from typing import Dict, Optional, Tuple +from typing import Any, Optional, Tuple # pylint:disable=cyclic-import # but pylint doesn't understand this feature -from . import archivist as type_helper # pylint:disable=unused-import +from . import archivist from .constants import LOCATIONS_SUBPATH, LOCATIONS_LABEL from .dictmerge import _deepmerge from .errors import ArchivistNotFoundError from .utils import selector_signature -from .type_aliases import NoneOnError LOGGER = getLogger(__name__) @@ -47,7 +47,7 @@ class Location(dict): """ @property - def name(self) -> NoneOnError[str]: + def name(self) -> str | None: """str: name of the location""" name = None try: @@ -69,15 +69,17 @@ class _LocationsClient: """ - def __init__(self, archivist: "type_helper.Archivist"): - self._archivist = archivist - self._subpath = f"{archivist.root}/{LOCATIONS_SUBPATH}" + def __init__(self, archivist_instance: archivist.Archivist): + self._archivist = archivist_instance + self._subpath = f"{archivist_instance.root}/{LOCATIONS_SUBPATH}" self._label = f"{self._subpath}/{LOCATIONS_LABEL}" def __str__(self) -> str: return f"LocationsClient({self._archivist.url})" - def create(self, props: Dict, *, attrs: Optional[Dict] = None) -> Location: + def create( + self, props: dict[str, Any], *, attrs: Optional[dict[str, Any]] = None + ) -> Location: """Create location Creates location with defined properties and attributes. @@ -93,7 +95,7 @@ def create(self, props: Dict, *, attrs: Optional[Dict] = None) -> Location: LOGGER.debug("Create Location %s", props) return self.create_from_data(self.__params(props, attrs)) - def create_from_data(self, data: Dict) -> Location: + def create_from_data(self, data: dict[str, Any]) -> Location: """Create location Creates location with request body from data stream. @@ -108,7 +110,7 @@ def create_from_data(self, data: Dict) -> Location: """ return Location(**self._archivist.post(self._label, data)) - def create_if_not_exists(self, data: Dict) -> Tuple[Optional[Location], bool]: + def create_if_not_exists(self, data: dict[str, Any]) -> Tuple[Location, bool]: """ Create a location if not already exists @@ -137,7 +139,7 @@ def create_if_not_exists(self, data: Dict) -> Tuple[Optional[Location], bool]: tuple of :class:`Location` instance, Boolean True if already exists """ - location = None + data = deepcopy(data) selector = data.pop("selector") # must exist props, attrs = selector_signature(selector, data) @@ -169,7 +171,9 @@ def read(self, identity: str) -> Location: """ return Location(**self._archivist.get(f"{self._subpath}/{identity}")) - def __params(self, props: Optional[Dict], attrs: Optional[Dict]) -> Dict: + def __params( + self, props: Optional[dict[str, Any]], attrs: Optional[dict[str, Any]] + ) -> dict[str, Any]: params = deepcopy(props) if props else {} if attrs: params["attributes"] = attrs @@ -177,7 +181,10 @@ def __params(self, props: Optional[Dict], attrs: Optional[Dict]) -> Dict: return _deepmerge(self._archivist.fixtures.get(LOCATIONS_LABEL), params) def count( - self, *, props: Optional[Dict] = None, attrs: Optional[Dict] = None + self, + *, + props: Optional[dict[str, Any]] = None, + attrs: Optional[dict[str, Any]] = None, ) -> int: """Count locations. @@ -197,8 +204,8 @@ def list( self, *, page_size: Optional[int] = None, - props: Optional[Dict] = None, - attrs: Optional[Dict] = None, + props: Optional[dict[str, Any]] = None, + attrs: Optional[dict[str, Any]] = None, ): """List locations. @@ -225,7 +232,10 @@ def list( ) def read_by_signature( - self, *, props: Optional[Dict] = None, attrs: Optional[Dict] = None + self, + *, + props: Optional[dict[str, Any]] = None, + attrs: Optional[dict[str, Any]] = None, ) -> Location: """Read location by signature. diff --git a/archivist/or_dict.py b/archivist/or_dict.py index 3c447ada..22af7f73 100644 --- a/archivist/or_dict.py +++ b/archivist/or_dict.py @@ -5,12 +5,15 @@ Dictionaries where key is always 'or' and value is a list of strings """ +from __future__ import annotations +from typing import Any -def or_dict(list_): + +def or_dict(list_: list) -> dict[str, list]: """Construct a dictionary with key 'or'""" return {"or": list_} -def and_list(lists): +def and_list(lists: list) -> list[dict[str, list[Any]]]: """Construct a list of or dictionaries""" return [or_dict(j) for j in lists] diff --git a/archivist/parser.py b/archivist/parser.py index 25998d65..be77d027 100644 --- a/archivist/parser.py +++ b/archivist/parser.py @@ -50,7 +50,7 @@ def __init__(self, **kwargs): def __call__(self, parser, namespace, values, option_string=None): # Convert value back into an Enum - value = self._enum[values] + value = self._enum[values] # type: ignore setattr(namespace, self.dest, value) diff --git a/archivist/publisher.py b/archivist/publisher.py index b73f7769..536bab0f 100644 --- a/archivist/publisher.py +++ b/archivist/publisher.py @@ -3,12 +3,14 @@ Wrap base methods with constants for assets (path, etc... """ +from __future__ import annotations from logging import getLogger import backoff from .utils import backoff_handler from .errors import ArchivistUnpublishedError +from . import sboms # pylint:disable=cyclic-import # but pylint doesn't understand this feature @@ -32,16 +34,16 @@ def __on_giveup_publication(details): @backoff.on_predicate( backoff.expo, - logger=None, + logger=None, # type: ignore max_time=__lookup_max_time, on_backoff=backoff_handler, on_giveup=__on_giveup_publication, ) -def _wait_for_publication(self, identity): +def _wait_for_publication(self: sboms._SBOMSClient, identity: str) -> sboms.SBOM: """Return None until published date is set""" entity = self.read(identity) if entity.published_date: return entity - return None + return None # type: ignore diff --git a/archivist/runner.py b/archivist/runner.py index fe7bab00..bc2c7ca6 100644 --- a/archivist/runner.py +++ b/archivist/runner.py @@ -3,19 +3,20 @@ """ +from __future__ import annotations from collections import defaultdict from functools import partialmethod from json import dumps as json_dumps from logging import getLogger from time import sleep as time_sleep from types import GeneratorType -from typing import Dict, Optional, Tuple +from typing import Any, Callable, Optional, Tuple from uuid import UUID # pylint:disable=cyclic-import # but pylint doesn't understand this feature # pylint:disable=missing-function-docstring # pylint:disable=protected-access -from . import archivist as type_helper # pylint:disable=unused-import +from . import archivist from .errors import ArchivistError, ArchivistInvalidOperationError LOGGER = getLogger(__name__) @@ -46,67 +47,67 @@ class _ActionMap(dict): # a dictionary # similarly for location and subjects labels # - def __init__(self, archivist: "type_helper.Archivist"): + def __init__(self, archivist_instance: archivist.Archivist): super().__init__() - self.archivist = archivist + self._archivist = archivist_instance # please keep in alphabetical order self["ASSETS_ATTACHMENT_INFO"] = { - "action": archivist.attachments.info, + "action": self._archivist.attachments.info, "use_asset_label": "add_arg_identity", } self["ASSETS_COUNT"] = { - "action": archivist.assets.count, + "action": self._archivist.assets.count, "keywords": ( "props", "attrs", ), } self["ASSETS_CREATE_IF_NOT_EXISTS"] = { - "action": archivist.assets.create_if_not_exists, + "action": self._archivist.assets.create_if_not_exists, "keywords": ("confirm",), "set_asset_label": True, "use_location_label": "add_data_location_identity", } self["ASSETS_CREATE"] = { - "action": archivist.assets.create_from_data, + "action": self._archivist.assets.create_from_data, "keywords": ("confirm",), "set_asset_label": True, } self["ASSETS_LIST"] = { - "action": archivist.assets.list, + "action": self._archivist.assets.list, "keywords": ( "props", "attrs", ), } self["ASSETS_WAIT_FOR_CONFIRMED"] = { - "action": archivist.assets.wait_for_confirmed, + "action": self._archivist.assets.wait_for_confirmed, "keywords": ( "props", "attrs", ), } self["COMPOSITE_ESTATE_INFO"] = { - "action": archivist.composite.estate_info, + "action": self._archivist.composite.estate_info, } self["COMPLIANCE_POLICIES_CREATE"] = { - "action": archivist.compliance_policies.create_from_data, - "delete": archivist.compliance_policies.delete, + "action": self._archivist.compliance_policies.create_from_data, + "delete": self._archivist.compliance_policies.delete, } self["COMPLIANCE_COMPLIANT_AT"] = { - "action": archivist.compliance.compliant_at, + "action": self._archivist.compliance.compliant_at, "keywords": ("report",), "use_asset_label": "add_arg_identity", } self["EVENTS_CREATE"] = { - "action": archivist.events.create_from_data, + "action": self._archivist.events.create_from_data, "keywords": ("confirm",), "use_asset_label": "add_arg_identity", "use_location_label": "add_data_location_identity", } self["EVENTS_COUNT"] = { - "action": archivist.events.count, + "action": self._archivist.events.count, "keywords": ( "asset_id", "props", @@ -116,7 +117,7 @@ def __init__(self, archivist: "type_helper.Archivist"): "use_asset_label": "add_kwarg_asset_identity", } self["EVENTS_LIST"] = { - "action": archivist.events.list, + "action": self._archivist.events.list, "keywords": ( "asset_id", "props", @@ -126,56 +127,56 @@ def __init__(self, archivist: "type_helper.Archivist"): "use_asset_label": "add_kwarg_asset_identity", } self["LOCATIONS_COUNT"] = { - "action": archivist.locations.count, + "action": self._archivist.locations.count, "keywords": ( "props", "attrs", ), } self["LOCATIONS_CREATE_IF_NOT_EXISTS"] = { - "action": archivist.locations.create_if_not_exists, + "action": self._archivist.locations.create_if_not_exists, "keywords": ("confirm",), "set_location_label": True, } self["LOCATIONS_LIST"] = { - "action": archivist.locations.list, + "action": self._archivist.locations.list, "keywords": ( "props", "attrs", ), } self["LOCATIONS_READ"] = { - "action": archivist.locations.read, + "action": self._archivist.locations.read, "use_location_label": "add_arg_identity", } self["SUBJECTS_COUNT"] = { - "action": archivist.subjects.count, + "action": self._archivist.subjects.count, "keywords": ("display_name",), } self["SUBJECTS_CREATE"] = { - "action": archivist.subjects.create_from_data, - "delete": archivist.subjects.delete, + "action": self._archivist.subjects.create_from_data, + "delete": self._archivist.subjects.delete, "set_subject_label": True, } self["SUBJECTS_CREATE_FROM_B64"] = { - "action": archivist.subjects.create_from_b64, - "delete": archivist.subjects.delete, + "action": self._archivist.subjects.create_from_b64, + "delete": self._archivist.subjects.delete, "set_subject_label": True, } self["SUBJECTS_DELETE"] = { - "action": archivist.subjects.delete, + "action": self._archivist.subjects.delete, "use_subject_label": "add_arg_identity", } self["SUBJECTS_LIST"] = { - "action": archivist.subjects.list, + "action": self._archivist.subjects.list, "keywords": ("display_name",), } self["SUBJECTS_READ"] = { - "action": archivist.subjects.read, + "action": self._archivist.subjects.read, "use_subject_label": "add_arg_identity", } self["SUBJECTS_UPDATE"] = { - "action": archivist.subjects.update, + "action": self._archivist.subjects.update, "keywords": ( "display_name", "wallet_pub_key", @@ -184,11 +185,11 @@ def __init__(self, archivist: "type_helper.Archivist"): "use_subject_label": "add_arg_identity", } self["SUBJECTS_WAIT_FOR_CONFIRMATION"] = { - "action": archivist.subjects.wait_for_confirmation, + "action": self._archivist.subjects.wait_for_confirmation, "use_subject_label": "add_arg_identity", } - def ops(self, action_name: str) -> Dict: + def ops(self, action_name: str) -> dict[str, Any]: """ Get valid entry in map """ @@ -197,14 +198,14 @@ def ops(self, action_name: str) -> Dict: raise ArchivistInvalidOperationError(f"Illegal Action '{action_name}'") return ops - def action(self, action_name: str) -> Dict: + def action(self, action_name: str) -> Callable: """ Get valid action in map """ # if an exception occurs here then the dict initialised above is faulty. - return self.ops(action_name).get("action") + return self.ops(action_name).get("action") # type: ignore - def keywords(self, action_name: str) -> Tuple: + def keywords(self, action_name: str) -> Tuple | None: """ Get keywords in map """ @@ -224,15 +225,15 @@ def label(self, noun: str, endpoint: str, action_name: str) -> bool: class _Step(dict): # pylint:disable=too-many-instance-attributes - def __init__(self, archivist: "type_helper.Archivist", **kwargs): + def __init__(self, archivist_instance: archivist.Archivist, **kwargs): super().__init__(**kwargs) - self._archivist = archivist - self._args = None - self._kwargs = None + self._archivist = archivist_instance + self._args: list[Any] = [] + self._kwargs: dict[str, Any] = {} self._actions = None self._action = None self._action_name = None - self._data = None + self._data = {} self._keywords = None self._delete_method = None self._labels = {} @@ -313,10 +314,10 @@ def label(self, verb: str, noun: str): def identity_from_label(self, noun, identity_method): label = self.get(f"{noun}_label") - if not label.startswith(f"{noun}s/"): + if not label.startswith(f"{noun}s/"): # type: ignore return identity_method(label) - uid = label.split("/")[1] + uid = label.split("/")[1] # type: ignore try: _ = UUID(uid, version=4) except ValueError: @@ -357,7 +358,7 @@ def actions(self): return self._actions @property - def action(self): + def action(self) -> Callable: if self._action is None: self._action = self.actions.action(self.action_name) @@ -394,15 +395,15 @@ class _Runner: ArchivistRunner takes a url, token_file. """ - def __init__(self, archivist: "type_helper.Archivist"): - self._archivist = archivist - self.entities = None + def __init__(self, archivist_instance: archivist.Archivist): + self._archivist = archivist_instance + self.entities: defaultdict self.deletions = {} def __str__(self) -> str: return f"Runner({self._archivist.url})" - def __call__(self, config: Dict): + def __call__(self, config: dict[str, Any]): """ The dict config contains a list of `steps` to be performed serially, e.g. @@ -438,7 +439,7 @@ def __call__(self, config: Dict): except (ArchivistError, KeyError) as ex: LOGGER.info("Runner exception %s", ex) - def run_steps(self, config: Dict): + def run_steps(self, config: dict[str, Any]): """Runs all defined steps in self.config.""" self.entities = tree() for step in config["steps"]: @@ -447,7 +448,7 @@ def run_steps(self, config: Dict): self.delete() self._archivist.close() - def run_step(self, step: Dict): + def run_step(self, step: dict[str, Any]): """Runs a step given parameters and the type of step. Args: @@ -477,7 +478,7 @@ def run_step(self, step: Dict): if s.label("set", noun) and label is not None: self.entities[label] = response - def set_deletions(self, response: Dict, delete_method): + def set_deletions(self, response: dict[str, Any], delete_method): """sets entry to be deleted""" if delete_method is not None: diff --git a/archivist/sbommetadata.py b/archivist/sbommetadata.py index c83ba345..3ec86b56 100644 --- a/archivist/sbommetadata.py +++ b/archivist/sbommetadata.py @@ -5,10 +5,10 @@ # pylint:disable=too-few-public-methods # pylint:disable=too-many-instance-attributes +from __future__ import annotations from dataclasses import dataclass, asdict from inspect import signature as inspect_signature from logging import getLogger -from typing import List # NB: the order of the fields is important. Fields with default values must # appear after fields without. @@ -23,11 +23,11 @@ class SBOM: """ identity: str - authors: List[str] + authors: list[str] supplier: str component: str version: str - hashes: List[str] + hashes: list[str] unique_id: str upload_date: str uploaded_by: str diff --git a/archivist/sboms.py b/archivist/sboms.py index 213e8b50..687526de 100644 --- a/archivist/sboms.py +++ b/archivist/sboms.py @@ -24,7 +24,8 @@ # pylint:disable=too-few-public-methods -from typing import BinaryIO, Dict, Optional +from __future__ import annotations +from typing import BinaryIO, Optional, Any from copy import deepcopy from io import BytesIO from logging import getLogger @@ -33,7 +34,7 @@ from xmltodict import parse as xmltodict_parse # pylint:disable=cyclic-import # but pylint doesn't understand this feature -from . import archivist as type_helper # pylint:disable=unused-import +from . import archivist from .constants import ( SBOMS_SUBPATH, @@ -62,16 +63,16 @@ class _SBOMSClient: """ - def __init__(self, archivist: "type_helper.Archivist"): - self._archivist = archivist - self._subpath = f"{archivist.root}/{SBOMS_SUBPATH}" + def __init__(self, archivist_instance: archivist.Archivist): + self._archivist = archivist_instance + self._subpath = f"{archivist_instance.root}/{SBOMS_SUBPATH}" self._label = f"{self._subpath}/{SBOMS_LABEL}" def __str__(self) -> str: return f"SBOMSClient({self._archivist.url})" @staticmethod - def parse(data: Dict) -> Dict: # pragma: no cover + def parse(data: dict[str, Any]) -> dict[str, Any]: # pragma: no cover """ parse the sbom and extract pertinent information @@ -133,7 +134,7 @@ def parse(data: Dict) -> Dict: # pragma: no cover return result - def create(self, data: Dict) -> Dict: # pragma: no cover + def create(self, data: dict[str, Any]) -> dict[str, Any]: # pragma: no cover """ Create an sbom and return struct suitable for use in an asset or event creation. @@ -220,7 +221,7 @@ def upload( *, confirm: bool = True, mtype: Optional[str] = None, - params: Optional[Dict] = None, + params: Optional[dict[str, Any]] = None, ) -> SBOM: """Create SBOM @@ -269,7 +270,7 @@ def wait_for_uploading(self, identity: str) -> SBOM: """ uploader.MAX_TIME = self._archivist.max_time # pylint: disable=protected-access - return uploader._wait_for_uploading(self, identity) # type: ignore + return uploader._wait_for_uploading(self, identity) def download(self, identity: str, fd: BinaryIO) -> Response: """Read SBOM @@ -304,7 +305,7 @@ def read(self, identity: str) -> SBOM: self._archivist.get(f"{self._subpath}/{identity}/{SBOMS_METADATA}") ) - def __params(self, metadata: Optional[Dict]) -> Dict: + def __params(self, metadata: Optional[dict[str, Any]]) -> dict[str, Any]: params = deepcopy(metadata) if metadata else {} return _deepmerge(self._archivist.fixtures.get(SBOMS_LABEL), params) @@ -312,7 +313,7 @@ def list( self, *, page_size: Optional[int] = None, - metadata: Optional[Dict] = None, + metadata: Optional[dict[str, Any]] = None, ): """List SBOMS. @@ -384,7 +385,7 @@ def wait_for_publication(self, identity: str) -> SBOM: """ publisher.MAX_TIME = self._archivist.max_time # pylint: disable=protected-access - return publisher._wait_for_publication(self, identity) # type: ignore + return publisher._wait_for_publication(self, identity) def withdraw(self, identity: str, confirm: bool = True) -> SBOM: """Withdraw SBOM @@ -425,4 +426,4 @@ def wait_for_withdrawn(self, identity: str) -> SBOM: """ withdrawer.MAX_TIME = self._archivist.max_time # pylint: disable=protected-access - return withdrawer._wait_for_withdrawn(self, identity) # type: ignore + return withdrawer._wait_for_withdrawn(self, identity) diff --git a/archivist/subjects.py b/archivist/subjects.py index 943b00d6..c6bfc745 100644 --- a/archivist/subjects.py +++ b/archivist/subjects.py @@ -21,13 +21,14 @@ """ +from __future__ import annotations from base64 import b64decode from json import loads as json_loads from logging import getLogger -from typing import Dict, List, Optional +from typing import Any, Optional # pylint:disable=cyclic-import # but pylint doesn't understand this feature -from . import archivist as type_helper # pylint:disable=unused-import +from . import archivist from .constants import ( SUBJECTS_SUBPATH, @@ -57,16 +58,16 @@ class _SubjectsClient: maxDiff = None - def __init__(self, archivist: "type_helper.Archivist"): - self._archivist = archivist - self._subpath = f"{archivist.root}/{SUBJECTS_SUBPATH}" + def __init__(self, archivist_instance: archivist.Archivist): + self._archivist = archivist_instance + self._subpath = f"{archivist_instance.root}/{SUBJECTS_SUBPATH}" self._label = f"{self._subpath}/{SUBJECTS_LABEL}" def __str__(self) -> str: return f"SubjectsClient({self._archivist.url})" def create( - self, display_name: str, wallet_pub_key: List, tessera_pub_key: List + self, display_name: str, wallet_pub_key: list[str], tessera_pub_key: list[str] ) -> Subject: """Create subject @@ -108,7 +109,7 @@ def import_subject(self, name: str, subject: Subject) -> Subject: subject["tessera_pub_key"], ) - def create_from_data(self, data: Dict) -> Subject: + def create_from_data(self, data: dict[str, Any]) -> Subject: """Create subject Creates subject with request body from data stream. @@ -124,7 +125,7 @@ def create_from_data(self, data: Dict) -> Subject: LOGGER.debug("Create Subject from data %s", data) return Subject(**self._archivist.post(self._label, data)) - def create_from_b64(self, data: Dict) -> Subject: + def create_from_b64(self, data: dict[str, Any]) -> Subject: """Create subject Creates subject with request body from b64 encoded string @@ -189,9 +190,9 @@ def update( self, identity: str, *, - display_name: str = None, - wallet_pub_key: Optional[List[str]] = None, - tessera_pub_key: Optional[List[str]] = None, + display_name: Optional[str] = None, + wallet_pub_key: Optional[list[str]] = None, + tessera_pub_key: Optional[list[str]] = None, ) -> Subject: """Update Subject @@ -218,7 +219,7 @@ def update( ) ) - def delete(self, identity: str) -> Dict: + def delete(self, identity: str) -> dict[str, Any]: """Delete Subject Deletes subject. @@ -236,9 +237,9 @@ def __params( self, *, display_name: Optional[str] = None, - wallet_pub_key: Optional[List[str]] = None, - tessera_pub_key: Optional[List[str]] = None, - ) -> Dict: + wallet_pub_key: Optional[list[str]] = None, + tessera_pub_key: Optional[list[str]] = None, + ) -> dict[str, Any]: params = {} diff --git a/archivist/subjects_confirmer.py b/archivist/subjects_confirmer.py index a2824748..0eca258f 100644 --- a/archivist/subjects_confirmer.py +++ b/archivist/subjects_confirmer.py @@ -1,10 +1,12 @@ """assets confirmer interface """ +from __future__ import annotations from logging import getLogger - import backoff +from . import subjects + from .constants import ( CONFIRMATION_CONFIRMED, CONFIRMATION_STATUS, @@ -34,18 +36,20 @@ def __on_giveup_confirmation(details): @backoff.on_predicate( backoff.expo, - logger=None, + logger=None, # type: ignore max_time=__lookup_max_time, on_backoff=backoff_handler, on_giveup=__on_giveup_confirmation, ) -def _wait_for_confirmation(self, identity): +def _wait_for_confirmation( + self: subjects._SubjectsClient, identity: str +) -> subjects.Subject: """Return None until subjects is confirmed""" subject = self.read(identity) if CONFIRMATION_STATUS not in subject: - return None + return None # type: ignore if subject[CONFIRMATION_STATUS] == CONFIRMATION_CONFIRMED: return subject - return None + return None # type: ignore diff --git a/archivist/tenancies.py b/archivist/tenancies.py index ceb045c0..cd21a96b 100644 --- a/archivist/tenancies.py +++ b/archivist/tenancies.py @@ -21,10 +21,11 @@ """ +from __future__ import annotations from logging import getLogger # pylint:disable=cyclic-import # but pylint doesn't understand this feature -from . import archivist as type_helper # pylint:disable=unused-import +from . import archivist from .constants import ( TENANCIES_LABEL, @@ -53,9 +54,9 @@ class _TenanciesClient: maxDiff = None - def __init__(self, archivist: "type_helper.Archivist"): - self._archivist = archivist - self._subpath = f"{archivist.root}/{TENANCIES_SUBPATH}" + def __init__(self, archivist_instance: archivist.Archivist): + self._archivist = archivist_instance + self._subpath = f"{archivist_instance.root}/{TENANCIES_SUBPATH}" self._label = f"{self._subpath}/{TENANCIES_LABEL}" def __str__(self) -> str: diff --git a/archivist/timestamp.py b/archivist/timestamp.py index 7899ef8c..01395260 100644 --- a/archivist/timestamp.py +++ b/archivist/timestamp.py @@ -9,7 +9,7 @@ from rfc3339 import rfc3339 -def parse_timestamp(date_string): +def parse_timestamp(date_string: str): """Parse an Archivist timestamp to a datetime object See https://pypi.org/project/iso8601/ @@ -24,7 +24,7 @@ def parse_timestamp(date_string): return parse_date(date_string) -def make_timestamp(date_object): +def make_timestamp(date_object: datetime): """Format a datetime object into an Archivist format timestamp string See https://pypi.org/project/rfc3339/ diff --git a/archivist/type_aliases.py b/archivist/type_aliases.py deleted file mode 100644 index 3e84b33a..00000000 --- a/archivist/type_aliases.py +++ /dev/null @@ -1,15 +0,0 @@ -""" Type aliases used for identifying complex input and return types for RKVST - -TODO: Create more explicit type aliases, Current type aliases default to -Dict[Unknown, Unknown] and List[Unknown]. Minimising the potential effectiveness of -the type hinting. - -- Look at TypedDict for explicit formatting of dictionaries -""" - -from typing import Optional, Tuple - -NoneOnError = Optional -"""Means the function returns None if a Error occurs upstream""" - -MachineAuth = Tuple[str, str] diff --git a/archivist/uploader.py b/archivist/uploader.py index 0ce9f30f..1773966b 100644 --- a/archivist/uploader.py +++ b/archivist/uploader.py @@ -1,11 +1,12 @@ """uploader interface """ - +from __future__ import annotations from logging import getLogger import backoff from .errors import ArchivistNotFoundError +from . import sboms # pylint:disable=cyclic-import # but pylint doesn't understand this feature @@ -30,17 +31,17 @@ def __on_giveup_uploading(details): @backoff.on_predicate( backoff.expo, - logger=None, + logger=None, # type: ignore max_time=__lookup_max_time, on_backoff=backoff_handler, on_giveup=__on_giveup_uploading, ) -def _wait_for_uploading(self, identity): +def _wait_for_uploading(self: sboms._SBOMSClient, identity: str) -> sboms.SBOM: """Return None until identity is found""" try: LOGGER.debug("Uploader Read %s", identity) entity = self.read(identity) except ArchivistNotFoundError: - return None + return None # type: ignore return entity diff --git a/archivist/utils.py b/archivist/utils.py index 0da4051a..10d5bb4c 100644 --- a/archivist/utils.py +++ b/archivist/utils.py @@ -1,6 +1,7 @@ """Some convenience stuff """ +from __future__ import annotations from io import BytesIO from logging import getLogger from typing import Tuple @@ -85,7 +86,7 @@ def get_auth( return None -def selector_signature(selector: list, data: dict) -> Tuple[dict, dict]: +def selector_signature(selector: list, data: dict) -> Tuple[dict, dict | None]: """ Convert a selector to a signature for list and count methods diff --git a/archivist/withdrawer.py b/archivist/withdrawer.py index 204226eb..3440b70c 100644 --- a/archivist/withdrawer.py +++ b/archivist/withdrawer.py @@ -3,12 +3,14 @@ Wrap base methods with constants for assets (path, etc... """ +from __future__ import annotations from logging import getLogger import backoff from .utils import backoff_handler from .errors import ArchivistUnwithdrawnError +from . import sboms # pylint:disable=cyclic-import # but pylint doesn't understand this feature @@ -32,16 +34,16 @@ def __on_giveup_withdrawn(details): @backoff.on_predicate( backoff.expo, - logger=None, + logger=None, # type: ignore max_time=__lookup_max_time, on_backoff=backoff_handler, on_giveup=__on_giveup_withdrawn, ) -def _wait_for_withdrawn(self, identity): +def _wait_for_withdrawn(self: sboms._SBOMSClient, identity: str) -> sboms.SBOM: """Return None until withdrawn date is set""" entity = self.read(identity) if entity.withdrawn_date: return entity - return None + return None # type: ignore diff --git a/requirements-dev.txt b/requirements-dev.txt index 18c87d96..0d529b71 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -7,6 +7,7 @@ coverage~=6.3 pip-audit~=2.0 pycodestyle~=2.8 pylint~=2.14 +pyright~=1.1.263 # uploading to pypi build~=0.7.0