-
Notifications
You must be signed in to change notification settings - Fork 2k
/
test_proxy.py
124 lines (97 loc) · 3.94 KB
/
test_proxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import requests
import unittest
import json
from httpretty import HTTPretty, httprettified
import paste.fixture
from pylons import config
import ckan.logic as logic
import ckan.model as model
import ckan.tests as tests
import ckan.plugins as plugins
import ckan.lib.create_test_data as create_test_data
import ckan.config.middleware as middleware
import ckanext.resourceproxy.controller as controller
import ckanext.resourceproxy.plugin as proxy
JSON_STRING = json.dumps({
"a": "foo",
"bar": "yes, I'm proxied",
"b": 42})
class TestProxyBasic(tests.WsgiAppCase, unittest.TestCase):
serving = False
@classmethod
def setup_class(cls):
cls._original_config = config.copy()
config['ckan.plugins'] = 'resource_proxy'
wsgiapp = middleware.make_app(config['global_conf'], **config)
cls.app = paste.fixture.TestApp(wsgiapp)
# create test resource
create_test_data.CreateTestData.create()
@classmethod
def teardown_class(cls):
config.clear()
config.update(cls._original_config)
model.repo.rebuild_db()
plugins.reset()
def set_resource_url(self, url):
testpackage = model.Package.get('annakarenina')
context = {
'model': model,
'session': model.Session,
'user': model.User.get('testsysadmin').name
}
resource = logic.get_action('resource_show')(context, {'id': testpackage.resources[0].id})
package = logic.get_action('package_show')(context, {'id': testpackage.id})
resource['url'] = url
logic.action.update.resource_update(context, resource)
testpackage = model.Package.get('annakarenina')
assert testpackage.resources[0].url == resource['url'], testpackage.resources[0].url
self.data_dict = {'resource': resource, 'package': package}
@httprettified
def test_resource_proxy_on_200(self):
url = 'http://www.ckan.org/static/test.json'
HTTPretty.register_uri(
HTTPretty.GET, url,
content_type='application/json',
body=JSON_STRING)
self.set_resource_url(url)
url = self.data_dict['resource']['url']
result = requests.get(url)
assert result.status_code == 200, result.status_code
assert "yes, I'm proxied" in result.content, result.content
@httprettified
def test_resource_proxy_on_404(self):
url = 'http://www.ckan.org/static/test.json'
HTTPretty.register_uri(
HTTPretty.GET, url,
body="I'm not here",
content_type='application/json',
status=404)
self.set_resource_url(url)
url = self.data_dict['resource']['url']
result = requests.get(url)
assert result.status_code == 404, result.status_code
proxied_url = proxy.get_proxified_resource_url(self.data_dict)
result = self.app.get(proxied_url, status='*')
assert result.status == 404, result.status
@httprettified
def test_large_file(self):
url = 'http://www.ckan.org/static/huge.json'
HTTPretty.register_uri(
HTTPretty.GET, url,
content_length=controller.MAX_FILE_SIZE + 1,
body=JSON_STRING)
self.set_resource_url(url)
proxied_url = proxy.get_proxified_resource_url(self.data_dict)
result = self.app.get(proxied_url, status='*')
assert result.status == 500, result.status
assert 'too large' in result.body, result.body
def test_resource_proxy_non_existent(self):
self.set_resource_url('http://foo.bar')
def f1():
url = self.data_dict['resource']['url']
requests.get(url)
self.assertRaises(requests.ConnectionError, f1)
proxied_url = proxy.get_proxified_resource_url(self.data_dict)
result = self.app.get(proxied_url, status='*')
assert result.status == 500, result.status
assert 'connection error' in result.body, result.body