diff --git a/CHANGELOG.md b/CHANGELOG.md index 6de11fe9..868ec1e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### Added - Add solution method to solution of vt object. [#166](https://github.com/greenbone/ospd/pull/166) - Add wait_for_children(). [#167](https://github.com/greenbone/ospd/pull/167) +- Extend osp to accept target options. [#194](https://github.com/greenbone/ospd/pull/194) ### Changes - Modify __init__() method and use new syntax for super(). [#186](https://github.com/greenbone/ospd/pull/186) diff --git a/doc/OSP.xml b/doc/OSP.xml index 6bf607b9..520997bf 100644 --- a/doc/OSP.xml +++ b/doc/OSP.xml @@ -179,6 +179,7 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. credentials exclude_hosts finished_hosts + alive_test hosts @@ -209,12 +210,18 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. string + + alive_test + Alive test type to be performed against the target. + string + Target without credentials. example.org T:22,U:5060 + 0 diff --git a/ospd/misc.py b/ospd/misc.py index 85fc9e8d..73a34847 100644 --- a/ospd/misc.py +++ b/ospd/misc.py @@ -268,11 +268,11 @@ def create_scan( scan_info = self.data_manager.dict() # type: Dict scan_info['results'] = list() scan_info['finished_hosts'] = dict( - [[target, []] for target, _, _, _, _ in targets] + [[target, []] for target, _, _, _, _, _ in targets] ) scan_info['progress'] = 0 scan_info['target_progress'] = dict( - [[target, {}] for target, _, _, _, _ in targets] + [[target, {}] for target, _, _, _, _, _ in targets] ) scan_info['targets'] = targets scan_info['vts'] = vts @@ -371,7 +371,7 @@ def get_target_list(self, scan_id: str) -> List: """ Get a scan's target list. """ target_list = [] - for target, _, _, _, _ in self.scans_table[scan_id]['targets']: + for target, _, _, _, _, _ in self.scans_table[scan_id]['targets']: target_list.append(target) return target_list @@ -413,6 +413,16 @@ def get_credentials(self, scan_id: str, target: str): if target == item[0]: return item[2] + def get_target_options(self, scan_id: str, target: str): + """ Get a scan's target option dictionary. + It return dictionary with the corresponding options for + a given target. + """ + if target: + for item in self.scans_table[scan_id]['targets']: + if target == item[0]: + return item[5] + def get_vts(self, scan_id: str): """ Get a scan's vts list. """ diff --git a/ospd/ospd.py b/ospd/ospd.py index 92af2af5..e7cdeb0d 100644 --- a/ospd/ospd.py +++ b/ospd/ospd.py @@ -526,6 +526,7 @@ def process_targets_element(cls, scanner_target) -> List: localhosts localhost1 80,443 + 192.168.0.0/24 @@ -543,14 +544,15 @@ def process_targets_element(cls, scanner_target) -> List: - @return: A list of [hosts, port, {credentials}, exclude_hosts] list. + @return: A list of [hosts, port, {credentials}, exclude_hosts, options] list. Example form: - [['localhosts', '80,43', '', 'localhosts1'], + [['localhosts', '80,43', '', 'localhosts1', + {'alive_test': 'ALIVE_TEST_CONSIDER_ALIVE'}], ['192.168.0.0/24', '22', {'smb': {'type': type, 'port': port, 'username': username, 'password': pass, - }}], ''] + }}, '', {}]] """ target_list = [] @@ -559,6 +561,7 @@ def process_targets_element(cls, scanner_target) -> List: finished_hosts = '' ports = '' credentials = {} # type: Dict + options = {} for child in target: if child.tag == 'hosts': hosts = child.text @@ -570,9 +573,18 @@ def process_targets_element(cls, scanner_target) -> List: ports = child.text if child.tag == 'credentials': credentials = cls.process_credentials_elements(child) + if child.tag == 'alive_test': + options['alive_test'] = child.text if hosts: target_list.append( - [hosts, ports, credentials, exclude_hosts, finished_hosts] + [ + hosts, + ports, + credentials, + exclude_hosts, + finished_hosts, + options, + ] ) else: raise OspdCommandError('No target to scan', 'start_scan') @@ -598,7 +610,7 @@ def handle_start_scan_command(self, scan_et) -> str: else: scan_targets = [] for single_target in target_str_to_list(target_str): - scan_targets.append([single_target, ports_str, '', '', '']) + scan_targets.append([single_target, ports_str, '', '', '', '']) scan_id = scan_et.attrib.get('scan_id') if scan_id is not None and scan_id != '' and not valid_uuid(scan_id): @@ -874,7 +886,7 @@ def calculate_progress(self, scan_id: str) -> float: def process_exclude_hosts(self, scan_id: str, target_list: List) -> None: """ Process the exclude hosts before launching the scans.""" - for target, _, _, exclude_hosts, _ in target_list: + for target, _, _, exclude_hosts, _, _ in target_list: exc_hosts_list = '' if not exclude_hosts: continue @@ -888,7 +900,7 @@ def process_finished_hosts(self, scan_id: str, target_list: List) -> None: Set finished hosts as finished with 100% to calculate the scan progress.""" - for target, _, _, _, finished_hosts in target_list: + for target, _, _, _, finished_hosts, _ in target_list: exc_hosts_list = '' if not finished_hosts: continue @@ -1857,6 +1869,11 @@ def get_scan_credentials(self, scan_id: str, target: str = '') -> Dict: the credential list for the given target. """ return self.scan_collection.get_credentials(scan_id, target) + def get_scan_target_options(self, scan_id: str, target: str = '') -> Dict: + """ Gives a scan's target option dict. If a target is passed gives + the credential list for the given target. """ + return self.scan_collection.get_target_options(scan_id, target) + def get_scan_vts(self, scan_id: str) -> Dict: """ Gives a scan's vts list. """ return self.scan_collection.get_vts(scan_id) diff --git a/tests/test_scan_and_result.py b/tests/test_scan_and_result.py index 189449bc..e2fd61be 100644 --- a/tests/test_scan_and_result.py +++ b/tests/test_scan_and_result.py @@ -952,6 +952,7 @@ def test_scan_multi_target(self): '' 'localhosts' '80,443' + '0' '' '192.168.0.0/24' '22' @@ -1021,6 +1022,25 @@ def test_scan_get_target(self): scan_res = response.find('scan') self.assertEqual(scan_res.get('target'), 'localhosts,192.168.0.0/24') + def test_scan_get_target_options(self): + daemon = DummyWrapper([]) + response = secET.fromstring( + daemon.handle_command( + '' + '' + '' + '' + '192.168.0.1' + '220' + '' + '' + ) + ) + scan_id = response.findtext('id') + time.sleep(1) + target_options = daemon.get_scan_target_options(scan_id, '192.168.0.1') + self.assertEqual(target_options, {'alive_test': '0'}) + def test_scan_get_finished_hosts(self): daemon = DummyWrapper([]) response = secET.fromstring( diff --git a/tests/test_ssh_daemon.py b/tests/test_ssh_daemon.py index 47d8dbd2..a1a20519 100644 --- a/tests/test_ssh_daemon.py +++ b/tests/test_ssh_daemon.py @@ -89,7 +89,7 @@ def test_run_command(self): daemon = DummyWrapper(niceness=10) scanid = daemon.create_scan( None, - [['host.example.com', '80, 443', '', '', '']], + [['host.example.com', '80, 443', '', '', '', '']], dict(port=5, ssh_timeout=15, username_password='dummy:pw'), '', ) @@ -105,7 +105,7 @@ def test_run_command_legacy_credential(self): daemon = DummyWrapper(niceness=10) scanid = daemon.create_scan( None, - [['host.example.com', '80, 443', '', '', '']], + [['host.example.com', '80, 443', '', '', '', '']], dict(port=5, ssh_timeout=15, username='dummy', password='pw'), '', ) @@ -132,7 +132,7 @@ def test_run_command_new_credential(self): scanid = daemon.create_scan( None, - [['host.example.com', '80, 443', cred_dict, '', '']], + [['host.example.com', '80, 443', cred_dict, '', '', '']], dict(port=5, ssh_timeout=15), '', ) @@ -147,7 +147,7 @@ def test_run_command_no_credential(self): daemon = DummyWrapper(niceness=10) scanid = daemon.create_scan( None, - [['host.example.com', '80, 443', '', '', '']], + [['host.example.com', '80, 443', '', '', '', '']], dict(port=5, ssh_timeout=15), '', )