diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 89eae262..b73007a2 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -42,7 +42,9 @@ repos: # false-positives on the project's own code style (`shell=dict(...)` # in argument_spec triggers B604, the literal `'on_create'` sentinel # triggers B105). Out of scope for in-tree review. - exclude: '^plugins/modules/ipa.*\.py$' + # `tests/` holds unit tests whose fixtures use throwaway passwords + # (B105/B106); scanning test fixtures for hardcoded secrets is noise. + exclude: '^(plugins/modules/ipa.*|tests/.*)\.py$' types_or: ['python'] - repo: 'https://github.com/jendrikseipp/vulture' diff --git a/CHANGELOG.md b/CHANGELOG.md index afa009f1..5539baae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +* **plugin:bitwarden_item**: Fixed the lookup's documentation so `ansible-doc` renders it again. * **plugin:combine_lod**: The `combine_lod` filter now reports an error when an item is missing part of a composite `unique_key` (a list of keys), instead of silently grouping such items together. Inventories with incomplete composite keys that previously merged by accident now fail loudly and must be corrected. Also fixed its documentation so `ansible-doc` renders it again. * **role:kernel_settings**: The `systemd_cpu_affinity` setting is now actually applied. The value was computed and shown in the debug output but never passed to the underlying system role, so a configured CPU affinity had no effect. diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e396f980..eb48a41b 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -758,6 +758,19 @@ Some files under `plugins/modules/` are not authored by Linuxfabrik but vendored In-house plugins live under `plugins/` following the standard Ansible collection layout: `filter/`, `lookup/`, `modules/` and `module_utils/`. The `## Tasks` rules above (FQCN, meta modules, idempotency) are about role tasks; the points below are specific to writing the plugins themselves. +* Every in-house plugin starts with the standard file header, followed by `from __future__ import absolute_import, division, print_function` and `__metaclass__ = type`: + + ```python + #!/usr/bin/env python3 + # -*- coding: utf-8; py-indent-offset: 4 -*- + # + # Author: Linuxfabrik GmbH, Zurich, Switzerland + # Contact: info (at) linuxfabrik (dot) ch + # https://www.linuxfabrik.ch/ + # License: The Unlicense, see LICENSE file. + ``` + +* Use single quotes and f-strings consistently (vendored plugins keep their upstream style, see below). * Every plugin carries `DOCUMENTATION` (and `RETURN` / `EXAMPLES` where applicable). Keep it valid YAML: in a `description` list, a bullet containing a colon followed by a space is parsed as a mapping and makes `ansible-doc` fail, so rephrase or quote such bullets. Verify with `ansible-doc -t linuxfabrik.lfops.`. * Set `version_added` to the LFOps release the plugin first shipped in, and never change it afterwards. * `module_utils` holds code shared between plugins. Do not import the external Linuxfabrik Python Libraries (`lib`) into a plugin; copy what you need and note the origin in a comment. @@ -767,7 +780,7 @@ In-house plugins live under `plugins/` following the standard Ansible collection Unit tests are **mandatory** for every in-house plugin. Any pull request that adds or changes a plugin must add or update its test, and `git grep` should never find a plugin without one. -* **Where**: under `tests/unit/`, mirroring the plugin tree, named `test_.py` (e.g. `tests/unit/plugins/filter/test_combine_lod.py`). Load the plugin by path (the plugins are not an importable package) and assert behavior, not implementation details. +* **Where**: under `tests/unit/`, mirroring the plugin tree, named `test_.py` (e.g. `tests/unit/plugins/filter/test_combine_lod.py`). A plugin with no collection-qualified imports (e.g. the `combine_lod` filter) can be loaded by file path. A plugin that imports `ansible_collections.linuxfabrik.lfops...` (modules, or lookups pulling in a module_util) is imported through that path; `tests/conftest.py` makes this checkout importable as the collection so the imports resolve under plain pytest/tox. Same-named test files in different plugin-type directories are fine (`--import-mode=importlib`). Assert behavior, not implementation details. * **Two tiers**, because plugins run in different environments: * Controller plugins (`plugins/filter/`, `plugins/lookup/`) are evaluated on the Ansible controller and only ever see the controller's Python (>= 3.10). They run on the standard CI matrix. diff --git a/plugins/lookup/bitwarden_item.py b/plugins/lookup/bitwarden_item.py index d197ee39..b23c539d 100644 --- a/plugins/lookup/bitwarden_item.py +++ b/plugins/lookup/bitwarden_item.py @@ -1,8 +1,10 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -# Copyright: (c) 2022, Linuxfabrik GmbH, Zurich, Switzerland, https://www.linuxfabrik.ch -# The Unlicense (see LICENSE or https://unlicense.org/) +#!/usr/bin/env python3 +# -*- coding: utf-8; py-indent-offset: 4 -*- +# +# Author: Linuxfabrik GmbH, Zurich, Switzerland +# Contact: info (at) linuxfabrik (dot) ch +# https://www.linuxfabrik.ch/ +# License: The Unlicense, see LICENSE file. from __future__ import absolute_import, division, print_function @@ -15,7 +17,7 @@ description: - Looks up a password item in Bitwarden by name (and optional username, folder, collection and organization), or directly by Bitwarden item ID. - - If the lookup is by name and no matching item is found, a new login item is created on the fly. This makes the plugin idempotent for automation: the first run creates the secret, every subsequent run returns the same item. + - If the lookup is by name and no matching item is found, a new login item is created on the fly. This makes the plugin idempotent for automation. The first run creates the secret, every subsequent run returns the same item. - If the lookup is by I(id) and the item is not in the local cache, the plugin falls back to a single API call. If the ID still does not resolve, the plugin fails - IDs are never auto-created. - If a name-based search returns more than one match, the plugin fails because it cannot decide which item to use. - On success, the plugin returns the full Bitwarden item object. C(username) and C(password) are additionally lifted to the top level so they can be addressed without going through the C(login) sub-dictionary. @@ -284,7 +286,7 @@ from ansible_collections.linuxfabrik.lfops.plugins.module_utils.bitwarden import \ Bitwarden -display = Display() # lfbwlp = Linuxfabrik Bitwarden Lookup Plugin +display = Display() # log prefix "lfbwlp" = Linuxfabrik Bitwarden Lookup Plugin # https://docs.ansible.com/ansible/latest/dev_guide/developing_plugins.html#developing-lookup-plugins # inspired by the lookup plugins lastpass (same topic) and redis (more modern) @@ -302,7 +304,7 @@ def run(self, terms, variables=None, **kwargs): ret = [] for term in terms: - display.vvv('lfbwlp - run - lookup term: {}'.format(term)) + display.vvv(f'lfbwlp - run - lookup term: {term}') try: collection_id = term.get('collection_id', None) folder_id = term.get('folder_id', None) @@ -317,11 +319,11 @@ def run(self, terms, variables=None, **kwargs): uris = term.get('uris', []) username = term.get('username', None) except Exception as e: - raise AnsibleError('Encountered exception while fetching {}: {}'.format(term, e)) + raise AnsibleError(f'Encountered exception while fetching {term}: {e}') if id_: result = bw.get_item_by_id(id_) - display.vvv('lfbwlp - run - get item by id: {}'.format(id_)) + display.vvv(f'lfbwlp - run - get item by id: {id_}') if result: # move username and password higher for easier access result['username'] = result['login']['username'] @@ -330,10 +332,10 @@ def run(self, terms, variables=None, **kwargs): continue # done here, go to next term else: # item not found by ID. if there is an ID given we expect it to exist - raise AnsibleError('Item with id {} not found.'.format(id_)) + raise AnsibleError(f'Item with id {id_} not found.') name = Bitwarden.get_pretty_name(name, hostname, purpose) - display.vvv('lfbwlp - run - get item: {}'.format(name)) + display.vvv(f'lfbwlp - run - get item: {name}') result = bw.get_items(name, username, folder_id, collection_id, organization_id) if len(result) > 1: @@ -380,30 +382,5 @@ def run(self, terms, variables=None, **kwargs): out['password'] = out['login']['password'] ret.append(out) - # always returns a list of dicts - # - # example: - # - # - collectionIds: - # - 47b22450-fb65-4ad2-836a-03f25c982fb1 - # favorite: false - # folderId: null - # id: 2656edf2-3600-4d8d-88e8-bcdda35d1ccf - # login: - # password: d2Dft5FqGK4yhzmsDcjWJD5LMAPGDsN8oZpXsxx6 - # passwordRevisionDate: null - # totp: null - # uris: - # - match: null - # uri: https://www.example.com - # - match: null - # uri: https://git.example.com - # username: mariadb-admin - # name: app4711 - MariaDB - # notes: Automatically generated by Ansible. - # object: item - # organizationId: 5ae8f510-1f84-4243-8c35-bec35091706c - # reprompt: 0 - # revisionDate: '2019-01-28T15:31:34.300Z' - ## type: 1 + # always returns a list of dicts; see the RETURN block above for the shape return ret diff --git a/plugins/module_utils/bitwarden.py b/plugins/module_utils/bitwarden.py index bc96de43..16a28f39 100644 --- a/plugins/module_utils/bitwarden.py +++ b/plugins/module_utils/bitwarden.py @@ -1,12 +1,14 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -# Copyright: (c) 2022, Linuxfabrik, Zurich, Switzerland, https://www.linuxfabrik.ch -# The Unlicense (see LICENSE or https://unlicense.org/) +#!/usr/bin/env python3 +# -*- coding: utf-8; py-indent-offset: 4 -*- +# +# Author: Linuxfabrik GmbH, Zurich, Switzerland +# Contact: info (at) linuxfabrik (dot) ch +# https://www.linuxfabrik.ch/ +# License: The Unlicense, see LICENSE file. from __future__ import absolute_import, division, print_function -# This module requires Python 3.8+ (secrets, f-strings with =, os.replace, json.JSONDecodeError). This should be fine since it will always run on localhost and the Ansible Controller has to be Python 3.9+ anyway +__metaclass__ = type import copy import email.encoders @@ -24,11 +26,10 @@ from urllib.error import HTTPError, URLError from ansible.module_utils.common.collections import Mapping +from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text from ansible.module_utils.six import string_types +from ansible.module_utils.urls import ConnectionError, SSLValidationError, open_url -from ansible.module_utils._text import to_bytes, to_native, to_text -from ansible.module_utils.urls import (ConnectionError, SSLValidationError, - open_url) try: from ansible.utils.display import Display display = Display() @@ -42,8 +43,9 @@ def vvv(self, msg, **kwargs): def prepare_multipart_no_base64(fields): - """Taken from ansible.module_utils.urls, but adjusted to not no encoding as the bitwarden API does not work with that - (even though it should according to the RFC, Content-Transfer-Encoding is deprecated but not removed). + """Taken from ansible.module_utils.urls, but adjusted to not encode the payload, as the + Bitwarden API does not work with that (even though it should according to the RFC, + Content-Transfer-Encoding is deprecated but not removed). See https://github.com/ansible/ansible/issues/73621 Takes a mapping, and prepares a multipart/form-data body @@ -74,7 +76,7 @@ def prepare_multipart_no_base64(fields): if not isinstance(fields, Mapping): raise TypeError( - 'Mapping is required, cannot be type %s' % fields.__class__.__name__ + f'Mapping is required, cannot be type {fields.__class__.__name__}' ) m = email.mime.multipart.MIMEMultipart('form-data') @@ -99,14 +101,14 @@ def prepare_multipart_no_base64(fields): main_type, sep, sub_type = mime.partition('/') else: raise TypeError( - 'value must be a string, or mapping, cannot be type %s' % value.__class__.__name__ + f'value must be a string, or mapping, cannot be type {value.__class__.__name__}' ) if not content and filename: with open(to_bytes(filename, errors='surrogate_or_strict'), 'rb') as f: part = email.mime.application.MIMEApplication(f.read(), _encoder=email.encoders.encode_noop) del part['Content-Type'] - part.add_header('Content-Type', '%s/%s' % (main_type, sub_type)) + part.add_header('Content-Type', f'{main_type}/{sub_type}') else: part = email.mime.nonmultipart.MIMENonMultipart(main_type, sub_type) part.set_payload(to_bytes(content)) @@ -143,7 +145,7 @@ def prepare_multipart_no_base64(fields): ) -CACHE_DIR = os.environ.get('XDG_RUNTIME_DIR', '/tmp') +CACHE_DIR = os.environ.get('XDG_RUNTIME_DIR', '/tmp') # nosec B108 - cache files are created with mkstemp (mode 0600) and atomically replaced CACHE_FILE = os.path.join(CACHE_DIR, 'lfops_bitwarden_cache.json') CACHE_VERSION = 2026032701 @@ -152,16 +154,16 @@ class BitwardenException(Exception): pass -class Bitwarden(object): +class Bitwarden: # https://bitwarden.com/help/vault-management-api def __init__(self, hostname='127.0.0.1', port=8087): - self._base_url = 'http://%s:%s' % (hostname, port) + self._base_url = f'http://{hostname}:{port}' self._cache = None self._load_cache() def _api_call(self, url_path, method='GET', body=None, body_format='json'): - url = '%s/%s' % (self._base_url, url_path) + url = f'{self._base_url}/{url_path}' headers = {} if body: @@ -172,7 +174,7 @@ def _api_call(self, url_path, method='GET', body=None, body_format='json'): try: content_type, body = prepare_multipart_no_base64(body) except (TypeError, ValueError) as e: - raise BitwardenException('failed to parse body as form-multipart: %s' % to_native(e)) + raise BitwardenException(f'failed to parse body as form-multipart: {to_native(e)}') headers['Content-Type'] = content_type # mostly taken from ansible.builtin.url lookup plugin @@ -180,21 +182,21 @@ def _api_call(self, url_path, method='GET', body=None, body_format='json'): # increased the timeout since listing all items via `list/object/items` takes forever (13s for ~2500 items) response = open_url(url, method=method, data=body, headers=headers, timeout=60) except HTTPError as e: - raise BitwardenException("Received HTTP error for %s : %s" % (url, to_native(e))) + raise BitwardenException(f'Received HTTP error for {url} : {to_native(e)}') except URLError as e: - raise BitwardenException("Failed lookup url for %s : %s" % (url, to_native(e))) + raise BitwardenException(f'Failed lookup url for {url} : {to_native(e)}') except SSLValidationError as e: - raise BitwardenException("Error validating the server's certificate for %s: %s" % (url, to_native(e))) + raise BitwardenException(f"Error validating the server's certificate for {url}: {to_native(e)}") except ConnectionError as e: - raise BitwardenException("Error connecting to %s: %s" % (url, to_native(e))) + raise BitwardenException(f'Error connecting to {url}: {to_native(e)}') try: result = json.loads(to_text(response.read())) except json.decoder.JSONDecodeError as e: - raise BitwardenException('Unable to load JSON: %s' % (to_native(e))) + raise BitwardenException(f'Unable to load JSON: {to_native(e)}') if not result.get('success'): - raise BitwardenException('API call failed: %s' % (result.get('data'))) + raise BitwardenException(f"API call failed: {result.get('data')}") return result @@ -209,7 +211,7 @@ def _load_cache(self): if data.get('version') == CACHE_VERSION: self._cache = data item_count = len(self._cache['items']) if self._cache['items'] is not None else 0 - display.vvv('lfbw - cache loaded from %s (%d items)' % (CACHE_FILE, item_count)) + display.vvv(f'lfbw - cache loaded from {CACHE_FILE} ({item_count} items)') return except (IOError, OSError, ValueError, json.decoder.JSONDecodeError): pass @@ -234,12 +236,12 @@ def _save_cache(self): with os.fdopen(fd, 'w') as f: json.dump(self._cache, f) os.replace(tmp_path, CACHE_FILE) - display.vvv('lfbw - cache saved to %s' % (CACHE_FILE)) + display.vvv(f'lfbw - cache saved to {CACHE_FILE}') except Exception: os.unlink(tmp_path) raise except (IOError, OSError): - display.vvv('lfbw - failed to save cache to %s' % (CACHE_FILE)) + display.vvv(f'lfbw - failed to save cache to {CACHE_FILE}') def _get_template(self, template_name): @@ -247,12 +249,12 @@ def _get_template(self, template_name): Templates are static API schema definitions that never change. """ if template_name not in self._cache['templates']: - display.vvv('lfbw - fetching template "%s" from API' % (template_name)) - result = self._api_call('object/template/%s' % (template_name)) + display.vvv(f'lfbw - fetching template "{template_name}" from API') + result = self._api_call(f'object/template/{template_name}') self._cache['templates'][template_name] = result['data']['template'] self._save_cache() else: - display.vvv('lfbw - using cached template "%s"' % (template_name)) + display.vvv(f'lfbw - using cached template "{template_name}"') return copy.deepcopy(self._cache['templates'][template_name]) @@ -271,12 +273,12 @@ def sync(self, force=False, interval=60): if not force and time.time() - self._cache.get('sync_timestamp', 0) < interval: display.vvv('lfbw - sync skipped, last sync was recent enough') return - display.vvv('lfbw - syncing vault (force=%s)' % (force)) + display.vvv(f'lfbw - syncing vault (force={force})') self._api_call('sync', method='POST') result = self._api_call('list/object/items') self._cache['items'] = result['data']['data'] self._cache['sync_timestamp'] = time.time() - display.vvv('lfbw - sync complete, cached %d items' % (len(self._cache['items']))) + display.vvv(f"lfbw - sync complete, cached {len(self._cache['items'])} items") self._save_cache() @@ -324,7 +326,7 @@ def get_items(self, name, username=None, folder_id=None, collection_id=None, org if isinstance(organization_id, str) and len(organization_id.strip()) == 0: organization_id = None - display.vvv('lfbw - searching cache for name="%s", username="%s"' % (name, username)) + display.vvv(f'lfbw - searching cache for name="{name}", username="{username}"') matching_items = [] for item in self._cache['items']: if item.get('type') != 1: @@ -341,25 +343,27 @@ def get_items(self, name, username=None, folder_id=None, collection_id=None, org and (item.get('organizationId') == organization_id): matching_items.append(item) - display.vvv('lfbw - found %d matching item(s)' % (len(matching_items))) + display.vvv(f'lfbw - found {len(matching_items)} matching item(s)') return matching_items def get_item_by_id(self, item_id): - """Get an item by ID from Bitwarden. Returns the item or None. Throws an exception if the id leads to unambiguous results. + """Get an item by ID from Bitwarden. Looks in the cache first, then falls back to the + API (the item may have been created externally). Returns the item; raises + BitwardenException if the API does not know the ID. """ - display.vvv('lfbw - looking up item by id=%s' % (item_id)) + display.vvv(f'lfbw - looking up item by id={item_id}') for item in self._cache['items']: if item.get('id') == item_id: display.vvv('lfbw - found item in cache') return item # fallback to API if not found in cache (item could have been created externally) display.vvv('lfbw - item not in cache, falling back to API') - result = self._api_call('object/item/%s' % (item_id)) + result = self._api_call(f'object/item/{item_id}') return result['data'] - def generate(self, password_length=60, password_choice='0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'): + def generate(self, password_length=60, password_choice='0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'): # nosec B107 - this is the character set to draw from, not a password """Generates a random password of a given length. If you want to generate a hex-based password, ensure that password_length is positive and even (as hex characters typically come in pairs representing bytes), and that password_choice is set to '0123456789abcdef'. @@ -452,7 +456,7 @@ def get_template_item(self, name, login=None, notes=None, organization_id=None, def create_item(self, item): """Creates an item object in Bitwarden. """ - display.vvv('lfbw - creating item "%s"' % (item.get('name', ''))) + display.vvv(f'lfbw - creating item "{item.get("name", "")}"') result = self._api_call('object/item', method='POST', body=item) self._cache['items'].append(result['data']) self._save_cache() @@ -463,8 +467,8 @@ def create_item(self, item): def edit_item(self, item, item_id): """Edits an item object in Bitwarden. """ - display.vvv('lfbw - editing item %s' % (item_id)) - result = self._api_call('object/item/%s' % (item_id), method='PUT', body=item) + display.vvv(f'lfbw - editing item {item_id}') + result = self._api_call(f'object/item/{item_id}', method='PUT', body=item) for i, cached_item in enumerate(self._cache['items']): if cached_item.get('id') == item_id: self._cache['items'][i] = result['data'] @@ -477,14 +481,14 @@ def edit_item(self, item, item_id): def add_attachment(self, item_id, attachment_path): """Adds the file at `attachment_path` to the item specified by `item_id` """ - display.vvv('lfbw - adding attachment "%s" to item %s' % (attachment_path, item_id)) + display.vvv(f'lfbw - adding attachment "{attachment_path}" to item {item_id}') body = { 'file': { 'filename': attachment_path, }, } - result = self._api_call('attachment?itemId=%s' % (item_id), method='POST', body=body, body_format='form-multipart') + result = self._api_call(f'attachment?itemId={item_id}', method='POST', body=body, body_format='form-multipart') for i, cached_item in enumerate(self._cache['items']): if cached_item.get('id') == item_id: self._cache['items'][i] = result['data'] @@ -503,6 +507,6 @@ def get_pretty_name(name, hostname=None, purpose=None): if not name: name = hostname if purpose: - name += ' - {}'.format(purpose) + name += f' - {purpose}' return name diff --git a/plugins/modules/bitwarden_item.py b/plugins/modules/bitwarden_item.py index 44b069e4..36e6ec13 100644 --- a/plugins/modules/bitwarden_item.py +++ b/plugins/modules/bitwarden_item.py @@ -1,8 +1,10 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -# Copyright: (c) 2022, Linuxfabrik GmbH, Zurich, Switzerland, https://www.linuxfabrik.ch -# The Unlicense (see LICENSE or https://unlicense.org/) +#!/usr/bin/env python3 +# -*- coding: utf-8; py-indent-offset: 4 -*- +# +# Author: Linuxfabrik GmbH, Zurich, Switzerland +# Contact: info (at) linuxfabrik (dot) ch +# https://www.linuxfabrik.ch/ +# License: The Unlicense, see LICENSE file. from __future__ import absolute_import, division, print_function @@ -261,9 +263,9 @@ def diff_and_update(current, target): - '''Diffs the current item with the target item and checks if there are relevant changes. - Returns (changed, updated_item). The updated_item can be send to `bw` to update the remote item. - ''' + """Diffs the current item with the target item and checks if there are relevant changes. + Returns (changed, updated_item). The updated_item can be sent to `bw` to update the remote item. + """ def check_dict_for_changes(current, target): changed = False @@ -273,7 +275,7 @@ def check_dict_for_changes(current, target): changed = True elif (value != current.get(key)) \ - and not (not value and not current.get(key)): # compare None to emtpy lists and empty strings + and not (not value and not current.get(key)): # compare None to empty lists and empty strings changed = True return changed @@ -316,11 +318,11 @@ def run_module(): if attachments: basenames = [os.path.basename(attachment) for attachment in attachments] if len(set(basenames)) < len(basenames): - module.fail_json('This module cannot handle multiple attachments with the same basename.') + module.fail_json(msg='This module cannot handle multiple attachments with the same basename.') for attachment in attachments: if not os.access(attachment, os.R_OK): - module.fail_json('Could not read the attachments at "{}".'.format(attachment)) + module.fail_json(msg=f'Could not read the attachments at "{attachment}".') # extract the variables to make the code more readable collection_id = module.params['collection_id'] @@ -338,7 +340,7 @@ def run_module(): bw = Bitwarden() if not bw.is_unlocked: - module.fail_json('Not logged into Bitwarden, or Bitwarden Vault is locked. Please run `bw login` and `bw unlock` first.') + module.fail_json(msg='Not logged into Bitwarden, or Bitwarden Vault is locked. Please run `bw login` and `bw unlock` first.') # to be sure we are up to date bw.sync() @@ -353,10 +355,7 @@ def run_module(): if len(current_items) > 1: module.fail_json(msg='Found multiple Bitwarden items with the same name/title and username, cannot decide which one to use. Aborting.') - try: - current_item = current_items[0] - except IndexError: - current_item = None + current_item = current_items[0] if current_items else None login_uris = bw.get_template_item_login_uri(uris) login = bw.get_template_item_login(username, password, login_uris) diff --git a/pyproject.toml b/pyproject.toml index cc86c240..1035d543 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,3 +19,7 @@ min_confidence = 80 # Unit tests for the in-house plugins. The matrix of Python / ansible-core # versions is driven by tox; see tox.ini. testpaths = ["tests/unit"] +# importlib mode lets same-named test files live in different plugin-type +# directories (e.g. modules/ and lookup/ both have test_bitwarden_item.py) +# without needing __init__.py packages. +addopts = ["--import-mode=importlib"] diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..1e4b9f4c --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8; py-indent-offset: 4 -*- +# +# Author: Linuxfabrik GmbH, Zurich, Switzerland +# Contact: info (at) linuxfabrik (dot) ch +# https://www.linuxfabrik.ch/ +# License: The Unlicense, see LICENSE file. + +"""Make this checkout importable as `ansible_collections.linuxfabrik.lfops`. + +Modules and lookups import their shared code via the collection-qualified +path, e.g. + + from ansible_collections.linuxfabrik.lfops.plugins.module_utils.bitwarden import Bitwarden + +For that to resolve under a plain pytest/tox run (without installing the +collection), build a temporary `ansible_collections/linuxfabrik/lfops` +tree that symlinks back to the repo and put it on sys.path. Filter and +lookup tests that load a plugin by file path do not need this, but it is +harmless for them. +""" + +import os +import pathlib +import sys +import tempfile + +_REPO_ROOT = pathlib.Path(__file__).resolve().parent.parent + + +def _make_collection_importable(): + root = pathlib.Path(tempfile.mkdtemp(prefix='lfops_ansible_collections_')) + link_parent = root / 'ansible_collections' / 'linuxfabrik' + link_parent.mkdir(parents=True, exist_ok=True) + link = link_parent / 'lfops' + if not link.exists(): + link.symlink_to(_REPO_ROOT, target_is_directory=True) + sys.path.insert(0, str(root)) + os.environ.setdefault('ANSIBLE_COLLECTIONS_PATH', str(root)) + + +_make_collection_importable() diff --git a/tests/unit/plugins/lookup/test_bitwarden_item.py b/tests/unit/plugins/lookup/test_bitwarden_item.py new file mode 100644 index 00000000..3f913e6a --- /dev/null +++ b/tests/unit/plugins/lookup/test_bitwarden_item.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8; py-indent-offset: 4 -*- +# +# Author: Linuxfabrik GmbH, Zurich, Switzerland +# Contact: info (at) linuxfabrik (dot) ch +# https://www.linuxfabrik.ch/ +# License: The Unlicense, see LICENSE file. + +"""Unit tests for the bitwarden_item lookup plugin. + +The lookup runs on the controller. All Bitwarden I/O goes through the +Bitwarden client, which is replaced with a fake here, so no server or +cache is touched. The collection import is wired up by tests/conftest.py. +""" + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +import unittest + +from ansible.errors import AnsibleError +from ansible_collections.linuxfabrik.lfops.plugins.lookup import bitwarden_item as lookup_mod + + +class _FakeBitwarden: + """Minimal stand-in for the Bitwarden client used by the lookup.""" + + items_by_search = [] + item_by_id = None + + def __init__(self, *args, **kwargs): + pass + + @property + def is_unlocked(self): + return True + + def sync(self, *args, **kwargs): + pass + + def get_items(self, name, username=None, folder_id=None, collection_id=None, organization_id=None): + return list(type(self).items_by_search) + + def get_item_by_id(self, item_id): + return type(self).item_by_id + + @staticmethod + def get_pretty_name(name, hostname=None, purpose=None): + return name or hostname + + +class _BitwardenLookupTestCase(unittest.TestCase): + + def setUp(self): + self._orig = lookup_mod.Bitwarden + lookup_mod.Bitwarden = _FakeBitwarden + _FakeBitwarden.items_by_search = [] + _FakeBitwarden.item_by_id = None + self.lookup = lookup_mod.LookupModule(loader=None, templar=None) + + def tearDown(self): + lookup_mod.Bitwarden = self._orig + + +class TestRun(_BitwardenLookupTestCase): + + def test_existing_single_item_lifts_credentials(self): + _FakeBitwarden.items_by_search = [ + {'name': 'host - db', 'login': {'username': 'dba', 'password': 'linuxfabrik'}}, + ] + result = self.lookup.run([{'name': 'host - db', 'username': 'dba'}]) + self.assertEqual(len(result), 1) + self.assertEqual(result[0]['username'], 'dba') + self.assertEqual(result[0]['password'], 'linuxfabrik') + + def test_multiple_matches_raise(self): + _FakeBitwarden.items_by_search = [ + {'name': 'host - db', 'login': {'username': 'dba', 'password': 'linuxfabrik'}}, + {'name': 'host - db', 'login': {'username': 'dba', 'password': 'linuxfabrik'}}, + ] + with self.assertRaises(AnsibleError): + self.lookup.run([{'name': 'host - db', 'username': 'dba'}]) + + def test_lookup_by_id_lifts_credentials(self): + _FakeBitwarden.item_by_id = { + 'id': 'abc', 'login': {'username': 'dba', 'password': 'linuxfabrik'}, + } + result = self.lookup.run([{'id': 'abc'}]) + self.assertEqual(len(result), 1) + self.assertEqual(result[0]['username'], 'dba') + self.assertEqual(result[0]['password'], 'linuxfabrik') + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/unit/plugins/module_utils/test_bitwarden.py b/tests/unit/plugins/module_utils/test_bitwarden.py new file mode 100644 index 00000000..97367368 --- /dev/null +++ b/tests/unit/plugins/module_utils/test_bitwarden.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8; py-indent-offset: 4 -*- +# +# Author: Linuxfabrik GmbH, Zurich, Switzerland +# Contact: info (at) linuxfabrik (dot) ch +# https://www.linuxfabrik.ch/ +# License: The Unlicense, see LICENSE file. + +"""Unit tests for the bitwarden module_util. + +The util runs on the Ansible controller (it is imported by the +bitwarden_item lookup) and, via AnsiballZ, on the managed node for the +bitwarden_item module. All network access funnels through +ansible.module_utils.urls.open_url, which the tests mock; no real +Bitwarden server or cache file is touched. +""" + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +import importlib.util +import io +import json +import os +import unittest +from urllib.error import HTTPError + +_MODULE_PATH = os.path.join( + os.path.dirname(__file__), + '..', '..', '..', '..', + 'plugins', 'module_utils', 'bitwarden.py', +) +_spec = importlib.util.spec_from_file_location('bitwarden', _MODULE_PATH) +bitwarden = importlib.util.module_from_spec(_spec) +_spec.loader.exec_module(bitwarden) + + +class _FakeResponse: + def __init__(self, payload): + self._payload = payload + + def read(self): + return json.dumps(self._payload).encode('utf-8') + + +def _make_bitwarden(tmp_path='/nonexistent/lfops_bw_test_cache.json'): + """Instantiate Bitwarden without touching a real cache file. + + Pointing CACHE_FILE at a missing path makes _load_cache start fresh, + and the constructor performs no network I/O. + """ + bitwarden.CACHE_FILE = tmp_path + return bitwarden.Bitwarden() + + +class TestGenerate(unittest.TestCase): + + def test_length_and_charset(self): + bw = _make_bitwarden() + result = bw.generate(password_length=32, password_choice='abc') + self.assertEqual(len(result), 32) + self.assertTrue(set(result).issubset(set('abc'))) + + def test_rejects_non_positive_length(self): + bw = _make_bitwarden() + with self.assertRaises(ValueError): + bw.generate(password_length=0) + + def test_hex_requires_even_length(self): + bw = _make_bitwarden() + with self.assertRaises(ValueError): + bw.generate(password_length=3, password_choice='0123456789abcdef') + # even length is fine + self.assertEqual(len(bw.generate(password_length=4, password_choice='0123456789abcdef')), 4) + + +class TestGetPrettyName(unittest.TestCase): + + def test_explicit_name_wins(self): + self.assertEqual(bitwarden.Bitwarden.get_pretty_name('myname', 'host', 'purpose'), 'myname') + + def test_hostname_only(self): + self.assertEqual(bitwarden.Bitwarden.get_pretty_name('', hostname='app4711'), 'app4711') + + def test_hostname_and_purpose(self): + self.assertEqual( + bitwarden.Bitwarden.get_pretty_name('', hostname='app4711', purpose='MariaDB'), + 'app4711 - MariaDB', + ) + + +class TestApiCall(unittest.TestCase): + + def setUp(self): + self.bw = _make_bitwarden() + self._orig_open_url = bitwarden.open_url + + def tearDown(self): + bitwarden.open_url = self._orig_open_url + + def test_success_returns_result(self): + bitwarden.open_url = lambda *a, **k: _FakeResponse({'success': True, 'data': {'x': 1}}) + result = self.bw._api_call('status') + self.assertEqual(result, {'success': True, 'data': {'x': 1}}) + + def test_unsuccessful_payload_raises(self): + bitwarden.open_url = lambda *a, **k: _FakeResponse({'success': False, 'data': 'nope'}) + with self.assertRaises(bitwarden.BitwardenException): + self.bw._api_call('status') + + def test_http_error_raises_bitwarden_exception(self): + def _raise(*a, **k): + raise HTTPError('http://127.0.0.1:8087/status', 500, 'err', {}, io.BytesIO(b'')) + bitwarden.open_url = _raise + with self.assertRaises(bitwarden.BitwardenException): + self.bw._api_call('status') + + def test_invalid_json_raises_bitwarden_exception(self): + class _BadResponse: + def read(self): + return b'not json' + bitwarden.open_url = lambda *a, **k: _BadResponse() + with self.assertRaises(bitwarden.BitwardenException): + self.bw._api_call('status') + + +class TestGetItems(unittest.TestCase): + + def setUp(self): + self.bw = _make_bitwarden() + # seed the in-memory cache directly; get_items only reads it + self.bw._cache = { + 'items': [ + {'type': 1, 'name': 'host - db', 'login': {'username': 'dba'}, + 'folderId': None, 'collectionIds': [], 'organizationId': None}, + {'type': 2, 'name': 'host - db', 'login': {'username': 'dba'}}, # non-login, skipped + {'type': 1, 'name': 'other', 'login': {'username': 'dba'}, + 'folderId': None, 'collectionIds': [], 'organizationId': None}, + ], + } + + def test_matches_login_item_by_name_and_username(self): + matches = self.bw.get_items('host - db', username='dba') + self.assertEqual(len(matches), 1) + self.assertEqual(matches[0]['name'], 'host - db') + + def test_skips_non_login_items(self): + # the type=2 entry shares name+username but must not match + matches = self.bw.get_items('host - db', username='dba') + self.assertTrue(all(item.get('type') == 1 for item in matches)) + + def test_no_match_returns_empty(self): + self.assertEqual(self.bw.get_items('does-not-exist', username='dba'), []) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/unit/plugins/modules/test_bitwarden_item.py b/tests/unit/plugins/modules/test_bitwarden_item.py new file mode 100644 index 00000000..3fbba453 --- /dev/null +++ b/tests/unit/plugins/modules/test_bitwarden_item.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8; py-indent-offset: 4 -*- +# +# Author: Linuxfabrik GmbH, Zurich, Switzerland +# Contact: info (at) linuxfabrik (dot) ch +# https://www.linuxfabrik.ch/ +# License: The Unlicense, see LICENSE file. + +"""Unit tests for the bitwarden_item module's pure diff helper. + +The module itself runs via AnsiballZ, but `diff_and_update` is a plain +function and is tested in isolation. The collection import is wired up by +tests/conftest.py. +""" + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +import unittest + +from ansible_collections.linuxfabrik.lfops.plugins.modules.bitwarden_item import diff_and_update + + +class TestDiffAndUpdate(unittest.TestCase): + + def test_takes_over_id(self): + current = {'id': 'abc', 'name': 'x'} + target = {'name': 'x'} + changed, updated = diff_and_update(current, target) + self.assertEqual(updated['id'], 'abc') + + def test_no_change_when_equal(self): + current = {'id': 'abc', 'name': 'x', 'notes': 'n'} + target = {'name': 'x', 'notes': 'n'} + changed, _ = diff_and_update(current, target) + self.assertFalse(changed) + + def test_change_detected_on_differing_value(self): + current = {'id': 'abc', 'name': 'x', 'notes': 'old'} + target = {'name': 'x', 'notes': 'new'} + changed, _ = diff_and_update(current, target) + self.assertTrue(changed) + + def test_falsy_vs_falsy_is_not_a_change(self): + # None vs empty list / empty string must not count as a change + current = {'id': 'abc', 'collectionIds': None, 'notes': ''} + target = {'collectionIds': [], 'notes': None} + changed, _ = diff_and_update(current, target) + self.assertFalse(changed) + + def test_nested_dict_change_detected(self): + current = {'id': 'abc', 'login': {'username': 'old', 'totp': ''}} + target = {'login': {'username': 'new', 'totp': ''}} + changed, _ = diff_and_update(current, target) + self.assertTrue(changed) + + def test_nested_dict_no_change(self): + current = {'id': 'abc', 'login': {'username': 'same', 'totp': ''}} + target = {'login': {'username': 'same', 'totp': ''}} + changed, _ = diff_and_update(current, target) + self.assertFalse(changed) + + +if __name__ == '__main__': + unittest.main()