-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
fc3aad3
commit 15544e2
Showing
7 changed files
with
234 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
import certifi | ||
import grpc | ||
import os | ||
|
||
from indykite_sdk.authorization import helper | ||
from indykite_sdk.indykite.identity.v1beta1 import identity_management_api_pb2_grpc as pb2_grpc | ||
|
||
|
||
class AuthorizationClient(object): | ||
|
||
def __init__(self, local=False): | ||
cred = os.getenv('INDYKITE_APPLICATION_CREDENTIALS') | ||
|
||
# Load the config from File (secondary) | ||
if (cred is False) or (cred is None): | ||
cred = os.getenv('INDYKITE_APPLICATION_CREDENTIALS_FILE') | ||
try: | ||
if (cred is False) or (cred is None): | ||
raise Exception("Missing INDYKITE_APPLICATION_CREDENTIALS or " | ||
"INDYKITE_APPLICATION_CREDENTIALS_FILE environment variable") | ||
except Exception as exception: | ||
print(exception) | ||
return None | ||
credentials = os.path.join(os.path.dirname(cred), os.path.basename(cred)) | ||
credentials = helper.load_credentials(credentials) | ||
|
||
# Load the credential json (primary) | ||
else: | ||
credentials = helper.load_json(cred) | ||
|
||
agent_token = helper.create_agent_jwt(credentials) | ||
|
||
call_credentials = grpc.access_token_call_credentials(agent_token.decode("utf-8")) | ||
|
||
if local: | ||
certificate_path = os.getenv('CAPEM') | ||
endpoint = credentials.get("local_endpoint") | ||
else: | ||
certificate_path = certifi.where() | ||
endpoint = credentials.get("endpoint") | ||
|
||
with open(certificate_path, "rb") as cert_file: | ||
channel_credentials = grpc.ssl_channel_credentials(cert_file.read()) | ||
|
||
composite_credentials = grpc.composite_channel_credentials(channel_credentials, | ||
call_credentials) | ||
|
||
self.channel = grpc.secure_channel(endpoint, composite_credentials) | ||
self.stub = pb2_grpc.IdentityManagementAPIStub(channel=self.channel) | ||
|
||
# Imported methods | ||
from .is_authorized import is_authorized_token |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
import json | ||
import time | ||
import uuid | ||
from authlib.jose import JsonWebKey, jwt | ||
from datetime import datetime, timedelta, timezone | ||
|
||
|
||
def load_credentials(path): | ||
with open(path, 'r') as file: | ||
raw_content = file.read() | ||
return json.loads(raw_content) | ||
|
||
|
||
def load_json(content): | ||
return json.loads(content) | ||
|
||
|
||
def create_agent_jwt(credentials): | ||
jwk = credentials.get('privateKeyJWK') | ||
key = JsonWebKey.import_key(jwk) | ||
message = create_jwt_message(credentials) | ||
jwt_token = jwt.encode({ | ||
'alg': 'ES256', | ||
'cty': 'JWT', | ||
'kid': jwk['kid'] | ||
}, message, key) | ||
return jwt_token | ||
|
||
|
||
def create_jwt_message(credentials): | ||
message = { | ||
'exp': get_int_from_datetime(datetime.now(timezone.utc) + timedelta(hours=24)), | ||
'iat': get_int_from_datetime(datetime.now(timezone.utc)), | ||
'iss': credentials.get('appAgentId'), | ||
'jti': str(uuid.uuid4()), | ||
'sub': credentials.get('appAgentId'), | ||
} | ||
return message | ||
|
||
|
||
def get_int_from_datetime(dt): | ||
return int(time.mktime(dt.timetuple())) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
from indykite_sdk.indykite.authorization.v1beta1 import authorization_service_pb2 as pb2 | ||
from indykite_sdk.indykite.identity.v1beta2 import identity_management_api_pb2 as pb2_ident | ||
|
||
|
||
def is_authorized_token(self, access_token, resources=[], actions=[]): | ||
try: | ||
response = self.stub.IsAuthorized( | ||
pb2.IsAuthorizedRequest( | ||
digital_twin_identifier=pb2_ident.DigitalTwinIdentifier( | ||
access_token=str(access_token) | ||
), | ||
resources=request_resource(resources), | ||
actions=actions | ||
) | ||
) | ||
except Exception as exception: | ||
print(exception) | ||
return None | ||
|
||
if not response: | ||
return None | ||
|
||
return response | ||
|
||
|
||
def request_resource(resources): | ||
res = [] | ||
for r in resources: | ||
res.append(pb2.IsAuthorizedRequest.Resource(id=r.id, label=r.label)) | ||
return res |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
from indykite_sdk.utils import timestamp_to_date | ||
from google.protobuf.json_format import MessageToDict | ||
|
||
|
||
class IsAuthorizedResponse: | ||
@classmethod | ||
def deserialize(cls, message): | ||
if message is None: | ||
return None | ||
|
||
is_authorized_response = IsAuthorizedResponse( | ||
decision_time=timestamp_to_date(message.decision_time), | ||
decisions=MessageToDict(message.decisions) | ||
) | ||
|
||
return is_authorized_response | ||
|
||
def __init__(self, decision_time, decisions): | ||
self.decision_time = decision_time, | ||
self.decisions = decisions | ||
|
||
|
||
class IsAuthorizedResource: | ||
|
||
def __init__(self, id: any, label): | ||
self.id = id | ||
self.label = label | ||
|
||
|
||
class IsAuthorizedDecisions: | ||
|
||
def __init__(self, id: any, label): | ||
self.id = id | ||
self.label = label | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
import json | ||
import time | ||
from datetime import datetime | ||
|
||
from indykite_sdk.authorization import AuthorizationClient | ||
from indykite_sdk.indykite.authorization.v1beta1 import authorization_service_pb2 as pb2 | ||
from indykite_sdk.indykite.identity.v1beta2 import identity_management_api_pb2 as pb2_ident | ||
from google.protobuf.json_format import MessageToDict | ||
from indykite_sdk.model.is_authorized import IsAuthorizedResource | ||
from helpers import data | ||
|
||
|
||
def test_is_authorized_token_wrong_token(): | ||
client = AuthorizationClient() | ||
assert client is not None | ||
|
||
access_token = data.get_expired_token() | ||
actions = ["HAS_FREE_PARKING"] | ||
resources = [IsAuthorizedResource("lotA", "ParkingLot"), IsAuthorizedResource("lotB", "ParkingLot")] | ||
response = client.is_authorized_token(access_token, resources, actions) | ||
assert response is None | ||
|
||
|
||
def test_is_authorized_token_success(): | ||
client = AuthorizationClient() | ||
assert client is not None | ||
|
||
access_token = data.get_verification_bearer() | ||
actions = ["HAS_FREE_PARKING"] | ||
resources = [IsAuthorizedResource("lotA", "ParkingLot"), IsAuthorizedResource("lotB", "ParkingLot")] | ||
res = [] | ||
for r in resources: | ||
res.append(pb2.IsAuthorizedRequest.Resource(id=r.id, label=r.label)) | ||
digital_twin_identifier = pb2_ident.DigitalTwinIdentifier( | ||
access_token=str(access_token) | ||
) | ||
|
||
def mocked_is_authorized(request: pb2.IsAuthorizedRequest): | ||
assert request.digital_twin_identifier == digital_twin_identifier | ||
return pb2.IsAuthorizedResponse() | ||
|
||
client.stub.IsAuthorized = mocked_is_authorized | ||
response = client.is_authorized_token(access_token, resources, actions) | ||
assert response is not None |