Skip to content
This repository has been archived by the owner on Nov 29, 2021. It is now read-only.

Commit

Permalink
Merge pull request #291 from jjnicola/get-vts
Browse files Browse the repository at this point in the history
Extend get_vts with attribute version_only and return the version
  • Loading branch information
bjoernricks committed Jun 25, 2020
2 parents b788360 + 656cc60 commit 6b9a52c
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Add new scan status INTERRUPTED.
[#288](https://github.com/greenbone/ospd/pull/288)
[#289](https://github.com/greenbone/ospd/pull/289)
- Extend get_vts with attribute version_only and return the version [#291](https://github.com/greenbone/ospd/pull/291)

### Changes
- Modify __init__() method and use new syntax for super(). [#186](https://github.com/greenbone/ospd/pull/186)
Expand Down
19 changes: 13 additions & 6 deletions ospd/command/command.py
Expand Up @@ -319,6 +319,7 @@ def handle_xml(self, xml: Element) -> Iterator[bytes]:
vt_id = xml.get('vt_id')
vt_filter = xml.get('filter')
_details = xml.get('details')
version_only = xml.get('version_only')

vt_details = False if _details == '0' else True

Expand All @@ -328,7 +329,7 @@ def handle_xml(self, xml: Element) -> Iterator[bytes]:
raise OspdCommandError(text, 'get_vts', 404)

filtered_vts = None
if vt_filter:
if vt_filter and not version_only:
try:
filtered_vts = self._daemon.vts_filter.get_filtered_vts_list(
self._daemon.vts, vt_filter
Expand All @@ -337,14 +338,20 @@ def handle_xml(self, xml: Element) -> Iterator[bytes]:
self._daemon.vts.is_cache_available = True
raise OspdCommandError(filter_error)

vts_selection = self._daemon.get_vts_selection_list(vt_id, filtered_vts)
if not version_only:
vts_selection = self._daemon.get_vts_selection_list(
vt_id, filtered_vts
)
# List of xml pieces with the generator to be iterated
yield xml_helper.create_response('get_vts')

begin_vts_tag = xml_helper.create_element('vts')
begin_vts_tag = xml_helper.add_attr(
begin_vts_tag, "vts_version", self._daemon.get_vts_version()
)
val = len(self._daemon.vts)
begin_vts_tag = xml_helper.add_attr(begin_vts_tag, "total", val)
if filtered_vts:
if filtered_vts and not version_only:
val = len(filtered_vts)
begin_vts_tag = xml_helper.add_attr(begin_vts_tag, "sent", val)

Expand All @@ -354,9 +361,9 @@ def handle_xml(self, xml: Element) -> Iterator[bytes]:
)

yield begin_vts_tag

for vt in self._daemon.get_vt_iterator(vts_selection, vt_details):
yield xml_helper.add_element(self._daemon.get_vt_xml(vt))
if not version_only:
for vt in self._daemon.get_vt_iterator(vts_selection, vt_details):
yield xml_helper.add_element(self._daemon.get_vt_xml(vt))

yield xml_helper.create_element('vts', end=True)
yield xml_helper.create_response('get_vts', end=True)
Expand Down
33 changes: 33 additions & 0 deletions tests/test_scan_and_result.py
Expand Up @@ -153,6 +153,39 @@ def test_get_vts_single_vt(self):
vt = vts.find('vt')
self.assertEqual(vt.get('id'), '1.2.3.4')

def test_get_vts_version(self):
fs = FakeStream()
self.daemon.add_vt('1.2.3.4', 'A vulnerability test')
self.daemon.set_vts_version('today')
self.daemon.handle_command('<get_vts />', fs)
response = fs.get_response()

self.assertEqual(response.get('status'), '200')

vts_version = response.find('vts').attrib['vts_version']
self.assertEqual(vts_version, self.daemon.get_vts_version())

vts = response.find('vts')
self.assertIsNotNone(vts.find('vt'))

vt = vts.find('vt')
self.assertEqual(vt.get('id'), '1.2.3.4')

def test_get_vts_version_only(self):
fs = FakeStream()
self.daemon.add_vt('1.2.3.4', 'A vulnerability test')
self.daemon.set_vts_version('today')
self.daemon.handle_command('<get_vts version_only="1"/>', fs)
response = fs.get_response()

self.assertEqual(response.get('status'), '200')

vts_version = response.find('vts').attrib['vts_version']
self.assertEqual(vts_version, self.daemon.get_vts_version())

vts = response.find('vts')
self.assertIsNone(vts.find('vt'))

def test_get_vts_still_not_init(self):
fs = FakeStream()
self.daemon.initialized = False
Expand Down

0 comments on commit 6b9a52c

Please sign in to comment.