# API Token Testing

In [1]:
import jwt
from passlib.hash import sha256_crypt
from datetime import timedelta as td, datetime as dt
from flask import current_app as app

In [2]:
from pymongo import MongoClient

In [3]:
import os
from dotenv import load_dotenv
load_dotenv(os.getenv('DOTENV'))

True

In [4]:
class Token:
    def __init__(self):
        self.__repr__ = 'API Access Token Manager'
    
    # connect to mongodb apiHashes collection
    @staticmethod
    def _get_apis_collection():
        client = MongoClient(os.getenv('MONGO_URI'))
        db = client['pocMLModelMonitoring']
        col = db['apis']
        return col

    # get token
    def encode_auth_token(self, user_id):
        try:
            # get a list of the hashed apis in use
            col = self._get_apis_collection()
            docs = col.find({})
            apis = [doc.get('api_encrypted', 'Nope') for doc in docs]
            
            payload = {
                    # set API-token expiration for 10 seconds for testing purposes
                    'exp': dt.utcnow() + td(days=1), # td(milliseconds=10000),
                    'iat': dt.utcnow(),
                    'sub': user_id
                }
            auth_token = jwt.encode(
                    payload,
    #                 app.config.get('SECRET_KEY'),
                    os.getenv('SECRET_KEY'),
                    algorithm='HS256'
                )
        
            if not len(apis) == 0 and any([sha256_crypt.verify(auth_token, api) for api in apis]):
                rerun = True
                # ensure that no duplicate auth_tokens are in the database
                while rerun:
                    payload = {
                        # set API-token expiration for 10 seconds for testing purposes
                        'exp': dt.utcnow() + td(days=1), # td(milliseconds=10000),
                        'iat': dt.utcnow(),
                        'sub': user_id
                    }

                    auth_token = jwt.encode(
                        payload,
        #                 app.config.get('SECRET_KEY'),
                        os.getenv('SECRET_KEY'),
                        algorithm='HS256'
                    )

                    # break the loop if no existing auth_tokens match the current auth_token
                    if not any([sha256_crypt.verify(auth_token, api) for api in apis]):
                        rerun = False
            
            result = col.insert_one({'user_id': user_id, 'api_encrypted': sha256_crypt.hash(auth_token)})
            
            return auth_token
        except Exception as e:
            return e

    # revoke token
    def revoke_token(self, auth_token):
        # thoughts: store tokens in mongodb
        # add a revoked field to mongo and check it or valid or something like that
        # tokens must not be expired or revoked to proceed
#         col = self._get_apis_collection()
#         result = col.update_one({'api_encrypted': hash(auth_token)}, {'$set': {'revoked': True}}, upsert=True)
#         if result.matched_count == 1 or upserted_id:
#             return True
        
        # get a list of the hashed apis in use
        col = self._get_apis_collection()
        docs = col.find({})
        apis = [doc.get('api_encrypted', 'Nope') for doc in docs]
        
        test_results = [sha256_crypt.verify(auth_token, api) for api in apis]
        idxs = [idx for idx, value in enumerate(test_results) if value]
        
        if idxs:
            for idx in idxs:
                result = col.update_one({'api_encrypted': apis[idx]}, {'$set': {'revoked': True}}, upsert=False)
#             message = f'{len(idxs)} found token(s) were revoked'
        else:
            result = col.insert_one({'api_encrypted': sha256_crypt.hash(auth_token), 'revoked': True})
#             message = 'token not found, but added as revoked'

    # decode a token
    def decode_auth_token(self, auth_token):
        try:
#             payload = jwt.decode(auth_token, app.config.get('SECRET_KEY'))
            payload = jwt.decode(auth_token, os.getenv('SECRET_KEY'), algorithms=['HS256'])
            user_id = payload['sub']
        
            # ensure the api is legit and not revoked
            col = self._get_apis_collection()
            docs = col.find({})
            docs = [doc for doc in docs]
            
            # if there are no tokens in the db, current token is invalid
            if not docs:
                raise jwt.InvalidTokenError
            
            # search for the api in the db, if verified, check for revoked
            # if verified but not revoked, return the user_id
            for doc in docs:
                if sha256_crypt.verify(auth_token, doc.get('api_encrypted')):
                    verified = True
                    revoked = doc.get('revoked')
                    if revoked:
                        return False, 'This API token has been revoked.'
                    if verified:
                        return True, payload['sub']
            else:
                raise jwt.InvalidTokenError
        
        except jwt.ExpiredSignatureError:
            return False, 'API Token expired. Please request a new token.'
        except jwt.InvalidTokenError:
            return False, 'Invalid API Token. Please request a new token.'

In [5]:
help(jwt.decode)

Help on method decode in module jwt.api_jwt:

decode(jwt: str, key: str = '', algorithms: List[str] = None, options: Dict = None, **kwargs) -> Dict[str, Any] method of jwt.api_jwt.PyJWT instance



In [6]:
token = Token()

In [7]:
API_KEY = token.encode_auth_token('crtucker')

In [8]:
# test decoding within 10 sec
verified, result = token.decode_auth_token(API_KEY)
print(f'API verified?: {verified}')
print(f'Result: {result}')

API verified?: True
Result: crtucker


In [9]:
import time
time.sleep(12)

In [10]:
# test decoding after 10 sec passed
verified, result = token.decode_auth_token(API_KEY)
print(f'API verified?: {verified}')
print(f'Result: {result}')

API verified?: False
Result: API Token expired. Please request a new token.


In [11]:
# test revoking a token
REVOKE_THIS_API = token.encode_auth_token('revoke_me')

In [12]:
verified, result = token.decode_auth_token(REVOKE_THIS_API)
print(f'API verified?: {verified}')
print(f'Result: {result}')

API verified?: True
Result: revoke_me


In [13]:
token.revoke_token(REVOKE_THIS_API)

In [14]:
verified, result = token.decode_auth_token(REVOKE_THIS_API)
print(f'API verified?: {verified}')
print(f'Result: {result}')

API verified?: False
Result: This API token has been revoked.


In [15]:
# test a token that doesn't exist
verified, result = token.decode_auth_token('blahblahIwanttoconnectandIshouldnt')
print(f'API verified?: {verified}')
print(f'Result: {result}')

API verified?: False
Result: Invalid API Token. Please request a new token.


In [16]:
# test revoking a token that doesn't exist
token.revoke_token('thisisafaketokenletsseeifitgetsrevokedanyway')