Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 275 additions & 0 deletions tests/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -853,3 +853,278 @@ async def test_no_prompt_error_contains_interactive_login(
msg = str(exc_info.value)
assert msg.startswith("Interactive login is required")
assert "Auth URI:" in msg


# ---------------------------------------------------------------------------
# TokenAuth -- callable(self._token) mutant killer
# ---------------------------------------------------------------------------


class TestTokenAuthCallableMutant:
"""Kill mutant: callable(self._token) → callable(None)."""

async def test_callable_token_is_awaited_not_returned_raw(self):
"""If callable() were always False, we'd get the coroutine function back."""

async def factory() -> str:
return "awaited-value"

auth = TokenAuth(factory)
result = await auth.get_token()
# Must be the awaited string, not the coroutine function itself
assert result == "awaited-value"
assert isinstance(result, str)
assert not callable(result)

async def test_string_token_not_awaited(self):
"""String tokens must be returned as-is (not called)."""
auth = TokenAuth("plain")
result = await auth.get_token()
assert result == "plain"


# ---------------------------------------------------------------------------
# MsalAuth._build_app -- direct call tests to kill constructor-arg mutants
# ---------------------------------------------------------------------------


class TestBuildAppDirect:
"""Call _build_app() directly to kill mutants on constructor args."""

@patch("flameconnect.auth.msal")
def test_returns_app_and_cache(self, mock_msal, tmp_path):
"""_build_app returns (app, cache) tuple."""
cache_path = tmp_path / "token.json"
mock_cache = MagicMock()
mock_msal.SerializableTokenCache.return_value = mock_cache
mock_app = MagicMock()
mock_msal.PublicClientApplication.return_value = mock_app

auth = MsalAuth(cache_path=cache_path)
app, cache = auth._build_app()

assert app is mock_app
assert cache is mock_cache

@patch("flameconnect.auth.msal")
def test_serializable_token_cache_called(self, mock_msal, tmp_path):
"""msal.SerializableTokenCache() is called (not replaced with None)."""
cache_path = tmp_path / "token.json"
mock_cache = MagicMock()
mock_msal.SerializableTokenCache.return_value = mock_cache
mock_msal.PublicClientApplication.return_value = MagicMock()

auth = MsalAuth(cache_path=cache_path)
auth._build_app()

mock_msal.SerializableTokenCache.assert_called_once_with()

@patch("flameconnect.auth.msal")
def test_cache_not_deserialized_when_file_missing(self, mock_msal, tmp_path):
"""When cache file doesn't exist, deserialize is NOT called."""
cache_path = tmp_path / "nonexistent.json"
mock_cache = MagicMock()
mock_msal.SerializableTokenCache.return_value = mock_cache
mock_msal.PublicClientApplication.return_value = MagicMock()

auth = MsalAuth(cache_path=cache_path)
auth._build_app()

mock_cache.deserialize.assert_not_called()

@patch("flameconnect.auth.msal")
def test_cache_deserialized_with_file_text(self, mock_msal, tmp_path):
"""When cache file exists, deserialize receives its text content."""
cache_path = tmp_path / "token.json"
cache_path.write_text('{"cached": true}')

mock_cache = MagicMock()
mock_msal.SerializableTokenCache.return_value = mock_cache
mock_msal.PublicClientApplication.return_value = MagicMock()

auth = MsalAuth(cache_path=cache_path)
auth._build_app()

mock_cache.deserialize.assert_called_once_with('{"cached": true}')

@patch("flameconnect.auth.msal")
def test_public_client_app_receives_client_id(self, mock_msal, tmp_path):
"""CLIENT_ID is the first positional arg."""
cache_path = tmp_path / "token.json"
mock_cache = MagicMock()
mock_msal.SerializableTokenCache.return_value = mock_cache
mock_msal.PublicClientApplication.return_value = MagicMock()

auth = MsalAuth(cache_path=cache_path)
auth._build_app()

args, kwargs = mock_msal.PublicClientApplication.call_args
assert args == (CLIENT_ID,)

@patch("flameconnect.auth.msal")
def test_public_client_app_receives_authority(self, mock_msal, tmp_path):
"""authority=AUTHORITY is passed."""
cache_path = tmp_path / "token.json"
mock_msal.SerializableTokenCache.return_value = MagicMock()
mock_msal.PublicClientApplication.return_value = MagicMock()

auth = MsalAuth(cache_path=cache_path)
auth._build_app()

_, kwargs = mock_msal.PublicClientApplication.call_args
assert kwargs["authority"] is AUTHORITY

@patch("flameconnect.auth.msal")
def test_public_client_app_validate_authority_false(self, mock_msal, tmp_path):
"""validate_authority=False (not True, not missing)."""
cache_path = tmp_path / "token.json"
mock_msal.SerializableTokenCache.return_value = MagicMock()
mock_msal.PublicClientApplication.return_value = MagicMock()

auth = MsalAuth(cache_path=cache_path)
auth._build_app()

_, kwargs = mock_msal.PublicClientApplication.call_args
assert kwargs["validate_authority"] is False

@patch("flameconnect.auth.msal")
def test_public_client_app_receives_token_cache(self, mock_msal, tmp_path):
"""token_cache= receives the SerializableTokenCache instance."""
cache_path = tmp_path / "token.json"
mock_cache = MagicMock()
mock_msal.SerializableTokenCache.return_value = mock_cache
mock_msal.PublicClientApplication.return_value = MagicMock()

auth = MsalAuth(cache_path=cache_path)
auth._build_app()

_, kwargs = mock_msal.PublicClientApplication.call_args
assert kwargs["token_cache"] is mock_cache

@patch("flameconnect.auth.msal")
def test_public_client_app_all_kwargs(self, mock_msal, tmp_path):
"""All keyword args passed in a single assertion."""
cache_path = tmp_path / "token.json"
mock_cache = MagicMock()
mock_msal.SerializableTokenCache.return_value = mock_cache
mock_msal.PublicClientApplication.return_value = MagicMock()

auth = MsalAuth(cache_path=cache_path)
auth._build_app()

mock_msal.PublicClientApplication.assert_called_once_with(
CLIENT_ID,
authority=AUTHORITY,
validate_authority=False,
token_cache=mock_cache,
)


# ---------------------------------------------------------------------------
# MsalAuth._save_cache -- direct call tests to kill mkdir/write/log mutants
# ---------------------------------------------------------------------------


class TestSaveCacheDirect:
"""Call _save_cache() directly to kill mutants on mkdir/write_text/log."""

def test_mkdir_called_with_parents_and_exist_ok(self, tmp_path):
"""mkdir receives parents=True, exist_ok=True."""
cache_path = tmp_path / "sub" / "dir" / "token.json"
auth = MsalAuth(cache_path=cache_path)

mock_cache = MagicMock()
mock_cache.has_state_changed = True
mock_cache.serialize.return_value = "{}"

auth._save_cache(mock_cache)

# Parent dir was created (proves parents=True works)
assert cache_path.parent.exists()
assert cache_path.exists()

def test_mkdir_exist_ok_true(self, tmp_path):
"""Calling _save_cache when parent dir already exists doesn't raise."""
cache_path = tmp_path / "existing" / "token.json"
cache_path.parent.mkdir(parents=True)
auth = MsalAuth(cache_path=cache_path)

mock_cache = MagicMock()
mock_cache.has_state_changed = True
mock_cache.serialize.return_value = '{"data": 1}'

# Should not raise (proves exist_ok=True)
auth._save_cache(mock_cache)
assert cache_path.exists()

def test_write_text_receives_serialized_content(self, tmp_path):
"""write_text receives cache.serialize() output (not None)."""
cache_path = tmp_path / "token.json"
auth = MsalAuth(cache_path=cache_path)

mock_cache = MagicMock()
mock_cache.has_state_changed = True
mock_cache.serialize.return_value = '{"tokens": "abc"}'

auth._save_cache(mock_cache)

assert cache_path.read_text() == '{"tokens": "abc"}'
mock_cache.serialize.assert_called_once()

def test_cache_not_saved_when_unchanged(self, tmp_path):
"""When has_state_changed is False, nothing is written."""
cache_path = tmp_path / "token.json"
auth = MsalAuth(cache_path=cache_path)

mock_cache = MagicMock()
mock_cache.has_state_changed = False

auth._save_cache(mock_cache)

assert not cache_path.exists()
mock_cache.serialize.assert_not_called()

def test_log_message_format(self, tmp_path, caplog):
"""Log message uses correct format string and includes cache path."""
cache_path = tmp_path / "token.json"
auth = MsalAuth(cache_path=cache_path)

mock_cache = MagicMock()
mock_cache.has_state_changed = True
mock_cache.serialize.return_value = "{}"

with caplog.at_level(logging.DEBUG):
auth._save_cache(mock_cache)

messages = [r.message for r in caplog.records]
assert len(messages) == 1
assert messages[0] == f"Token cache saved to {cache_path}"

def test_log_uses_percent_format_args(self, tmp_path, caplog):
"""Verify the logger record uses %-style args (not f-string)."""
cache_path = tmp_path / "token.json"
auth = MsalAuth(cache_path=cache_path)

mock_cache = MagicMock()
mock_cache.has_state_changed = True
mock_cache.serialize.return_value = "{}"

with caplog.at_level(logging.DEBUG):
auth._save_cache(mock_cache)

record = caplog.records[0]
assert record.args == (cache_path,)
assert record.msg == "Token cache saved to %s"

def test_no_log_when_cache_unchanged(self, tmp_path, caplog):
"""No log message emitted when cache hasn't changed."""
cache_path = tmp_path / "token.json"
auth = MsalAuth(cache_path=cache_path)

mock_cache = MagicMock()
mock_cache.has_state_changed = False

with caplog.at_level(logging.DEBUG):
auth._save_cache(mock_cache)

assert len(caplog.records) == 0
Loading