-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #138 from bunq/feature/sdk_python#136_complete_oau…
…th_psd2_implementation sdk_python#136 complete Oauth/PSD2 implementation
- Loading branch information
Showing
7 changed files
with
317 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
from typing import Dict | ||
|
||
import requests | ||
|
||
from bunq.sdk.context.api_context import ApiContext | ||
from bunq.sdk.http.api_client import ApiClient | ||
from bunq.sdk.http.bunq_response_raw import BunqResponseRaw | ||
from bunq.sdk.security import security | ||
|
||
|
||
class AnonymousApiClient(ApiClient): | ||
|
||
def __init__(self, api_context: ApiContext) -> None: | ||
super().__init__(api_context) | ||
|
||
def post(self, | ||
uri_relative: str, | ||
request_bytes: bytes, | ||
custom_headers: Dict[str, str]) -> BunqResponseRaw: | ||
return self._request( | ||
self.METHOD_POST, | ||
uri_relative, | ||
request_bytes, | ||
{}, | ||
custom_headers | ||
) | ||
|
||
def _request(self, | ||
method: str, | ||
uri_relative: str, | ||
request_bytes: bytes, | ||
params: Dict[str, str], | ||
custom_headers: Dict[str, str]) -> BunqResponseRaw: | ||
from bunq.sdk.context.bunq_context import BunqContext | ||
|
||
uri_relative_with_params = self._append_params_to_uri(uri_relative, params) | ||
if uri_relative not in self._URIS_NOT_REQUIRING_ACTIVE_SESSION: | ||
if self._api_context.ensure_session_active(): | ||
BunqContext.update_api_context(self._api_context) | ||
|
||
all_headers = self._get_all_headers( | ||
request_bytes, | ||
custom_headers | ||
) | ||
|
||
response = requests.request( | ||
method, | ||
uri_relative_with_params, | ||
data=request_bytes, | ||
headers=all_headers, | ||
proxies={self.FIELD_PROXY_HTTPS: self._api_context.proxy_url}, | ||
) | ||
|
||
self._assert_response_success(response) | ||
|
||
if self._api_context.installation_context is not None: | ||
security.validate_response( | ||
self._api_context.installation_context.public_key_server, | ||
response.status_code, | ||
response.content, | ||
response.headers | ||
) | ||
|
||
return BunqResponseRaw(response.content, response.headers) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
from typing import Dict | ||
|
||
|
||
class HttpUtil: | ||
QUERY_FORMAT = '{}={}' | ||
QUERY_DELIMITER = '&' | ||
|
||
@classmethod | ||
def create_query_string(cls, all_parameter: Dict[str, str]): | ||
encoded_parameters = [] | ||
|
||
for parameter, value in all_parameter.items(): | ||
encoded_parameters.append(cls.QUERY_FORMAT.format(parameter, value)) | ||
|
||
return cls.QUERY_DELIMITER.join(encoded_parameters) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
from __future__ import annotations | ||
|
||
from typing import Optional, Type | ||
|
||
from bunq import ApiEnvironmentType | ||
from bunq.sdk.context.bunq_context import BunqContext | ||
from bunq.sdk.exception.bunq_exception import BunqException | ||
from bunq.sdk.http.anonymous_api_client import AnonymousApiClient | ||
from bunq.sdk.http.bunq_response import BunqResponse | ||
from bunq.sdk.http.bunq_response_raw import BunqResponseRaw | ||
from bunq.sdk.http.http_util import HttpUtil | ||
from bunq.sdk.json import converter | ||
from bunq.sdk.model.core.bunq_model import BunqModel | ||
from bunq.sdk.model.core.oauth_grant_type import OauthGrantType | ||
from bunq.sdk.model.generated.endpoint import OauthClient | ||
from bunq.sdk.util.type_alias import T | ||
|
||
|
||
class OauthAccessToken(BunqModel): | ||
# Field constants. | ||
FIELD_GRANT_TYPE = "grant_type" | ||
FIELD_CODE = "code" | ||
FIELD_REDIRECT_URI = "redirect_uri" | ||
FIELD_CLIENT_ID = "client_id" | ||
FIELD_CLIENT_SECRET = "client_secret" | ||
|
||
# Token constants. | ||
TOKEN_URI_FORMAT_SANDBOX = "https://api-oauth.sandbox.bunq.com/v1/token?%s" | ||
TOKEN_URI_FORMAT_PRODUCTION = "https://api.oauth.bunq.com/v1/token?%s" | ||
|
||
# Error constants. | ||
ERROR_ENVIRONMENT_TYPE_NOT_SUPPORTED = "You are trying to use an unsupported environment type." | ||
|
||
def __init__(self, token: str, token_type: str, state: str = None) -> None: | ||
self._token = token | ||
self._token_type = token_type | ||
self._state = state | ||
|
||
@property | ||
def token(self) -> str: | ||
return self._token | ||
|
||
@property | ||
def token_type(self) -> str: | ||
return self._token_type | ||
|
||
@property | ||
def state(self) -> Optional[str]: | ||
return self._state | ||
|
||
@classmethod | ||
def create(cls, | ||
grant_type: OauthGrantType, | ||
oauth_code: str, | ||
redirect_uri: str, | ||
client: OauthClient) -> OauthAccessToken: | ||
api_client = AnonymousApiClient(BunqContext.api_context()) | ||
response_raw = api_client.post( | ||
cls.create_token_uri(grant_type.value, oauth_code, redirect_uri, client), | ||
bytearray(), | ||
{} | ||
) | ||
|
||
return cls.from_json(OauthAccessToken, response_raw).value | ||
|
||
@classmethod | ||
def create_token_uri(cls, grant_type: str, auth_code: str, redirect_uri: str, client: OauthClient) -> str: | ||
all_token_parameter = { | ||
cls.FIELD_GRANT_TYPE: grant_type, | ||
cls.FIELD_CODE: auth_code, | ||
cls.FIELD_REDIRECT_URI: redirect_uri, | ||
cls.FIELD_CLIENT_ID: client.id_, | ||
cls.FIELD_CLIENT_SECRET: client.secret, | ||
} | ||
|
||
return cls.determine_auth_uri_format().format(HttpUtil.create_query_string(all_token_parameter)) | ||
|
||
def is_all_field_none(self) -> bool: | ||
if self._token is not None: | ||
return False | ||
elif self._token_type is not None: | ||
return False | ||
elif self._state is not None: | ||
return False | ||
|
||
return True | ||
|
||
@classmethod | ||
def from_json(cls, class_of_object: Type[T], response_raw: BunqResponseRaw): | ||
response_item_object = converter.deserialize(class_of_object, response_raw) | ||
response_value = converter.json_to_class(class_of_object, response_item_object) | ||
|
||
return BunqResponse(response_value, response_raw.headers) | ||
|
||
@classmethod | ||
def determine_auth_uri_format(cls) -> str: | ||
environment_type = BunqContext.api_context().environment_type | ||
|
||
if ApiEnvironmentType.PRODUCTION == environment_type: | ||
return cls.TOKEN_URI_FORMAT_PRODUCTION | ||
|
||
if ApiEnvironmentType.SANDBOX == environment_type: | ||
return cls.TOKEN_URI_FORMAT_SANDBOX | ||
|
||
raise BunqException(cls.ERROR_ENVIRONMENT_TYPE_NOT_SUPPORTED) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
from __future__ import annotations | ||
|
||
from bunq import ApiEnvironmentType | ||
from bunq.sdk.context.bunq_context import BunqContext | ||
from bunq.sdk.exception.bunq_exception import BunqException | ||
from bunq.sdk.http.http_util import HttpUtil | ||
from bunq.sdk.model.core.bunq_model import BunqModel | ||
from bunq.sdk.model.core.oauth_response_type import OauthResponseType | ||
from bunq.sdk.model.generated.endpoint import OauthClient | ||
|
||
|
||
class OauthAuthorizationUri(BunqModel): | ||
# Auth constants. | ||
AUTH_URI_FORMAT_SANDBOX = "https://oauth.sandbox.bunq.com/auth?{}" | ||
AUTH_URI_FORMAT_PRODUCTION = "https://oauth.bunq.com/auth?{}" | ||
|
||
# Field constants | ||
FIELD_RESPONSE_TYPE = "response_type" | ||
FIELD_REDIRECT_URI = "redirect_uri" | ||
FIELD_STATE = "state" | ||
FIELD_CLIENT_ID = "client_id" | ||
|
||
# Error constants. | ||
ERROR_ENVIRONMENT_TYPE_NOT_SUPPORTED = "You are trying to use an unsupported environment type." | ||
|
||
def __init__(self, authorization_uri: str) -> None: | ||
self._authorization_uri = authorization_uri | ||
|
||
@property | ||
def authorization_uri(self) -> str: | ||
return self._authorization_uri | ||
|
||
@classmethod | ||
def create(cls, | ||
response_type: OauthResponseType, | ||
redirect_uri: str, | ||
client: OauthClient, | ||
state: str = None) -> OauthAuthorizationUri: | ||
all_request_parameter = { | ||
cls.FIELD_REDIRECT_URI: redirect_uri, | ||
cls.FIELD_RESPONSE_TYPE: response_type.name.lower() | ||
} | ||
|
||
if client.client_id is not None: | ||
all_request_parameter[cls.FIELD_CLIENT_ID] = client.client_id | ||
|
||
if state is not None: | ||
all_request_parameter[cls.FIELD_STATE] = state | ||
|
||
return OauthAuthorizationUri( | ||
cls.determine_auth_uri_format().format(HttpUtil.create_query_string(all_request_parameter)) | ||
) | ||
|
||
def get_authorization_uri(self) -> str: | ||
return self._authorization_uri | ||
|
||
def is_all_field_none(self) -> bool: | ||
if self._authorization_uri is None: | ||
return True | ||
else: | ||
return False | ||
|
||
@classmethod | ||
def determine_auth_uri_format(cls) -> str: | ||
environment_type = BunqContext.api_context().environment_type | ||
|
||
if ApiEnvironmentType.PRODUCTION == environment_type: | ||
return cls.AUTH_URI_FORMAT_PRODUCTION | ||
|
||
if ApiEnvironmentType.SANDBOX == environment_type: | ||
return cls.AUTH_URI_FORMAT_SANDBOX | ||
|
||
raise BunqException(cls.ERROR_ENVIRONMENT_TYPE_NOT_SUPPORTED) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
import aenum | ||
|
||
|
||
class OauthGrantType(aenum.AutoNumberEnum): | ||
""" | ||
:type AUTHORIZATION_CODE: str | ||
:type grant_type: str | ||
""" | ||
|
||
AUTHORIZATION_CODE = 'authorization_code' | ||
|
||
def __init__(self, grant_type: str) -> None: | ||
self._grant_type = grant_type | ||
|
||
@property | ||
def grant_type(self) -> str: | ||
return self.grant_type |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
import aenum | ||
|
||
|
||
class OauthResponseType(aenum.AutoNumberEnum): | ||
""" | ||
:type CODE: str | ||
:type response_type: str | ||
""" | ||
|
||
CODE = 'code' | ||
|
||
def __init__(self, response_type: str) -> None: | ||
self._response_type = response_type | ||
|
||
@property | ||
def response_type(self) -> str: | ||
return self._response_type |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
from bunq.sdk.context.bunq_context import BunqContext | ||
from bunq.sdk.model.core.oauth_authorization_uri import OauthAuthorizationUri | ||
from bunq.sdk.model.core.oauth_response_type import OauthResponseType | ||
from bunq.sdk.model.generated.endpoint import OauthClient | ||
from tests.bunq_test import BunqSdkTestCase | ||
|
||
|
||
class TestOauthAuthorizationUri(BunqSdkTestCase): | ||
_TEST_EXPECT_URI = 'https://oauth.sandbox.bunq.com/auth?redirect_uri=redirecturi&response_type=code&state=state' | ||
_TEST_REDIRECT_URI = 'redirecturi' | ||
_TEST_STATUS = 'status' | ||
_TEST_STATE = 'state' | ||
|
||
@classmethod | ||
def setUpClass(cls) -> None: | ||
BunqContext.load_api_context(cls._get_api_context()) | ||
|
||
def test_oauth_authorization_uri_create(self) -> None: | ||
uri = OauthAuthorizationUri.create( | ||
OauthResponseType(OauthResponseType.CODE), | ||
self._TEST_REDIRECT_URI, | ||
OauthClient(self._TEST_STATUS), | ||
self._TEST_STATE | ||
).get_authorization_uri() | ||
|
||
self.assertEqual(self._TEST_EXPECT_URI, uri) |