diff --git a/examples/authentication.py b/examples/authentication.py index 0796f3e5..5c468fa8 100644 --- a/examples/authentication.py +++ b/examples/authentication.py @@ -82,3 +82,49 @@ def token_gen_call(username, password): if mockpassword == password and mockusername == username: # This is an example. Don't do that. return {"token" : jwt.encode({'user': username, 'data': 'mydata'}, secret_key, algorithm='HS256')} return 'Invalid username and/or password for user: {0}'.format(username) + +# JWT AUTH EXAMPLE # +replace_this = False # Replace this placeholder in your implementation. +config = { + 'jwt_secret': 'super-secret-key-please-change', + # Token will expire in 3600 seconds if it is not refreshed and the user will be required to log in again. + 'token_expiration_seconds': 3600, + # If a request is made at a time less than 1000 seconds before expiry, a new jwt is sent in the response header. + 'token_refresh_seconds': 1000 +} +# Enable authenticated endpoints, example @authenticated.get('/users/me'). +authenticated = hug.http(requires=hug.authentication.json_web_token(hug.authentication.verify_jwt, config['jwt_secret'])) + +# Check the token and issue a new one if it is about to expire (within token_refresh_seconds from expiry). +@hug.response_middleware() +def refresh_jwt(request, response, resource): + authorization = request.get_header('Authorization') + if authorization: + token = hug.authentication.refresh_jwt(authorization, config['token_refresh_seconds'], + config['token_expiration_seconds'], config['jwt_secret']) + if token: + response.set_header('token', token) + +@hug.post('/login') +def login(request, response, + email: fields.Email(), + password: fields.String() + ): + response.status = falcon.HTTP_400 + + user = replace_this # store.get_user_by_email(email) + if not user: + return {'errors': {'Issue': "User not found."}} + elif 'password_hash' in user: + if replace_this: # if bcrypt.checkpw(password.encode('utf8'), user.password_hash): + response.status = falcon.HTTP_201 + token = hug.authentication.new_jwt( + str(user['_id']), + config['token_expiration_seconds'], + config['jwt_secret']) + response.set_header('token', token) + else: + return {'errors': {'Issue': "Password hash mismatch."}} + else: + return {'errors': {'Issue': "Please check your email to complete registration."}} +# END - JWT AUTH EXAMPLE # diff --git a/hug/authentication.py b/hug/authentication.py index 563b7e8b..96c241d9 100644 --- a/hug/authentication.py +++ b/hug/authentication.py @@ -21,10 +21,12 @@ """ from __future__ import absolute_import +import jwt import base64 import binascii from falcon import HTTPUnauthorized +from datetime import datetime, timedelta def authenticator(function, challenges=()): @@ -138,3 +140,85 @@ def verify_user(user_name, user_password): return user_name return False return verify_user + +# JWT AUTH # +def jwt_authenticator(function, challenges=()): + """Wraps authentication logic, verify_token through to the authentication function. + + The verify_token function passed in should accept the authorization header and the jwt secret + and return a user id to store in the request context if authentication succeeded. + """ + challenges = challenges or ('{} realm="simple"'.format(function.__name__), ) + + def wrapper(verify_token, jwt_secret): + def authenticate(request, response, **kwargs): + result = function(request, response, verify_token, jwt_secret) + + def jwt_authenticator_name(): + try: + return function.__doc__.splitlines()[0] + except AttributeError: + return function.__name__ + + if result is None: + raise HTTPUnauthorized('Authentication Required', + 'Please provide valid {0} credentials'.format(jwt_authenticator_name()), + challenges=challenges) + + if result is False: + raise HTTPUnauthorized('Invalid Authentication', + 'Provided {0} credentials were invalid'.format(jwt_authenticator_name()), + challenges=challenges) + + request.context['user_id'] = result + return True + + authenticate.__doc__ = function.__doc__ + return authenticate + + return wrapper + + +@jwt_authenticator +def json_web_token(request, response, verify_token, jwt_secret): + """JWT verification + + Checks for the Authorization header and verifies it using the verify_token function. + """ + authorization = request.get_header('Authorization') + if authorization: + verified_token = verify_token(authorization, response, jwt_secret) + if verified_token: + return verified_token + else: + return False + return None + + +def verify_jwt(authorization, response, jwt_secret): + try: + token = authorization.split(' ')[1] + decoding = jwt.decode(token, jwt_secret, algorithm='HS256') + return decoding['user_id'] + except jwt.InvalidTokenError: + return False + + +def new_jwt(user_id, token_expiration_seconds, jwt_secret): + return jwt.encode({'user_id': user_id, + 'exp': datetime.utcnow() + timedelta(seconds=token_expiration_seconds)}, + jwt_secret, algorithm='HS256').decode("utf-8") + + +def refresh_jwt(authorization, token_refresh_seconds, token_expiration_seconds, jwt_secret): + try: + token = authorization.split(' ')[1] + decoding = jwt.decode(token, jwt_secret, algorithm='HS256') + exp = decoding['exp'] + if datetime.utcnow() > (datetime.utcfromtimestamp(exp) - timedelta(seconds=token_refresh_seconds)): + return jwt.encode({'user_id': decoding['user_id'], + 'exp': datetime.utcnow() + timedelta(seconds=token_expiration_seconds)}, + jwt_secret, algorithm='HS256').decode("utf-8") + except jwt.InvalidTokenError: + return None +# END - JWT AUTH #