Skip to content

Commit

Permalink
feat: add file based backend as a fallback
Browse files Browse the repository at this point in the history
Useful when a keyring is not available, such as when running on a headless
system.

Signed-off-by: Sergio Schvezov <sergio.schvezov@canonical.com>
  • Loading branch information
sergiusens committed Nov 22, 2023
1 parent 93eb2e8 commit 424d10b
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 3 deletions.
90 changes: 88 additions & 2 deletions craft_store/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,34 @@

import base64
import binascii
import json
import logging
import os
from typing import Dict, Optional, Tuple, Union
import sys
from pathlib import Path
from typing import Dict, Optional, Tuple, Union, cast

import keyring
import keyring.backend
import keyring.backends.fail
import keyring.errors
from keyring._compat import properties
from keyring.backends import SecretService
from xdg import BaseDirectory # type: ignore[import]

from . import errors

logger = logging.getLogger(__name__)


if sys.platform == "linux":
from secretstorage.exceptions import SecretServiceNotAvailableException

KEYRING_EXCEPTIONS = (keyring.errors.InitError, SecretServiceNotAvailableException)
else:
KEYRING_EXCEPTIONS = (keyring.errors.InitError,)


class MemoryKeyring(keyring.backend.KeyringBackend):
"""A keyring that stores credentials in a dictionary."""

Expand Down Expand Up @@ -66,6 +79,70 @@ def delete_password(self, service: str, username: str) -> None:
raise keyring.errors.PasswordDeleteError from key_error


class FileKeyring(keyring.backend.KeyringBackend):
"""A keyring that stores credentials in a file."""

@properties.classproperty # type: ignore[misc]
def priority(self) -> Union[int, float]:
"""Supply a priority.
Indicating the priority of the backend relative to all other backends.
"""
# Only > 0 make it to the chainer.
return -1

@property
def credentials_file(self) -> Path:
"""Credentials file path for instance of application_name."""
return (
Path(BaseDirectory.save_data_path(self._application_name))
/ "credentials.json"
)

def _write_credentials(self) -> None:
if not self.credentials_file.exists():
self.credentials_file.parent.mkdir(parents=True, exist_ok=True)
self.credentials_file.touch(mode=0o600)
with self.credentials_file.open("w") as credentials_file:
json.dump(self._credentials, credentials_file)

def _read_credentials(self) -> None:
try:
with self.credentials_file.open() as credentials_file:
self._credentials = json.load(credentials_file)
except (FileNotFoundError, json.decoder.JSONDecodeError):
self._credentials = {}

def __init__(self, application_name: str) -> None:
super().__init__()

self._application_name = application_name
self._read_credentials()

def set_password(self, service: str, username: str, password: str) -> None:
"""Set the service password for username in memory."""
if service not in self._credentials:
self._credentials[service] = {username: password}
else:
self._credentials[service][username] = password
self._write_credentials()

def get_password(self, service: str, username: str) -> Optional[str]:
"""Get the service password for username from memory."""
try:
return cast(str, self._credentials[service][username])
except KeyError:
return None

def delete_password(self, service: str, username: str) -> None:
"""Delete the service password for username from memory."""
try:
del self._credentials[service][username]
self._write_credentials()
except KeyError as key_error:
raise keyring.errors.PasswordDeleteError from key_error


class Auth:
"""Auth wraps around the keyring to store credentials.
Expand Down Expand Up @@ -108,7 +185,16 @@ def __init__(

self._keyring = keyring.get_keyring()
# This keyring would fail on first use, fail early instead.
if isinstance(self._keyring, keyring.backends.fail.Keyring):
# Only SecretService has get_preferred_collection, which can
# fail if the keyring backend cannot be unlocked.
if isinstance(self._keyring, SecretService.Keyring):
try:
self._keyring.get_preferred_collection()
except KEYRING_EXCEPTIONS:
logger.warning("Falling back to file based storage")
keyring.set_keyring(FileKeyring(application_name))
self._keyring = keyring.get_keyring()
elif isinstance(self._keyring, keyring.backends.fail.Keyring):
raise errors.NoKeyringError

if environment_auth_value:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies = [
"requests-toolbelt>=1.0.0",
"macaroonbakery>=1.3.0",
"pydantic>=1.10,<2.0",
"pyxdg",
]
classifiers = [
"Development Status :: 5 - Production/Stable",
Expand Down
84 changes: 83 additions & 1 deletion tests/unit/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import logging
import sys
from typing import List, Type
from unittest.mock import ANY

import keyring
import keyring.backends.fail
import keyring.errors
import pytest
from craft_store import errors
from craft_store.auth import Auth, MemoryKeyring
from craft_store.auth import Auth, FileKeyring, MemoryKeyring
from keyring.backends import SecretService


def test_set_credentials(caplog, fake_keyring):
Expand Down Expand Up @@ -232,3 +235,82 @@ def test_memory_keyring_delete_empty():

with pytest.raises(keyring.errors.PasswordDeleteError):
k.delete_password("my-service", "my-user")


@pytest.fixture(autouse=True)
def _fake_basedirectory(mocker, tmp_path):
mocker.patch("craft_store.auth.BaseDirectory.save_data_path", return_value=tmp_path)


def test_file_keyring_set_get():
k = FileKeyring("test-app")
k.set_password("my-service", "my-user", "my-password")

assert k.get_password("my-service", "my-user") == "my-password"


def test_file_keyring_get_empty():
k = FileKeyring("test-app")

assert k.get_password("my-service", "my-user") is None


def test_file_keyring_set_delete():
k = FileKeyring("test-app")
k.set_password("my-service", "my-user", "my-password")

assert k.get_password("my-service", "my-user") == "my-password"

k.delete_password("my-service", "my-user")

assert k.get_password("my-service", "my-user") is None


def test_file_keyring_delete_empty():
k = FileKeyring("test-app")

with pytest.raises(keyring.errors.PasswordDeleteError):
k.delete_password("my-service", "my-user")


def test_file_keyring_storage_path(tmp_path):
"""Ensure the mock is used."""
k = FileKeyring("test-app")

assert k.credentials_file == tmp_path / "credentials.json"


test_exceptions: List[Type[Exception]] = [keyring.errors.InitError]
if sys.platform == "linux":
from secretstorage.exceptions import SecretServiceNotAvailableException

test_exceptions.append(SecretServiceNotAvailableException)


@pytest.mark.disable_fake_keyring()
@pytest.mark.parametrize("exception", test_exceptions)
def test_secretservice_file_fallsback(mocker, exception):
# At one point in the code we run keyring.set_backend, there is no
# elegant way to reset this in the library.
keyring.set_keyring(SecretService.Keyring())
mocker.patch(
"keyring.backends.SecretService.Keyring.get_preferred_collection",
side_effect=exception,
)
auth = Auth(application_name="test-app", host="foo")

assert type(auth._keyring) == FileKeyring


@pytest.mark.disable_fake_keyring()
def test_secretservice_works(mocker):
# At one point in the code we run keyring.set_backend, there is no
# elegant way to reset this in the library.
keyring.set_keyring(SecretService.Keyring())
mocker.patch(
"keyring.backends.SecretService.Keyring.get_preferred_collection",
return_value=None,
)
auth = Auth(application_name="test-app", host="foo")

assert type(auth._keyring) == SecretService.Keyring

0 comments on commit 424d10b

Please sign in to comment.