Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 172 additions & 0 deletions scripts/test_auth_e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
#!/usr/bin/env python3
"""Interactive end-to-end test for the CLI auth flow.

Starts a tiny local HTTP server that serves the /auth/cli-config endpoint,
then runs `layerlens login` against it.

Usage:
python scripts/test_auth_e2e.py

This tests the full flow WITHOUT needing the real backend running.
The device-code step will still fail (no real Cognito) but it validates
that config discovery, credential storage, and the CLI plumbing all work.

To test against the real staging/production backend, set:
LAYERLENS_STRATIX_BASE_URL=https://api.layerlens.ai/api/v1 layerlens login
"""

import json
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler

# Fake Cognito config — replace with real values to test end-to-end
FAKE_AUTH_CONFIG = {
"region": "us-east-1",
"client_id": "REPLACE_WITH_REAL_CLIENT_ID",
"domain": "atlas-production",
"scopes": "openid profile email",
}

# Fake login response
FAKE_LOGIN_RESPONSE = {
"access_token": "fake-access-token-xyz",
"id_token": "fake-id-token-xyz",
"refresh_token": "fake-refresh-token-xyz",
"expires_in": 3600,
"token_type": "Bearer",
"user": {"email": "test@example.com", "given_name": "Test", "name": "Test User"},
}

# Fake device-code response
FAKE_DEVICE_CODE_RESPONSE = {
"device_code": "test-device-code-000",
"user_code": "ABCD-1234",
"verification_uri": "https://atlas-production.auth.us-east-1.amazoncognito.com/activate",
"verification_uri_complete": "https://atlas-production.auth.us-east-1.amazoncognito.com/activate?user_code=ABCD-1234",
"expires_in": 60,
"interval": 5,
}


class MockHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path.endswith("/dgklmnr/auth/cli-config"):
self._json_response(200, FAKE_AUTH_CONFIG)
else:
self._json_response(404, {"error": "not found"})

def do_POST(self):
if self.path.endswith("/dgklmnr/auth/cli-login"):
length = int(self.headers.get("Content-Length", 0))
body = json.loads(self.rfile.read(length)) if length else {}
if not body.get("email") or not body.get("password"):
self._json_response(400, {"error": "Email and password are required"})
elif body["email"] == "test@example.com" and body["password"] == "password123":
self._json_response(200, FAKE_LOGIN_RESPONSE)
else:
self._json_response(401, {"error": "Invalid email or password"})
elif "deviceAuthorization" in self.path:
self._json_response(200, FAKE_DEVICE_CODE_RESPONSE)
elif "oauth2/token" in self.path:
# Simulate authorization_pending
self._json_response(400, {"error": "authorization_pending"})
else:
self._json_response(404, {"error": "not found"})

def _json_response(self, status, body):
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(body).encode())

def log_message(self, format, *args):
print(f" [mock-server] {format % args}")


