Skip to content

Commit

Permalink
feat: add authorization
Browse files Browse the repository at this point in the history
  • Loading branch information
cowan-macady committed Jan 20, 2023
1 parent 15544e2 commit 2d922d7
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 7 deletions.
27 changes: 27 additions & 0 deletions indykite_sdk/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1641,6 +1641,19 @@ def main():
print("Invalid import digital twins response")
return import_digital_twins_config_response

elif command == "is_authorized_dt":
digital_twin_id = args.digital_twin_id
tenant_id = args.tenant_id
resources = [IsAuthorizedResource("lotA", "ParkingLot"), IsAuthorizedResource("lotB", "ParkingLot")]
actions = ["HAS_FREE_PARKING"]
is_authorized = client_authorization.is_authorized_digital_twin(digital_twin_id, tenant_id, resources, actions)

if is_authorized:
print_response(is_authorized)
else:
print("Invalid is_authorized")
return is_authorized

elif command == "is_authorized_token":
access_token = args.access_token
actions = ["HAS_FREE_PARKING"]
Expand All @@ -1652,6 +1665,20 @@ def main():
print("Invalid is_authorized")
return is_authorized

elif command == "is_authorized_property":
type_filter = "email"
email_value = args.email
tenant_id = args.tenant_id
resources = [IsAuthorizedResource("lotA", "ParkingLot"), IsAuthorizedResource("lotB", "ParkingLot")]
actions = ["HAS_FREE_PARKING"]
is_authorized = client_authorization.is_authorized_property_filter(type_filter, email_value, tenant_id,
resources=resources, actions=actions)
if is_authorized:
print_response(is_authorized)
else:
print("Invalid is_authorized")
return is_authorized


def print_verify_info(digital_twin_info): # pragma: no cover
print("Digital twin info")
Expand Down
4 changes: 2 additions & 2 deletions indykite_sdk/authorization/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os

from indykite_sdk.authorization import helper
from indykite_sdk.indykite.identity.v1beta1 import identity_management_api_pb2_grpc as pb2_grpc
from indykite_sdk.indykite.identity.v1beta2 import identity_management_api_pb2_grpc as pb2_grpc


class AuthorizationClient(object):
Expand Down Expand Up @@ -49,4 +49,4 @@ def __init__(self, local=False):
self.stub = pb2_grpc.IdentityManagementAPIStub(channel=self.channel)

# Imported methods
from .is_authorized import is_authorized_token
from .is_authorized import is_authorized_token, is_authorized_digital_twin, is_authorized_property_filter
51 changes: 51 additions & 0 deletions indykite_sdk/authorization/is_authorized.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,31 @@
from indykite_sdk.indykite.authorization.v1beta1 import authorization_service_pb2 as pb2
from indykite_sdk.indykite.identity.v1beta2 import identity_management_api_pb2 as pb2_ident
from indykite_sdk.indykite.identity.v1beta2 import model_pb2 as model
from indykite_sdk.indykite.objects.v1beta1 import struct_pb2 as pb2_struct


def is_authorized_digital_twin(self, digital_twin_id, tenant_id, resources=[], actions=[]):
try:
response = self.stub.IsAuthorized(
pb2.IsAuthorizedRequest(
digital_twin_identifier=pb2_ident.DigitalTwinIdentifier(
digital_twin=model.DigitalTwin(
id=str(digital_twin_id),
tenant_id=str(tenant_id)
)
),
resources=request_resource(resources),
actions=actions
)
)
except Exception as exception:
print(exception)
return None

if not response:
return None

return response


def is_authorized_token(self, access_token, resources=[], actions=[]):
Expand All @@ -23,6 +49,31 @@ def is_authorized_token(self, access_token, resources=[], actions=[]):
return response


