/
unknown_dep_fetcher.py
67 lines (55 loc) · 2.8 KB
/
unknown_dep_fetcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""Task to fetch unknown dependencies."""
from __future__ import division
import json
from f8a_worker.graphutils import GREMLIN_SERVER_URL_REST
from f8a_worker.base import BaseTask
from f8a_worker.utils import get_session_retry
class UnknownDependencyFetcherTask(BaseTask):
"""Task to fetch unknown dependencies."""
def get_dependency_data(self, dependency_list):
"""Prepare list of unknown dependencies from given list of dependencies."""
dep_pkg_list_unknown = list()
dep_pkg_list_known = list()
for dependency in dependency_list:
n_colons = dependency.count(":")
dependency_list = dependency.split(":")
ecosystem = dependency_list[0]
version = dependency_list[-1]
if n_colons == 3:
name = dependency_list[1] + ":" + dependency_list[2]
elif n_colons == 2:
name = dependency_list[1]
else:
self.log.error("No valid dependency format found: {}"
.format(dependency))
name = ""
qstring = ("g.V().has('pecosystem','" + ecosystem + "').has('pname','" +
name + "').has('version','" + version + "').tryNext()")
payload = {'gremlin': qstring}
graph_req = get_session_retry().post(GREMLIN_SERVER_URL_REST, data=json.dumps(payload))
if graph_req.status_code == 200:
graph_resp_data = graph_req.json().get('result', {}).get('data')
if graph_resp_data[0].get('present'):
dep_pkg_list_known.append(ecosystem + ":" + name + ":" + version)
else:
dep_pkg_list_unknown.append(ecosystem + ":" + name + ":" + version)
else:
self.log.error("Error response from graph for {dependency} "
"with status code as {status_code}"
.format(dependency=dependency, status_code=graph_req.status_code))
self.log.info("Known dependencies are: {}".format(dep_pkg_list_known))
self.log.info("Unknown dependencies are: {}".format(dep_pkg_list_unknown))
return dep_pkg_list_unknown
def execute(self, arguments=None):
"""
Task code.
:param arguments: dictionary with task arguments
:return: {}, results
"""
self.log.debug("Arguments passed from GithubDependencyTreeTask: {}".format(arguments))
if arguments.get("lock_file_absent"):
return {"lock_file_absent": arguments.get('lock_file_absent'),
"result": [], "message": arguments.get('message')}
self._strict_assert(arguments.get('dependencies'))
result = self.get_dependency_data(arguments.get('dependencies'))
return {"result": result}