diff --git a/mcp-servers/python/mcp-rss-search/src/mcp_rss_search/__init__.py b/mcp-servers/python/mcp-rss-search/src/mcp_rss_search/__init__.py index 889f4b21e..ff996880e 100644 --- a/mcp-servers/python/mcp-rss-search/src/mcp_rss_search/__init__.py +++ b/mcp-servers/python/mcp-rss-search/src/mcp_rss_search/__init__.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python3 # -*- coding: utf-8 -*- """MCP RSS Search Server - Advanced RSS feed parsing, searching, and analysis.""" diff --git a/mcp-servers/python/mcp-rss-search/src/mcp_rss_search/server_fastmcp.py b/mcp-servers/python/mcp-rss-search/src/mcp_rss_search/server_fastmcp.py old mode 100644 new mode 100755 diff --git a/mcp-servers/python/mcp-rss-search/tests/test_server.py b/mcp-servers/python/mcp-rss-search/tests/test_server.py index 82337b59e..64af423ef 100644 --- a/mcp-servers/python/mcp-rss-search/tests/test_server.py +++ b/mcp-servers/python/mcp-rss-search/tests/test_server.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Tests for MCP RSS Search Server.""" diff --git a/mcpgateway/admin.py b/mcpgateway/admin.py index 99988c9aa..ceb4d853e 100644 --- a/mcpgateway/admin.py +++ b/mcpgateway/admin.py @@ -2608,53 +2608,56 @@ async def admin_login_handler(request: Request, db: Session = Depends(get_db)) - root_path = request.scope.get("root_path", "") return RedirectResponse(url=f"{root_path}/admin/login?error=missing_fields", status_code=303) - # Authenticate using the email auth service # First-Party - from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel # pylint: disable=import-outside-toplevel + from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel auth_service = EmailAuthService(db) - try: - # Authenticate user - LOGGER.debug(f"Attempting authentication for {email}") - user = await auth_service.authenticate_user(email, password) - LOGGER.debug(f"Authentication result: {user}") - - if not user: - LOGGER.warning(f"Authentication failed for {email} - user is None") - root_path = request.scope.get("root_path", "") - return RedirectResponse(url=f"{root_path}/admin/login?error=invalid_credentials", status_code=303) + LOGGER.debug(f"Attempting authentication for {email}") + user = await auth_service.authenticate_user(email, password) + LOGGER.debug(f"Authentication result: {user}") - # Create JWT token with proper audience and issuer claims - # First-Party - from mcpgateway.routers.email_auth import create_access_token # pylint: disable=import-outside-toplevel + if not user: + root_path = request.scope.get("root_path", "") + return RedirectResponse(url=f"{root_path}/admin/login?error=invalid_credentials", status_code=303) - token, _ = await create_access_token(user) # expires_seconds not needed here + # First-Party + from mcpgateway.routers.email_auth import create_access_token # pylint: disable=import-outside-toplevel - # Create redirect response - root_path = request.scope.get("root_path", "") - response = RedirectResponse(url=f"{root_path}/admin", status_code=303) + token, _ = await create_access_token(user) - # Set JWT token as secure cookie - # First-Party - from mcpgateway.utils.security_cookies import set_auth_cookie # pylint: disable=import-outside-toplevel + root_path = request.scope.get("root_path", "") + response = RedirectResponse(url=f"{root_path}/admin", status_code=303) - set_auth_cookie(response, token, remember_me=False) + # First-Party + from mcpgateway.utils.security_cookies import set_auth_cookie # pylint: disable=import-outside-toplevel - LOGGER.info(f"Admin user {email} logged in successfully") - return response + set_auth_cookie(response, token, remember_me=False) + default_admin_username = settings.platform_admin_email + default_password = settings.platform_admin_password - except Exception as e: - LOGGER.warning(f"Login failed for {email}: {e}") + is_default_password = email.lower() == default_admin_username and password == default_password - if settings.secure_cookies and settings.environment == "development": - LOGGER.warning("Login failed - set SECURE_COOKIES to false in config for HTTP development") + # ✅ Set or clear the security reminder cookie + if is_default_password: + response.set_cookie( + key="pwd_is_default", + value="true", + max_age=3600 * 24, # 1 day + httponly=False, # JS needs to read it + secure=False, # set True for HTTPS environments + samesite="Lax", + ) + LOGGER.debug("Set cookie: pwd_is_default=true for admin@example.com") + else: + response.delete_cookie("pwd_is_default") + LOGGER.debug("Cleared cookie: pwd_is_default") - root_path = request.scope.get("root_path", "") - return RedirectResponse(url=f"{root_path}/admin/login?error=invalid_credentials", status_code=303) + LOGGER.info(f"Admin user {email} logged in successfully") + return response except Exception as e: - LOGGER.error(f"Login handler error: {e}") + LOGGER.exception(f"Login handler error: {e}") root_path = request.scope.get("root_path", "") return RedirectResponse(url=f"{root_path}/admin/login?error=server_error", status_code=303) @@ -4477,75 +4480,110 @@ async def admin_get_user_edit( async def admin_update_user( user_email: str, request: Request, - db: Session = Depends(get_db), + db: Optional[Session] = Depends(get_db), _user=Depends(get_current_user_with_permissions), ) -> HTMLResponse: - """Update user via admin UI. + """ + Update a user's details via the admin UI. + + This includes updating the full name, admin status, and password. It also ensures + the last admin cannot be demoted and manages a "pwd_is_default" cookie for + the platform admin if the default password is used. Args: - user_email: Email of user to update - request: FastAPI request object - db: Database session + user_email (str): Email of the user to update. + request (Request): FastAPI request object. + db (Optional[Session]): Database session. + _user: Current user with permissions (dependency). Returns: - HTMLResponse: Success message or error response + HTMLResponse: Success message or error response. """ if not settings.email_auth_enabled: - return HTMLResponse(content='
Email authentication is disabled
', status_code=403) + return HTMLResponse( + content='
Email authentication is disabled
', + status_code=403, + ) try: + # Import service locally to avoid circular imports # First-Party - from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel # pylint: disable=import-outside-toplevel + from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel auth_service = EmailAuthService(db) - - # URL decode the email - decoded_email = urllib.parse.unquote(user_email) - form = await request.form() full_name = form.get("full_name") is_admin = form.get("is_admin") == "on" password = form.get("password") confirm_password = form.get("confirm_password") - # Validate password confirmation if password is being changed + # Validate password confirmation if password and password != confirm_password: - return HTMLResponse(content='
Passwords do not match
', status_code=400) + return HTMLResponse( + content='
Passwords do not match
', + status_code=400, + ) - # Check if trying to remove admin privileges from last admin + # Prevent removing admin role from the last admin user_obj = await auth_service.get_user_by_email(decoded_email) if user_obj and user_obj.is_admin and not is_admin: - # This user is currently an admin and we're trying to remove admin privileges if await auth_service.is_last_active_admin(decoded_email): - return HTMLResponse(content='
Cannot remove administrator privileges from the last remaining admin user
', status_code=400) + return HTMLResponse( + content='
Cannot remove administrator privileges from the last remaining admin user
', + status_code=400, + ) - # Update user - fn_val = form.get("full_name") - pw_val = form.get("password") - full_name = fn_val if isinstance(fn_val, str) else None - password = pw_val if isinstance(pw_val, str) else None - await auth_service.update_user(email=decoded_email, full_name=full_name, is_admin=is_admin, password=password if password else None) + # Update user record + await auth_service.update_user( + email=decoded_email, + full_name=full_name if isinstance(full_name, str) else None, + is_admin=is_admin, + password=password if isinstance(password, str) and password else None, + ) - # Return success message with auto-close and refresh + # Create success HTML response success_html = """

User updated successfully

""" - return HTMLResponse(content=success_html) + response = HTMLResponse(content=success_html) + + # Manage "pwd_is_default" cookie for platform admin + default_admin_username = settings.platform_admin_email + + if decoded_email.lower() == default_admin_username.lower(): + if password == "changeme": # nosec B105 + response.set_cookie( + key="pwd_is_default", + value="true", + max_age=3600 * 24, # 1 day + httponly=False, # allow JS read + secure=False, # True for HTTPS + samesite="Lax", + ) + LOGGER.debug("Set cookie: pwd_is_default=true (default password restored)") + elif password: # password updated to something else + response.delete_cookie("pwd_is_default") + LOGGER.debug("Cleared cookie: pwd_is_default (non-default password set)") + + LOGGER.info(f"User {decoded_email} updated successfully") + return response except Exception as e: LOGGER.error(f"Error updating user {user_email}: {e}") - return HTMLResponse(content=f'
Error updating user: {str(e)}
', status_code=400) + return HTMLResponse( + content=f'
Error updating user: {str(e)}
', + status_code=400, + ) @admin_router.post("/users/{user_email}/activate") diff --git a/mcpgateway/plugins/framework/external/mcp/tls_utils.py b/mcpgateway/plugins/framework/external/mcp/tls_utils.py index 91b04cfb0..9cd125e45 100644 --- a/mcpgateway/plugins/framework/external/mcp/tls_utils.py +++ b/mcpgateway/plugins/framework/external/mcp/tls_utils.py @@ -87,6 +87,7 @@ def create_ssl_context(tls_config: MCPClientTLSConfig, plugin_name: str) -> ssl. logger.warning(f"Certificate verification disabled for plugin '{plugin_name}'. This is not recommended for production use.") ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE # noqa: DUO122 + else: # Enable strict certificate verification (production mode) # Load CA certificate bundle for server certificate validation diff --git a/mcpgateway/templates/admin.html b/mcpgateway/templates/admin.html index 4e3785831..86cb3f2c6 100644 --- a/mcpgateway/templates/admin.html +++ b/mcpgateway/templates/admin.html @@ -63,6 +63,10 @@ rel="stylesheet" /> +