def is_authorized_property_filter(self, type_filter, value, tenant_id, resources=[], actions=[]):
try:
response = self.stub.IsAuthorized(
pb2.IsAuthorizedRequest(
digital_twin_identifier=pb2_ident.DigitalTwinIdentifier(
property_filter=pb2_ident.PropertyFilter(
type=str(type_filter),
value=pb2_struct.Value(string_value=value),
tenant_id=str(tenant_id)
)
),
resources=request_resource(resources),
actions=actions
)
)
except Exception as exception:
print(exception)
return None

if not response:
return None

return response


def request_resource(resources):
res = []
for r in resources:
Expand Down
179 changes: 174 additions & 5 deletions tests/test_is_authorized.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import json
import time
from datetime import datetime

from indykite_sdk.authorization import AuthorizationClient
from indykite_sdk.indykite.authorization.v1beta1 import authorization_service_pb2 as pb2
from indykite_sdk.indykite.identity.v1beta2 import identity_management_api_pb2 as pb2_ident
from google.protobuf.json_format import MessageToDict
from indykite_sdk.model.is_authorized import IsAuthorizedResource
from indykite_sdk.indykite.identity.v1beta2 import model_pb2 as model
from indykite_sdk.indykite.objects.v1beta1 import struct_pb2 as pb2_struct
from helpers import data


Expand Down Expand Up @@ -42,3 +39,175 @@ def mocked_is_authorized(request: pb2.IsAuthorizedRequest):
client.stub.IsAuthorized = mocked_is_authorized
response = client.is_authorized_token(access_token, resources, actions)
assert response is not None


def test_is_authorized_token_empty():
client = AuthorizationClient()
assert client is not None

access_token = data.get_verification_bearer()
actions = ["HAS_FREE_PARKING"]
resources = [IsAuthorizedResource("lotA", "ParkingLot"), IsAuthorizedResource("lotB", "ParkingLot")]
res = []
for r in resources:
res.append(pb2.IsAuthorizedRequest.Resource(id=r.id, label=r.label))
digital_twin_identifier = pb2_ident.DigitalTwinIdentifier(
access_token=str(access_token)
)

def mocked_is_authorized(request: pb2.IsAuthorizedRequest):
assert request.digital_twin_identifier == digital_twin_identifier
return None
client.stub.IsAuthorized = mocked_is_authorized
response = client.is_authorized_token(access_token, resources, actions)
assert response is None


def test_is_authorized_dt_wrong_dt():
client = AuthorizationClient()
assert client is not None

digital_twin_id = data.get_tenant_email()
tenant_id = data.get_tenant()
actions = ["HAS_FREE_PARKING"]
resources = [IsAuthorizedResource("lotA", "ParkingLot"), IsAuthorizedResource("lotB", "ParkingLot")]
response = client.is_authorized_digital_twin(digital_twin_id, tenant_id, resources, actions)
assert response is None


def test_is_authorized_dt_wrong_resources():
client = AuthorizationClient()
assert client is not None

digital_twin_id = data.get_digital_twin()
tenant_id = data.get_tenant()
actions = ["HAS_FREE_PARKING"]
resources = [{"lotA", "ParkingLot"}]
response = client.is_authorized_digital_twin(digital_twin_id, tenant_id, resources, actions)
assert response is None


def test_is_authorized_dt_success():
client = AuthorizationClient()
assert client is not None

digital_twin_id = data.get_digital_twin()
tenant_id = data.get_tenant()
actions = ["HAS_FREE_PARKING"]
resources = [IsAuthorizedResource("lotA", "ParkingLot"), IsAuthorizedResource("lotB", "ParkingLot")]
digital_twin_identifier = pb2_ident.DigitalTwinIdentifier(
digital_twin=model.DigitalTwin(
id=str(digital_twin_id),
tenant_id=str(tenant_id)
)
)

def mocked_is_authorized(request: pb2.IsAuthorizedRequest):
assert request.digital_twin_identifier == digital_twin_identifier
return pb2.IsAuthorizedResponse()

