Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sdk_python#136 complete Oauth/PSD2 implementation #138

64 changes: 64 additions & 0 deletions bunq/sdk/http/anonymous_api_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from typing import Dict

import requests

from bunq.sdk.context.api_context import ApiContext
from bunq.sdk.http.api_client import ApiClient
from bunq.sdk.http.bunq_response_raw import BunqResponseRaw
from bunq.sdk.security import security


class AnonymousApiClient(ApiClient):

def __init__(self, api_context: ApiContext) -> None:
super().__init__(api_context)

def post(self,
uri_relative: str,
request_bytes: bytes,
custom_headers: Dict[str, str]) -> BunqResponseRaw:
return self._request(
self.METHOD_POST,
uri_relative,
request_bytes,
{},
custom_headers
)

def _request(self,
method: str,
uri_relative: str,
request_bytes: bytes,
params: Dict[str, str],
custom_headers: Dict[str, str]) -> BunqResponseRaw:
from bunq.sdk.context.bunq_context import BunqContext

uri_relative_with_params = self._append_params_to_uri(uri_relative, params)
if uri_relative not in self._URIS_NOT_REQUIRING_ACTIVE_SESSION:
if self._api_context.ensure_session_active():
BunqContext.update_api_context(self._api_context)

all_headers = self._get_all_headers(
request_bytes,
custom_headers
)

response = requests.request(
method,
uri_relative_with_params,
data=request_bytes,
headers=all_headers,
proxies={self.FIELD_PROXY_HTTPS: self._api_context.proxy_url},
)

self._assert_response_success(response)

if self._api_context.installation_context is not None:
security.validate_response(
self._api_context.installation_context.public_key_server,
response.status_code,
response.content,
response.headers
)

return BunqResponseRaw(response.content, response.headers)
15 changes: 15 additions & 0 deletions bunq/sdk/http/http_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Dict


class HttpUtil:
QUERY_FORMAT = '{}={}'
QUERY_DELIMITER = '&'

@classmethod
def create_query_string(cls, all_parameter: Dict[str, str]):
encoded_parameters = []

for parameter in all_parameter:
angelomelonas marked this conversation as resolved.
Show resolved Hide resolved
encoded_parameters.append(cls.QUERY_FORMAT.format(parameter, all_parameter[parameter]))

return cls.QUERY_DELIMITER.join(encoded_parameters)
105 changes: 105 additions & 0 deletions bunq/sdk/model/core/oauth_access_token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from __future__ import annotations

from typing import Optional, Type

from bunq import ApiEnvironmentType
from bunq.sdk.context.bunq_context import BunqContext
from bunq.sdk.exception.bunq_exception import BunqException
from bunq.sdk.http.anonymous_api_client import AnonymousApiClient
from bunq.sdk.http.bunq_response import BunqResponse
from bunq.sdk.http.bunq_response_raw import BunqResponseRaw
from bunq.sdk.http.http_util import HttpUtil
from bunq.sdk.json import converter
from bunq.sdk.model.core.bunq_model import BunqModel
from bunq.sdk.model.core.oauth_grant_type import OauthGrantType
from bunq.sdk.model.generated.endpoint import OauthClient
from bunq.sdk.util.type_alias import T


class OauthAccessToken(BunqModel):
# Field constants.
FIELD_GRANT_TYPE = "grant_type"
FIELD_CODE = "code"
FIELD_REDIRECT_URI = "redirect_uri"
FIELD_CLIENT_ID = "client_id"
FIELD_CLIENT_SECRET = "client_secret"

# Token constants.
TOKEN_URI_FORMAT_SANDBOX = "https://api-oauth.sandbox.bunq.com/v1/token?%s"
TOKEN_URI_FORMAT_PRODUCTION = "https://api.oauth.bunq.com/v1/token?%s"

# Error constants.
ERROR_ENVIRONMENT_TYPE_NOT_SUPPORTED = "You are trying to use an unsupported environment type."

def __init__(self, token: str, token_type: str, state: str = None) -> None:
self._token = token
self._token_type = token_type
self._state = state

@property
def token(self) -> str:
return self._token

@property
def token_type(self) -> str:
return self._token_type

@property
def state(self) -> Optional[str]:
return self._state

@classmethod
def create(cls,
grant_type: OauthGrantType,
oauth_code: str,
redirect_uri: str,
client: OauthClient) -> OauthAccessToken:
api_client = AnonymousApiClient(BunqContext.api_context())
response_raw = api_client.post(
cls.create_token_uri(grant_type.value, oauth_code, redirect_uri, client),
bytearray(),
{}
)

return cls.from_json(OauthAccessToken, response_raw).value

@classmethod
def create_token_uri(cls, grant_type: str, auth_code: str, redirect_uri: str, client: OauthClient) -> str:
all_token_parameter = {
cls.FIELD_GRANT_TYPE: grant_type,
cls.FIELD_CODE: auth_code,
cls.FIELD_REDIRECT_URI: redirect_uri,
cls.FIELD_CLIENT_ID: client.id_,
cls.FIELD_CLIENT_SECRET: client.secret,
}

return cls.determine_auth_uri_format().format(HttpUtil.create_query_string(all_token_parameter))

def is_all_field_none(self) -> bool:
if self._token is not None:
return False
elif self._token_type is not None:
return False
elif self._state is not None:
return False

return True

