Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 49 additions & 36 deletions descope/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(
raise AuthException(
400,
ERROR_TYPE_INVALID_ARGUMENT,
"Unable to init AuthHelper object because project_id cannot be empty. Set environment variable DESCOPE_PROJECT_ID or pass your Project ID to the init function.",
"Unable to init Auth object because project_id cannot be empty. Set environment variable DESCOPE_PROJECT_ID or pass your Project ID to the init function.",
)
self.project_id = project_id

Expand Down Expand Up @@ -220,8 +220,7 @@ def refresh_token(self, refresh_token: str) -> dict:
response = self.do_get(uri, None, None, refresh_token)

resp = response.json()
auth_info = self._generate_auth_info(resp, refresh_token)
return auth_info
return self._generate_auth_info(resp, refresh_token)

@staticmethod
def _compose_exchange_params(code: str) -> dict:
Expand Down Expand Up @@ -315,19 +314,14 @@ def _generate_auth_info(self, response_body: dict, refresh_token: str) -> dict:
jwt_response = {}
st_jwt = response_body.get("sessionJwt", "")
if st_jwt:
jwt_response[SESSION_TOKEN_NAME] = self._validate_and_load_tokens(
st_jwt, None
)
jwt_response[SESSION_TOKEN_NAME] = self._validate_token(st_jwt)
rt_jwt = response_body.get("refreshJwt", "")
if rt_jwt:
jwt_response[REFRESH_SESSION_TOKEN_NAME] = self._validate_and_load_tokens(
rt_jwt, None
)

if refresh_token:
jwt_response[REFRESH_SESSION_TOKEN_NAME] = self._validate_and_load_tokens(
refresh_token, None
jwt_response[REFRESH_SESSION_TOKEN_NAME] = self._validate_token(
refresh_token
)
elif rt_jwt:
jwt_response[REFRESH_SESSION_TOKEN_NAME] = self._validate_token(rt_jwt)

jwt_response[COOKIE_DATA_NAME] = {
"exp": response_body.get("cookieExpiration", 0),
Expand Down Expand Up @@ -365,16 +359,16 @@ def _get_default_headers(self, pswd: str = None):
headers["Authorization"] = f"Bearer {bearer}"
return headers

def _validate_and_load_tokens(self, session_token: str, refresh_token: str) -> dict:
if not session_token:
# Validate a token and load the public key if needed
def _validate_token(self, token: str) -> dict:
if not token:
raise AuthException(
500,
ERROR_TYPE_INVALID_TOKEN,
f"empty signed token: {session_token}",
"Token validation received empty token",
)

try:
unverified_header = jwt.get_unverified_header(session_token)
unverified_header = jwt.get_unverified_header(token)
except Exception as e:
raise AuthException(
500,
Expand Down Expand Up @@ -416,33 +410,52 @@ def _validate_and_load_tokens(self, session_token: str, refresh_token: str) -> d
ERROR_TYPE_INVALID_PUBLIC_KEY,
"Algorithm signature in JWT header does not match the algorithm signature in the public key",
)
claims = jwt.decode(jwt=token, key=copy_key[0].key, algorithms=[alg_header])
claims["jwt"] = token
return claims

try:
claims = jwt.decode(
jwt=session_token, key=copy_key[0].key, algorithms=[alg_header]
def _validate_and_load_tokens(self, session_token: str, refresh_token: str) -> dict:
if not session_token and not refresh_token:
raise AuthException(
500,
ERROR_TYPE_INVALID_TOKEN,
"Both refresh token and session token are empty",
)

claims["jwt"] = session_token
return claims

except ExpiredSignatureError:
# Session token expired, check that refresh token is valid
if session_token:
try:
jwt.decode(
jwt=refresh_token,
key=copy_key[0].key,
algorithms=[alg_header],
)
return self._validate_token(session_token)
except ExpiredSignatureError:
# Session token expired, check that refresh token is valid
if refresh_token:
try:
self._validate_token(refresh_token)
except Exception as e:
raise AuthException(
401, ERROR_TYPE_INVALID_TOKEN, f"Invalid refresh token: {e}"
)
else:
raise AuthException(
401,
ERROR_TYPE_INVALID_TOKEN,
"Session token expired and no refresh token provided",
)
# Refresh token is valid now refresh the session token
return self.refresh_token(refresh_token) # return jwt_response dict
except Exception as e:
raise AuthException(
401, ERROR_TYPE_INVALID_TOKEN, f"Invalid refresh token: {e}"
500, ERROR_TYPE_INVALID_TOKEN, f"Invalid token: {e}"
)

# Refresh token is valid now refresh the session token
return self.refresh_token(refresh_token) # return jwt_response dict

# If we got here, we did not have a session token so only do the refresh
try:
self._validate_token(refresh_token)
except Exception as e:
raise AuthException(500, ERROR_TYPE_INVALID_TOKEN, f"Invalid token: {e}")
raise AuthException(
401, ERROR_TYPE_INVALID_TOKEN, f"Invalid refresh token: {e}"
)
# Refresh token is valid now refresh the session token
return self.refresh_token(refresh_token) # return jwt_response dict

@staticmethod
def _compose_refresh_token_url() -> str:
Expand Down
1 change: 1 addition & 0 deletions descope/descope_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def validate_session_request(self, session_token: str, refresh_token: str) -> di
"""
Validate the session for a given request. If the user is authenticated but the
session has expired, the session token will automatically be refreshed.
Either the session_token or the refresh_token must be provided.
Call this function every time you make a private API call that requires an authorized
user.

Expand Down