# Authentication in Flask-RESTX: Basic and JWT

This notebook covers three levels of authentication implementation using **Flask-RESTX**:
1. Basic and JWT Authentication.
2. Token storage and expiration handling.
3. Role-based access control using enums and decorators.
 

In [69]:
import json
from flask import Flask, request
from flask_restx import Api, Namespace, Resource, fields
from flask_jwt_extended import JWTManager, create_access_token, jwt_required, verify_jwt_in_request, get_jwt_identity
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
from functools import wraps
from enum import Enum
import base64
import nest_asyncio  # Re-added for Jupyter Notebook compatibility
from werkzeug.serving import run_simple
from datetime import datetime, timedelta
from flask_jwt_extended import get_jwt

# Apply `nest_asyncio` for Jupyter Notebook compatibility
nest_asyncio.apply()

# Flask Application Configuration
app = Flask(__name__)
app.config['JWT_SECRET_KEY'] = 'your-secret-key'
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql+mysqlconnector://root:top!secret@localhost:3307/test_4"
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

# Initialize Extensions
api = Api(
    app,
    title="Role-Based Auth API",
    version="1.0",
    description="Handles Basic, JWT, and Role-Based Authentication",
    security=['basic', 'jwt'],
    authorizations={
        'basic': {
            'type': 'basic',
            'description': "Basic Authentication - Provide `username:password` in Base64."
        },
        'jwt': {
            'type': 'apiKey',
            'in': 'header',
            'name': 'Authorization',
            'description': "JWT Authentication - Use `Bearer <JWT>` in the header."
        }
    }
)

jwt = JWTManager(app)
db = SQLAlchemy(app)

# User Roles Enum
class UserRole(Enum):
    ADMIN = "admin"
    USER = "user"
    GUEST = "guest"

# User Model
class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(50), unique=True, nullable=False)
    password_hash = db.Column(db.String(255), nullable=False)
    role = db.Column(db.String(20), default=UserRole.USER.value, nullable=False)

    def set_password(self, password):
        self.password_hash = generate_password_hash(password)

    def check_password(self, password):
        return check_password_hash(self.password_hash, password)

# Token Model (Optional for Revocation Management)
class Token(db.Model):
    __tablename__ = 'tokens'
    id = db.Column(db.Integer, primary_key=True)
    token = db.Column(db.String(512), unique=True, nullable=False)
    expires_at = db.Column(db.DateTime, nullable=False)

# Default User and Token Creation
with app.app_context():
    db.create_all()
    if not User.query.filter_by(username="admin").first():
        admin = User(username="admin", role=UserRole.ADMIN.value)
        admin.set_password("admin123")
        db.session.add(admin)
        db.session.commit()

    # Create a default token for the admin user
    admin_user = User.query.filter_by(username="admin").first()
    if admin_user and not Token.query.filter_by(token="default_admin_token").first():
        identity = json.dumps({"username": admin_user.username, "role": admin_user.role})
        default_token = create_access_token(identity=identity)
        token_entry = Token(token=default_token, expires_at=datetime.utcnow() + timedelta(days=1))
        db.session.add(token_entry)
        db.session.commit()

# Role-Based Authorization Decorator  
def role_required(allowed_roles):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            auth_header = request.headers.get('Authorization')
            if not auth_header:
                return {"message": "Authorization header is missing"}, 401
            
            # Basic Authentication 
            if auth_header.startswith('Basic '):
                try:
                    base64_credentials = auth_header.split(' ')[1]
                    credentials = base64.b64decode(base64_credentials).decode('utf-8')
                    username, password = credentials.split(':')
                    user = User.query.filter_by(username=username).first()
                    if user and user.check_password(password):
                        if user.role not in [role.value for role in allowed_roles]:
                            return {"message": "Access forbidden: Insufficient permissions"}, 403
                        request.current_user = {"username": username, "role": user.role}
                        return func(*args, **kwargs)
                    return {"message": "Invalid Basic Authentication credentials"}, 401
                except Exception as e:
                    return {"message": f"Basic Authentication error: {str(e)}"}, 401
            
            # JWT/Bearer Token Authentication
            elif auth_header.startswith('Bearer '):
                token = auth_header.split(' ')[1]
                if len(token.split('.')) != 3:
                    return {"message": "JWT Authentication error: Not enough segments"}, 401
                try:
                    verify_jwt_in_request()
                    user = json.loads(get_jwt_identity())  # Parse JSON string  user_identity: {'username': 'admin', 'role': 'admin'} 
                    user_name = user.get("username")
                    user_role = user.get("role")
                    
                    if user_role not in [role.value for role in allowed_roles]:
                        return {"message": "Access forbidden: Insufficient permissions"}, 403
                    request.current_user = {"username": user_name, "role": user_role}
                    return func(*args, **kwargs)
                except Exception as e:
                    return {"message": f"JWT Authentication error: {str(e)}"}, 401
            else:
                return {"message": "Unsupported authentication method"}, 401
        return wrapper
    return decorator

