Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions agrirouter/onboarding/dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,45 @@ def __repr__(self):


class ErrorResponse:
CODE = "code"
MESSAGE = "message"
TARGET = "target"
DETAILS = "details"

def __init__(self,
*,
code,
message,
target,
details
code: str = None,
message: str = None,
target: str = None,
details: str = None
):
self.code = code
self.message = message
self.target = target
self.details = details

def json_serialize(self) -> dict:
return {
self.CODE: self.code,
self.MESSAGE: self.message,
self.TARGET: self.target,
self.DETAILS: self.details
}

def json_deserialize(self, data: Union[str, dict]) -> None:
data = data if type(data) == dict else json.loads(data)
for key, value in data.items():
if key == self.CODE:
self.code = value
elif key == self.MESSAGE:
self.message = value
elif key == self.TARGET:
self.target = value
elif key == self.DETAILS:
self.details = value
else:
raise WrongFieldError(f"Unknown field {key} for ErrorResponse class")

def get_code(self) -> str:
return self.code

Expand Down
9 changes: 9 additions & 0 deletions agrirouter/onboarding/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class SoftwareOnboardingResponse(BaseOnboardingResonse):
SENSOR_ALTERNATE_ID = "sensorAlternateId"
CONNECTION_CRITERIA = "connectionCriteria"
AUTHENTICATION = "authentication"
ERROR = "error"

def __init__(self, http_response: Response = None):
if http_response:
Expand Down Expand Up @@ -130,6 +131,10 @@ def set_capability_alternate_id(self, capability_alternate_id: str):
self.capability_alternate_id = capability_alternate_id

def json_serialize(self):
if self.error:
return {
self.ERROR: self.error
}
return {
self.DEVICE_ALTERNATE_ID: self.device_alternate_id,
self.CAPABILITY_ALTERNATE_ID: self.capability_alternate_id,
Expand All @@ -155,6 +160,10 @@ def json_deserialize(self, data: Union[dict, str]):
authentication = Authentication()
authentication.json_deserialize(value)
self.authentication = authentication
elif key == self.ERROR:
error_response = ErrorResponse()
error_response.json_deserialize(value)
self.error = error_response
else:
raise WrongFieldError(f"Unknown field `{key}` for {self.__class__}")

Expand Down
47 changes: 29 additions & 18 deletions tests/auth_test/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,39 +9,50 @@
ENV,
application_id,
)
from re import search
import re


class TestAuthorization:
def test_extract_auth_response(self):
def test_extract_query_params(self):
auth_client = Authorization(ENV, public_key=public_key, private_key=private_key)
assert search(
"<agrirouter.auth.response.AuthResponse",
str(auth_client.extract_auth_response(auth_result_url)),
)

auth_client = Authorization(
"Production", public_key=public_key, private_key=private_key
)
assert search(
"<agrirouter.auth.response.AuthResponse",
str(auth_client.extract_auth_response(auth_result_url)),
)
test_uri = "key1=val1&key2=val2&key3=val3"
params = auth_client._extract_query_params(test_uri)
assert params == {"key1": "val1", "key2": "val2", "key3": "val3"}

def test_get_auth_request_url(self):
auth_params = AuthUrlParameter(
application_id=application_id, response_type="onboard"
application_id="8c947a45-c57d-42d2-affc-206e21d63a50", response_type="onboard"
)
assert auth_params.state

auth_client = Authorization(
"QA", public_key=public_key, private_key=private_key
)
check_url = "https://agrirouter-qa.cfapps.eu10.hana.ondemand.com/application/"
assert search(check_url, auth_client.get_auth_request_url(auth_params))

check_url = "https://agrirouter-qa.cfapps.eu10.hana.ondemand.com/application/" \
"8c947a45-c57d-42d2-affc-206e21d63a50/authorize?response_type=onboard&"
result_url = auth_client.get_auth_request_url(auth_params)
assert check_url == result_url.split("state")[0]

def test_extract_auth_response(self):
auth_client = Authorization(ENV, public_key=public_key, private_key=private_key)
state = "3770a15d-adf3-4900-a435-464978fe8054"
token = "token"
signature = "signature"

test_uri = f"www.my_response.com/app?state={state}&token={token}&signature={signature}"
response = auth_client.extract_auth_response(test_uri)

assert response.state == state
assert response.signature == signature
assert response.token == token
assert not response.error
assert response.is_successful

def test_get_auth_result(self):
auth_client = Authorization(
"QA", public_key=public_key, private_key=private_key
)
auth_response = auth_client.extract_auth_response(auth_result_url)
auth_client.verify_auth_response(auth_response)
assert auth_response.get_auth_result()["credentials"]
assert auth_response.get_auth_result().get_decoded_token()
76 changes: 76 additions & 0 deletions tests/auth_test/test_auth_dto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""Tests agrirouter/auth/dto.py"""
from agrirouter.auth.dto import AuthorizationToken
from agrirouter.messaging.exceptions import WrongFieldError
import pytest


class TestAuthorizationToken:
def test_json_deserialize_from_valid_dict(self):
account = "account"
regcode = "regcode"
expires = "01-01-2021"
test_object = AuthorizationToken(
account=account,
)
test_object.json_deserialize({"regcode": regcode, "expires": expires})
assert test_object
assert test_object.account == account
assert test_object.regcode == regcode
assert test_object.expires == expires

test_object_1 = AuthorizationToken(
regcode=regcode,
)
test_object_1.json_deserialize({"account": account})
assert test_object_1
assert test_object_1.account == account
assert test_object_1.regcode == regcode
assert test_object_1.expires is None

test_object_2 = AuthorizationToken(
account=account,
)
test_object_2.json_deserialize({"expires": expires})
assert test_object_2
assert test_object_2.account == account
assert test_object_2.regcode is None
assert test_object_2.expires == expires

test_object_3 = AuthorizationToken(
expires=expires,
)
test_object_3.json_deserialize({"regcode": regcode})
assert test_object_3
assert test_object_3.account is None
assert test_object_3.regcode == regcode
assert test_object_3.expires == expires

def test_json_deserialize_from_invalid_dict(self):
account = "account"
regcode = "regcode"
expires = "01-01-2021"
test_object = AuthorizationToken()

with pytest.raises(WrongFieldError):
test_object.json_deserialize({"regcode": regcode, "expires": expires, "wrong_key": account})

def test_json_deserialize_from_valid_json(self):
account = "account"
regcode = "regcode"
expires = "01-01-2021"

json_data = '{"account": "account", "regcode": "regcode", "expires": "01-01-2021"}'

test_object = AuthorizationToken()
test_object.json_deserialize(json_data)
assert test_object
assert test_object.account == account
assert test_object.regcode == regcode
assert test_object.expires == expires

def test_json_deserialize_from_invalid_json(self):
json_data = '{"account": "account", "regcode": "regcode", "wrong_key": "01-01-2021"}'
test_object = AuthorizationToken()

with pytest.raises(WrongFieldError):
assert test_object.json_deserialize(json_data)
137 changes: 0 additions & 137 deletions tests/auth_test/test_dto.py

This file was deleted.

Loading