diff --git a/python/common/business-registry-digital-credentials/poetry.lock b/python/common/business-registry-digital-credentials/poetry.lock index acc9c06200..a25ab37650 100644 --- a/python/common/business-registry-digital-credentials/poetry.lock +++ b/python/common/business-registry-digital-credentials/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.3.2 and should not be changed by hand. [[package]] name = "alembic" @@ -15,7 +15,6 @@ files = [ [package.dependencies] Mako = "*" SQLAlchemy = ">=1.4.0" -tomli = {version = "*", markers = "python_version < \"3.11\""} typing-extensions = ">=4.12" [package.extras] @@ -138,8 +137,6 @@ mypy-extensions = ">=0.4.3" packaging = ">=22.0" pathspec = ">=0.9.0" platformdirs = ">=2" -tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""} -typing-extensions = {version = ">=4.0.1", markers = "python_version < \"3.11\""} [package.extras] colorama = ["colorama (>=0.4.3)"] @@ -191,6 +188,29 @@ reference = "main" resolved_reference = "6aad13126a3c8c7e8f301a2c49aa7918607fce0e" subdirectory = "python/common/business-registry-model" +[[package]] +name = "business-registry-common" +version = "0.1.1" +description = "" +optional = false +python-versions = ">=3.13,<3.14" +groups = ["main"] +files = [] +develop = false + +[package.dependencies] +datedelta = ">=1.4,<2.0" +flask = ">=3.1.0,<4.0.0" +python-dateutil = ">=2.9.0,<3.0.0" +pytz = ">=2025.1,<2026.0" + +[package.source] +type = "git" +url = "https://github.com/bcgov/lear.git" +reference = "main" +resolved_reference = "ab74265227677d93896a8d9c85dd5bae6c617873" +subdirectory = "python/common/business-registry-common" + [[package]] name = "cachelib" version = "0.13.0" @@ -503,9 +523,6 @@ files = [ {file = "coverage-7.9.2.tar.gz", hash = "sha256:997024fa51e3290264ffd7492ec97d0690293ccd2b45a6cd7d82d945a4a80c8b"}, ] -[package.dependencies] -tomli = {version = "*", optional = true, markers = "python_full_version <= \"3.11.0a6\" and extra == \"toml\""} - [package.extras] toml = ["tomli ; python_full_version <= \"3.11.0a6\""] @@ -601,25 +618,6 @@ files = [ [package.dependencies] python-dotenv = "*" -[[package]] -name = "exceptiongroup" -version = "1.3.0" -description = "Backport of PEP 654 (exception groups)" -optional = false -python-versions = ">=3.7" -groups = ["dev"] -markers = "python_version < \"3.11\"" -files = [ - {file = "exceptiongroup-1.3.0-py3-none-any.whl", hash = "sha256:4d111e6e0c13d0644cad6ddaa7ed0261a0b36971f6d23e7ec9b4b9097da78a10"}, - {file = "exceptiongroup-1.3.0.tar.gz", hash = "sha256:b241f5885f560bc56a59ee63ca4c6a8bfa46ae4ad651af316d4e81817bb9fd88"}, -] - -[package.dependencies] -typing-extensions = {version = ">=4.6.0", markers = "python_version < \"3.13\""} - -[package.extras] -test = ["pytest (>=6)"] - [[package]] name = "flake8" version = "7.3.0" @@ -668,7 +666,6 @@ files = [ [package.dependencies] blinker = ">=1.9.0" click = ">=8.1.3" -importlib-metadata = {version = ">=3.6.0", markers = "python_version < \"3.10\""} itsdangerous = ">=2.2.0" jinja2 = ">=3.1.2" markupsafe = ">=2.1.1" @@ -787,7 +784,7 @@ description = "Lightweight in-process concurrent programming" optional = false python-versions = ">=3.9" groups = ["main"] -markers = "python_version < \"3.14\" and (platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\")" +markers = "platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\"" files = [ {file = "greenlet-3.2.3-cp310-cp310-macosx_11_0_universal2.whl", hash = "sha256:1afd685acd5597349ee6d7a88a8bec83ce13c106ac78c196ee9dde7c04fe87be"}, {file = "greenlet-3.2.3-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:761917cac215c61e9dc7324b2606107b3b292a8349bdebb31503ab4de3f559ac"}, @@ -864,31 +861,6 @@ files = [ [package.extras] all = ["flake8 (>=7.1.1)", "mypy (>=1.11.2)", "pytest (>=8.3.2)", "ruff (>=0.6.2)"] -[[package]] -name = "importlib-metadata" -version = "8.7.0" -description = "Read metadata from Python packages" -optional = false -python-versions = ">=3.9" -groups = ["main"] -markers = "python_version == \"3.9\"" -files = [ - {file = "importlib_metadata-8.7.0-py3-none-any.whl", hash = "sha256:e5dd1551894c77868a30651cef00984d50e1002d06942a7101d34870c5f02afd"}, - {file = "importlib_metadata-8.7.0.tar.gz", hash = "sha256:d13b81ad223b890aa16c5471f2ac3056cf76c5f10f82d6f9292f0b415f389000"}, -] - -[package.dependencies] -zipp = ">=3.20" - -[package.extras] -check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\""] -cover = ["pytest-cov"] -doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] -enabler = ["pytest-enabler (>=2.2)"] -perf = ["ipython"] -test = ["flufl.flake8", "importlib_resources (>=1.3) ; python_version < \"3.9\"", "jaraco.test (>=5.4)", "packaging", "pyfakefs", "pytest (>=6,!=8.1.*)", "pytest-perf (>=0.9.2)"] -type = ["pytest-mypy"] - [[package]] name = "iniconfig" version = "2.1.0" @@ -991,7 +963,7 @@ fqdn = {version = "*", optional = true, markers = "extra == \"format\""} idna = {version = "*", optional = true, markers = "extra == \"format\""} isoduration = {version = "*", optional = true, markers = "extra == \"format\""} jsonpointer = {version = ">1.13", optional = true, markers = "extra == \"format\""} -jsonschema-specifications = ">=2023.03.6" +jsonschema-specifications = ">=2023.3.6" referencing = ">=0.28.4" rfc3339-validator = {version = "*", optional = true, markers = "extra == \"format\""} rfc3987 = {version = "*", optional = true, markers = "extra == \"format\""} @@ -1436,12 +1408,10 @@ files = [ [package.dependencies] colorama = {version = ">=0.4", markers = "sys_platform == \"win32\""} -exceptiongroup = {version = ">=1", markers = "python_version < \"3.11\""} iniconfig = ">=1" packaging = ">=20" pluggy = ">=1.5,<2" pygments = ">=2.7.2" -tomli = {version = ">=1", markers = "python_version < \"3.11\""} [package.extras] dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "requests", "setuptools", "xmlschema"] @@ -1541,7 +1511,6 @@ files = [ [package.dependencies] attrs = ">=22.2.0" rpds-py = ">=0.7.0" -typing-extensions = {version = ">=4.4.0", markers = "python_version < \"3.13\""} [[package]] name = "registry_schemas" @@ -1950,7 +1919,7 @@ version = "2.2.1" description = "A lil' TOML parser" optional = false python-versions = ">=3.8" -groups = ["main", "dev"] +groups = ["main"] files = [ {file = "tomli-2.2.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:678e4fa69e4575eb77d103de3df8a895e1591b48e740211bd1067378c69e8249"}, {file = "tomli-2.2.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:023aa114dd824ade0100497eb2318602af309e5a55595f76b626d6d9f3b7b0a6"}, @@ -1985,7 +1954,6 @@ files = [ {file = "tomli-2.2.1-py3-none-any.whl", hash = "sha256:cb55c73c5f4408779d0cf3eef9f762b9c9f147a77de7b258bef0a5628adc85cc"}, {file = "tomli-2.2.1.tar.gz", hash = "sha256:cd45e1dc79c835ce60f7404ec8119f2eb06d38b1deba146f07ced3bbc44505ff"}, ] -markers = {dev = "python_full_version <= \"3.11.0a6\""} [[package]] name = "types-python-dateutil" @@ -2005,12 +1973,11 @@ version = "4.14.1" description = "Backported and Experimental Type Hints for Python 3.9+" optional = false python-versions = ">=3.9" -groups = ["main", "dev"] +groups = ["main"] files = [ {file = "typing_extensions-4.14.1-py3-none-any.whl", hash = "sha256:d1e1e3b58374dc93031d6eda2420a48ea44a36c2b4766a4fdeb3710755731d76"}, {file = "typing_extensions-4.14.1.tar.gz", hash = "sha256:38b39f4aeeab64884ce9f74c94263ef78f3c22467c8724005483154c26648d36"}, ] -markers = {dev = "python_version < \"3.11\""} [[package]] name = "typing-inspection" @@ -2104,32 +2071,10 @@ files = [ [package.dependencies] flake8-import-order = "*" -importlib-metadata = {version = "*", markers = "python_version < \"3.10\""} pyflakes = "*" tomli = "*" -[[package]] -name = "zipp" -version = "3.23.0" -description = "Backport of pathlib-compatible object wrapper for zip files" -optional = false -python-versions = ">=3.9" -groups = ["main"] -markers = "python_version == \"3.9\"" -files = [ - {file = "zipp-3.23.0-py3-none-any.whl", hash = "sha256:071652d6115ed432f5ce1d34c336c0adfd6a884660d1e9712a256d3d3bd4b14e"}, - {file = "zipp-3.23.0.tar.gz", hash = "sha256:a07157588a12518c9d4034df3fbbee09c814741a33ff63c05fa29d26a2404166"}, -] - -[package.extras] -check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\""] -cover = ["pytest-cov"] -doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] -enabler = ["pytest-enabler (>=2.2)"] -test = ["big-O", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more_itertools", "pytest (>=6,!=8.1.*)", "pytest-ignore-flaky"] -type = ["pytest-mypy"] - [metadata] lock-version = "2.1" -python-versions = ">=3.9,<4.0" -content-hash = "56bad40f11b7a6fe6ba513d24284ae00ff27dd57fa2adcc6984f08c67542554d" +python-versions = ">=3.13,<3.14" +content-hash = "59dd9b4a3a1e66ff2cc903c4e10be843f27e45567f11c6ce256f59a09c93c358" diff --git a/python/common/business-registry-digital-credentials/pyproject.toml b/python/common/business-registry-digital-credentials/pyproject.toml index ffa78acc12..443f58fe43 100644 --- a/python/common/business-registry-digital-credentials/pyproject.toml +++ b/python/common/business-registry-digital-credentials/pyproject.toml @@ -1,19 +1,20 @@ [tool.poetry] name = "business-registry-digital-credentials" -version = "0.1.1" +version = "0.2.0" description = "" authors = ["Lucas O'Neil "] readme = "README.md" packages = [{ include = "business_registry_digital_credentials", from = "src" }] [tool.poetry.dependencies] -python = ">=3.9,<4.0" +python = ">=3.13,<3.14" datedelta = "^1.4" flask-jwt-oidc = "^0.8.0" pytz = "^2025.1" pyjwt = "^2.8.0" requests = "^2.32.3" business-model = { git = "https://github.com/bcgov/lear.git", subdirectory = "python/common/business-registry-model", branch = "main" } +business-registry-common = { git = "https://github.com/bcgov/lear.git", subdirectory = "python/common/business-registry-common", branch = "main" } [tool.poetry.group.dev.dependencies] pytest = "^8.3.5" @@ -60,7 +61,7 @@ docstring-min-length = 10 count = true [tool.black] -target-version = ["py39", "py310", "py311", "py312"] +target-version = ["py313"] line-length = 120 [tool.isort] diff --git a/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/__init__.py b/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/__init__.py index e49c92042d..5df35db867 100644 --- a/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/__init__.py +++ b/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/__init__.py @@ -11,7 +11,34 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Digital Business Card (DBC) shared package. + +Centralizes the Traction REST client, access-rules engine, credential helpers, +and credential-lifecycle DB wrappers used by both legal-api and the +business-digital-credentials queue service. +""" + +# Import the service class and create the module-level singleton FIRST. +# Modules imported below (e.g. ``digital_credentials_lifecycle``) reference this +# attribute at top-level, so it must exist before they are imported. from .digital_credentials import DigitalCredentialsService -"""This module holds general utility functions and helpers for the main package.""" digital_credentials = DigitalCredentialsService() + +from .digital_credentials_auth import ( # noqa: E402 + are_digital_credentials_allowed, + get_digital_credentials_preconditions, +) +from .digital_credentials_helpers import ( # noqa: E402 + extract_invitation_message_id, + get_digital_credential_data, + get_or_create_business_user, + get_roles, +) +from .digital_credentials_lifecycle import ( # noqa: E402 + get_all_digital_credentials_for_business, + issue_digital_credential, + replace_digital_credential, + revoke_digital_credential, +) +from .digital_credentials_rules import DigitalCredentialsRulesService # noqa: E402 diff --git a/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/config.py b/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/config.py index e7c5a2f009..b1d7e03c7c 100644 --- a/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/config.py +++ b/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/config.py @@ -24,7 +24,6 @@ from dotenv import find_dotenv, load_dotenv - # this will load all the envars from a .env file located in the project root (api) load_dotenv(find_dotenv()) diff --git a/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/decorators.py b/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/decorators.py index 8fa3989a72..a52accba22 100644 --- a/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/decorators.py +++ b/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/decorators.py @@ -15,27 +15,18 @@ import json import time -from datetime import datetime +from datetime import UTC, datetime from functools import wraps from http import HTTPStatus import jwt as pyjwt import requests -from flask import current_app, jsonify -from flask_jwt_oidc import JwtManager +from flask import current_app from jwt import ExpiredSignatureError -from business_model.models import Business -from .digital_credentials_auth import are_digital_credentials_allowed - -# from legal_api.utils.auth import jwt - - MAX_RETRIES = 5 # Number of times to retry getting the token TOKEN_RETRY_WAIT = 2 # Delay in seconds between retries -jwt = JwtManager() - def _get_traction_token(): """Get a traction token and check if it is valid.""" @@ -65,7 +56,7 @@ def _get_traction_token(): # Use the token to check if it is valid by calling the tenant endpoint check_response = requests.get(f"{traction_api_url}/tenant", headers={"Authorization": f"Bearer {token}"}) - if check_response.status_code == 401: + if check_response.status_code == HTTPStatus.UNAUTHORIZED: current_app.logger.warning(f"Attempt {attempt + 1}: Received 401 checking token. Retry.") time.sleep(TOKEN_RETRY_WAIT) continue @@ -97,7 +88,7 @@ def decorated_function(*args, **kwargs): if not (decoded := pyjwt.decode(current_app.api_token, options={"verify_signature": False})): raise pyjwt.ExpiredSignatureError - if datetime.utcfromtimestamp(decoded["exp"]) <= datetime.utcnow(): + if datetime.fromtimestamp(decoded["exp"], UTC) <= datetime.now(UTC): raise pyjwt.ExpiredSignatureError except ExpiredSignatureError: current_app.logger.info("Traction token expired or is missing, requesting new token") @@ -106,21 +97,3 @@ def decorated_function(*args, **kwargs): return f(*args, **kwargs) return decorated_function - - -def can_access_digital_credentials(f): - """Ensure the business can has access to digital credentials.""" - - @wraps(f) - def decorated_function(*args, **kwargs): - identifier = kwargs.get("identifier", None) - - if not (business := Business.find_by_identifier(identifier)): - return jsonify({"message": f"{identifier} not found."}), HTTPStatus.NOT_FOUND - - if not are_digital_credentials_allowed(business, jwt): - return jsonify({"message": f"digital credential not available for: {identifier}."}), HTTPStatus.UNAUTHORIZED - - return f(*args, **kwargs) - - return decorated_function diff --git a/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials.py b/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials.py index 7f2bf42d7d..7742785fd0 100644 --- a/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials.py +++ b/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials.py @@ -19,12 +19,11 @@ import secrets from contextlib import suppress from datetime import datetime -from typing import Optional import requests +from business_model.models import DCDefinition, DCRevocationReason from .decorators import requires_traction_auth -from business_model.models import DCDefinition, DCRevocationReason class DigitalCredentialsService: @@ -88,14 +87,14 @@ def _register_business_definition(self): # Look for a schema first, and copy it into the Traction tenant if it's not there if not (schema_id := self._fetch_schema(self.business_schema_id)): raise ValueError( - f"Schema with id:{self.business_schema_id}" + " must be available in Traction tenant storage" + f"Schema with id:{self.business_schema_id} must be available in Traction tenant storage" ) # Look for a published credential definition first, and copy it into the Traction tenant if it's not there if not (credential_definition_id := self._fetch_credential_definition(self.business_cred_def_id)): raise ValueError( f"Credential Definition with id: {self.business_cred_def_id}" - + " must be available in Traction tenant storage" + " must be available in Traction tenant storage" ) # Check for the current Business definition. @@ -123,7 +122,7 @@ def _register_business_definition(self): return None @requires_traction_auth - def _fetch_schema(self, schema_id: str) -> Optional[str]: + def _fetch_schema(self, schema_id: str) -> str | None: """Find a schema in Traction storage.""" try: response = requests.get(self.api_url + f"/schemas/{schema_id}", headers=self._get_headers()) @@ -135,7 +134,7 @@ def _fetch_schema(self, schema_id: str) -> Optional[str]: raise err @requires_traction_auth - def _fetch_credential_definition(self, cred_def_id: str) -> Optional[str]: + def _fetch_credential_definition(self, cred_def_id: str) -> str | None: """Find a published credential definition.""" try: response = requests.get( @@ -145,13 +144,13 @@ def _fetch_credential_definition(self, cred_def_id: str) -> Optional[str]: return response.json().get("credential_definition", None).get("id", None) except Exception as err: self.app.logger.error( - f"Failed to find credential definition with id: {cred_def_id}" + " from Traction tenant storage" + f"Failed to find credential definition with id: {cred_def_id} from Traction tenant storage" ) self.app.logger.error(err) raise err @requires_traction_auth - def create_invitation(self) -> Optional[dict]: + def create_invitation(self) -> dict | None: """Create a new connection invitation.""" try: response = requests.post( @@ -173,7 +172,7 @@ def create_invitation(self) -> Optional[dict]: return None @requires_traction_auth - def attest_connection(self, connection_id: str) -> Optional[dict]: + def attest_connection(self, connection_id: str) -> dict | None: """Perform an attestation to ensure that interactions only happen with connections on a trusted app.""" try: current_timestamp = int(datetime.now().timestamp()) @@ -224,7 +223,7 @@ def issue_credential( definition: DCDefinition, data: list, # list of { 'name': 'business_name', 'value': 'test_business' } comment: str = "", - ) -> Optional[dict]: + ) -> dict | None: """Send holder a credential, automating entire flow.""" try: response = requests.post( @@ -257,7 +256,7 @@ def issue_credential( return None @requires_traction_auth - def fetch_credential_exchange_record(self, cred_ex_id: str) -> Optional[dict]: + def fetch_credential_exchange_record(self, cred_ex_id: str) -> dict | None: """Fetch a credential exchange record.""" try: response = requests.get( @@ -272,7 +271,7 @@ def fetch_credential_exchange_record(self, cred_ex_id: str) -> Optional[dict]: @requires_traction_auth def revoke_credential( self, connection_id, cred_rev_id: str, rev_reg_id: str, reason: DCRevocationReason - ) -> Optional[dict]: + ) -> dict | None: """Revoke a credential.""" try: response = requests.post( @@ -297,7 +296,7 @@ def revoke_credential( return None @requires_traction_auth - def remove_connection_record(self, connection_id: str) -> Optional[dict]: + def remove_connection_record(self, connection_id: str) -> dict | None: """Delete a connection.""" try: response = requests.delete(self.api_url + "/connections/" + connection_id, headers=self._get_headers()) @@ -308,7 +307,7 @@ def remove_connection_record(self, connection_id: str) -> Optional[dict]: return None @requires_traction_auth - def remove_credential_exchange_record(self, cred_ex_id: str) -> Optional[dict]: + def remove_credential_exchange_record(self, cred_ex_id: str) -> dict | None: """Delete a credential exchange.""" try: response = requests.delete( diff --git a/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials_auth.py b/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials_auth.py index 6de895cb7c..3727d27f7a 100644 --- a/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials_auth.py +++ b/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials_auth.py @@ -12,24 +12,31 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""This provides auth functions for digital credentials.""" +"""This provides auth functions for digital credentials. -from typing import Dict, List +Both functions here require Flask request context (``g`` + ``request_ctx``). +The queue service never reaches this path — DBC access enforcement is a +legal-api / HTTP concern. +""" +from business_model.models.business import Business +from business_model.models.user import User from flask import g +from flask.globals import request_ctx from flask_jwt_oidc import JwtManager -from business_model.models.business import Business -from business_model.models.user import User from .digital_credentials_rules import DigitalCredentialsRulesService - STAFF_ROLE = "staff" -def are_digital_credentials_allowed(business: Business, jwt: JwtManager) -> bool: - """Return True if the business is allowed to have/view digital credentials.""" - is_staff = jwt.contains_role([STAFF_ROLE]) +def are_digital_credentials_allowed(business: Business, jwt: JwtManager, allowed_business_types: list[str]) -> bool: + """Return True if the business is allowed to have/view digital credentials. + + ``allowed_business_types`` is the feature-flag-resolved list (required — pass ``[]`` + to deny). The caller (legal-api) loads the LaunchDarkly flag and forwards the value. + """ + is_staff = jwt.contains_role(request_ctx.current_user, [STAFF_ROLE]) if is_staff: # Staff do not have digital credentials return False @@ -38,10 +45,10 @@ def are_digital_credentials_allowed(business: Business, jwt: JwtManager) -> bool return False rules = DigitalCredentialsRulesService() - return rules.are_digital_credentials_allowed(user, business) + return rules.are_digital_credentials_allowed(user, business, allowed_business_types) -def get_digital_credentials_preconditions(business: Business) -> Dict[str, List[str]]: +def get_digital_credentials_preconditions(business: Business) -> dict[str, list[str]]: """Return the preconditions for digital credentials.""" if not (user := User.find_by_jwt_token(g.jwt_oidc_token_info)): return {} diff --git a/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials_helpers.py b/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials_helpers.py index d7bdb2a1ab..4692ef65d6 100644 --- a/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials_helpers.py +++ b/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials_helpers.py @@ -14,18 +14,17 @@ """This provides helper functions for digital credentials.""" -from typing import List, Union -from flask import current_app +from business_common.utils.legislation_datetime import LegislationDatetime +from business_model.models import Business, CorpType, DCBusinessUser, DCDefinition, User -from business_model.models import Business, CorpType, DCDefinition, User, DCBusinessUser from .digital_credentials_rules import DigitalCredentialsRulesService def get_digital_credential_data( business_user: DCBusinessUser, credential_type: DCDefinition.CredentialType, - self_attested_roles: Union[List[str], None] = None, -) -> List[dict[str, str]]: + self_attested_roles: list[str] | None = None, +) -> list[dict[str, str]] | None: """Get the data for a digital credential.""" if credential_type == DCDefinition.CredentialType.business: rules = DigitalCredentialsRulesService() @@ -78,8 +77,12 @@ def get_company_status(business: Business) -> str: def get_registered_on_dateint(business: Business) -> str: - """Get registered on date in YYYYMMDD format.""" - return business.founding_date.strftime("%Y%m%d") if business.founding_date else "" + """Get registered on date in YYYYMMDD format (legislation timezone).""" + return ( + LegislationDatetime.as_legislation_timezone(business.founding_date).strftime("%Y%m%d") + if business.founding_date + else "" + ) def get_family_name(user: User) -> str: @@ -93,8 +96,11 @@ def get_given_names(user: User) -> str: def get_roles( - user: User, business: Business, rules: DigitalCredentialsRulesService, self_attested_roles: Union[List[str], None] -) -> List[str]: + user: User, + business: Business, + rules: DigitalCredentialsRulesService, + self_attested_roles: list[str] | None, +) -> list[str]: """Get roles for the user in the business.""" def valid_party_role_filter(party_role) -> bool: @@ -116,7 +122,7 @@ def valid_party_role_filter(party_role) -> bool: # Ensures that the user cant attach roles that are not stated in the preconditions party_roles = list(filter(valid_party_role_filter, party_roles)) - return list(map(lambda party_role: party_role.role.replace("_", " ").title(), party_roles)) + return [party_role.role.replace("_", " ").title() for party_role in party_roles] def extract_invitation_message_id(json_message: dict) -> str: diff --git a/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials_lifecycle.py b/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials_lifecycle.py new file mode 100644 index 0000000000..ac8fa1e788 --- /dev/null +++ b/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials_lifecycle.py @@ -0,0 +1,146 @@ +# Copyright © 2025 Province of British Columbia +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Credential-lifecycle helpers: DB-persistence wrappers over the Traction service. + +These wrap the ``digital_credentials`` singleton (Traction REST client) with +``DCCredential`` row save/delete/update operations. Pulled out of the queue +service so both legal-api and the queue service can share them. + +The ``digital_credentials`` singleton is bound at module import time — package +``__init__.py`` creates it BEFORE importing this module, so the top-level +``from . import digital_credentials`` resolves cleanly. Tests can patch +``business_registry_digital_credentials.digital_credentials_lifecycle.digital_credentials`` +to swap out the Traction client. +""" + +from business_model.models import Business, DCBusinessUser, DCConnection, DCCredential, DCDefinition, DCRevocationReason + +from . import digital_credentials +from .digital_credentials_helpers import get_digital_credential_data + + +class DigitalCredentialError(Exception): + """Raised when an issue/revoke/replace lifecycle operation cannot complete.""" + + +def get_all_digital_credentials_for_business(business: Business) -> list[DCCredential]: + """Get all currently-issued, non-revoked digital credentials for a business. + + TODO: Once DCCredential references DCBusinessUser, this function can be refactored. + """ + credentials = [] + for business_user in business.business_users: + active_connections = [conn for conn in business_user.connections if conn.is_active] + if active_connections and len(active_connections) == 1: + active_connection = active_connections[0] + for credential in active_connection.credentials: + if credential.is_issued and not credential.is_revoked: + credentials.append(credential) + return credentials + + +def issue_digital_credential( + business_user: DCBusinessUser, + credential_type: DCDefinition.CredentialType | str, +) -> DCCredential: + """Issue a digital credential for a business to a user. + + ``credential_type`` accepts either a ``DCDefinition.CredentialType`` enum + member or its name string (e.g. ``"business"``). + """ + ct = ( + credential_type + if isinstance(credential_type, DCDefinition.CredentialType) + else DCDefinition.CredentialType[credential_type] + ) + if not ( + definition := DCDefinition.find_by( + ct, + digital_credentials.business_schema_id, + digital_credentials.business_cred_def_id, + ) + ): + raise DigitalCredentialError(f"Definition not found for credential type: {credential_type}.") + + if not (connection := DCConnection.find_active_by_business_user_id(business_user_id=business_user.id)): + raise DigitalCredentialError(f"Active connection not found for business user with ID: {business_user.id}.") + + credential_data = get_digital_credential_data(business_user, definition.credential_type) + credential_id = next( + (item["value"] for item in credential_data if item["name"] == "credential_id"), + None, + ) + + if not ( + response := digital_credentials.issue_credential( + connection_id=connection.connection_id, + definition=definition, + data=credential_data, + ) + ): + raise DigitalCredentialError("Failed to issue credential.") + + issued_credential = DCCredential( + definition_id=definition.id, + connection_id=connection.id, + business_user_id=business_user.id, + credential_exchange_id=response["cred_ex_id"], + credential_id=credential_id, + ) + issued_credential.save() + return issued_credential + + +def revoke_digital_credential(credential: DCCredential, reason: DCRevocationReason) -> None: + """Revoke an issued digital credential.""" + if not credential.is_issued or credential.is_revoked: + raise DigitalCredentialError("Credential is not issued yet or is revoked already.") + + if not (connection := credential.connection) or not connection.is_active: + raise DigitalCredentialError(f"Active connection not found for credential with ID: {credential.credential_id}.") + + if ( + digital_credentials.revoke_credential( + connection.connection_id, + credential.credential_revocation_id, + credential.revocation_registry_id, + reason, + ) + is None + ): + raise DigitalCredentialError("Failed to revoke credential.") + + credential.is_revoked = True + credential.save() + + +def replace_digital_credential( + credential: DCCredential, + credential_type: DCDefinition.CredentialType, + reason: DCRevocationReason, +) -> None: + """Replace an issued digital credential: revoke, issue new, delete old.""" + if credential.is_issued and not credential.is_revoked: + revoke_digital_credential(credential, reason) + + if ( + digital_credentials.fetch_credential_exchange_record(credential.credential_exchange_id) is not None + and digital_credentials.remove_credential_exchange_record(credential.credential_exchange_id) is None + ): + raise DigitalCredentialError("Failed to remove credential exchange record.") + + issue_digital_credential(credential.connection.business_user, credential_type) + # We delete the old credential after issuing the new one so that the connection is not lost + credential.delete() diff --git a/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials_rules.py b/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials_rules.py index 56e94c1ba7..8cbd5cc9d8 100644 --- a/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials_rules.py +++ b/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials_rules.py @@ -14,12 +14,12 @@ """This provides the service for determining access rules to digital credentials.""" -import logging from datetime import datetime, timezone from enum import Enum -from typing import List from business_model.models import Business, Filing, Party, PartyRole, User +from flask import current_app + from .digital_credentials_utils import FormattedUser, determine_allowed_business_types @@ -42,7 +42,10 @@ class FilingTypes(Enum): Business.LegalTypes.PARTNERSHIP.value, ] - valid_incorporation_types = [Business.LegalTypes.BCOMP.value, Business.LegalTypes.BCOMP_CONTINUE_IN.value] + valid_incorporation_types = [ + Business.LegalTypes.BCOMP.value, + Business.LegalTypes.BCOMP_CONTINUE_IN.value, + ] valid_role_types = [ PartyRole.RoleTypes.PROPRIETOR.value, @@ -50,11 +53,18 @@ class FilingTypes(Enum): PartyRole.RoleTypes.PARTNER.value, ] - def are_digital_credentials_allowed(self, user: User, business: Business) -> bool: - """Return True if the user is allowed to access digital credentials.""" - return self._has_general_access(user) and self._has_specific_access(user, business) + def are_digital_credentials_allowed( + self, user: User, business: Business, allowed_business_types: list[str] + ) -> bool: + """Return True if the user is allowed to access digital credentials. + + ``allowed_business_types`` is the feature-flag-resolved list of business types + eligible for DBC. Required — callers must resolve and pass it explicitly to + avoid silently bypassing the gate. Pass ``[]`` to deny all. + """ + return self._has_general_access(user) and self._has_specific_access(user, business, allowed_business_types) - def get_preconditions(self, user: User, business: Business) -> List[str]: + def get_preconditions(self, user: User, business: Business) -> list[str]: """ Return the preconditions for digital credentials. @@ -64,54 +74,64 @@ def get_preconditions(self, user: User, business: Business) -> List[str]: if not self.user_is_completing_party(user, business): preconditions += self.user_business_party_roles(user, business) preconditions += self.user_filing_party_roles(user, business) - return list(map(lambda party_role: party_role.role, preconditions)) + return [party_role.role for party_role in preconditions] def _has_general_access(self, user: User) -> bool: """Return True if general access rules are met.""" if not user: - logging.debug("No user is provided.") + current_app.logger.debug("No user is provided.") return False is_login_source_bcsc = user.login_source == "BCSC" if not is_login_source_bcsc: - logging.debug("User is not logged in with BCSC.") + current_app.logger.debug("User is not logged in with BCSC.") return False return True - def _has_specific_access(self, user: User, business: Business) -> bool: + def _has_specific_access(self, user: User, business: Business, allowed_business_types: list[str]) -> bool: """Return True if business rules are met.""" if not business: - logging.debug("No business is provided.") + current_app.logger.debug("No business is provided.") return False - allowed_business_types = determine_allowed_business_types( - self.valid_registration_types, self.valid_incorporation_types + eligible_business_types = determine_allowed_business_types( + allowed_business_types, + self.valid_registration_types, + self.valid_incorporation_types, ) - logging.debug("Allowed business types: %s", allowed_business_types) + current_app.logger.debug("DBC Allowed business types: %s", eligible_business_types) - if business.legal_type in allowed_business_types: + if business.legal_type in eligible_business_types: return bool(self.user_filing_party_roles(user, business) or self.user_business_party_roles(user, business)) - logging.debug("No specific access rules are met.") + current_app.logger.debug("No specific DBC access rules are met.") return False def user_is_completing_party(self, user: User, business: Business) -> bool: """Return True if the user is the completing party.""" if len(filings := self.valid_filings(business)) <= 0: - logging.debug("No registration or incorporation filing found for the business.") + current_app.logger.debug("No registration or incorporation filing found for the business.") return False filing = filings.pop(0) return self.user_submitted_filing(user, filing) and self.user_matches_completing_party(user, filing) - def user_filing_party_roles(self, user: User, business: Business) -> List[PartyRole]: + def user_has_filing_party_role(self, user: User, business: Business) -> bool: + """Return True if the user has a valid filing party role (e.g. incorporator) in the business.""" + return len(self.user_filing_party_roles(user, business)) > 0 + + def user_has_business_party_role(self, user: User, business: Business) -> bool: + """Return True if the user has a valid business party role (e.g. director) in the business.""" + return len(self.user_business_party_roles(user, business)) > 0 + + def user_filing_party_roles(self, user: User, business: Business) -> list[PartyRole]: """Return the valid filing roles of the user for the business, if any. - Only returns roles that are in valid_role_types (proprietor, director, partner). + Only returns roles that are in ``valid_role_types`` (proprietor, director, partner). """ if len(filings := self.valid_filings(business)) <= 0: - logging.debug("No registration or incorporation filing found for the business.") + current_app.logger.debug("No registration or incorporation filing found for the business.") return [] if business.legal_type in self.valid_registration_types: @@ -120,35 +140,41 @@ def user_filing_party_roles(self, user: User, business: Business) -> List[PartyR filing = filings.pop(0) roles = filing.filing_party_roles.filter(PartyRole.role != PartyRole.RoleTypes.COMPLETING_PARTY.value).all() return list( - filter(lambda role: role.role in self.valid_role_types and self.user_matches_party(user, role.party), roles) + filter( + lambda role: role.role in self.valid_role_types and self.user_matches_party(user, role.party), + roles, + ) ) - def user_business_party_roles(self, user: User, business: Business) -> List[PartyRole]: + def user_business_party_roles(self, user: User, business: Business) -> list[PartyRole]: """Return the valid party roles of the user for the business, if any. - Only returns roles that are in valid_role_types (proprietor, director, partner). + Only returns roles that are in ``valid_role_types`` (proprietor, director, partner). """ roles = business.party_roles.all() return list( - filter(lambda role: role.role in self.valid_role_types and self.user_matches_party(user, role.party), roles) + filter( + lambda role: role.role in self.valid_role_types and self.user_matches_party(user, role.party), + roles, + ) ) def user_submitted_filing(self, user: User, filing: Filing) -> bool: """Return True if the user submitted the filing.""" did_user_submit_filing = user.id == filing.submitter_id if not did_user_submit_filing: - logging.debug("User is not the filing submitter.") + current_app.logger.debug("User is not the filing submitter.") return did_user_submit_filing def user_matches_completing_party(self, user: User, filing: Filing) -> bool: """Return the True if the user matches a completing party.""" if len(roles := self.completing_party_roles(filing)) <= 0: - logging.debug("No completing parties found for the registration or incorporation filing.") + current_app.logger.debug("No completing parties found for the registration or incorporation filing.") return False is_user_completing_party = len(list(filter(lambda role: self.user_matches_party(user, role.party), roles))) > 0 if not is_user_completing_party: - logging.debug("User is not the completing party.") + current_app.logger.debug("User is not the completing party.") return is_user_completing_party def user_matches_party(self, user: User, party: Party) -> bool: @@ -157,11 +183,11 @@ def user_matches_party(self, user: User, party: Party) -> bool: p = FormattedUser(party) return u.first_name == p.first_name and u.last_name == p.last_name - def valid_filings(self, business: Business) -> List[Filing]: + def valid_filings(self, business: Business) -> list[Filing]: """Return the registration or incorporation filings for the business.""" return Filing.get_filings_by_types(business.id, self.valid_filing_types) - def completing_party_roles(self, filing: Filing) -> List[PartyRole]: + def completing_party_roles(self, filing: Filing) -> list[PartyRole]: """Return the completing parties of a filing.""" return PartyRole.get_party_roles_by_filing( filing.id, @@ -169,7 +195,7 @@ def completing_party_roles(self, filing: Filing) -> List[PartyRole]: PartyRole.RoleTypes.COMPLETING_PARTY.value, ) - def filing_party_roles(self, filing: Filing) -> List[PartyRole]: + def filing_party_roles(self, filing: Filing) -> list[PartyRole]: """Return the party roles of a filing.""" return PartyRole.get_party_roles_by_filing( filing.id, diff --git a/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials_utils.py b/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials_utils.py index 6f376ed29a..d8f6f0ba22 100644 --- a/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials_utils.py +++ b/python/common/business-registry-digital-credentials/src/business_registry_digital_credentials/digital_credentials_utils.py @@ -14,45 +14,26 @@ """This provides utility functions for specific actions related to digital credentials.""" -from typing import List, Union - -from flask import current_app - from business_model.models import Party, User -# from legal_api.services.flags import Flags - - -# flags = Flags() - -DBC_ENABLED_BUSINESS_TYPES_FLAG = "dbc-enabled-business-types" - def determine_allowed_business_types( - valid_registration_types: List[str], valid_incorporation_types: List[str] -) -> List[str]: - """Determine if the business type is allowed for digital credentials based on flags.""" - # if not flags.is_on(DBC_ENABLED_BUSINESS_TYPES_FLAG): - # logging.warning('%s is OFF', DBC_ENABLED_BUSINESS_TYPES_FLAG) - # return [] - - # flag_obj = flags.value(DBC_ENABLED_BUSINESS_TYPES_FLAG) - # logging.debug('%s flag: %s', DBC_ENABLED_BUSINESS_TYPES_FLAG, flag_obj) - - # # Validate dbc-enabled-business-types is the right format to parse out - # if not isinstance(flag_obj, dict) or 'types' not in flag_obj or not isinstance(flag_obj['types'], list): - # logging.error('Invalid %s flag value: %s', DBC_ENABLED_BUSINESS_TYPES_FLAG, flag_obj) - # return [] + enabled_business_types: list[str] | None, + valid_registration_types: list[str], + valid_incorporation_types: list[str], +) -> list[str]: + """Return the intersection of caller-provided enabled types and DBC-supported types. - # supported_types = valid_registration_types + valid_incorporation_types - # valid_business_types = list(set(flag_obj['types']) & set(supported_types)) - # return valid_business_types + ``enabled_business_types`` is the feature-flag-resolved list of business types the + operator wants enabled for DBC. The caller (e.g. legal-api) is responsible for + loading the LaunchDarkly flag and passing the resolved value. ``None`` or ``[]`` + means "DBC disabled" — returns ``[]``. + """ + if not enabled_business_types: + return [] - # TODO: Implement the flag logic when flags are available. - # As done in the legal-api where this function is actually used - current_app.logger.warning("DBC_ENABLED_BUSINESS_TYPES_FLAG is not implemented yet.") - # For now, return all valid business types. - return ["SP", "BEN", "GP"] + supported_types = valid_registration_types + valid_incorporation_types + return list(set(enabled_business_types) & set(supported_types)) class FormattedUser: @@ -61,13 +42,13 @@ class FormattedUser: first_name: str last_name: str - def __init__(self, user: Union[User, Party]): + def __init__(self, user: User | Party): """Initialize the formatted user.""" first_name, last_name = self._formatted_user(user) self.first_name = first_name self.last_name = last_name - def _formatted_user(self, user: Union[User, Party]) -> tuple[str, str]: + def _formatted_user(self, user: User | Party) -> tuple[str, str]: """Return the formatted name of the user.""" first_name = (getattr(user, "firstname", "") or getattr(user, "first_name", "") or "").lower() last_name = (getattr(user, "lastname", "") or getattr(user, "last_name", "") or "").lower() diff --git a/python/common/business-registry-digital-credentials/tests/conftest.py b/python/common/business-registry-digital-credentials/tests/conftest.py index 48e4538b52..dbc221e44f 100644 --- a/python/common/business-registry-digital-credentials/tests/conftest.py +++ b/python/common/business-registry-digital-credentials/tests/conftest.py @@ -13,17 +13,23 @@ # limitations under the License. """Test configuration and fixtures.""" -import pytest from unittest.mock import Mock + +import pytest from flask import Flask -@pytest.fixture +@pytest.fixture(autouse=True) def app(): - """Create a Flask application for testing.""" + """Create a Flask application for testing. + + Autouse so every test runs inside an app context — the shared package now + uses ``current_app.logger`` and ``LegislationDatetime`` helpers that need it. + """ app = Flask(__name__) app.config["TESTING"] = True app.config["SECRET_KEY"] = "test-secret-key" + app.config["LEGISLATIVE_TIMEZONE"] = "America/Vancouver" with app.app_context(): yield app diff --git a/python/common/business-registry-digital-credentials/tests/unit/test_config.py b/python/common/business-registry-digital-credentials/tests/unit/test_config.py index 755a0db637..2efb6ba009 100644 --- a/python/common/business-registry-digital-credentials/tests/unit/test_config.py +++ b/python/common/business-registry-digital-credentials/tests/unit/test_config.py @@ -14,6 +14,7 @@ """Tests for config module.""" import pytest + from business_registry_digital_credentials.config import DevConfig, ProdConfig, TestConfig, get_named_config diff --git a/python/common/business-registry-digital-credentials/tests/unit/test_decorators.py b/python/common/business-registry-digital-credentials/tests/unit/test_decorators.py index d20e9b0790..5d684e149b 100644 --- a/python/common/business-registry-digital-credentials/tests/unit/test_decorators.py +++ b/python/common/business-registry-digital-credentials/tests/unit/test_decorators.py @@ -22,11 +22,8 @@ import jwt as pyjwt import pytest -from business_registry_digital_credentials.decorators import ( - _get_traction_token, - can_access_digital_credentials, - requires_traction_auth, -) + +from business_registry_digital_credentials.decorators import _get_traction_token, requires_traction_auth def _make_valid_token(): @@ -39,55 +36,6 @@ def _make_expired_token(): return pyjwt.encode({"exp": int(time.time()) - 3600}, "secret", algorithm="HS256") -class TestCanAccessDigitalCredentials: - """Tests for can_access_digital_credentials decorator.""" - - def test_business_not_found(self, app): - """Returns 404 when business is not found.""" - - @can_access_digital_credentials - def dummy_view(**kwargs): - return "ok", 200 - - with app.test_request_context(): - with patch("business_registry_digital_credentials.decorators.Business.find_by_identifier", return_value=None): - result, status = dummy_view(identifier="BC1234567") - assert status == HTTPStatus.NOT_FOUND - - @patch("business_registry_digital_credentials.decorators.are_digital_credentials_allowed", return_value=False) - def test_not_allowed(self, mock_allowed, app): - """Returns 401 when digital credentials are not allowed.""" - - @can_access_digital_credentials - def dummy_view(**kwargs): - return "ok", 200 - - business = MagicMock() - with app.test_request_context(): - with patch( - "business_registry_digital_credentials.decorators.Business.find_by_identifier", return_value=business - ): - result, status = dummy_view(identifier="BC1234567") - assert status == HTTPStatus.UNAUTHORIZED - - @patch("business_registry_digital_credentials.decorators.are_digital_credentials_allowed", return_value=True) - def test_allowed(self, mock_allowed, app): - """Calls through to wrapped function when allowed.""" - - @can_access_digital_credentials - def dummy_view(**kwargs): - return "ok", 200 - - business = MagicMock() - with app.test_request_context(): - with patch( - "business_registry_digital_credentials.decorators.Business.find_by_identifier", return_value=business - ): - result, status = dummy_view(identifier="BC1234567") - assert result == "ok" - assert status == 200 - - class TestRequiresTractionAuth: """Tests for requires_traction_auth decorator.""" diff --git a/python/common/business-registry-digital-credentials/tests/unit/test_digital_credentials_auth.py b/python/common/business-registry-digital-credentials/tests/unit/test_digital_credentials_auth.py index 7891d22aad..db4c135cf2 100644 --- a/python/common/business-registry-digital-credentials/tests/unit/test_digital_credentials_auth.py +++ b/python/common/business-registry-digital-credentials/tests/unit/test_digital_credentials_auth.py @@ -18,8 +18,9 @@ from unittest.mock import MagicMock, patch -from business_model.models import Business -from business_model.models import User +from business_model.models import Business, User +from flask.globals import request_ctx + from business_registry_digital_credentials.digital_credentials_auth import ( are_digital_credentials_allowed, get_digital_credentials_preconditions, @@ -90,21 +91,23 @@ def test_get_digital_credentials_preconditions_no_user(mock_user, app): def test_are_digital_credentials_allowed_staff(app): """Staff users are not allowed digital credentials.""" with app.test_request_context(): + request_ctx.current_user = {"username": "test"} jwt_mock = MagicMock() jwt_mock.contains_role.return_value = True business = Business() - assert are_digital_credentials_allowed(business, jwt_mock) is False + assert are_digital_credentials_allowed(business, jwt_mock, ["SP", "BEN", "GP"]) is False @patch("business_model.models.User.find_by_jwt_token", return_value=None) def test_are_digital_credentials_allowed_no_user(mock_find, app): """Returns False when user is not found.""" with app.test_request_context(): + request_ctx.current_user = {"username": "test"} app.app_ctx_globals_class.jwt_oidc_token_info = {"username": "test"} jwt_mock = MagicMock() jwt_mock.contains_role.return_value = False business = Business() - assert are_digital_credentials_allowed(business, jwt_mock) is False + assert are_digital_credentials_allowed(business, jwt_mock, ["SP", "BEN", "GP"]) is False @patch("business_model.models.User.find_by_jwt_token", return_value=User(id=1, login_source="BCSC")) @@ -112,11 +115,12 @@ def test_are_digital_credentials_allowed_no_user(mock_find, app): def test_are_digital_credentials_allowed_delegates_to_rules(mock_rules, mock_find, app): """Delegates to rules service when user is found.""" with app.test_request_context(): + request_ctx.current_user = {"username": "test"} app.app_ctx_globals_class.jwt_oidc_token_info = {"username": "test"} jwt_mock = MagicMock() jwt_mock.contains_role.return_value = False business = Business() - assert are_digital_credentials_allowed(business, jwt_mock) is True + assert are_digital_credentials_allowed(business, jwt_mock, ["SP", "BEN", "GP"]) is True @patch("business_model.models.User.find_by_jwt_token", return_value=User(id=1, login_source="BCSC")) @@ -124,8 +128,9 @@ def test_are_digital_credentials_allowed_delegates_to_rules(mock_rules, mock_fin def test_are_digital_credentials_allowed_rules_deny(mock_rules, mock_find, app): """Returns False when rules service denies access.""" with app.test_request_context(): + request_ctx.current_user = {"username": "test"} app.app_ctx_globals_class.jwt_oidc_token_info = {"username": "test"} jwt_mock = MagicMock() jwt_mock.contains_role.return_value = False business = Business() - assert are_digital_credentials_allowed(business, jwt_mock) is False + assert are_digital_credentials_allowed(business, jwt_mock, ["SP", "BEN", "GP"]) is False diff --git a/python/common/business-registry-digital-credentials/tests/unit/test_digital_credentials_helpers_and_utils.py b/python/common/business-registry-digital-credentials/tests/unit/test_digital_credentials_helpers_and_utils.py index 1b1ae9438f..34b592a70a 100644 --- a/python/common/business-registry-digital-credentials/tests/unit/test_digital_credentials_helpers_and_utils.py +++ b/python/common/business-registry-digital-credentials/tests/unit/test_digital_credentials_helpers_and_utils.py @@ -16,11 +16,12 @@ Test suite to ensure that helpers and utility functions for digital credentials are working as expected """ -from datetime import datetime +from datetime import datetime, timezone from unittest.mock import MagicMock, patch import pytest from business_model.models import Business, CorpType, DCBusinessUser, DCDefinition, Party, PartyRole, User + from business_registry_digital_credentials.digital_credentials_helpers import ( extract_invitation_message_id, get_business_type, @@ -63,54 +64,29 @@ def test_formatted_user(test_user, expected): @pytest.mark.parametrize( - "flag_value, valid_registration_types, valid_incorporation_types, expected", - [ - ({"types": ["SP", "BEN", "GP"]}, ["SP", "GP"], ["BEN"], ["SP", "BEN", "GP"]), - ({"types": ["SP", "BEN", "GP", "CBEN"]}, ["SP", "GP"], ["BEN"], ["SP", "BEN", "GP"]), - ({"types": ["SP"]}, ["SP", "GP"], ["BEN"], ["SP", "BEN", "GP"]), - ({"types": []}, ["SP", "GP"], ["BEN"], ["SP", "BEN", "GP"]), - ({"types": ["SP", "GP"]}, [], ["BEN"], ["SP", "BEN", "GP"]), - ({"types": ["SP", "BEN"]}, ["SP", "GP"], [], ["SP", "BEN", "GP"]), - ], -) -def test_determine_allowed_business_types( - app, flag_value, valid_registration_types, valid_incorporation_types, expected -): - """Test filtering of allowed business types based on flag values.""" - - # The app fixture provides Flask context, so current_app.logger works - result = determine_allowed_business_types(valid_registration_types, valid_incorporation_types) - assert sorted(result) == sorted(expected) - - -@pytest.mark.parametrize( - "flag_value, valid_registration_types, valid_incorporation_types, expected", + "enabled_business_types, valid_registration_types, valid_incorporation_types, expected", [ (["SP", "BEN", "GP"], ["SP", "GP"], ["BEN"], ["SP", "BEN", "GP"]), - ({}, ["SP"], ["BEN"], ["SP", "BEN", "GP"]), - ({"types": "SP"}, ["SP"], ["BEN"], ["SP", "BEN", "GP"]), - ({"types": 123}, ["SP"], ["BEN"], ["SP", "BEN", "GP"]), - ({"type": ["SP", "BEN", "GP"]}, ["SP"], ["BEN"], ["SP", "BEN", "GP"]), - ("not-a-object", ["SP"], ["BEN"], ["SP", "BEN", "GP"]), - (123, ["SP"], ["BEN"], ["SP", "BEN", "GP"]), + (["SP", "BEN", "GP", "CBEN"], ["SP", "GP"], ["BEN"], ["SP", "BEN", "GP"]), + (["SP"], ["SP", "GP"], ["BEN"], ["SP"]), + ([], ["SP", "GP"], ["BEN"], []), + (["SP", "GP"], [], ["BEN"], []), + (["SP", "BEN"], ["SP", "GP"], [], ["SP"]), ], ) -def test_determine_allowed_business_types_invalid_flags( - app, flag_value, valid_registration_types, valid_incorporation_types, expected +def test_determine_allowed_business_types( + enabled_business_types, valid_registration_types, valid_incorporation_types, expected ): - """Test filtering of allowed business types based on flag values.""" - - # The app fixture provides Flask context, so current_app.logger works - result = determine_allowed_business_types(valid_registration_types, valid_incorporation_types) + """Caller-provided enabled types are intersected with supported types.""" + result = determine_allowed_business_types( + enabled_business_types, valid_registration_types, valid_incorporation_types + ) assert sorted(result) == sorted(expected) -def test_determine_allowed_business_types_missing_flag(app): - """Test filtering of allowed business types based on flag value not set.""" - - # The app fixture provides Flask context, so current_app.logger works - result = determine_allowed_business_types(["SP", "GP"], ["BEN"]) - assert result == ["SP", "BEN", "GP"] +def test_determine_allowed_business_types_none_input(): + """None ``enabled_business_types`` yields empty list (DBC disabled).""" + assert determine_allowed_business_types(None, ["SP", "GP"], ["BEN"]) == [] # Helper function tests @@ -175,9 +151,11 @@ class TestGetRegisteredOnDateint: """Tests for get_registered_on_dateint.""" def test_formats_date(self): - """Returns date in YYYYMMDD format.""" + """Returns date in YYYYMMDD format (in legislation timezone).""" business = MagicMock() - business.founding_date = datetime(2025, 1, 15) + # Use a tz-aware UTC time mid-day in America/Vancouver so the date is + # stable regardless of the host system's local timezone. + business.founding_date = datetime(2025, 1, 15, 20, 0, tzinfo=timezone.utc) assert get_registered_on_dateint(business) == "20250115" def test_no_founding_date(self): @@ -328,7 +306,9 @@ def test_non_business_type_returns_none(self): @patch("business_registry_digital_credentials.digital_credentials_helpers.get_roles", return_value=["Director"]) @patch("business_registry_digital_credentials.digital_credentials_helpers.get_given_names", return_value="JOHN") @patch("business_registry_digital_credentials.digital_credentials_helpers.get_family_name", return_value="DOE") - @patch("business_registry_digital_credentials.digital_credentials_helpers.get_company_status", return_value="ACTIVE") + @patch( + "business_registry_digital_credentials.digital_credentials_helpers.get_company_status", return_value="ACTIVE" + ) @patch( "business_registry_digital_credentials.digital_credentials_helpers.get_registered_on_dateint", return_value="20250115", diff --git a/python/common/business-registry-digital-credentials/tests/unit/test_digital_credentials_lifecycle.py b/python/common/business-registry-digital-credentials/tests/unit/test_digital_credentials_lifecycle.py new file mode 100644 index 0000000000..d06244ece9 --- /dev/null +++ b/python/common/business-registry-digital-credentials/tests/unit/test_digital_credentials_lifecycle.py @@ -0,0 +1,275 @@ +# Copyright © 2026 Province of British Columbia +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for the credential-lifecycle helpers.""" + +from unittest.mock import MagicMock, patch + +import pytest +from business_model.models import DCDefinition, DCRevocationReason + +from business_registry_digital_credentials import digital_credentials_lifecycle +from business_registry_digital_credentials.digital_credentials_lifecycle import ( + DigitalCredentialError, + get_all_digital_credentials_for_business, + issue_digital_credential, + replace_digital_credential, + revoke_digital_credential, +) + +# get_all_digital_credentials_for_business ---------------------------------- + + +class TestGetAllDigitalCredentialsForBusiness: + """Tests for get_all_digital_credentials_for_business.""" + + def test_returns_empty_when_no_business_users(self): + """Returns empty list when the business has no business_users.""" + business = MagicMock() + business.business_users = [] + assert get_all_digital_credentials_for_business(business) == [] + + def test_returns_empty_when_no_active_connection(self): + """Returns empty list when no business_user has an active connection.""" + business_user = MagicMock() + conn = MagicMock(is_active=False) + business_user.connections = [conn] + business = MagicMock(business_users=[business_user]) + assert get_all_digital_credentials_for_business(business) == [] + + def test_skips_when_multiple_active_connections(self): + """Helper only considers users with exactly one active connection.""" + business_user = MagicMock() + business_user.connections = [ + MagicMock(is_active=True), + MagicMock(is_active=True), + ] + business = MagicMock(business_users=[business_user]) + assert get_all_digital_credentials_for_business(business) == [] + + def test_returns_issued_non_revoked_credentials(self): + """Returns only credentials that are issued and not revoked.""" + good = MagicMock(is_issued=True, is_revoked=False) + not_issued = MagicMock(is_issued=False, is_revoked=False) + revoked = MagicMock(is_issued=True, is_revoked=True) + conn = MagicMock(is_active=True, credentials=[good, not_issued, revoked]) + business_user = MagicMock(connections=[conn]) + business = MagicMock(business_users=[business_user]) + assert get_all_digital_credentials_for_business(business) == [good] + + +# issue_digital_credential -------------------------------------------------- + + +class TestIssueDigitalCredential: + """Tests for issue_digital_credential.""" + + @patch.object(digital_credentials_lifecycle, "DCDefinition") + def test_raises_when_definition_not_found(self, mock_definition_cls): + """Raises when no DCDefinition matches.""" + mock_definition_cls.find_by.return_value = None + # Preserve the real enum so isinstance check works. + mock_definition_cls.CredentialType = DCDefinition.CredentialType + business_user = MagicMock(id=1) + with pytest.raises(DigitalCredentialError, match="Definition not found"): + issue_digital_credential(business_user, DCDefinition.CredentialType.business) + + @patch.object(digital_credentials_lifecycle, "DCConnection") + @patch.object(digital_credentials_lifecycle, "DCDefinition") + def test_raises_when_active_connection_not_found(self, mock_definition_cls, mock_connection_cls): + """Raises when there is no active DCConnection for the business user.""" + mock_definition_cls.find_by.return_value = MagicMock() + mock_definition_cls.CredentialType = DCDefinition.CredentialType + mock_connection_cls.find_active_by_business_user_id.return_value = None + business_user = MagicMock(id=1) + with pytest.raises(DigitalCredentialError, match="Active connection not found"): + issue_digital_credential(business_user, DCDefinition.CredentialType.business) + + @patch.object(digital_credentials_lifecycle, "get_digital_credential_data", return_value=[]) + @patch.object(digital_credentials_lifecycle, "digital_credentials") + @patch.object(digital_credentials_lifecycle, "DCConnection") + @patch.object(digital_credentials_lifecycle, "DCDefinition") + def test_raises_when_traction_issue_fails(self, mock_definition_cls, mock_connection_cls, mock_dc, mock_get_data): + """Raises when Traction issuance returns no response.""" + mock_definition_cls.find_by.return_value = MagicMock() + mock_definition_cls.CredentialType = DCDefinition.CredentialType + mock_connection_cls.find_active_by_business_user_id.return_value = MagicMock() + mock_dc.issue_credential.return_value = None + with pytest.raises(DigitalCredentialError, match="Failed to issue credential"): + issue_digital_credential(MagicMock(id=1), DCDefinition.CredentialType.business) + + @patch.object(digital_credentials_lifecycle, "DCCredential") + @patch.object(digital_credentials_lifecycle, "get_digital_credential_data") + @patch.object(digital_credentials_lifecycle, "digital_credentials") + @patch.object(digital_credentials_lifecycle, "DCConnection") + @patch.object(digital_credentials_lifecycle, "DCDefinition") + def test_saves_credential_with_business_user_id( + self, + mock_definition_cls, + mock_connection_cls, + mock_dc, + mock_get_data, + mock_credential_cls, + ): + """Created DCCredential carries business_user_id (NOT NULL on the model).""" + definition = MagicMock(id=10) + connection = MagicMock(id=20, connection_id="conn-xyz") + mock_definition_cls.find_by.return_value = definition + mock_definition_cls.CredentialType = DCDefinition.CredentialType + mock_connection_cls.find_active_by_business_user_id.return_value = connection + mock_dc.issue_credential.return_value = {"cred_ex_id": "ex-1"} + mock_get_data.return_value = [{"name": "credential_id", "value": "cid-1"}] + + business_user = MagicMock(id=42) + issue_digital_credential(business_user, DCDefinition.CredentialType.business) + + mock_credential_cls.assert_called_once_with( + definition_id=10, + connection_id=20, + business_user_id=42, + credential_exchange_id="ex-1", + credential_id="cid-1", + ) + mock_credential_cls.return_value.save.assert_called_once() + + @patch.object(digital_credentials_lifecycle, "DCCredential") + @patch.object(digital_credentials_lifecycle, "get_digital_credential_data", return_value=[]) + @patch.object(digital_credentials_lifecycle, "digital_credentials") + @patch.object(digital_credentials_lifecycle, "DCConnection") + @patch.object(digital_credentials_lifecycle, "DCDefinition") + def test_accepts_credential_type_as_string( + self, + mock_definition_cls, + mock_connection_cls, + mock_dc, + mock_get_data, + mock_credential_cls, + ): + """credential_type as a name string is normalized to the enum.""" + mock_definition_cls.find_by.return_value = MagicMock(id=10) + mock_definition_cls.CredentialType = DCDefinition.CredentialType + mock_connection_cls.find_active_by_business_user_id.return_value = MagicMock(id=20, connection_id="c") + mock_dc.issue_credential.return_value = {"cred_ex_id": "ex-1"} + + issue_digital_credential(MagicMock(id=1), "business") + + # find_by was called with the resolved enum member, not the raw string. + passed_type = mock_definition_cls.find_by.call_args[0][0] + assert passed_type == DCDefinition.CredentialType.business + + +# revoke_digital_credential ------------------------------------------------- + + +class TestRevokeDigitalCredential: + """Tests for revoke_digital_credential.""" + + def test_raises_when_not_issued(self): + """Raises when the credential has not yet been issued.""" + credential = MagicMock(is_issued=False, is_revoked=False) + with pytest.raises(DigitalCredentialError, match="not issued yet or is revoked already"): + revoke_digital_credential(credential, DCRevocationReason.UPDATED_INFORMATION) + + def test_raises_when_already_revoked(self): + """Raises when the credential is already revoked.""" + credential = MagicMock(is_issued=True, is_revoked=True) + with pytest.raises(DigitalCredentialError, match="not issued yet or is revoked already"): + revoke_digital_credential(credential, DCRevocationReason.UPDATED_INFORMATION) + + def test_raises_when_no_active_connection(self): + """Raises when the credential's connection is missing or inactive.""" + credential = MagicMock(is_issued=True, is_revoked=False, connection=None) + with pytest.raises(DigitalCredentialError, match="Active connection not found"): + revoke_digital_credential(credential, DCRevocationReason.UPDATED_INFORMATION) + + @patch.object(digital_credentials_lifecycle, "digital_credentials") + def test_raises_when_traction_revoke_fails(self, mock_dc): + """Raises when Traction returns None on revoke.""" + connection = MagicMock(is_active=True, connection_id="c") + credential = MagicMock(is_issued=True, is_revoked=False, connection=connection) + mock_dc.revoke_credential.return_value = None + with pytest.raises(DigitalCredentialError, match="Failed to revoke credential"): + revoke_digital_credential(credential, DCRevocationReason.UPDATED_INFORMATION) + + @patch.object(digital_credentials_lifecycle, "digital_credentials") + def test_marks_revoked_and_saves(self, mock_dc): + """On success, sets is_revoked=True and saves the credential.""" + connection = MagicMock(is_active=True, connection_id="c") + credential = MagicMock(is_issued=True, is_revoked=False, connection=connection) + mock_dc.revoke_credential.return_value = {"ok": True} + + revoke_digital_credential(credential, DCRevocationReason.UPDATED_INFORMATION) + + assert credential.is_revoked is True + credential.save.assert_called_once() + + +# replace_digital_credential ------------------------------------------------ + + +class TestReplaceDigitalCredential: + """Tests for replace_digital_credential.""" + + @patch.object(digital_credentials_lifecycle, "issue_digital_credential") + @patch.object(digital_credentials_lifecycle, "revoke_digital_credential") + @patch.object(digital_credentials_lifecycle, "digital_credentials") + def test_revokes_then_issues_then_deletes(self, mock_dc, mock_revoke, mock_issue): + """Replacement path: revoke existing → issue new → delete old.""" + credential = MagicMock(is_issued=True, is_revoked=False) + mock_dc.fetch_credential_exchange_record.return_value = {"id": "ex-1"} + mock_dc.remove_credential_exchange_record.return_value = {"removed": True} + + replace_digital_credential( + credential, + DCDefinition.CredentialType.business, + DCRevocationReason.UPDATED_INFORMATION, + ) + + mock_revoke.assert_called_once_with(credential, DCRevocationReason.UPDATED_INFORMATION) + mock_issue.assert_called_once_with(credential.connection.business_user, DCDefinition.CredentialType.business) + credential.delete.assert_called_once() + + @patch.object(digital_credentials_lifecycle, "issue_digital_credential") + @patch.object(digital_credentials_lifecycle, "revoke_digital_credential") + @patch.object(digital_credentials_lifecycle, "digital_credentials") + def test_skips_revoke_when_already_revoked(self, mock_dc, mock_revoke, mock_issue): + """Does not call revoke when the existing credential is already revoked.""" + credential = MagicMock(is_issued=True, is_revoked=True) + mock_dc.fetch_credential_exchange_record.return_value = None + + replace_digital_credential( + credential, + DCDefinition.CredentialType.business, + DCRevocationReason.UPDATED_INFORMATION, + ) + + mock_revoke.assert_not_called() + mock_issue.assert_called_once() + + @patch.object(digital_credentials_lifecycle, "issue_digital_credential") + @patch.object(digital_credentials_lifecycle, "revoke_digital_credential") + @patch.object(digital_credentials_lifecycle, "digital_credentials") + def test_raises_when_remove_exchange_record_fails(self, mock_dc, mock_revoke, mock_issue): + """Raises when the credential exchange record exists but removal fails.""" + credential = MagicMock(is_issued=True, is_revoked=True) + mock_dc.fetch_credential_exchange_record.return_value = {"id": "ex-1"} + mock_dc.remove_credential_exchange_record.return_value = None + + with pytest.raises(DigitalCredentialError, match="Failed to remove credential exchange record"): + replace_digital_credential( + credential, + DCDefinition.CredentialType.business, + DCRevocationReason.UPDATED_INFORMATION, + ) + mock_issue.assert_not_called() + credential.delete.assert_not_called() diff --git a/python/common/business-registry-digital-credentials/tests/unit/test_digital_credentials_rules.py b/python/common/business-registry-digital-credentials/tests/unit/test_digital_credentials_rules.py index 231b01fe2d..c37fd095b9 100644 --- a/python/common/business-registry-digital-credentials/tests/unit/test_digital_credentials_rules.py +++ b/python/common/business-registry-digital-credentials/tests/unit/test_digital_credentials_rules.py @@ -20,6 +20,7 @@ import pytest from business_model.models import Business, Filing, Party, PartyRole, User + from business_registry_digital_credentials.digital_credentials_rules import DigitalCredentialsRulesService @@ -90,6 +91,7 @@ def test_returns_empty_when_no_roles(self, mock_match, rules_service, mock_user) result = rules_service.user_business_party_roles(mock_user, business) assert result == [] + class TestUserFilingPartyRoles: """Tests for user_filing_party_roles filtering by valid_role_types.""" @@ -179,7 +181,7 @@ class TestHasSpecificAccess: def test_no_business(self, rules_service, mock_user): """Returns False when no business is provided.""" - assert rules_service._has_specific_access(mock_user, None) is False + assert rules_service._has_specific_access(mock_user, None, ["SP", "BEN", "GP"]) is False @patch( "business_registry_digital_credentials.digital_credentials_rules.determine_allowed_business_types", @@ -189,7 +191,7 @@ def test_non_allowed_business_type(self, mock_types, rules_service, mock_user): """Returns False when business type is not allowed.""" business = MagicMock(spec=Business) business.legal_type = "CP" - assert rules_service._has_specific_access(mock_user, business) is False + assert rules_service._has_specific_access(mock_user, business, ["SP", "BEN", "GP"]) is False @patch( "business_registry_digital_credentials.digital_credentials_rules.determine_allowed_business_types", @@ -200,7 +202,7 @@ def test_allowed_type_with_filing_role(self, mock_role, mock_types, rules_servic """Returns True when business type is allowed and user has a filing role.""" business = MagicMock(spec=Business) business.legal_type = "BEN" - assert rules_service._has_specific_access(mock_user, business) is True + assert rules_service._has_specific_access(mock_user, business, ["SP", "BEN", "GP"]) is True @patch( "business_registry_digital_credentials.digital_credentials_rules.determine_allowed_business_types", @@ -208,11 +210,13 @@ def test_allowed_type_with_filing_role(self, mock_role, mock_types, rules_servic ) @patch.object(DigitalCredentialsRulesService, "user_filing_party_roles", return_value=[]) @patch.object(DigitalCredentialsRulesService, "user_business_party_roles", return_value=[MagicMock()]) - def test_allowed_type_with_business_role(self, mock_biz_role, mock_filing_role, mock_types, rules_service, mock_user): + def test_allowed_type_with_business_role( + self, mock_biz_role, mock_filing_role, mock_types, rules_service, mock_user + ): """Returns True when business type is allowed and user has a business role.""" business = MagicMock(spec=Business) business.legal_type = "SP" - assert rules_service._has_specific_access(mock_user, business) is True + assert rules_service._has_specific_access(mock_user, business, ["SP", "BEN", "GP"]) is True class TestAreDigitalCredentialsAllowed: @@ -223,20 +227,20 @@ class TestAreDigitalCredentialsAllowed: def test_allowed(self, mock_specific, mock_general, rules_service, mock_user): """Returns True when both general and specific access are met.""" business = MagicMock(spec=Business) - assert rules_service.are_digital_credentials_allowed(mock_user, business) is True + assert rules_service.are_digital_credentials_allowed(mock_user, business, ["SP", "BEN", "GP"]) is True @patch.object(DigitalCredentialsRulesService, "_has_general_access", return_value=False) def test_no_general_access(self, mock_general, rules_service, mock_user): """Returns False when general access is not met.""" business = MagicMock(spec=Business) - assert rules_service.are_digital_credentials_allowed(mock_user, business) is False + assert rules_service.are_digital_credentials_allowed(mock_user, business, ["SP", "BEN", "GP"]) is False @patch.object(DigitalCredentialsRulesService, "_has_general_access", return_value=True) @patch.object(DigitalCredentialsRulesService, "_has_specific_access", return_value=False) def test_no_specific_access(self, mock_specific, mock_general, rules_service, mock_user): """Returns False when specific access is not met.""" business = MagicMock(spec=Business) - assert rules_service.are_digital_credentials_allowed(mock_user, business) is False + assert rules_service.are_digital_credentials_allowed(mock_user, business, ["SP", "BEN", "GP"]) is False class TestGetPreconditions: @@ -253,8 +257,10 @@ def test_returns_business_roles(self, mock_cp, rules_service, mock_user): """Returns business party roles when user has them.""" business = MagicMock(spec=Business) role = _make_party_role("director") - with patch.object(rules_service, "user_business_party_roles", return_value=[role]), \ - patch.object(rules_service, "user_filing_party_roles", return_value=[]): + with ( + patch.object(rules_service, "user_business_party_roles", return_value=[role]), + patch.object(rules_service, "user_filing_party_roles", return_value=[]), + ): result = rules_service.get_preconditions(mock_user, business) assert result == ["director"] @@ -264,8 +270,10 @@ def test_returns_combined_roles(self, mock_cp, rules_service, mock_user): business = MagicMock(spec=Business) biz_role = _make_party_role("proprietor") filing_role = _make_party_role("director") - with patch.object(rules_service, "user_business_party_roles", return_value=[biz_role]), \ - patch.object(rules_service, "user_filing_party_roles", return_value=[filing_role]): + with ( + patch.object(rules_service, "user_business_party_roles", return_value=[biz_role]), + patch.object(rules_service, "user_filing_party_roles", return_value=[filing_role]), + ): result = rules_service.get_preconditions(mock_user, business) assert "proprietor" in result assert "director" in result diff --git a/python/common/business-registry-digital-credentials/tests/unit/test_digital_credentials_service.py b/python/common/business-registry-digital-credentials/tests/unit/test_digital_credentials_service.py index 31530b67b2..0db99843b0 100644 --- a/python/common/business-registry-digital-credentials/tests/unit/test_digital_credentials_service.py +++ b/python/common/business-registry-digital-credentials/tests/unit/test_digital_credentials_service.py @@ -22,6 +22,7 @@ import jwt as pyjwt import pytest from business_model.models import DCDefinition, DCRevocationReason + from business_registry_digital_credentials.digital_credentials import DigitalCredentialsService @@ -201,7 +202,9 @@ def test_success(self, mock_post, app): service = DigitalCredentialsService() service.app = app service.api_url = "https://traction.test" - result = service.revoke_credential("conn-123", "cred-rev-1", "rev-reg-1", DCRevocationReason.UPDATED_INFORMATION) + result = service.revoke_credential( + "conn-123", "cred-rev-1", "rev-reg-1", DCRevocationReason.UPDATED_INFORMATION + ) assert result == {} @patch("business_registry_digital_credentials.digital_credentials.requests.post", side_effect=Exception("fail")) @@ -211,5 +214,7 @@ def test_returns_none_on_error(self, mock_post, app): service = DigitalCredentialsService() service.app = app service.api_url = "https://traction.test" - result = service.revoke_credential("conn-123", "cred-rev-1", "rev-reg-1", DCRevocationReason.UPDATED_INFORMATION) + result = service.revoke_credential( + "conn-123", "cred-rev-1", "rev-reg-1", DCRevocationReason.UPDATED_INFORMATION + ) assert result is None