In [1]:
import json
from flask import request, abort
from functools import wraps
from jose import jwt
from urllib.request import urlopen

AUTH0_DOMAIN = 'idataist.us.auth0.com'
ALGORITHMS = ['RS256']
API_AUDIENCE = 'dev'

class AuthError(Exception):
    def __init__(self, error, status_code):
        self.error = error
        self.status_code = status_code

def get_token_auth_header():
    """Obtains the Access Token from the Authorization Header
    """
    auth = 'Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6IjZHUG9DNjJwNEcxbi10VlkxUWRXRyJ9.eyJpc3MiOiJodHRwczovL2lkYXRhaXN0LnVzLmF1dGgwLmNvbS8iLCJzdWIiOiJMRXdQeExNRkhISW5XbEh0TmNFWE52dlJQSTZFUGlVaEBjbGllbnRzIiwiYXVkIjoiZGV2IiwiaWF0IjoxNjI1NjkzODUzLCJleHAiOjE2MjU3ODAyNTMsImF6cCI6IkxFd1B4TE1GSEhJbldsSHROY0VYTnZ2UlBJNkVQaVVoIiwic2NvcGUiOiJnZXQ6ZHJpbmtzLWRldGFpbCBwYXRjaDpkcmlua3MgZGVsZXRlOmRyaW5rcyBwb3N0OmRyaW5rcyIsImd0eSI6ImNsaWVudC1jcmVkZW50aWFscyJ9.W_utDgiXctrFW6wPNK4jJj-oN-Y0QH-JGh9DJiBYc0W0qMWlGDy-ZXMNUhXB01zUhgowE6EYL_17XinSiCiqW-vzrE4O7-IzhQHaBTUsiVjB8_YjblKKfRGtlWip5EaXeMtujCG9QMvmbHROIP0QiFmdwRqUd7Q6M2y39KFqvs6llrBR2jSWtLOJou7ELHTYQHvUgYdk8dY5h73fPIJVdaDhstZUhusZ1CCB9HtD9588easRjQWHyIF-3do91uYb8t4DLZO4E_L2jUmceoa-A1dOUwJ2mCGdwij03tc1voEx8II9YraPj1pGRfRHS_13NTPgjGho26dqMiaMKG9nmQ'
    if not auth:
        raise AuthError({
            'code': 'authorization_header_missing',
            'description': 'Authorization header is expected.'
        }, 401)

    parts = auth.split()
    if parts[0].lower() != 'bearer':
        raise AuthError({
            'code': 'invalid_header',
            'description': 'Authorization header must start with "Bearer".'
        }, 401)

    elif len(parts) == 1:
        raise AuthError({
            'code': 'invalid_header',
            'description': 'Token not found.'
        }, 401)

    elif len(parts) > 2:
        raise AuthError({
            'code': 'invalid_header',
            'description': 'Authorization header must be bearer token.'
        }, 401)

    token = parts[1]
    return token

def verify_decode_jwt(token):
    jsonurl = urlopen(f'https://{AUTH0_DOMAIN}/.well-known/jwks.json')
    jwks = json.loads(jsonurl.read())
    unverified_header = jwt.get_unverified_header(token)
    rsa_key = {}
    if 'kid' not in unverified_header:
        raise AuthError({
            'code': 'invalid_header',
            'description': 'Authorization malformed.'
        }, 401)

    for key in jwks['keys']:
        if key['kid'] == unverified_header['kid']:
            rsa_key = {
                'kty': key['kty'],
                'kid': key['kid'],
                'use': key['use'],
                'n': key['n'],
                'e': key['e']
            }
    if rsa_key:
        try:
            payload = jwt.decode(
                token,
                rsa_key,
                algorithms=ALGORITHMS,
                audience=API_AUDIENCE,
                issuer='https://' + AUTH0_DOMAIN + '/'
            )

            return payload

        except jwt.ExpiredSignatureError:
            raise AuthError({
                'code': 'token_expired',
                'description': 'Token expired.'
            }, 401)

        except jwt.JWTClaimsError:
            raise AuthError({
                'code': 'invalid_claims',
                'description': 'Incorrect claims. Please, check the audience and issuer.'
            }, 401)
        except Exception:
            raise AuthError({
                'code': 'invalid_header',
                'description': 'Unable to parse authentication token.'
            }, 400)
    raise AuthError({
                'code': 'invalid_header',
                'description': 'Unable to find the appropriate key.'
            }, 400)

def check_permissions(permission, payload):
    # if 'permissions' not in payload:
    #                     raise AuthError({
    #                         'code': 'invalid_claims',
    #                         'description': 'Permissions not included in JWT.'
    #                     }, 400)

    if permission not in payload['scope']:
        raise AuthError({
            'code': 'unauthorized',
            'description': 'Permission not found.'
        }, 403)
    return True

def requires_auth(permission=''):
    def requires_auth_decorator(f):
        @wraps(f)
        def wrapper(*args, **kwargs):
            token = get_token_auth_header()
            payload = verify_decode_jwt(token)
            check_permissions(permission, payload)
            return f(payload, *args, **kwargs)
        return wrapper
    return requires_auth_decorator

In [2]:
@requires_auth('delete:drinks')
def say(self):
    print('hello')

In [3]:
say()

hello
