Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add file based backend as a fallback #119

Merged
merged 3 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
90 changes: 88 additions & 2 deletions craft_store/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,34 @@

import base64
import binascii
import json
import logging
import os
from typing import Dict, Optional, Tuple, Union
import sys
from pathlib import Path
from typing import Dict, Optional, Tuple, Union, cast

import keyring
import keyring.backend
import keyring.backends.fail
import keyring.errors
from keyring._compat import properties
from keyring.backends import SecretService
from xdg import BaseDirectory # type: ignore[import]

from . import errors

logger = logging.getLogger(__name__)


if sys.platform == "linux":
from secretstorage.exceptions import SecretServiceNotAvailableException

KEYRING_EXCEPTIONS = (keyring.errors.InitError, SecretServiceNotAvailableException)
else:
KEYRING_EXCEPTIONS = (keyring.errors.InitError,)


class MemoryKeyring(keyring.backend.KeyringBackend):
"""A keyring that stores credentials in a dictionary."""

Expand Down Expand Up @@ -66,6 +79,70 @@ def delete_password(self, service: str, username: str) -> None:
raise keyring.errors.PasswordDeleteError from key_error


class FileKeyring(keyring.backend.KeyringBackend):
"""A keyring that stores credentials in a file."""

@properties.classproperty # type: ignore[misc]
def priority(self) -> Union[int, float]:
"""Supply a priority.

Indicating the priority of the backend relative to all other backends.
"""
# Only > 0 make it to the chainer.
return -1

@property
def credentials_file(self) -> Path:
"""Credentials file path for instance of application_name."""
return (
Path(BaseDirectory.save_data_path(self._application_name))
/ "credentials.json"
)

def _write_credentials(self) -> None:
if not self.credentials_file.exists():
self.credentials_file.parent.mkdir(parents=True, exist_ok=True)
self.credentials_file.touch(mode=0o600)
with self.credentials_file.open("w") as credentials_file:
json.dump(self._credentials, credentials_file)

def _read_credentials(self) -> None:
try:
with self.credentials_file.open() as credentials_file:
self._credentials = json.load(credentials_file)
except (FileNotFoundError, json.decoder.JSONDecodeError):
self._credentials = {}

def __init__(self, application_name: str) -> None:
super().__init__()

self._application_name = application_name
self._read_credentials()

def set_password(self, service: str, username: str, password: str) -> None:
"""Set the service password for username in memory."""
if service not in self._credentials:
self._credentials[service] = {username: password}
else:
self._credentials[service][username] = password
self._write_credentials()

def get_password(self, service: str, username: str) -> Optional[str]:
"""Get the service password for username from memory."""
try:
return cast(str, self._credentials[service][username])
except KeyError:
return None

def delete_password(self, service: str, username: str) -> None:
"""Delete the service password for username from memory."""
try:
sergiusens marked this conversation as resolved.
Show resolved Hide resolved
del self._credentials[service][username]
except KeyError as key_error:
raise keyring.errors.PasswordDeleteError from key_error
self._write_credentials()


class Auth:
"""Auth wraps around the keyring to store credentials.

Expand Down Expand Up @@ -108,7 +185,16 @@ def __init__(

self._keyring = keyring.get_keyring()
# This keyring would fail on first use, fail early instead.
if isinstance(self._keyring, keyring.backends.fail.Keyring):
# Only SecretService has get_preferred_collection, which can
# fail if the keyring backend cannot be unlocked.
if isinstance(self._keyring, SecretService.Keyring):
try:
self._keyring.get_preferred_collection()
except KEYRING_EXCEPTIONS:
logger.warning("Falling back to file based storage")
keyring.set_keyring(FileKeyring(application_name))
self._keyring = keyring.get_keyring()
elif isinstance(self._keyring, keyring.backends.fail.Keyring):
raise errors.NoKeyringError

if environment_auth_value:
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies = [
"requests-toolbelt>=1.0.0",
"macaroonbakery>=1.3.0",
"pydantic>=1.10,<2.0",
"pyxdg",
]
classifiers = [
"Development Status :: 5 - Production/Stable",
Expand Down Expand Up @@ -137,6 +138,9 @@ line_length = 88
minversion = "7.0"
testpaths = "tests"
xfail_strict = true
markers = [
"disable_fake_keyring"
]

[tool.coverage.run]
branch = true
Expand Down
59 changes: 59 additions & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,70 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import datetime
from typing import Any, List, Optional, Tuple
from unittest.mock import patch

import pytest


class FakeKeyring:
"""Fake Keyring Backend implementation for tests."""

name = "Fake Keyring"

def __init__(self) -> None:
self.set_password_calls: List[Tuple[Any, ...]] = []
self.get_password_calls: List[Tuple[Any, ...]] = []
self.delete_password_calls: List[Tuple[Any, ...]] = []
self.password = None
self.delete_error: Optional[Exception] = None

def set_password(self, *args) -> None:
"""Set the service password for username in memory."""
self.set_password_calls.append(args)
self.password = args[2]

def get_password(self, *args):
"""Get the service password for username from memory."""
self.get_password_calls.append(args)
return self.password

def delete_password(self, *args):
"""Delete the service password for username from memory."""
self.delete_password_calls.append(args)
if self.delete_error is not None:
# https://www.logilab.org/ticket/3207
raise self.delete_error # pylint: disable=raising-bad-type


@pytest.fixture()
def keyring_set_keyring_mock():
"""Mock setting the keyring."""

patched_keyring = patch("keyring.set_keyring", autospec=True)
mocked_keyring = patched_keyring.start()
yield mocked_keyring
patched_keyring.stop()


@pytest.fixture()
def fake_keyring():
return FakeKeyring()


@pytest.fixture(autouse=True)
def fake_keyring_get(fake_keyring, request):
"""Mock keyring and return a FakeKeyring."""
if "disable_fake_keyring" in request.keywords:
yield
else:
patched_keyring = patch("keyring.get_keyring")
mocked_keyring = patched_keyring.start()
mocked_keyring.return_value = fake_keyring
yield mocked_keyring
patched_keyring.stop()


@pytest.fixture()
def expires():
"""Mocks/freezes utcnow() in craft_store.endpoints module.
Expand Down
145 changes: 84 additions & 61 deletions tests/unit/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,73 +15,17 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import logging
from typing import Any, List, Optional, Tuple
from unittest.mock import ANY, patch
import sys
from typing import List, Type
from unittest.mock import ANY

