Skip to content
This repository was archived by the owner on Oct 29, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 78 additions & 21 deletions n26/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,36 @@
from n26 import config

BASE_URL = 'https://api.tech26.de'
BASIC_AUTH_HEADERS = {'Authorization': 'Basic YW5kcm9pZDpzZWNyZXQ='}

GET = "get"
POST = "post"


# Api class can be imported as a library in order to use it within applications
def _refresh_token(refresh_token):
"""
Refreshes an authentication token
:param refresh_token: the refresh token issued by the server when requesting a token
:return: the refreshed token data
"""
values_token = {
'grant_type': 'refresh_token',
'refresh_token': refresh_token
}

response = requests.post(BASE_URL + '/oauth/token', data=values_token, headers=BASIC_AUTH_HEADERS)
response.raise_for_status()
return response.json()


class Api(object):
# constructor accepting None to maintain backward compatibility
def __init__(self, cfg=None):
if not cfg:
cfg = config.get_config()
self.config = cfg

def get_token(self):
values_token = {'grant_type': 'password', 'username': self.config.username, 'password': self.config.password}
headers_token = {'Authorization': 'Basic YW5kcm9pZDpzZWNyZXQ='}

response_token = requests.post(BASE_URL + '/oauth/token', data=values_token, headers=headers_token)
token_info = response_token.json()
# TODO check if access_token is not nil
return token_info['access_token']

# TODO: this method will check if token is valid, if not it will run get_token
def validate_token(self):
pass
self._token_data = {}

# IDEA: @get_token decorator
def get_account_info(self):
Expand Down Expand Up @@ -105,9 +110,6 @@ def block_card(self, card_id):
def unblock_card(self, card_id):
return self._do_request(POST, BASE_URL + '/api/cards/%s/unblock' % card_id)

def get_spaces(self):
return self._do_request(GET, BASE_URL + '/api/spaces')['spaces']

def get_savings(self):
return self._do_request(GET, BASE_URL + '/api/hub/savings/accounts')

Expand All @@ -126,7 +128,7 @@ def get_invitations(self):
return self._do_request(GET, BASE_URL + '/api/aff/invitations')

def _do_request(self, method=GET, url="/", params={}):
access_token = self.get_token()
access_token = self._get_token()
headers = {'Authorization': 'bearer' + str(access_token)}

first_param = True
Expand All @@ -144,11 +146,66 @@ def _do_request(self, method=GET, url="/", params={}):
url += "%s=%s" % (k, v)

if method is GET:
result = requests.get(url, headers=headers)
response = requests.get(url, headers=headers)
elif method is POST:
result = requests.post(url, headers=headers)
response = requests.post(url, headers=headers)
else:
return None

result.raise_for_status()
return result.json()
response.raise_for_status()
return response.json()

def _get_token(self):
"""
Returns the access token to use for api authentication.
If a token has been requested before it will be reused if it is still valid.
If the previous token has expired it will be refreshed.
If no token has been requested it will be requested from the server.

:return: the access token
"""
if not self._validate_token(self._token_data):
if "refresh_token" in self._token_data:
refresh_token = self._token_data["refresh_token"]
self._token_data = _refresh_token(refresh_token)
else:
self._token_data = self._request_token()

# if it's still not valid, raise an exception
if not self._validate_token(self._token_data):
raise PermissionError("Unable to request authentication token")

return self._token_data["access_token"]

def _request_token(self):
"""
Request an authentication token from the server
:return: the token or None if the response did not contain a token
"""
values_token = {
'grant_type': 'password',
'username': self.config.username,
'password': self.config.password
}

response = requests.post(BASE_URL + '/oauth/token', data=values_token, headers=BASIC_AUTH_HEADERS)
response.raise_for_status()
response_json = response.json()

# add expiration time to expiration in _validate_token()
response_json["expiration_time"] = time.time() + response_json["expires_in"]
return response_json

@staticmethod
def _validate_token(token_data):
"""
Checks if a token is valid
:param token_data: the token data to check
:return: true if valid, false otherwise
"""

if "expiration_time" in token_data and time.time() >= token_data["expiration_time"]:
# token has expired
return False

return "access_token" in token_data
6 changes: 3 additions & 3 deletions n26/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def spaces():
string = str(space['name']) + ': ' + str(balance)
if 'goal' in space:
goal = space['goal']['amount']
percentage = balance/goal
percentage = balance / goal
string += '/' + str(goal) + ' <- ' + '{:.2%}'.format(percentage)
print(string)

Expand Down Expand Up @@ -104,11 +104,11 @@ def statements():


@cli.command()
@click.option('--limit', default=5, help='Limit transaction output.')
@click.option('--limit', default=5, type=click.IntRange(1, 10000), help='Limit transaction output.')
def transactions(limit):
""" Show transactions (default: 5) """
transactions = api.Api()
output = transactions.get_transactions_limited(str(limit))
output = transactions.get_transactions(limit=limit)
print('Transactions:')
print('-------------')
li = []
Expand Down
12 changes: 9 additions & 3 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,16 @@ def test_init_explicit():
# test token
def test_get_token():
with mock.patch('n26.api.requests.post') as mock_post:
mock_post.return_value.json.return_value = {'access_token':
'some token'}
mock_post.return_value.json.return_value = {
'access_token': '12345678-1234-1234-1234-123456789012',
'token_type': 'bearer',
'refresh_token': '12345678-1234-1234-1234-123456789012',
'expires_in': 1798,
'scope': 'trust',
'host_url': 'https://api.tech26.de'
}
new_api = api.Api(conf)
token = new_api.get_token()
token = new_api._get_token()
assert token == 'some token'


Expand Down