def main():
port = 18923
server = HTTPServer(("127.0.0.1", port), MockHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()

base_url = f"http://127.0.0.1:{port}/api/v1"
print(f"\n Mock server running at {base_url}")
print(f" Testing config discovery...\n")

# Test 1: Config discovery
import layerlens.cli._auth as auth_mod
from layerlens.cli._auth import fetch_auth_config

auth_mod._cached_auth_config = None # clear cache

config = fetch_auth_config(base_url)
print(f" Config fetched: {json.dumps(config, indent=2)}")
assert config["client_id"] == FAKE_AUTH_CONFIG["client_id"], "Config mismatch!"
print(" [PASS] Config discovery works\n")

# Test 2: Credential storage round-trip
from layerlens.cli._auth import load_credentials, save_credentials, clear_credentials

test_creds = {"access_token": "test-tok", "auth_config": config}
save_credentials(test_creds)
loaded = load_credentials()
assert loaded["access_token"] == "test-tok"
print(" [PASS] Credential storage works\n")

clear_credentials()
assert load_credentials() is None
print(" [PASS] Credential clearing works\n")

# Test 3: CLI login via email/password
print(" Testing CLI login command (email/password)...\n")

import os

os.environ["LAYERLENS_STRATIX_BASE_URL"] = base_url
auth_mod._cached_auth_config = None # clear cache

from layerlens.cli._auth import cli_login

creds = cli_login("test@example.com", "password123", base_url=base_url)
assert creds["access_token"] == FAKE_LOGIN_RESPONSE["access_token"]
assert creds["user"]["email"] == "test@example.com"
print(" [PASS] CLI login works\n")

# Test 4: CLI login command via CliRunner
print(" Testing CLI login command via CliRunner...\n")
clear_credentials()
auth_mod._cached_auth_config = None

from click.testing import CliRunner

from layerlens.cli._app import cli

runner = CliRunner()
result = runner.invoke(cli, ["login"], input="test@example.com\npassword123\n")
print(f" Exit code: {result.exit_code}")
print(f" Output: {result.output}")
assert result.exit_code == 0
loaded = load_credentials()
assert loaded is not None and loaded["access_token"] == FAKE_LOGIN_RESPONSE["access_token"]
print(" [PASS] CLI login command works\n")

# Test 5: whoami command
print(" Testing whoami command...\n")
result = runner.invoke(cli, ["whoami"])
print(f" Exit code: {result.exit_code}")
print(f" Output: {result.output}")
assert result.exit_code == 0
print(" [PASS] whoami works\n")

# Test 6: logout command
result = runner.invoke(cli, ["logout"])
assert result.exit_code == 0
assert load_credentials() is None
print(" [PASS] logout works\n")

server.shutdown()
print("\n Done! All checks passed.")


if __name__ == "__main__":
main()
74 changes: 69 additions & 5 deletions src/layerlens/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

from . import _exceptions
from ._utils import is_mapping
from .models import Organization, OrganizationResponse
from ._constants import DEFAULT_TIMEOUT, DEFAULT_BASE_URL
from .models import Organization, OrganizationResponse, OrganizationsListResponse
from ._constants import DEFAULT_TIMEOUT, DEFAULT_BASE_URL, DIRTY_ROUTER_PREFIX
from ._exceptions import StratixError, APIStatusError
from ._base_client import BaseClient, BaseAsyncClient

Expand All @@ -37,13 +37,15 @@ class Stratix(BaseClient):
api_key: str
organization_id: str | None
project_id: str | None
_use_bearer_auth: bool

def __init__(
self,
*,
api_key: str | None = None,
base_url: str | httpx.URL | None = None,
timeout: Union[float, httpx.Timeout, None] = DEFAULT_TIMEOUT,
use_bearer_auth: bool = False,
) -> None:
"""Construct a new synchronous Stratix client instance.

Expand All @@ -57,12 +59,17 @@ def __init__(
"The api_key client option must be set either by passing api_key to the client or by setting the LAYERLENS_STRATIX_API_KEY environment variable"
)
self.api_key = api_key
self._use_bearer_auth = use_bearer_auth

if base_url is None:
base_url = os.environ.get("LAYERLENS_STRATIX_BASE_URL") or os.environ.get("LAYERLENS_ATLAS_BASE_URL")
if base_url is None:
base_url = DEFAULT_BASE_URL

# Bearer auth (OAuth tokens) routes through the dirty prefix
if use_bearer_auth:
base_url = str(base_url).rstrip("/") + DIRTY_ROUTER_PREFIX

super().__init__(
base_url=base_url,
timeout=timeout,
Expand Down Expand Up @@ -154,7 +161,11 @@ def public(self) -> PublicClient:
@property
@override
def auth_headers(self) -> dict[str, str]:
return {"x-api-key": self.api_key} if self.api_key else {}
if not self.api_key:
return {}
if self._use_bearer_auth:
return {"Authorization": f"Bearer {self.api_key}"}
return {"x-api-key": self.api_key}

def copy(
self,
Expand All @@ -171,6 +182,7 @@ def copy(
api_key=api_key or self.api_key,
base_url=base_url or self.base_url,
timeout=self.timeout or timeout,
use_bearer_auth=self._use_bearer_auth,
**_extra_kwargs,
)

Expand Down Expand Up @@ -214,8 +226,32 @@ def _make_status_error(
return APIStatusError(err_msg, response=response, body=data)

def _get_organization(self) -> Optional[Organization]:
if self._use_bearer_auth:
# JWT-authenticated route returns a list of organizations
resp = super().get_cast(
"/organizations",
timeout=30,
cast_to=OrganizationsListResponse,
)
if isinstance(resp, OrganizationsListResponse) and resp.data:
# Try to find an org owned by the logged-in user that has projects
from .cli._auth import load_credentials

creds = load_credentials()
email = (creds or {}).get("user", {}).get("email") if creds else None
if email:
for org in resp.data:
if org.owner_id == email and org.projects:
return org
# Fall back to first org with projects
for org in resp.data:
if org.projects:
return org
return resp.data[0]
return None

organization = super().get_cast(
f"/organizations",
"/organizations",
timeout=30,
cast_to=OrganizationResponse,
)
Expand All @@ -226,13 +262,15 @@ class AsyncStratix(BaseAsyncClient):
api_key: str
organization_id: str | None
project_id: str | None
_use_bearer_auth: bool

def __init__(
self,
*,
api_key: str | None = None,
base_url: str | httpx.URL | None = None,
timeout: float | httpx.Timeout | None = DEFAULT_TIMEOUT,
use_bearer_auth: bool = False,
) -> None:
"""Construct a new asynchronous Stratix client instance.

Expand All @@ -247,12 +285,16 @@ def __init__(
"or by setting the LAYERLENS_STRATIX_API_KEY environment variable"
)
self.api_key = api_key
self._use_bearer_auth = use_bearer_auth

if base_url is None:
base_url = os.environ.get("LAYERLENS_STRATIX_BASE_URL") or os.environ.get("LAYERLENS_ATLAS_BASE_URL")
if base_url is None:
base_url = DEFAULT_BASE_URL

if use_bearer_auth:
base_url = str(base_url).rstrip("/") + DIRTY_ROUTER_PREFIX

super().__init__(base_url=base_url, timeout=timeout)

organization = self._get_organization()
Expand Down Expand Up @@ -341,7 +383,11 @@ def public(self) -> AsyncPublicClient:
@property
@override
def auth_headers(self) -> dict[str, str]:
return {"x-api-key": self.api_key} if self.api_key else {}
if not self.api_key:
return {}
if self._use_bearer_auth:
return {"Authorization": f"Bearer {self.api_key}"}
return {"x-api-key": self.api_key}

def copy(
self,
Expand All @@ -355,6 +401,7 @@ def copy(
api_key=api_key or self.api_key,
base_url=base_url or self.base_url,
timeout=self.timeout or timeout,
use_bearer_auth=self._use_bearer_auth,
**_extra_kwargs,
)

Expand Down Expand Up @@ -398,6 +445,23 @@ def _get_organization(self) -> Optional[Organization]:

data = response.json()

if self._use_bearer_auth:
resp = OrganizationsListResponse(**data)
if resp.data:
from .cli._auth import load_credentials

creds = load_credentials()
email = (creds or {}).get("user", {}).get("email") if creds else None
if email:
for org in resp.data:
if org.owner_id == email and org.projects:
return org
for org in resp.data:
if org.projects:
return org
return resp.data[0]
return None

organization = OrganizationResponse(**data)
return organization.data if isinstance(organization, OrganizationResponse) else None

Expand Down
7 changes: 7 additions & 0 deletions src/layerlens/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,10 @@
DEFAULT_TIMEOUT = httpx.Timeout(timeout=600, connect=5.0)

DEFAULT_BASE_URL = "https://api.layerlens.ai/api/v1"

# The "dirty" router prefix used by the backend for browser/session routes
DIRTY_ROUTER_PREFIX = "/dgklmnr"

# CLI auth endpoints (appended to base URL + dirty prefix)
AUTH_CLI_CONFIG_PATH = DIRTY_ROUTER_PREFIX + "/auth/cli-config"
AUTH_CLI_LOGIN_PATH = DIRTY_ROUTER_PREFIX + "/auth/cli-login"
6 changes: 6 additions & 0 deletions src/layerlens/cli/_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from .._version import __version__
from .commands.ci import ci
from .commands.auth import login, logout, whoami
from .commands.bulk import bulk
from .commands.judge import judge
from .commands.space import space
Expand Down Expand Up @@ -78,6 +79,11 @@ def cli(
cli.add_command(bulk)
cli.add_command(ci)

# Auth commands
cli.add_command(login)
cli.add_command(logout)
cli.add_command(whoami)


@cli.command("completion")
@click.argument("shell", type=click.Choice(["bash", "zsh", "fish", "powershell"]))
Expand Down
Loading
Loading