In [2]:
# default_exp api.auth
# export
from ascrum.core.config import config
from ascrum.server import app, api
import jwt
from http import HTTPStatus
from functools import wraps
from flask import request
from flask_restplus import Resource, fields

auth_ns = api.namespace('auth', description='A Scrum Authentication')

In [3]:
# export
def _verify_credentials(username, password):
    if config.verify_password(username, password):
        return True
    try:
        import pamela
        pamela.authenticate(username, password)
        return True
    except:
        return False


In [12]:
# export
import datetime

def _get_jwt_token(user):
    token = jwt.encode({
        'uid': user,
        'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=20),
        'iat': datetime.datetime.utcnow()
        }, config.secret, algorithm='HS256').decode('utf-8')
    app.logger.info(f'JWT token created for user {user}: {token}')
    return token

def _verify_jwt_token(token):
    try:
        info = jwt.decode(token, config.secret, algorithms=['HS256'])
        user = info['uid']
        app.logger.info(f'Valid JWT token for user {user}. Token: {token}')
        return user
    except:
        app.logger.info(f'Invalid JWT token {token}')
        return None

In [14]:
_get_jwt_token('test')


'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1aWQiOiJ0ZXN0IiwiZXhwIjoxNjAzNjg2MzMzLCJpYXQiOjE2MDM2MTQzMzN9.RSsx6j2qW65SjfriCcTG8XyXqIJS7AD70D67BY8tKus'

In [13]:
# export
def token_required(f):
    @wraps(f)
    def decorator(*args, **kwargs):
        auth_header = request.headers.get('Authorization')
        if auth_header:
            try:
                token = auth_header.split(' ')[1]
            except IndexError:
                app.logger.info(f'Invalid HTTP Authorization: {auth_header}')
                raise jwt.InvalidTokenError
            user = _verify_jwt_token(token)
            if user:
                return f(*args, **kwargs, user=user)
        app.logger.info('No HTTP header Authorization in request')
        return 'Token required', HTTPStatus.UNAUTHORIZED
    return decorator

In [14]:
#export
_login_model = api.model('Login', {
    'username': fields.String(required=True),
    'password': fields.String(required=True)
})

In [15]:
#export

@api.route('/register', endpoint='register')
@api.doc()
class RegisterApi(Resource):
    @auth_ns.expect(_login_model, validate=True)
    @auth_ns.doc('authenticate a user')
    def post(self):
        auth = api.payload
        username = auth['username']
        password = auth['password']

        config.set_password(username, password)
        return HTTPStatus.CREATED

In [16]:
#export

@api.route('/login', endpoint='login')
@api.doc()
class LoginApi(Resource):
    @auth_ns.expect(_login_model, validate=True)
    @auth_ns.doc('authenticate a user')
    def post(self):
        auth = api.payload
        username = auth['username']
        password = auth['password']

        if _verify_credentials(username, password):
            return _get_jwt_token(username), HTTPStatus.OK
        else:
            return f'Invalid credentials for user {username}', HTTPStatus.UNAUTHORIZED.value