/
server_api.py
286 lines (223 loc) · 11.9 KB
/
server_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
"""Basic checks for the server API."""
from behave import then, when
from urllib.parse import urljoin
import requests
import time
from src.utils import split_comma_separated_list
from src.authorization_tokens import authorization
from src.attribute_checks import check_and_get_attribute, check_timestamp, check_hash_value
from src.schema_validator import validate_schema
from src.parsing import parse_token_clause
@when('I access {url:S}')
def access_url(context, url):
"""Access the service API using the HTTP GET method."""
context.response = requests.get(context.coreapi_url + url)
@when('I access the {url:S} endpoint using the HTTP GET method')
@when('I call the {url:S} endpoint using the HTTP GET method')
def access_url_get_method(context, url):
"""Access the service API using the HTTP GET method."""
context.response = requests.get(context.coreapi_url + url)
@when('I access the {url:S} endpoint using the HTTP PUT method')
@when('I call the {url:S} endpoint using the HTTP PUT method')
def access_url_put_method(context, url):
"""Access the service API using the HTTP PUT method."""
context.response = requests.put(context.coreapi_url + url)
@when('I access the {url:S} endpoint using the HTTP POST method')
@when('I call the {url:S} endpoint using the HTTP POST method')
def access_url_post_method(context, url):
"""Access the service API using the HTTP POST method."""
context.response = requests.post(context.coreapi_url + url)
@when('I access the {url:S} endpoint using the HTTP PATCH method')
@when('I call the {url:S} endpoint using the HTTP PATCH method')
def access_url_patch_method(context, url):
"""Access the service API using the HTTP PATCH method."""
context.response = requests.patch(context.coreapi_url + url)
@when('I access the {url:S} endpoint using the HTTP DELETE method')
@when('I call the {url:S} endpoint using the HTTP DELETE method')
def access_url_delete_method(context, url):
"""Access the service API using the HTTP DELETE method."""
context.response = requests.delete(context.coreapi_url + url)
@when('I access the {url:S} endpoint using the HTTP HEAD method')
@when('I call the {url:S} endpoint using the HTTP HEAD method')
def access_url_head_method(context, url):
"""Access the service API using the HTTP HEAD method."""
context.response = requests.head(context.coreapi_url + url)
@when('I access the {url:S} {repeat_count:d} times with {delay:d} seconds delay')
def access_url_repeatedly(context, url, repeat_count, delay):
"""Access the service API using the HTTP GET method repeatedly."""
context.api_call_results = []
url = context.coreapi_url + url
# repeatedly call REST API endpoint and collect HTTP status codes
# into list assigned to the context
for i in range(repeat_count):
response = requests.get(url)
context.api_call_results.append(response.status_code)
time.sleep(delay)
@when('I access {url:S} with authorization token')
@when('I access the {url:S} endpoint using the HTTP GET method and authorization token')
def access_url_with_authorization_token(context, url):
"""Access the service API using the HTTP GET method."""
context.response = requests.get(context.coreapi_url + url,
headers=authorization(context))
@when('I access the {url:S} endpoint using the HTTP PUT method and authorization token')
@when('I call the {url:S} endpoint using the HTTP PUT method and authorization token')
def access_url_put_method_with_authorization(context, url):
"""Access the service API using the HTTP PUT method and authorization token."""
context.response = requests.put(context.coreapi_url + url,
headers=authorization(context))
@when('I access the {url:S} endpoint using the HTTP POST method and authorization token')
@when('I call the {url:S} endpoint using the HTTP POST method and authorization token')
def access_url_post_method_with_authorization(context, url):
"""Access the service API using the HTTP POST method and authorization token."""
context.response = requests.post(context.coreapi_url + url,
headers=authorization(context))
@when('I access the {url:S} endpoint using the HTTP PATCH method and authorization token')
@when('I call the {url:S} endpoint using the HTTP PATCH method and authorization token')
def access_url_patch_method_with_authorization(context, url):
"""Access the service API using the HTTP PATCH method and authorization token."""
context.response = requests.patch(context.coreapi_url + url,
headers=authorization(context))
@when('I access the {url:S} endpoint using the HTTP DELETE method and authorization token')
@when('I call the {url:S} endpoint using the HTTP DELETE method and authorization token')
def access_url_delete_method_with_authorization(context, url):
"""Access the service API using the HTTP DELETE method and authorization token."""
context.response = requests.delete(context.coreapi_url + url,
headers=authorization(context))
@when('I access the {url:S} endpoint using the HTTP HEAD method and authorization token')
@when('I call the {url:S} endpoint using the HTTP HEAD method and authorization token')
def access_url_head_method_with_authorization(context, url):
"""Access the service API using the HTTP HEAD method and authorization token."""
context.response = requests.head(context.coreapi_url + url,
headers=authorization(context))
@when('I access {url:S} without valid values')
def check_submit_feedback_without_valid_values(context, url):
"""Access the submit-feedback API using the HTTP POST method with invalid payload."""
payload = {
"stack_id": "1234-569586048",
"recommendation_type": "companion",
"package_name": "blah-blah",
"feedback_type": True,
"ecosystem": None
}
parms = {'user_key': context.three_scale_preview_user_key}
context.response = requests.post(context.core_v2_api_url + url,
params=parms,
data=payload)
@when('I access {url:S} without any payload')
def check_submit_feedback_without_any_payload(context, url):
"""Access the submit-feedback API using the HTTP POST method with no payload."""
payload = None
parms = {'user_key': context.three_scale_preview_user_key}
context.response = requests.post(context.core_v2_api_url + url,
params=parms,
data=payload)
@when('I access {url:S} with empty payload')
def check_submit_feedback_with_empty_payload(context, url):
"""Access the submit-feedback API using the HTTP POST method with empty payload."""
payload = {}
parms = {'user_key': context.three_scale_preview_user_key}
context.response = requests.post(context.core_v2_api_url + url,
params=parms,
data=payload)
@then('I should see {num:d} ecosystems')
def check_ecosystems(context, num):
"""Check if the API call returns correct number of ecosystems."""
ecosystems = context.response.json()['items']
assert len(ecosystems) == num
for e in ecosystems:
# assert that there is 'ecosystem' field in every ecosystem
assert 'ecosystem' in e
def check_package_version(v, ecosystem, package, versions):
"""Check package version."""
assert v['ecosystem'] == ecosystem
assert v['package'] == package
assert v['version'] in versions
@then('I should see {num:d} versions ({versions}), all for {ecosystem}/{package} package')
def check_versions(context, num=0, versions='', ecosystem='', package=''):
"""Check the versions for the selected ecosystems and package."""
versions = split_comma_separated_list(versions)
vrsns = context.response.json()['items']
assert len(vrsns) == num
for v in vrsns:
check_package_version(v, ecosystem, package, versions)
@then('I should find the endpoint {endpoint} in the list of supported endpoints')
def check_endpoint_in_paths(context, endpoint):
"""Check the existence of given endpoint in the list of all supported endpoints."""
data = context.response.json()
paths = check_and_get_attribute(data, "paths")
assert endpoint in paths, "Cannot find the expected endpoint {e}".format(
e=endpoint)
@then('I should find the schema {schema} version {version} in the list of supported schemas')
def check_schema_existence(context, schema, version, selector=None):
"""Check the existence of given schema."""
data = context.response.json()
if selector is not None:
api_schemas = check_and_get_attribute(data, selector)
schema = check_and_get_attribute(api_schemas, schema)
else:
schema = check_and_get_attribute(data, schema)
check_and_get_attribute(schema, version)
@then('I should find the schema {schema} version {version} in the list of supported schemas '
'for API calls')
def check_schema_existence_api_call(context, schema, version):
"""Check the existence of given schema (API calls)."""
check_schema_existence(context, schema, version, "api")
@then('I should find the schema {schema} version {version} in the list of component analyses '
'schemas')
def check_schema_existence_component_analyses(context, schema, version):
"""Check the existence of given schema (component analyses)."""
check_schema_existence(context, schema, version, "component_analyses")
@then('I should find the schema version {version} in the list of schema versions')
def check_schema_version(context, version):
"""Check the existence of schema version."""
data = context.response.json()
check_and_get_attribute(data, version)
@then('I should find valid schema in the server response')
def check_valid_schema(context):
"""Check if the schema is valid, validation is performed against metaschema."""
data = context.response.json()
validate_schema(data)
def generate_data_for_user_feedback(is_valid):
"""Generate data for generating user feedback."""
if is_valid == "valid":
return {"request_id": "test_id", "feedback": [{"ques": "what", "ans": "got it"}]}
elif is_valid == "invalid":
return {"foo": "x", "bar": "y", "baz": []}
elif is_valid == "incomplete":
return {"request_id": "test_id"}
elif is_valid == "empty":
return {}
else:
return None
@when("I post {is_valid} input to the {endpoint} endpoint {token} authorization token")
def post_input_to_user_feedback(context, is_valid, endpoint, token):
"""Send feedback to user feedback endpoint."""
use_token = parse_token_clause(token)
api_url = urljoin(context.coreapi_url, endpoint)
data = generate_data_for_user_feedback(is_valid)
if use_token:
response = requests.post(api_url, json=data,
headers=authorization(context))
else:
response = requests.post(api_url, json=data)
context.response = response
@then("I should find the correct commit hash in the JSON response")
def check_hash_attribute(context):
"""Check the commit hash in the JSON response."""
response = context.response
assert response is not None
data = response.json()
commit_hash = check_and_get_attribute(data, "commit_hash")
check_hash_value(commit_hash)
@then("I should find the correct committed at timestamp in the JSON response")
def check_timestamp_attribute(context):
"""Check the commited at timestamp in the JSON response."""
response = context.response
assert response is not None
data = response.json()
timestamp = check_and_get_attribute(data, "committed_at")
# we are not interested much in +100 offsets etc.
template = "2000-01-01 10:20:30"
assert len(timestamp) >= len(template), "Invalid timestamp %s" % timestamp
t = timestamp[0:len(template)]
check_timestamp(t)