-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
5a0295f
commit 75bb9ea
Showing
6 changed files
with
133 additions
and
31 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
from indykite_sdk.indykite.identity.v1beta2 import identity_management_api_pb2 as pb2 | ||
from indykite_sdk.model.token_info import TokenInfo | ||
from indykite_sdk.model.session_introspect import SessionIntrospect | ||
import sys | ||
import indykite_sdk.utils.logger as logger | ||
|
||
|
||
def token_introspect(self, user_token): | ||
sys.excepthook = logger.handle_excepthook | ||
try: | ||
response = self.stub.TokenIntrospect(pb2.TokenIntrospectRequest(token=user_token)) | ||
except Exception as exception: | ||
return logger.logger_error(exception) | ||
|
||
if not response or not response.active: | ||
return None | ||
|
||
return TokenInfo.deserialize(response.token_info) | ||
|
||
|
||
def session_introspect(self, tenant_id, access_token): | ||
sys.excepthook = logger.handle_excepthook | ||
try: | ||
response = self.stub.SessionIntrospect(pb2.SessionIntrospectRequest(tenant_id=tenant_id, token=access_token)) | ||
if not response : | ||
return None | ||
return SessionIntrospect.deserialize(response) | ||
except Exception as exception: | ||
return logger.logger_error(exception) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
from indykite_sdk.model.token_info import TokenInfo | ||
|
||
|
||
class SessionIntrospect: | ||
@classmethod | ||
def deserialize(cls, message): | ||
if message is None: | ||
return None | ||
|
||
fields = [desc.name for desc, val in message.ListFields()] | ||
session_introspect = SessionIntrospect( | ||
bool(message.active) | ||
) | ||
|
||
if "token_info" in fields: | ||
session_introspect.token_info = TokenInfo.deserialize(message.token_info) | ||
|
||
if "provider_data" in fields: | ||
provider_data = [] | ||
for e in message.provider_data: | ||
provider_data.append(str(e)) | ||
session_introspect.provider_data = provider_data | ||
|
||
return session_introspect | ||
|
||
def __init__(self, active, token_info=None, provider_data=None): | ||
self.active = active | ||
self.token_info = token_info | ||
self.provider_data = provider_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,43 +1,90 @@ | ||
from indykite_sdk.identity import IdentityClient | ||
from indykite_sdk.indykite.identity.v1beta2 import identity_management_api_pb2 as pb2 | ||
from helpers import data | ||
|
||
|
||
def test_introspect_token_short_token(capsys): | ||
def test_token_introspect_short_token(capsys): | ||
token = "invalid_token" | ||
|
||
client = IdentityClient() | ||
assert client is not None | ||
|
||
response = client.introspect_token(token) | ||
response = client.token_introspect(token) | ||
captured = capsys.readouterr() | ||
|
||
assert response is None | ||
|
||
|
||
def test_introspect_token_error(registration): | ||
def test_token_introspect_error(registration,capsys): | ||
token = registration[0] | ||
client = IdentityClient() | ||
assert client is not None | ||
|
||
def mocked_introspect_token_error(request: pb2.TokenIntrospectRequest): | ||
def mocked_token_introspect_error(request: pb2.TokenIntrospectRequest): | ||
raise Exception("something went wrong") | ||
|
||
client.stub.TokenIntrospect = mocked_introspect_token_error | ||
response = client.introspect_token(token) | ||
client.stub.TokenIntrospect = mocked_token_introspect_error | ||
response = client.token_introspect(token) | ||
captured = capsys.readouterr() | ||
|
||
response is None | ||
assert "something went wrong" in captured.err | ||
|
||
|
||
def test_introspec_token_success(registration): | ||
def test_token_introspect_success(registration): | ||
token = registration[0] | ||
client = IdentityClient() | ||
assert client is not None | ||
|
||
def mocked_introspect_token(request: pb2.TokenIntrospectRequest): | ||
def mocked_token_introspect(request: pb2.TokenIntrospectRequest): | ||
assert request.access_token == token | ||
return pb2.TokenIntrospectResponse() | ||
|
||
client.stub.TokenIntrospect = mocked_introspect_token | ||
response = client.introspect_token(token) | ||
client.stub.TokenIntrospect = mocked_token_introspect | ||
response = client.token_introspect(token) | ||
|
||
assert response is not None | ||
|
||
|
||
def test_session_introspect_short_token(capsys): | ||
token = "invalid_token" | ||
tenant_id = data.get_tenant() | ||
|
||
client = IdentityClient() | ||
assert client is not None | ||
|
||
response = client.session_introspect(tenant_id, token) | ||
captured = capsys.readouterr() | ||
|
||
assert "method SessionIntrospect not implemented" in captured.err | ||
|
||
|
||
def test_session_introspect_error(registration, capsys): | ||
token = registration[0] | ||
tenant_id = data.get_tenant() | ||
client = IdentityClient() | ||
assert client is not None | ||
|
||
def mocked_session_introspect_error(request: pb2.SessionIntrospectRequest): | ||
raise Exception("something went wrong") | ||
|
||
client.stub.SessionIntrospect = mocked_session_introspect_error | ||
response = client.session_introspect(tenant_id, token) | ||
captured = capsys.readouterr() | ||
assert "something went wrong" in captured.err | ||
|
||
|
||
def test_session_introspect_success(registration): | ||
token = registration[0] | ||
tenant_id = data.get_tenant() | ||
client = IdentityClient() | ||
assert client is not None | ||
|
||
def mocked_session_introspect(request: pb2.SessionIntrospectRequest): | ||
assert request.tenant_id == tenant_id | ||
assert request.access_token == token | ||
return pb2.SessionIntrospectResponse() | ||
|
||
client.stub.SessionIntrospect = mocked_session_introspect | ||
response = client.session_introspect(tenant_id, token) | ||
|
||
response is not None | ||
assert response is not None |