# Namespaces
auth_ns = Namespace('auth', description="Authentication Endpoints")
protected_ns = Namespace('protected', description="Role-Based Protected Endpoints")

# Models
register_model = auth_ns.model('Register', {
    'username': fields.String(required=True, description="User's username"),
    'password': fields.String(required=True, description="User's password"),
    'role': fields.String(description="Role of the user", default=UserRole.USER.value)
})

login_model = auth_ns.model('Login', {
    'username': fields.String(required=True, description="User's username"),
    'password': fields.String(required=True, description="User's password")
})

protected_response_model = auth_ns.model('ProtectedResponse', {
    'message': fields.String(description="Protected resource message")
})

# Endpoints
@auth_ns.route('/register')
class Register(Resource):
    @auth_ns.expect(register_model, validate=True)
    @auth_ns.doc(security=['basic', 'jwt'])
    @role_required([UserRole.ADMIN])
    def post(self):
        """Register a new user (Admin only)"""
        data = auth_ns.payload
        username, password, role = data['username'], data['password'], data.get('role', UserRole.USER.value)
        if User.query.filter_by(username=username).first():
            return {"message": "Username already exists"}, 400
        user = User(username=username, role=role)
        user.set_password(password)
        db.session.add(user)
        db.session.commit()
        return {"message": "User registered successfully"}, 201

@auth_ns.route('/login')
class Login(Resource):
    @auth_ns.expect(login_model, validate=True)
    def post(self):
        """Login and receive a JWT token"""
        data = auth_ns.payload
        username, password = data['username'], data['password']
        user = User.query.filter_by(username=username).first()
        if not user or not user.check_password(password):
            return {"message": "Invalid username or password"}, 401
        token = create_access_token(identity={"username": username, "role": user.role})
        return {"access_token": token}, 200

@protected_ns.route('/admin')
class AdminResource(Resource):
    @auth_ns.doc(security=['basic', 'jwt'])
    @role_required([UserRole.ADMIN])
    def get(self):
        """Admin-only resource"""
        return {"message": f"Welcome Admin, {request.current_user['username']}!"}, 200

@protected_ns.route('/user')
class UserResource(Resource):
    @auth_ns.doc(security=['basic', 'jwt'])
    @role_required([UserRole.ADMIN, UserRole.USER])
    def get(self):
        """User and Admin resource"""
        return {"message": f"Welcome User, {request.current_user['username']}!"}, 200

@protected_ns.route('/guest')
class GuestResource(Resource):
    @auth_ns.doc(security=['basic', 'jwt'])
    @role_required([UserRole.ADMIN, UserRole.USER, UserRole.GUEST])
    def get(self):
        """Guest, User, and Admin resource"""
        return {"message": f"Welcome Guest, {request.current_user['username']}!"}, 200

# Register Namespaces
api.add_namespace(auth_ns, path='/auth')
api.add_namespace(protected_ns, path='/protected')

# Run the Application 
run_simple("localhost", 5000, app)


 * Running on http://localhost:5000
[33mPress CTRL+C to quit[0m
127.0.0.1 - - [13/Jan/2025 04:41:04] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [13/Jan/2025 04:41:04] "[36mGET /swaggerui/droid-sans.css HTTP/1.1[0m" 304 -
127.0.0.1 - - [13/Jan/2025 04:41:04] "[36mGET /swaggerui/swagger-ui.css HTTP/1.1[0m" 304 -
127.0.0.1 - - [13/Jan/2025 04:41:04] "[36mGET /swaggerui/swagger-ui-bundle.js HTTP/1.1[0m" 304 -
127.0.0.1 - - [13/Jan/2025 04:41:04] "[36mGET /swaggerui/swagger-ui-standalone-preset.js HTTP/1.1[0m" 304 -
127.0.0.1 - - [13/Jan/2025 04:41:04] "[36mGET /swaggerui/favicon-32x32.png HTTP/1.1[0m" 304 -
127.0.0.1 - - [13/Jan/2025 04:41:04] "GET /swagger.json HTTP/1.1" 200 -
127.0.0.1 - - [13/Jan/2025 04:41:18] "GET /protected/admin HTTP/1.1" 200 -
