Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(remaining-validity): adds get_secured_api_key_remaining_validity #455

Merged
merged 2 commits into from
Sep 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions algoliasearch/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ class AlgoliaUnreachableHostException(AlgoliaException):

class ObjectNotFoundException(AlgoliaException):
pass


class ValidUntilNotFoundException(AlgoliaException):
pass
18 changes: 18 additions & 0 deletions algoliasearch/search_client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import base64
import hashlib
import hmac
import re
import time

from typing import Optional, Union, List

from algoliasearch.exceptions import ValidUntilNotFoundException
from algoliasearch.helpers import endpoint, is_async_available
from algoliasearch.http.request_options import RequestOptions
from algoliasearch.http.serializer import QueryParametersSerializer
Expand Down Expand Up @@ -302,6 +305,21 @@ def generate_secured_api_key(parent_api_key, restrictions):

return str(base64encoded.decode('utf-8'))

@staticmethod
def get_secured_api_key_remaining_validity(api_key):
# type: (str) -> int

decoded_string = base64.b64decode(api_key)

match = re.search(r'validUntil=(\d+)', str(decoded_string))

if match is None:
raise ValidUntilNotFoundException(
'ValidUntil not found in api key.'
)

return int(match.group(1)) - int(round(time.time()))

def list_indices(self, request_options=None):
# type: (Optional[Union[dict, RequestOptions]]) -> dict

Expand Down
31 changes: 30 additions & 1 deletion tests/features/test_search_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import unittest

from algoliasearch.configs import SearchConfig
from algoliasearch.exceptions import RequestException
from algoliasearch.exceptions import RequestException, \
ValidUntilNotFoundException
from algoliasearch.http.hosts import HostsCollection, Host, CallType
from algoliasearch.http.serializer import QueryParametersSerializer
from algoliasearch.responses import MultipleResponse
Expand Down Expand Up @@ -346,6 +347,34 @@ def test_secured_api_keys(self):
F.search_client(api_key=api_key).init_index(
self.index2.name).search('')

def test_get_secured_api_key_remaining_validity(self):
import time

now = int(time.time())
api_key = SearchClient.generate_secured_api_key('foo', {
'validUntil': now - (60 * 10)
})

remaining = SearchClient.get_secured_api_key_remaining_validity(
api_key
)

self.assertTrue(remaining < 0)

api_key = SearchClient.generate_secured_api_key('foo', {
'validUntil': now + (60 * 10),
})

remaining = SearchClient.get_secured_api_key_remaining_validity(
api_key
)
self.assertTrue(remaining > 0)

api_key = SearchClient.generate_secured_api_key('foo', {})

with self.assertRaises(ValidUntilNotFoundException) as _:
SearchClient.get_secured_api_key_remaining_validity(api_key)

@unittest.skipIf(Env.is_community(),
"Community can not test personalization operations")
def test_personalization_strategy(self):
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/test_search_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def test_exists(self):

self.index.get_settings.assert_called_once()

self.assertEqual(indexExists, False)

# No request options
args = self.index.get_settings.call_args[0]
self.assertEqual(args[0], None)
Expand Down