Skip to content

Commit

Permalink
fix: optional unauthorized status codes (#1753)
Browse files Browse the repository at this point in the history
* fix: optional unauthorized status codes

* fix

* fix

* lint

* add tests
  • Loading branch information
dpgaspar committed Dec 3, 2021
1 parent 751e57c commit 319b6be
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 16 deletions.
29 changes: 21 additions & 8 deletions flask_appbuilder/security/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ def response_unauthorized(base_class: "BaseApi") -> Response:
return base_class.response_401()


def response_unauthorized_mvc() -> Response:
status_code = 401
if current_app.appbuilder.sm.current_user and current_app.config.get(
"AUTH_STRICT_RESPONSE_CODES", False
):
status_code = 403
response = make_response(
jsonify({"message": str(FLAMSG_ERR_SEC_ACCESS_DENIED), "severity": "danger"}),
status_code,
)
response.headers["Content-Type"] = "application/json"
return response


def protect(allow_browser_login=False):
"""
Use this decorator to enable granular security permissions
Expand Down Expand Up @@ -72,19 +86,25 @@ def wraps(self, *args, **kwargs):
if _permission_name:
permission_str = f"{PERMISSION_PREFIX}{_permission_name}"
class_permission_name = self.class_permission_name
# Check if permission is allowed on the class
if permission_str not in self.base_permissions:
return response_unauthorized(self)
# Check if the resource is public
if current_app.appbuilder.sm.is_item_public(
permission_str, class_permission_name
):
return f(self, *args, **kwargs)
# if no browser login then verify JWT
if not (self.allow_browser_login or allow_browser_login):
verify_jwt_in_request()
# Verify resource access
if current_app.appbuilder.sm.has_access(
permission_str, class_permission_name
):
return f(self, *args, **kwargs)
# If browser login?
elif self.allow_browser_login or allow_browser_login:
# no session cookie (but we allow it), then try JWT
if not current_user.is_authenticated:
verify_jwt_in_request()
if current_app.appbuilder.sm.has_access(
Expand Down Expand Up @@ -174,14 +194,7 @@ def wraps(self, *args, **kwargs):
permission_str, self.__class__.__name__
)
)
response = make_response(
jsonify(
{"message": str(FLAMSG_ERR_SEC_ACCESS_DENIED), "severity": "danger"}
),
403,
)
response.headers["Content-Type"] = "application/json"
return response
return response_unauthorized_mvc()

f._permission_name = permission_str
return functools.update_wrapper(wraps, f)
Expand Down
12 changes: 4 additions & 8 deletions flask_appbuilder/tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,19 @@
class FABTestCase(unittest.TestCase):
@staticmethod
def auth_client_get(client, token, uri):
return client.get(uri, headers={"Authorization": "Bearer {}".format(token)})
return client.get(uri, headers={"Authorization": f"Bearer {token}"})

@staticmethod
def auth_client_delete(client, token, uri):
return client.delete(uri, headers={"Authorization": "Bearer {}".format(token)})
return client.delete(uri, headers={"Authorization": f"Bearer {token}"})

@staticmethod
def auth_client_put(client, token, uri, json):
return client.put(
uri, json=json, headers={"Authorization": "Bearer {}".format(token)}
)
return client.put(uri, json=json, headers={"Authorization": f"Bearer {token}"})

@staticmethod
def auth_client_post(client, token, uri, json):
return client.post(
uri, json=json, headers={"Authorization": "Bearer {}".format(token)}
)
return client.post(uri, json=json, headers={"Authorization": f"Bearer {token}"})

@staticmethod
def _login(client, username, password, refresh: bool = False):
Expand Down
34 changes: 34 additions & 0 deletions flask_appbuilder/tests/test_mvc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1310,6 +1310,40 @@ def test_api_read(self):
self.assertIn("pks", data)
assert len(data.get("result")) > 10

def test_api_unauthenticated(self):
"""
Testing unauthenticated access to MVC API
"""
client = self.app.test_client()
self.app.config["AUTH_STRICT_RESPONSE_CODES"] = True
rv = client.get("/model1formattedview/api/read")
self.assertEqual(rv.status_code, 401)
self.app.config["AUTH_STRICT_RESPONSE_CODES"] = False
rv = client.get("/model1formattedview/api/read")
self.assertEqual(rv.status_code, 401)

def test_api_unauthorized(self):
"""
Testing unauthorized access to MVC API
"""
client = self.app.test_client()
self.browser_login(client, USERNAME_READONLY, PASSWORD_READONLY)
self.app.config["AUTH_STRICT_RESPONSE_CODES"] = True

rv = client.post(
"/model1view/api/create",
data=dict(field_string="zzz"),
follow_redirects=True,
)
self.assertEqual(rv.status_code, 403)
self.app.config["AUTH_STRICT_RESPONSE_CODES"] = False
rv = client.post(
"/model1view/api/create",
data=dict(field_string="zzz"),
follow_redirects=True,
)
self.assertEqual(rv.status_code, 401)

def test_api_create(self):
"""
Testing the api/create endpoint
Expand Down

0 comments on commit 319b6be

Please sign in to comment.