client.stub.IsAuthorized = mocked_is_authorized
response = client.is_authorized_digital_twin(digital_twin_id, tenant_id, resources, actions)
assert response is not None


def test_is_authorized_dt_empty():
client = AuthorizationClient()
assert client is not None

digital_twin_id = data.get_digital_twin()
tenant_id = data.get_tenant()
actions = ["HAS_FREE_PARKING"]
resources = [IsAuthorizedResource("lotA", "ParkingLot"), IsAuthorizedResource("lotB", "ParkingLot")]
digital_twin_identifier = pb2_ident.DigitalTwinIdentifier(
digital_twin=model.DigitalTwin(
id=str(digital_twin_id),
tenant_id=str(tenant_id)
)
)

def mocked_is_authorized(request: pb2.IsAuthorizedRequest):
assert request.digital_twin_identifier == digital_twin_identifier
return None

client.stub.IsAuthorized = mocked_is_authorized
response = client.is_authorized_digital_twin(digital_twin_id, tenant_id, resources, actions)
assert response is None


def test_is_authorized_property_wrong_property():
client = AuthorizationClient()
assert client is not None

type_filter = "email"
email_value = "sdk@indykite.com"
tenant_id = data.get_tenant()
actions = ["HAS_FREE_PARKING"]
resources = [IsAuthorizedResource("lotA", "ParkingLot"), IsAuthorizedResource("lotB", "ParkingLot")]
response = client.is_authorized_property_filter(type_filter, email_value, tenant_id, resources, actions)
assert response is None


def test_is_authorized_property_wrong_resources():
client = AuthorizationClient()
assert client is not None

type_filter = "email"
email_value = "sdk@indykite.com"
tenant_id = data.get_tenant()
actions = ["HAS_FREE_PARKING"]
resources = [{"lotA", "ParkingLot"}]
response = client.is_authorized_property_filter(type_filter, email_value, tenant_id, resources, actions)
assert response is None


def test_is_authorized_property_success():
client = AuthorizationClient()
assert client is not None

type_filter = "email"
email_value = "sdk@indykite.com"
tenant_id = data.get_tenant()
actions = ["HAS_FREE_PARKING"]
resources = [IsAuthorizedResource("lotA", "ParkingLot"), IsAuthorizedResource("lotB", "ParkingLot")]
digital_twin_identifier = pb2_ident.DigitalTwinIdentifier(
property_filter=pb2_ident.PropertyFilter(
type=str(type_filter),
value=pb2_struct.Value(string_value=email_value),
tenant_id=str(tenant_id)
)
)

def mocked_is_authorized(request: pb2.IsAuthorizedRequest):
assert request.digital_twin_identifier == digital_twin_identifier
return pb2.IsAuthorizedResponse()

client.stub.IsAuthorized = mocked_is_authorized
response = client.is_authorized_property_filter(type_filter, email_value, tenant_id, resources, actions)
assert response is not None


def test_is_authorized_property_empty():
client = AuthorizationClient()
assert client is not None

type_filter = "email"
email_value = "sdk@indykite.com"
tenant_id = data.get_tenant()
actions = ["HAS_FREE_PARKING"]
resources = [IsAuthorizedResource("lotA", "ParkingLot"), IsAuthorizedResource("lotB", "ParkingLot")]
digital_twin_identifier = pb2_ident.DigitalTwinIdentifier(
property_filter=pb2_ident.PropertyFilter(
type=str(type_filter),
value=pb2_struct.Value(string_value=email_value),
tenant_id=str(tenant_id)
)
)

def mocked_is_authorized(request: pb2.IsAuthorizedRequest):
assert request.digital_twin_identifier == digital_twin_identifier
return None

client.stub.IsAuthorized = mocked_is_authorized
response = client.is_authorized_property_filter(type_filter, email_value, tenant_id, resources, actions)
assert response is None

0 comments on commit 2d922d7

Please sign in to comment.