Skip to content

Commit

Permalink
Protect remote_cors from Server-Side Request Forgery (#3326)
Browse files Browse the repository at this point in the history
* Protect remote_cors from Server-Side Request Forgery

* Improved code and added test

* Fix a couple of comments in a test
  • Loading branch information
northwestwitch committed May 5, 2022
1 parent 51a838d commit b0ef15f
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -29,6 +29,7 @@ About changelog [here](https://keepachangelog.com/en/1.0.0/)
- Removed stream argument deprecation warning in tests
- Handle `no intervals found` warning in load_region test
- Beacon remove variants
- Protect remote_cors function in alignviewers view from Server-Side Request Forgery (SSRF)

## [4.51]
### Added
Expand Down
21 changes: 20 additions & 1 deletion scout/server/blueprints/alignviewers/controllers.py
Expand Up @@ -14,6 +14,25 @@
CUSTOM_TRACK_NAMES = ["Genes", "ClinVar", "ClinVar CNVs"]


def check_session_tracks(resource):
"""Make sure that a user requesting a resource is authenticated and resource is in session IGV tracks
Args:
resource(str): a resource on the server or on a remote URL
Returns
True is user has access to resource else False
"""
# Check that user is logged in or that file extension is valid
if current_user.is_authenticated is False:
LOG.warning("Unauthenticated user requesting resource via remote_static")
return False
if resource not in session.get("igv_tracks", []):
LOG.warning(f"{resource} not in {session.get('igv_tracks', [])}")
return False
return True


def set_session_tracks(display_obj):
"""Save igv tracks as a session object. This way it's easy to verify that a user is requesting one of these files from remote_static view endpoint
Expand All @@ -22,7 +41,7 @@ def set_session_tracks(display_obj):
"""
session_tracks = list(display_obj.get("reference_track", {}).values())
for key, track_items in display_obj.items():
if key not in ["tracks", "custom_tracks", "sample_tracks"]:
if key not in ["tracks", "custom_tracks", "sample_tracks", "cloud_public_tracks"]:
continue
for track_item in track_items:
session_tracks += list(track_item.values())
Expand Down
7 changes: 5 additions & 2 deletions scout/server/blueprints/alignviewers/views.py
Expand Up @@ -39,6 +39,10 @@ def remote_cors(remote_url):
Based on code from answers to this thread:
https://stackoverflow.com/questions/6656363/proxying-to-another-web-service-with-flask/
"""
# Check that user is logged in or that file extension is valid
if controllers.check_session_tracks(remote_url) is False:
return abort(403)

resp = requests.request(
method=request.method,
url=remote_url,
Expand Down Expand Up @@ -70,8 +74,7 @@ def remote_static():
file_path = request.args.get("file") or "."

# Check that user is logged in or that file extension is valid
if current_user.is_authenticated is False or file_path not in session.get("igv_tracks", []):
LOG.warning(f"{file_path} not in {session.get('igv_tracks', [])}")
if controllers.check_session_tracks(file_path) is False:
return abort(403)

range_header = request.headers.get("Range", None)
Expand Down
36 changes: 29 additions & 7 deletions tests/server/blueprints/alignviewers/test_alignviewers_views.py
Expand Up @@ -47,8 +47,9 @@ def test_remote_static(app):
with app.test_client() as client:
# GIVEN that user is logged in
client.get(url_for("auto_login"))

# GIVEN that resource file exists in user session
with client.session_transaction() as session:
# GIVEN that resource file exists in user session
session["igv_tracks"] = [file]

# THEN the resource should be available to the user
Expand All @@ -61,20 +62,41 @@ def test_remote_static(app):
assert resp.status_code == 200


def test_remote_cors(app):
def test_remote_cors_wrong_resource(app):
"""Test endpoint that serves as a proxy to the actual remote track on the cloud"""
cloud_track_url = "http://google.com"
# GIVEN a resource not present in session["igv_tracks"]
an_url = "http://google.com"

# GIVEN an initialized app
# GIVEN a valid user and institute
# GIVEN a running demo app
with app.test_client() as client:
# GIVEN that the user could be logged in
resp = client.get(url_for("auto_login"))
assert resp.status_code == 200

# WHEN the remote cors endpoint is invoked with an url
resp = client.get(url_for("alignviewers.remote_cors", remote_url=an_url))

# THEN it should return forbidden (403)
assert resp.status_code == 403


def test_remote_cors(app):
"""Test endpoint that serves as a proxy to the actual remote track on the cloud"""
# GIVEN an igv track on the cloud
cloud_track_url = "https://s3-eu-west-1.amazonaws.com/pfigshare-u-files/25777460/GRCh37.variant_call.clinical.pathogenic_or_likely_pathogenic.vcf.gz.tbi"

# GIVEN a running demo app
with app.test_client() as client:
# GIVEN that the user could be logged in
resp = client.get(url_for("auto_login"))

# GIVEN that resource url exists in user session
with client.session_transaction() as session:
session["igv_tracks"] = [cloud_track_url]

# WHEN the remote cors endpoint is invoked with cloud_track_url
resp = client.get(url_for("alignviewers.remote_cors", remote_url=cloud_track_url))
# THEN it should return success response

# THEN response should be successful
assert resp.status_code == 200


Expand Down

0 comments on commit b0ef15f

Please sign in to comment.