-
Notifications
You must be signed in to change notification settings - Fork 279
/
RequestsLibrary.py
146 lines (93 loc) · 4 KB
/
RequestsLibrary.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import requests
import json
from urllib import urlencode
import robot
from robot.libraries.BuiltIn import BuiltIn
class RequestsLibrary(object):
ROBOT_LIBRARY_SCOPE = 'Global'
def __init__(self):
'''
TODO: probably can set global proxy here
'''
self._cache = robot.utils.ConnectionCache('No sessions created')
requests.settings.base_headers['User-Agent'] = 'robotframework-requests'
self.builtin = BuiltIn()
def create_session(self, alias, url, headers=None, cookies=None, auth=None, timeout=None, proxies=None):
''' Create Session: create a HTTP session to a server
*url:* Base url of the server
*alias:* Robot Framework alias to identify the session
*headers:* Dictionary of default headers
*auth:* Dictionary of username & password for HTTP Basic Auth
*timeout:* connection timeout
*proxies:* proxy server url
'''
def baseurlhook(args):
# url is the base url. Request url is uri
args['url'] = '%s%s' %(url, args['url'])
self.builtin.log('Creating session: %s' %alias, 'DEBUG')
session = requests.session(hooks=dict(args=baseurlhook), auth=auth, headers=headers,
cookies=cookies, timeout=timeout, proxies=proxies )
self._cache.register(session, alias=alias)
return session
def delete_all_sessions(self):
''' Delete Session: removes all the session objects
'''
self._cache.empty_cache()
def to_json(self, content):
return json.loads(content)
def get(self, alias, uri, headers=None):
''' Get: send a GET request on the session object found using the given alias
'''
session = self._cache.switch(alias)
resp = session.get(uri, headers=headers)
# store the last response object
session.last_resp = resp
return resp
def post(self, alias, uri, data=None, headers=None):
''' Post: send a POST request on the session object found using the given alias
'''
if data:
data = urlencode(data)
session = self._cache.switch(alias)
resp = session.post(uri, data=data, headers=headers)
# store the last response object
session.last_resp = resp
self.builtin.log("Post response: " + resp.content, 'DEBUG')
return resp
def put(self, alias, uri, data=None, headers=None):
''' Put: send a PUT request on the session object found using the given alias
'''
session = self._cache.switch(alias)
resp = session.put(uri, data=urlencode(data), headers=headers)
# store the last response object
session.last_resp = resp
return resp
def delete(self, alias, uri, headers=None):
''' Delete: send a DELETE request on the session object found using the given alias
'''
session = self._cache.switch(alias)
resp = session.delete(uri, headers=headers)
# store the last response object
session.last_resp = resp
return resp
def head(self, alias, uri, headers=None):
''' Delete: send a HEAD request on the session object found using the given alias
'''
session = self._cache.switch(alias)
resp = session.head(uri, headers=headers)
# store the last response object
session.last_resp = resp
return resp
if __name__ == '__main__':
rl = RequestsLibrary()
session = rl.create_session('github', 'http://github.com/api/v2/json')
#resp = rl.get('github', '/user/search/bulkan')
#jsondata = rl.to_json(resp.content)
auth = ('user', 'passwd')
session = rl.create_session('httpbin', 'http:/httpbin.org', auth=auth)
resp = rl.get('httpbin', '/basic-auth/user/passwd')
import pdb; pdb.set_trace()
#with requests.session(auth=auth) as c:
# resp = c.get('http://httpbin.org/basic-auth/user/passwd')
# print resp
# sometimes you just need pdb