@classmethod
def from_json(cls, class_of_object: Type[T], response_raw: BunqResponseRaw):
response_item_object = converter.deserialize(class_of_object, response_raw)
response_value = converter.json_to_class(class_of_object, response_item_object)

return BunqResponse(response_value, response_raw.headers)

@classmethod
def determine_auth_uri_format(cls) -> str:
environment_type = BunqContext.api_context().environment_type

if ApiEnvironmentType.PRODUCTION == environment_type:
return cls.TOKEN_URI_FORMAT_PRODUCTION

if ApiEnvironmentType.SANDBOX == environment_type:
return cls.TOKEN_URI_FORMAT_SANDBOX

raise BunqException(cls.ERROR_ENVIRONMENT_TYPE_NOT_SUPPORTED)
73 changes: 73 additions & 0 deletions bunq/sdk/model/core/oauth_authorization_uri.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from __future__ import annotations

from bunq import ApiEnvironmentType
from bunq.sdk.context.bunq_context import BunqContext
from bunq.sdk.exception.bunq_exception import BunqException
from bunq.sdk.http.http_util import HttpUtil
from bunq.sdk.model.core.bunq_model import BunqModel
from bunq.sdk.model.core.oauth_response_type import OauthResponseType
from bunq.sdk.model.generated.endpoint import OauthClient


class OauthAuthorizationUri(BunqModel):
# Auth constants.
AUTH_URI_FORMAT_SANDBOX = "https://oauth.sandbox.bunq.com/auth?{}"
AUTH_URI_FORMAT_PRODUCTION = "https://oauth.bunq.com/auth?{}"

# Field constants
FIELD_RESPONSE_TYPE = "response_type"
FIELD_REDIRECT_URI = "redirect_uri"
FIELD_STATE = "state"
FIELD_CLIENT_ID = "client_id"

# Error constants.
ERROR_ENVIRONMENT_TYPE_NOT_SUPPORTED = "You are trying to use an unsupported environment type."

def __init__(self, authorization_uri: str) -> None:
self._authorization_uri = authorization_uri

@property
def authorization_uri(self) -> str:
return self._authorization_uri

@classmethod
def create(cls,
response_type: OauthResponseType,
redirect_uri: str,
client: OauthClient,
state: str = None) -> OauthAuthorizationUri:
all_request_parameter = {
cls.FIELD_REDIRECT_URI: redirect_uri,
cls.FIELD_RESPONSE_TYPE: response_type.name.lower()
}

if client.client_id is not None:
all_request_parameter[cls.FIELD_CLIENT_ID] = client.client_id

if state is not None:
all_request_parameter[cls.FIELD_STATE] = state

return OauthAuthorizationUri(
cls.determine_auth_uri_format().format(HttpUtil.create_query_string(all_request_parameter))
)

def get_authorization_uri(self) -> str:
return self._authorization_uri

def is_all_field_none(self) -> bool:
if self._authorization_uri is None:
return True
else:
return False

@classmethod
def determine_auth_uri_format(cls) -> str:
environment_type = BunqContext.api_context().environment_type

if ApiEnvironmentType.PRODUCTION == environment_type:
return cls.AUTH_URI_FORMAT_PRODUCTION

if ApiEnvironmentType.SANDBOX == environment_type:
return cls.AUTH_URI_FORMAT_SANDBOX

raise BunqException(cls.ERROR_ENVIRONMENT_TYPE_NOT_SUPPORTED)
17 changes: 17 additions & 0 deletions bunq/sdk/model/core/oauth_grant_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import aenum


class OauthGrantType(aenum.AutoNumberEnum):
"""
:type AUTHORIZATION_CODE: str
:type grant_type: str
"""

AUTHORIZATION_CODE = 'authorization_code'

def __init__(self, grant_type: str) -> None:
self._grant_type = grant_type

@property
def grant_type(self) -> str:
return self.grant_type
17 changes: 17 additions & 0 deletions bunq/sdk/model/core/oauth_response_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import aenum


class OauthResponseType(aenum.AutoNumberEnum):
"""
:type CODE: str
:type response_type: str
"""

CODE = 'code'

def __init__(self, response_type: str) -> None:
self._response_type = response_type

@property
def response_type(self) -> str:
return self._response_type
26 changes: 26 additions & 0 deletions tests/model/core/test_oauth_authorization_uri.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from bunq.sdk.context.bunq_context import BunqContext
from bunq.sdk.model.core.oauth_authorization_uri import OauthAuthorizationUri
from bunq.sdk.model.core.oauth_response_type import OauthResponseType
from bunq.sdk.model.generated.endpoint import OauthClient
from tests.bunq_test import BunqSdkTestCase


class TestOauthAuthorizationUri(BunqSdkTestCase):
_TEST_EXPECT_URI = 'https://oauth.sandbox.bunq.com/auth?redirect_uri=redirecturi&response_type=code&state=state'
_TEST_REDIRECT_URI = 'redirecturi'
_TEST_STATUS = 'status'
_TEST_STATE = 'state'

@classmethod
def setUpClass(cls) -> None:
BunqContext.load_api_context(cls._get_api_context())

def test_oauth_authorization_uri_create(self) -> None:
uri = OauthAuthorizationUri.create(
OauthResponseType(OauthResponseType.CODE),
self._TEST_REDIRECT_URI,
OauthClient(self._TEST_STATUS),
self._TEST_STATE
).get_authorization_uri()

self.assertEqual(self._TEST_EXPECT_URI, uri)