import keyring
import keyring.backends.fail
import keyring.errors
import pytest
from craft_store import errors
from craft_store.auth import Auth, MemoryKeyring


class FakeKeyring:
"""Fake Keyring Backend implementation for tests."""

name = "Fake Keyring"

def __init__(self) -> None:
self.set_password_calls: List[Tuple[Any, ...]] = []
self.get_password_calls: List[Tuple[Any, ...]] = []
self.delete_password_calls: List[Tuple[Any, ...]] = []
self.password = None
self.delete_error: Optional[Exception] = None

def set_password(self, *args) -> None:
"""Set the service password for username in memory."""
self.set_password_calls.append(args)
self.password = args[2]

def get_password(self, *args):
"""Get the service password for username from memory."""
self.get_password_calls.append(args)
return self.password

def delete_password(self, *args):
"""Delete the service password for username from memory."""
self.delete_password_calls.append(args)
if self.delete_error is not None:
# https://www.logilab.org/ticket/3207
raise self.delete_error # pylint: disable=raising-bad-type


@pytest.fixture()
def keyring_set_keyring_mock():
"""Mock setting the keyring."""

patched_keyring = patch("keyring.set_keyring", autospec=True)
mocked_keyring = patched_keyring.start()
yield mocked_keyring
patched_keyring.stop()


@pytest.fixture()
def fake_keyring():
return FakeKeyring()


@pytest.fixture(autouse=True)
def fake_keyring_get(fake_keyring, request):
"""Mock keyring and return a FakeKeyring."""
if "disable_fake_keyring" in request.keywords:
yield
else:
patched_keyring = patch("keyring.get_keyring")
mocked_keyring = patched_keyring.start()
mocked_keyring.return_value = fake_keyring
yield mocked_keyring
patched_keyring.stop()
from craft_store.auth import Auth, FileKeyring, MemoryKeyring
from keyring.backends import SecretService


def test_set_credentials(caplog, fake_keyring):
Expand Down Expand Up @@ -291,3 +235,82 @@ def test_memory_keyring_delete_empty():

with pytest.raises(keyring.errors.PasswordDeleteError):
k.delete_password("my-service", "my-user")


@pytest.fixture(autouse=True)
def _fake_basedirectory(mocker, tmp_path):
mocker.patch("craft_store.auth.BaseDirectory.save_data_path", return_value=tmp_path)


def test_file_keyring_set_get():
k = FileKeyring("test-app")
k.set_password("my-service", "my-user", "my-password")

assert k.get_password("my-service", "my-user") == "my-password"


def test_file_keyring_get_empty():
k = FileKeyring("test-app")

assert k.get_password("my-service", "my-user") is None


def test_file_keyring_set_delete():
k = FileKeyring("test-app")
k.set_password("my-service", "my-user", "my-password")

assert k.get_password("my-service", "my-user") == "my-password"

k.delete_password("my-service", "my-user")

assert k.get_password("my-service", "my-user") is None


def test_file_keyring_delete_empty():
k = FileKeyring("test-app")

with pytest.raises(keyring.errors.PasswordDeleteError):
k.delete_password("my-service", "my-user")


def test_file_keyring_storage_path(tmp_path):
"""Ensure the mock is used."""
k = FileKeyring("test-app")

assert k.credentials_file == tmp_path / "credentials.json"


test_exceptions: List[Type[Exception]] = [keyring.errors.InitError]
if sys.platform == "linux":
from secretstorage.exceptions import SecretServiceNotAvailableException

test_exceptions.append(SecretServiceNotAvailableException)


@pytest.mark.disable_fake_keyring()
@pytest.mark.parametrize("exception", test_exceptions)
def test_secretservice_file_fallsback(mocker, exception):
# At one point in the code we run keyring.set_backend, there is no
# elegant way to reset this in the library.
keyring.set_keyring(SecretService.Keyring())
mocker.patch(
"keyring.backends.SecretService.Keyring.get_preferred_collection",
side_effect=exception,
)
auth = Auth(application_name="test-app", host="foo")

assert type(auth._keyring) == FileKeyring


@pytest.mark.disable_fake_keyring()
def test_secretservice_works(mocker):
# At one point in the code we run keyring.set_backend, there is no
# elegant way to reset this in the library.
keyring.set_keyring(SecretService.Keyring())
mocker.patch(
"keyring.backends.SecretService.Keyring.get_preferred_collection",
return_value=None,
)
auth = Auth(application_name="test-app", host="foo")

assert type(auth._keyring) == SecretService.Keyring