diff --git a/cbrain_cli/cli_utils.py b/cbrain_cli/cli_utils.py index cc8e7a6..e721647 100644 --- a/cbrain_cli/cli_utils.py +++ b/cbrain_cli/cli_utils.py @@ -1,26 +1,45 @@ import functools import json import re +import sys import urllib.error # import importlib.metadata -from cbrain_cli.config import CREDENTIALS_FILE +from cbrain_cli.config import ACTIVE_SESSION_KEY, CREDENTIALS_FILE + +# Session name priority: --session flag > _active_session in cbrain.json > "default" +session_name = "default" +session_specified = False +for i, arg in enumerate(sys.argv): + if arg == "--session" and i + 1 < len(sys.argv): + session_name = sys.argv[i + 1] + session_specified = True + elif arg.startswith("--session="): + session_name = arg.split("=", 1)[1] + session_specified = True try: - # MARK: Credentials. - with open(CREDENTIALS_FILE) as f: - credentials = json.load(f) + try: + with open(CREDENTIALS_FILE) as f: + all_credentials = json.load(f) + except FileNotFoundError: + all_credentials = {} + + if not session_specified: + session_name = all_credentials.get(ACTIVE_SESSION_KEY, "default") or "default" + + all_credentials.pop(ACTIVE_SESSION_KEY, None) + + credentials = all_credentials.get(session_name, {}) # Get credentials. cbrain_url = credentials.get("cbrain_url") api_token = credentials.get("api_token") user_id = credentials.get("user_id") cbrain_timestamp = credentials.get("timestamp") -except FileNotFoundError: - cbrain_url = None - api_token = None - user_id = None - cbrain_timestamp = None +except Exception: + all_credentials = {} + cbrain_url = api_token = user_id = cbrain_timestamp = None def is_authenticated(): @@ -85,7 +104,7 @@ def handle_connection_error(error): if error.code == 401: print(f"{status_description}: {error.reason}") print("Error: Access denied. Please log in using authorized credentials.") - elif error.code == 404 or error.code == 422 or error.code == 500: + elif error.code in (400, 404, 422, 500): # Try to extract specific error message from response try: # Check if the error response has already been read @@ -107,6 +126,14 @@ def handle_connection_error(error): or error_data.get("notice") or str(error_data) ) + # Check if this looks like a password change redirect + if "change_password" in error_msg: + print( + f"{status_description}: Account requires " + "a password change. " + "Please log into the web portal." + ) + return print(f"{status_description}: {error_msg}") return except json.JSONDecodeError: diff --git a/cbrain_cli/config.py b/cbrain_cli/config.py index cc74b5b..8405d32 100644 --- a/cbrain_cli/config.py +++ b/cbrain_cli/config.py @@ -13,6 +13,10 @@ SESSION_FILE_DIR.mkdir(parents=True, exist_ok=True) CREDENTIALS_FILE = SESSION_FILE_DIR / SESSION_FILE_NAME +# Key used inside credentials.json to track the currently active session. +# Prefixed with "_" so it is clearly not a session name. +ACTIVE_SESSION_KEY = "_active_session" + # HTTP headers. DEFAULT_HEADERS = { "Content-Type": "application/x-www-form-urlencoded", diff --git a/cbrain_cli/data/projects.py b/cbrain_cli/data/projects.py index 58ff27b..9099e53 100644 --- a/cbrain_cli/data/projects.py +++ b/cbrain_cli/data/projects.py @@ -3,7 +3,8 @@ import urllib.request from cbrain_cli.cli_utils import api_token, cbrain_url -from cbrain_cli.config import CREDENTIALS_FILE, auth_headers +from cbrain_cli.config import auth_headers +from cbrain_cli.sessions import save_credentials def switch_project(args): @@ -20,6 +21,8 @@ def switch_project(args): dict or None Dictionary containing project details if successful, None otherwise """ + from cbrain_cli.cli_utils import all_credentials, session_name + # Get the group ID from the group_id argument group_id = getattr(args, "group_id", None) if not group_id: @@ -56,15 +59,12 @@ def switch_project(args): group_data = json.loads(group_data_text) # Step 3: Update credentials file with current group_id - if CREDENTIALS_FILE.exists(): - with open(CREDENTIALS_FILE) as f: - credentials = json.load(f) - - credentials["current_group_id"] = group_id - credentials["current_group_name"] = group_data.get("name", "Unknown") - - with open(CREDENTIALS_FILE, "w") as f: - json.dump(credentials, f, indent=2) + if session_name in all_credentials: + all_credentials[session_name]["current_group_id"] = group_id + all_credentials[session_name]["current_group_name"] = group_data.get( + "name", "Unknown" + ) + save_credentials(all_credentials) return group_data @@ -83,6 +83,8 @@ def show_project(args): dict or None Dictionary containing project details if successful, None if no project set """ + from cbrain_cli.cli_utils import all_credentials, session_name + # Check if a specific project ID was provided project_id = getattr(args, "project_id", None) @@ -105,10 +107,8 @@ def show_project(args): raise else: # Show current project from credentials - with open(CREDENTIALS_FILE) as f: - credentials = json.load(f) - - current_group_id = credentials.get("current_group_id") + session_creds = all_credentials.get(session_name, {}) + current_group_id = session_creds.get("current_group_id") if not current_group_id: return None @@ -128,10 +128,9 @@ def show_project(args): if e.code == 404: print(f"Error: Current project (ID {current_group_id}) no longer exists") # Clear the invalid group_id from credentials - credentials.pop("current_group_id", None) - credentials.pop("current_group_name", None) - with open(CREDENTIALS_FILE, "w") as f: - json.dump(credentials, f, indent=2) + session_creds.pop("current_group_id", None) + session_creds.pop("current_group_name", None) + save_credentials(all_credentials) return None else: raise @@ -164,3 +163,4 @@ def list_projects(args): projects_data = json.loads(data) return projects_data + diff --git a/cbrain_cli/main.py b/cbrain_cli/main.py index 41e2d07..303bb7e 100644 --- a/cbrain_cli/main.py +++ b/cbrain_cli/main.py @@ -39,7 +39,7 @@ handle_tool_list, handle_tool_show, ) -from cbrain_cli.sessions import create_session, logout_session +from cbrain_cli.sessions import create_session, list_sessions, logout_session, switch_session from cbrain_cli.users import whoami_user @@ -60,6 +60,12 @@ def main(): action="store_true", help="Output in JSONL format (one JSON object per line)", ) + parser.add_argument( + "--session", + type=str, + default="default", + help="Session name to use for multiple configurations (default: default)", + ) subparsers = parser.add_subparsers(dest="command", help="Available commands") @@ -70,17 +76,47 @@ def main(): # MARK: Session commands (top-level) # Create new session. login_parser = subparsers.add_parser("login", help="Login to CBRAIN") + login_parser.add_argument("--session", type=str, help="Session name to use") + login_parser.add_argument("-u", "--username", type=str, help="CBRAIN username") + login_parser.add_argument("-p", "--password", type=str, help="CBRAIN password") + login_parser.add_argument("-s", "--server", type=str, help="CBRAIN server URL") login_parser.set_defaults(func=handle_errors(create_session)) # Logout session. logout_parser = subparsers.add_parser("logout", help="Logout from CBRAIN") + logout_parser.add_argument( + "--session", type=str, help="Session name to logout (default: all sessions)" + ) logout_parser.set_defaults(func=handle_errors(logout_session)) # Show current session. whoami_parser = subparsers.add_parser("whoami", help="Show current session") + whoami_parser.add_argument("--session", type=str, help="Session name to show") whoami_parser.add_argument("-v", "--version", action="store_true", help="Show version") whoami_parser.set_defaults(func=handle_errors(whoami_user)) + # Switch active session. + switch_session_parser = subparsers.add_parser( + "switch_session", + help="Switch the default session (e.g. cbrain switch_session prod)", + ) + switch_session_parser.add_argument( + "session_target", + type=str, + help="Name of the session to make the default", + ) + switch_session_parser.set_defaults(func=handle_errors(switch_session)) + + # Session management sub-commands. + session_parser = subparsers.add_parser("session", help="Session management") + session_subparsers = session_parser.add_subparsers( + dest="action", help="Session actions" + ) + session_list_parser = session_subparsers.add_parser( + "list", help="List all saved sessions" + ) + session_list_parser.set_defaults(func=handle_errors(list_sessions)) + # MARK: Model-based commands # File commands file_parser = subparsers.add_parser("file", help="File operations") @@ -404,22 +440,29 @@ def main(): parser.print_help() return - # Handle session commands (no authentication needed for login, version, and whoami). + # Handle public commands (no authentication needed). if args.command == "login": return handle_errors(create_session)(args) + elif args.command == "logout": + return handle_errors(logout_session)(args) elif args.command == "version": return handle_errors(version_info)(args) elif args.command == "whoami": return handle_errors(whoami_user)(args) + elif args.command == "switch_session": + return handle_errors(switch_session)(args) + elif args.command == "session": + if not getattr(args, "action", None): + session_parser.print_help() + return 1 + return args.func(args) # All other commands require authentication. if not is_authenticated(): return 1 # Handle authenticated commands. - if args.command == "logout": - return handle_errors(logout_session)(args) - elif args.command in [ + if args.command in [ "file", "dataprovider", "project", diff --git a/cbrain_cli/sessions.py b/cbrain_cli/sessions.py index 4559f52..0534b40 100644 --- a/cbrain_cli/sessions.py +++ b/cbrain_cli/sessions.py @@ -5,134 +5,205 @@ import urllib.parse import urllib.request -from cbrain_cli.cli_utils import api_token, cbrain_url from cbrain_cli.config import ( + ACTIVE_SESSION_KEY, CREDENTIALS_FILE, DEFAULT_BASE_URL, DEFAULT_HEADERS, auth_headers, ) +## MARK: Internal helpers -# MARK: Create Session. -def create_session(args): - """ - Create a new CBRAIN session by logging in and saving credentials. +def load_credentials() -> dict: + """Load cbrain.json; return {} if missing, raise on corrupt JSON.""" + try: + with open(CREDENTIALS_FILE) as f: + return json.load(f) + except FileNotFoundError: + return {} - Returns - ------- - None - A command is run via inputs from the user. - """ - if CREDENTIALS_FILE.exists(): - print("Already logged in. Use 'cbrain logout' to logout.") +def save_credentials(data: dict) -> None: + """Merge *data* into cbrain.json, preserving metadata keys (e.g. _active_session).""" + on_disk = load_credentials() # always re-read so we don't lose metadata + on_disk.update(data) # overlay the caller's session changes + with open(CREDENTIALS_FILE, "w") as f: + json.dump(on_disk, f, indent=2) + + +def get_sessions(all_creds: dict) -> dict: + """Return only genuine session entries (skip the metadata key).""" + return {name: creds for name, creds in all_creds.items() if name != ACTIVE_SESSION_KEY} + + +# MARK: Switch Session + +def switch_session(args): + """Switch the default session used by bare commands.""" + target = getattr(args, "session_target", None) + if not target: + print("Usage: cbrain switch_session ") + return 1 + + try: + all_creds = load_credentials() + except json.JSONDecodeError: + print(f"Error: credentials file is corrupted ({CREDENTIALS_FILE}).") + return 1 + + sessions = get_sessions(all_creds) + if target not in sessions: + available = ", ".join(sessions) or "(none)" + print(f"Session '{target}' not found. Available sessions: {available}") + return 1 + + all_creds[ACTIVE_SESSION_KEY] = target + save_credentials(all_creds) + print(f"Switched to session '{target}'. All future commands will use this session.") + return 0 + + +# MARK: List Sessions + +def list_sessions(args): + """List all saved sessions, marking the currently active one with '*'.""" + try: + all_creds = load_credentials() + except json.JSONDecodeError: + print(f"Error: credentials file is corrupted ({CREDENTIALS_FILE}).") + return 1 + + active = all_creds.get(ACTIVE_SESSION_KEY, "default") + sessions = get_sessions(all_creds) + + if not sessions: + print("No saved sessions. Use 'cbrain login' to create one.") + return 0 + + print(f"{'#':<4} {'SESSION':<20} {'USERNAME':<16} {'USER ID':<10} {'SERVER':<35} {'TIMESTAMP'}") + print("-" * 90) + for idx, (name, c) in enumerate(sessions.items(), start=1): + marker = "*" if name == active else " " + print( + f"{marker}{idx:<3} {name:<20} {c.get('username', '(unknown)'):<16} " + f"{c.get('user_id', 'N/A')!s:<10} {c.get('cbrain_url', 'N/A'):<35} " + f"{c.get('timestamp', 'N/A')}" + ) + + print(f"\nActive session: {active} (* = active)") + return 0 + + +# MARK: Create Session + +def create_session(args): + """Login to CBRAIN and save credentials for the current session.""" + from cbrain_cli.cli_utils import all_credentials, api_token, cbrain_url, session_name + + if cbrain_url and api_token: + print(f"Already logged in to session '{session_name}'. Use 'cbrain logout' to logout.") return 1 - # Get user input. - cbrain_url = input("Enter CBRAIN server base URL [default: localhost:3000]: ").strip() - if not cbrain_url: - cbrain_url = DEFAULT_BASE_URL + server = getattr(args, "server", None) or input( + "Enter CBRAIN server base URL [default: localhost:3000]: " + ).strip() or DEFAULT_BASE_URL - username = input("Enter CBRAIN username: ").strip() + username = getattr(args, "username", None) or input("Enter CBRAIN username: ").strip() if not username: print("Username is required") return 1 - password = getpass.getpass("Enter CBRAIN password: ") + password = getattr(args, "password", None) or getpass.getpass("Enter CBRAIN password: ") if not password: print("Password is required") return 1 - # Prepare the login request. - login_endpoint = f"{cbrain_url}/session" - - # Prepare form data. - form_data = {"login": username, "password": password} - - # Encode the form data. - encoded_data = urllib.parse.urlencode(form_data).encode("utf-8") - - # Create the request. + encoded = urllib.parse.urlencode({"login": username, "password": password}).encode() request = urllib.request.Request( - login_endpoint, data=encoded_data, headers=DEFAULT_HEADERS, method="POST" + f"{server}/session", data=encoded, headers=DEFAULT_HEADERS, method="POST" ) - # Make the request. - with urllib.request.urlopen(request) as response: - data = response.read().decode("utf-8") - response_data = json.loads(data) - - # Extract the API token from response. - cbrain_api_token = response_data.get("cbrain_api_token") - cbrain_user_id = response_data.get("user_id") - - if not cbrain_api_token: + with urllib.request.urlopen(request) as resp: + data = json.loads(resp.read()) + token = data.get("cbrain_api_token") + if not token: print("Login failed: No API token received") return 1 - # Prepare credentials data. - credentials = { - "cbrain_url": cbrain_url, - "api_token": cbrain_api_token, - "user_id": cbrain_user_id, + all_credentials[session_name] = { + "cbrain_url": server, + "api_token": token, + "user_id": data.get("user_id"), + "username": username, "timestamp": datetime.datetime.now().isoformat(), } + save_credentials(all_credentials) - # Save credentials to file. - with open(CREDENTIALS_FILE, "w") as f: - json.dump(credentials, f, indent=2) - - print(f"Connection successful, API token saved in {CREDENTIALS_FILE}") - return 0 + print(f"Connection successful. Token saved in {CREDENTIALS_FILE} for session '{session_name}'.") + return 0 # MARK: Logout + def logout_session(args): """ - Logout from CBRAIN by deleting the session file. + Logout from CBRAIN. - Returns - ------- - None - A command is run via inputs from the user. + Without ``--session``: logout all active sessions. + With ``--session ``: logout only that session. """ + from cbrain_cli.cli_utils import session_name, session_specified - if not cbrain_url or not api_token: - print("Invalid credentials file. Removing local session.") - CREDENTIALS_FILE.unlink() - return 0 - - # Prepare logout request. - logout_endpoint = f"{cbrain_url}/session" + # Load a fresh, unstripped copy from disk so _active_session is preserved. + all_creds = load_credentials() + sessions = get_sessions(all_creds) - # Create headers with authorization. - headers = auth_headers(api_token) + sessions_to_logout = list(sessions) if not session_specified else [session_name] - # Create the DELETE request. - request = urllib.request.Request( - logout_endpoint, - data=None, # No payload for DELETE - headers=headers, - method="DELETE", - ) + if not sessions_to_logout: + print("No active sessions to logout.") + return 0 - # Make the request to logout from server. - try: - with urllib.request.urlopen(request) as response: - if response.status == 200: - print("Successfully logged out from CBRAIN server.") - else: - print("Logout failed") - except urllib.error.HTTPError as e: - if e.code == 401: - print("Session already expired on server.") - else: - print(f"Logout request failed: HTTP {e.code}") - except urllib.error.URLError as e: - print(f"Network error during logout: {e}") - - # Always remove local credentials file. - CREDENTIALS_FILE.unlink() - print(f"Local session removed from {CREDENTIALS_FILE}") + for s_name in sessions_to_logout: + creds = sessions.get(s_name, {}) + s_url, s_token = creds.get("cbrain_url"), creds.get("api_token") + + if not s_url or not s_token: + if s_name in sessions: + print(f"Invalid credentials for session '{s_name}'. Removing local session.") + all_creds.pop(s_name, None) + elif session_specified: + print(f"Not logged in to session '{s_name}'.") + elif len(sessions_to_logout) == 1: + print("Not logged in. Use 'cbrain login' to login first.") + continue + + # Use the stored username for the logout message (no extra network call needed). + display_name = creds.get("username", s_name) + + try: + req = urllib.request.Request( + f"{s_url}/session", headers=auth_headers(s_token), method="DELETE" + ) + with urllib.request.urlopen(req) as resp: + if resp.status == 200: + print(f"Successfully logged out from CBRAIN server as {display_name}.") + else: + print(f"Logout failed for session '{s_name}'.") + except urllib.error.HTTPError as e: + print( + f"Session '{s_name}' already expired on server." + if e.code == 401 + else f"Logout request failed for '{s_name}': HTTP {e.code}" + ) + except urllib.error.URLError as e: + print(f"Network error during logout for '{s_name}': {e}") + + all_creds.pop(s_name, None) + print(f"Local session '{s_name}' removed from {CREDENTIALS_FILE}.") + + with open(CREDENTIALS_FILE, "w") as f: + json.dump(all_creds, f, indent=2) return 0 + diff --git a/cbrain_cli/users.py b/cbrain_cli/users.py index d8bcaaf..b2d67f2 100644 --- a/cbrain_cli/users.py +++ b/cbrain_cli/users.py @@ -79,7 +79,7 @@ def whoami_user(args): else "****" ) - print(f"DEBUG: Found credentials {CREDENTIALS_FILE}") + print(f"DEBUG: Found credentials at {CREDENTIALS_FILE}") print(f"DEBUG: User in credentials: {user_data['login']} on server {cbrain_url}") print(f"DEBUG: Token found: {masked_token}") print("DEBUG: Verifying token...") @@ -121,3 +121,4 @@ def whoami_user(args): return 1 print(f"Current user: {user_data['login']} ({user_data['full_name']}) on server {cbrain_url}") + return 0