diff --git a/descope/auth.py b/descope/auth.py index 04c715259..eef5e1add 100644 --- a/descope/auth.py +++ b/descope/auth.py @@ -46,7 +46,7 @@ def __init__( raise AuthException( 400, ERROR_TYPE_INVALID_ARGUMENT, - "Unable to init AuthHelper object because project_id cannot be empty. Set environment variable DESCOPE_PROJECT_ID or pass your Project ID to the init function.", + "Unable to init Auth object because project_id cannot be empty. Set environment variable DESCOPE_PROJECT_ID or pass your Project ID to the init function.", ) self.project_id = project_id @@ -220,8 +220,7 @@ def refresh_token(self, refresh_token: str) -> dict: response = self.do_get(uri, None, None, refresh_token) resp = response.json() - auth_info = self._generate_auth_info(resp, refresh_token) - return auth_info + return self._generate_auth_info(resp, refresh_token) @staticmethod def _compose_exchange_params(code: str) -> dict: @@ -315,19 +314,14 @@ def _generate_auth_info(self, response_body: dict, refresh_token: str) -> dict: jwt_response = {} st_jwt = response_body.get("sessionJwt", "") if st_jwt: - jwt_response[SESSION_TOKEN_NAME] = self._validate_and_load_tokens( - st_jwt, None - ) + jwt_response[SESSION_TOKEN_NAME] = self._validate_token(st_jwt) rt_jwt = response_body.get("refreshJwt", "") - if rt_jwt: - jwt_response[REFRESH_SESSION_TOKEN_NAME] = self._validate_and_load_tokens( - rt_jwt, None - ) - if refresh_token: - jwt_response[REFRESH_SESSION_TOKEN_NAME] = self._validate_and_load_tokens( - refresh_token, None + jwt_response[REFRESH_SESSION_TOKEN_NAME] = self._validate_token( + refresh_token ) + elif rt_jwt: + jwt_response[REFRESH_SESSION_TOKEN_NAME] = self._validate_token(rt_jwt) jwt_response[COOKIE_DATA_NAME] = { "exp": response_body.get("cookieExpiration", 0), @@ -365,16 +359,16 @@ def _get_default_headers(self, pswd: str = None): headers["Authorization"] = f"Bearer {bearer}" return headers - def _validate_and_load_tokens(self, session_token: str, refresh_token: str) -> dict: - if not session_token: + # Validate a token and load the public key if needed + def _validate_token(self, token: str) -> dict: + if not token: raise AuthException( 500, ERROR_TYPE_INVALID_TOKEN, - f"empty signed token: {session_token}", + "Token validation received empty token", ) - try: - unverified_header = jwt.get_unverified_header(session_token) + unverified_header = jwt.get_unverified_header(token) except Exception as e: raise AuthException( 500, @@ -416,33 +410,52 @@ def _validate_and_load_tokens(self, session_token: str, refresh_token: str) -> d ERROR_TYPE_INVALID_PUBLIC_KEY, "Algorithm signature in JWT header does not match the algorithm signature in the public key", ) + claims = jwt.decode(jwt=token, key=copy_key[0].key, algorithms=[alg_header]) + claims["jwt"] = token + return claims - try: - claims = jwt.decode( - jwt=session_token, key=copy_key[0].key, algorithms=[alg_header] + def _validate_and_load_tokens(self, session_token: str, refresh_token: str) -> dict: + if not session_token and not refresh_token: + raise AuthException( + 500, + ERROR_TYPE_INVALID_TOKEN, + "Both refresh token and session token are empty", ) - claims["jwt"] = session_token - return claims - - except ExpiredSignatureError: - # Session token expired, check that refresh token is valid + if session_token: try: - jwt.decode( - jwt=refresh_token, - key=copy_key[0].key, - algorithms=[alg_header], - ) + return self._validate_token(session_token) + except ExpiredSignatureError: + # Session token expired, check that refresh token is valid + if refresh_token: + try: + self._validate_token(refresh_token) + except Exception as e: + raise AuthException( + 401, ERROR_TYPE_INVALID_TOKEN, f"Invalid refresh token: {e}" + ) + else: + raise AuthException( + 401, + ERROR_TYPE_INVALID_TOKEN, + "Session token expired and no refresh token provided", + ) + # Refresh token is valid now refresh the session token + return self.refresh_token(refresh_token) # return jwt_response dict except Exception as e: raise AuthException( - 401, ERROR_TYPE_INVALID_TOKEN, f"Invalid refresh token: {e}" + 500, ERROR_TYPE_INVALID_TOKEN, f"Invalid token: {e}" ) - # Refresh token is valid now refresh the session token - return self.refresh_token(refresh_token) # return jwt_response dict - + # If we got here, we did not have a session token so only do the refresh + try: + self._validate_token(refresh_token) except Exception as e: - raise AuthException(500, ERROR_TYPE_INVALID_TOKEN, f"Invalid token: {e}") + raise AuthException( + 401, ERROR_TYPE_INVALID_TOKEN, f"Invalid refresh token: {e}" + ) + # Refresh token is valid now refresh the session token + return self.refresh_token(refresh_token) # return jwt_response dict @staticmethod def _compose_refresh_token_url() -> str: diff --git a/descope/descope_client.py b/descope/descope_client.py index 384e7c7d3..e66aeeaea 100644 --- a/descope/descope_client.py +++ b/descope/descope_client.py @@ -57,6 +57,7 @@ def validate_session_request(self, session_token: str, refresh_token: str) -> di """ Validate the session for a given request. If the user is authenticated but the session has expired, the session token will automatically be refreshed. + Either the session_token or the refresh_token must be provided. Call this function every time you make a private API call that requires an authorized user.