From 162d371cbd94ad67495cc6ee342f41be76f7ceb8 Mon Sep 17 00:00:00 2001 From: Brook Elgie Date: Mon, 27 Oct 2014 16:26:45 +0000 Subject: [PATCH] [#1944] Allow CORS settings in config. New config settings: ckan.cors.origin_allow_all = ckan.cors.origin_whitelist = Access-Control-Allow headers will only be set in the response if the request contains an Origin header, the origin_allow_all setting is present, and either: a) the origin_allow_all setting is true, or b) the request origin is listed in the origin_whitelist setting --- ckan/lib/base.py | 30 ++++-- ckan/new_tests/lib/test_base.py | 145 +++++++++++++++++++++++++++++ ckan/tests/functional/test_cors.py | 23 ----- 3 files changed, 169 insertions(+), 29 deletions(-) create mode 100644 ckan/new_tests/lib/test_base.py delete mode 100644 ckan/tests/functional/test_cors.py diff --git a/ckan/lib/base.py b/ckan/lib/base.py index 70d9af60c83..aa5d908b6c9 100644 --- a/ckan/lib/base.py +++ b/ckan/lib/base.py @@ -375,17 +375,35 @@ def __call__(self, environ, start_response): return res def __after__(self, action, **params): - self._set_cors() + # Do we have CORS settings in config? + if config.get('ckan.cors.origin_allow_all') \ + and request.headers.get('Origin'): + self._set_cors() r_time = time.time() - c.__timer url = request.environ['CKAN_CURRENT_URL'].split('?')[0] log.info(' %s render time %.3f seconds' % (url, r_time)) def _set_cors(self): - response.headers['Access-Control-Allow-Origin'] = "*" - response.headers['Access-Control-Allow-Methods'] = \ - "POST, PUT, GET, DELETE, OPTIONS" - response.headers['Access-Control-Allow-Headers'] = \ - "X-CKAN-API-KEY, Authorization, Content-Type" + ''' + Set up Access Control Allow headers if either origin_allow_all is + True, or the request Origin is in the origin_whitelist. + ''' + cors_origin_allowed = None + if asbool(config.get('ckan.cors.origin_allow_all')): + cors_origin_allowed = "*" + elif config.get('ckan.cors.origin_whitelist') and \ + request.headers.get('Origin') \ + in config['ckan.cors.origin_whitelist'].split(" "): + # set var to the origin to allow it. + cors_origin_allowed = request.headers.get('Origin') + + if cors_origin_allowed is not None: + response.headers['Access-Control-Allow-Origin'] = \ + cors_origin_allowed + response.headers['Access-Control-Allow-Methods'] = \ + "POST, PUT, GET, DELETE, OPTIONS" + response.headers['Access-Control-Allow-Headers'] = \ + "X-CKAN-API-KEY, Authorization, Content-Type" def _get_user_for_apikey(self): apikey_header_name = config.get(APIKEY_HEADER_NAME_KEY, diff --git a/ckan/new_tests/lib/test_base.py b/ckan/new_tests/lib/test_base.py new file mode 100644 index 00000000000..04656093928 --- /dev/null +++ b/ckan/new_tests/lib/test_base.py @@ -0,0 +1,145 @@ +from nose import tools as nose_tools + +from ckan.new_tests import helpers + + +class TestCORS(helpers.FunctionalTestBase): + + def test_options(self): + app = self._get_test_app() + response = app.options(url='/', status=200) + assert len(str(response.body)) == 0, 'OPTIONS must return no content' + + def test_cors_config_no_cors(self): + ''' + No ckan.cors settings in config, so no Access-Control-Allow headers in + response. + ''' + app = self._get_test_app() + response = app.get('/') + response_headers = dict(response.headers) + + assert 'Access-Control-Allow-Origin' not in response_headers + assert 'Access-Control-Allow-Methods' not in response_headers + assert 'Access-Control-Allow-Headers' not in response_headers + + def test_cors_config_no_cors_with_origin(self): + ''' + No ckan.cors settings in config, so no Access-Control-Allow headers in + response, even with origin header in request. + ''' + app = self._get_test_app() + request_headers = {'Origin': 'http://thirdpartyrequests.org'} + response = app.get('/', headers=request_headers) + response_headers = dict(response.headers) + + assert 'Access-Control-Allow-Origin' not in response_headers + assert 'Access-Control-Allow-Methods' not in response_headers + assert 'Access-Control-Allow-Headers' not in response_headers + + @helpers.change_config('ckan.cors.origin_allow_all', 'true') + def test_cors_config_origin_allow_all_true_no_origin(self): + ''' + With origin_allow_all set to true, but no origin in the request + header, no Access-Control-Allow headers should be in the response. + ''' + app = self._get_test_app() + response = app.get('/') + response_headers = dict(response.headers) + + assert 'Access-Control-Allow-Origin' not in response_headers + assert 'Access-Control-Allow-Methods' not in response_headers + assert 'Access-Control-Allow-Headers' not in response_headers + + @helpers.change_config('ckan.cors.origin_allow_all', 'true') + @helpers.change_config('ckan.site_url', 'http://test.ckan.org') + def test_cors_config_origin_allow_all_true_with_origin(self): + ''' + With origin_allow_all set to true, and an origin in the request + header, the appropriate Access-Control-Allow headers should be in the + response. + ''' + app = self._get_test_app() + request_headers = {'Origin': 'http://thirdpartyrequests.org'} + response = app.get('/', headers=request_headers) + response_headers = dict(response.headers) + + assert 'Access-Control-Allow-Origin' in response_headers + nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], '*') + nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS") + nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type") + + @helpers.change_config('ckan.cors.origin_allow_all', 'false') + @helpers.change_config('ckan.site_url', 'http://test.ckan.org') + def test_cors_config_origin_allow_all_false_with_origin_without_whitelist(self): + ''' + With origin_allow_all set to false, with an origin in the request + header, but no whitelist defined, there should be no Access-Control- + Allow headers in the response. + ''' + app = self._get_test_app() + request_headers = {'Origin': 'http://thirdpartyrequests.org'} + response = app.get('/', headers=request_headers) + response_headers = dict(response.headers) + + assert 'Access-Control-Allow-Origin' not in response_headers + assert 'Access-Control-Allow-Methods' not in response_headers + assert 'Access-Control-Allow-Headers' not in response_headers + + @helpers.change_config('ckan.cors.origin_allow_all', 'false') + @helpers.change_config('ckan.cors.origin_whitelist', 'http://thirdpartyrequests.org') + @helpers.change_config('ckan.site_url', 'http://test.ckan.org') + def test_cors_config_origin_allow_all_false_with_whitelisted_origin(self): + ''' + With origin_allow_all set to false, with an origin in the request + header, and a whitelist defined (containing the origin), the + appropriate Access-Control-Allow headers should be in the response. + ''' + app = self._get_test_app() + request_headers = {'Origin': 'http://thirdpartyrequests.org'} + response = app.get('/', headers=request_headers) + response_headers = dict(response.headers) + + assert 'Access-Control-Allow-Origin' in response_headers + nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], 'http://thirdpartyrequests.org') + nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS") + nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type") + + @helpers.change_config('ckan.cors.origin_allow_all', 'false') + @helpers.change_config('ckan.cors.origin_whitelist', 'http://google.com http://thirdpartyrequests.org http://yahoo.co.uk') + @helpers.change_config('ckan.site_url', 'http://test.ckan.org') + def test_cors_config_origin_allow_all_false_with_multiple_whitelisted_origins(self): + ''' + With origin_allow_all set to false, with an origin in the request + header, and a whitelist defining multiple allowed origins (containing + the origin), the appropriate Access-Control-Allow headers should be in + the response. + ''' + app = self._get_test_app() + request_headers = {'Origin': 'http://thirdpartyrequests.org'} + response = app.get('/', headers=request_headers) + response_headers = dict(response.headers) + + assert 'Access-Control-Allow-Origin' in response_headers + nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], 'http://thirdpartyrequests.org') + nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS") + nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type") + + @helpers.change_config('ckan.cors.origin_allow_all', 'false') + @helpers.change_config('ckan.cors.origin_whitelist', 'http://google.com http://yahoo.co.uk') + @helpers.change_config('ckan.site_url', 'http://test.ckan.org') + def test_cors_config_origin_allow_all_false_with_whitelist_not_containing_origin(self): + ''' + With origin_allow_all set to false, with an origin in the request + header, and a whitelist defining multiple allowed origins (but not + containing the requesting origin), there should be no Access-Control- + Allow headers in the response. + ''' + app = self._get_test_app() + request_headers = {'Origin': 'http://thirdpartyrequests.org'} + response = app.get('/', headers=request_headers) + response_headers = dict(response.headers) + + assert 'Access-Control-Allow-Origin' not in response_headers + assert 'Access-Control-Allow-Methods' not in response_headers + assert 'Access-Control-Allow-Headers' not in response_headers diff --git a/ckan/tests/functional/test_cors.py b/ckan/tests/functional/test_cors.py deleted file mode 100644 index b69ff1eea1d..00000000000 --- a/ckan/tests/functional/test_cors.py +++ /dev/null @@ -1,23 +0,0 @@ -from ckan.tests import TestController -from ckan.tests import is_search_supported - -class TestCORS(TestController): - - def test_options(self): - out = self.app._gen_request(method='OPTIONS', url='/', status=200) - assert len(str(out.body)) == 0, 'OPTIONS must return no content' - - def test_headers(self): - # the home page does a package search so have to skip this test if - # search is not supported - if not is_search_supported(): - from nose import SkipTest - raise SkipTest("Search not supported") - - out = self.app.get('/') - headers = dict(out.headers) - print headers - assert headers['Access-Control-Allow-Origin'] == '*' - assert headers['Access-Control-Allow-Methods'] == "POST, PUT, GET, DELETE, OPTIONS" - assert headers['Access-Control-Allow-Headers'] == "X-CKAN-API-KEY, Authorization, Content-Type" -