Skip to content

Commit

Permalink
Merge branch 'IFX-CDC-master'
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafiot committed Oct 5, 2018
2 parents 1445a99 + 9a2610a commit 275f667
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 28 deletions.
161 changes: 159 additions & 2 deletions pymisp/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def _check_response(self, response):
try:
json_response = response.json()
except ValueError:
# It the server didn't return a JSON blob, we've a problem.
# If the server didn't return a JSON blob, we've a problem.
raise PyMISPError(everything_broken.format(response.request.headers, response.request.body, response.text))

errors = []
Expand Down Expand Up @@ -1676,63 +1676,220 @@ def get_roles_list(self):
# ############## Tags ##################

def get_tags_list(self):
"""Get the list of existing tags"""
"""Get the list of existing tags."""
url = urljoin(self.root_url, '/tags')
response = self._prepare_request('GET', url)
return self._check_response(response)['Tag']

def get_tag(self, tag_id):
"""Get a tag by id."""
url = urljoin(self.root_url, '/tags/view/{}'.format(tag_id))
response = self._prepare_request('GET', url)
return self._check_response(response)

def _set_tag_parameters(self, name, colour, exportable, hide_tag, org_id, count, user_id, numerical_value,
attribute_count, old_tag):
tag = old_tag
if name is not None:
tag['name'] = name
if colour is not None:
tag['colour'] = colour
if exportable is not None:
tag['exportable'] = exportable
if hide_tag is not None:
tag['hide_tag'] = hide_tag
if org_id is not None:
tag['org_id'] = org_id
if count is not None:
tag['count'] = count
if user_id is not None:
tag['user_id'] = user_id
if numerical_value is not None:
tag['numerical_value'] = numerical_value
if attribute_count is not None:
tag['attribute_count'] = attribute_count

return {'Tag': tag}

def edit_tag(self, tag_id, name=None, colour=None, exportable=None, hide_tag=None, org_id=None, count=None,
user_id=None, numerical_value=None, attribute_count=None):
"""Edit only the provided parameters of a tag."""
old_tag = self.get_tag(tag_id)
new_tag = self._set_tag_parameters(name, colour, exportable, hide_tag, org_id, count, user_id,
numerical_value, attribute_count, old_tag)
url = urljoin(self.root_url, '/tags/edit/{}'.format(tag_id))
response = self._prepare_request('POST', url, json.dumps(new_tag))
return self._check_response(response)

def edit_tag_json(self, json_file, tag_id):
"""Edit the tag using a json file."""
with open(json_file, 'rb') as f:
jdata = json.load(f)
url = urljoin(self.root_url, '/tags/edit/{}'.format(tag_id))
response = self._prepare_request('POST', url, json.dumps(jdata))
return self._check_response(response)

def enable_tag(self, tag_id):
"""Enable a tag by id."""
response = self.edit_tag(tag_id, hide_tag=False)
return response

def disable_tag(self, tag_id):
"""Disable a tag by id."""
response = self.edit_tag(tag_id, hide_tag=True)
return response

# ############## Taxonomies ##################

def get_taxonomies_list(self):
"""Get all the taxonomies."""
url = urljoin(self.root_url, '/taxonomies')
response = self._prepare_request('GET', url)
return self._check_response(response)

def get_taxonomy(self, taxonomy_id):
"""Get a taxonomy by id."""
url = urljoin(self.root_url, '/taxonomies/view/{}'.format(taxonomy_id))
response = self._prepare_request('GET', url)
return self._check_response(response)

def update_taxonomies(self):
"""Update all the taxonomies."""
url = urljoin(self.root_url, '/taxonomies/update')
response = self._prepare_request('POST', url)
return self._check_response(response)

def enable_taxonomy(self, taxonomy_id):
"""Enable a taxonomy by id."""
url = urljoin(self.root_url, '/taxonomies/enable/{}'.format(taxonomy_id))
response = self._prepare_request('POST', url)
return self._check_response(response)

def disable_taxonomy(self, taxonomy_id):
"""Disable a taxonomy by id."""
self.disable_taxonomy_tags(taxonomy_id)
url = urljoin(self.root_url, '/taxonomies/disable/{}'.format(taxonomy_id))
response = self._prepare_request('POST', url)
return self._check_response(response)

def get_taxonomy_tags_list(self, taxonomy_id):
"""Get all the tags of a taxonomy by id."""
url = urljoin(self.root_url, '/taxonomies/view/{}'.format(taxonomy_id))
response = self._prepare_request('GET', url)
return self._check_response(response)["entries"]

def enable_taxonomy_tags(self, taxonomy_id):
"""Enable all the tags of a taxonomy by id."""
enabled = self.get_taxonomy(taxonomy_id)['Taxonomy']['enabled']
if enabled:
url = urljoin(self.root_url, '/taxonomies/addTag/{}'.format(taxonomy_id))
response = self._prepare_request('POST', url)
return self._check_response(response)

def disable_taxonomy_tags(self, taxonomy_id):
"""Disable all the tags of a taxonomy by id."""
url = urljoin(self.root_url, '/taxonomies/disableTag/{}'.format(taxonomy_id))
response = self._prepare_request('POST', url)
return self._check_response(response)

# ############## WarningLists ##################

def get_warninglists(self):
"""Get all the warninglists."""
url = urljoin(self.root_url, '/warninglists')
response = self._prepare_request('GET', url)
return self._check_response(response)

def get_warninglist(self, warninglist_id):
"""Get a warninglist by id."""
url = urljoin(self.root_url, '/warninglists/view/{}'.format(warninglist_id))
response = self._prepare_request('GET', url)
return self._check_response(response)

def update_warninglists(self):
"""Update all the warninglists."""
url = urljoin(self.root_url, '/warninglists/update')
response = self._prepare_request('POST', url)
return self._check_response(response)

def toggle_warninglist(self, warninglist_id=None, warninglist_name=None, force_enable=None):
'''Toggle (enable/disable) the status of a warninglist by ID.
:param warninglist_id: ID of the WarningList
:param force_enable: Force the warning list in the enabled state (does nothing if already enabled)
'''
if warninglist_id is None and warninglist_name is None:
raise Exception('Either warninglist_id or warninglist_name is required.')
query = {}
if warninglist_id is not None:
if not isinstance(warninglist_id, list):
warninglist_id = [warninglist_id]
query['id'] = warninglist_id
if warninglist_name is not None:
if not isinstance(warninglist_name, list):
warninglist_name = [warninglist_name]
query['name'] = warninglist_name
if force_enable is not None:
query['enabled'] = force_enable
url = urljoin(self.root_url, '/warninglists/toggleEnable')
response = self._prepare_request('POST', url, json.dumps(query))
return self._check_response(response)

def enable_warninglist(self, warninglist_id):
"""Enable a warninglist by id."""
return self.toggle_warninglist(warninglist_id=warninglist_id, force_enable=True)

def disable_warninglist(self, warninglist_id):
"""Disable a warninglist by id."""
return self.toggle_warninglist(warninglist_id=warninglist_id, force_enable=False)

# ############## NoticeLists ##################

def get_noticelists(self):
"""Get all the noticelists."""
url = urljoin(self.root_url, '/noticelists')
response = self._prepare_request('GET', url)
return self._check_response(response)

def get_noticelist(self, noticelist_id):
"""Get a noticelist by id."""
url = urljoin(self.root_url, '/noticelists/view/{}'.format(noticelist_id))
response = self._prepare_request('GET', url)
return self._check_response(response)

def update_noticelists(self):
"""Update all the noticelists."""
url = urljoin(self.root_url, '/noticelists/update')
response = self._prepare_request('POST', url)
return self._check_response(response)

def enable_noticelist(self, noticelist_id):
"""Enable a noticelist by id."""
url = urljoin(self.root_url, '/noticelists/enableNoticelist/{}/true'.format(noticelist_id))
response = self._prepare_request('POST', url)
return self._check_response(response)

def disable_noticelist(self, noticelist_id):
"""Disable a noticelist by id."""
url = urljoin(self.root_url, '/noticelists/enableNoticelist/{}'.format(noticelist_id))
response = self._prepare_request('POST', url)
return self._check_response(response)

# ############## Galaxies/Clusters ##################

def get_galaxies(self):
"""Get all the galaxies."""
url = urljoin(self.root_url, '/galaxies')
response = self._prepare_request('GET', url)
return self._check_response(response)

def get_galaxy(self, galaxy_id):
"""Get a galaxy by id."""
url = urljoin(self.root_url, '/galaxies/view/{}'.format(galaxy_id))
response = self._prepare_request('GET', url)
return self._check_response(response)

def update_galaxies(self):
"""Update all the galaxies."""
url = urljoin(self.root_url, '/galaxies/update')
response = self._prepare_request('POST', url)
return self._check_response(response)
Expand Down
17 changes: 1 addition & 16 deletions pymisp/aping.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,7 @@ def toggle_warninglist(self, warninglist_id: List[int]=None, warninglist_name: L
:param warninglist_id: ID of the WarningList
:param force_enable: Force the warning list in the enabled state (does nothing is already enabled)
'''
if warninglist_id is None and warninglist_name is None:
raise Exception('Either warninglist_id or warninglist_name is required.')
query = {}
if warninglist_id is not None:
if not isinstance(warninglist_id, list):
warninglist_id = [warninglist_id]
query['id'] = warninglist_id
if warninglist_name is not None:
if not isinstance(warninglist_name, list):
warninglist_name = [warninglist_name]
query['name'] = warninglist_name
if force_enable is not None:
query['enabled'] = force_enable
url = urljoin(self.root_url, '/warninglists/toggleEnable')
response = self._prepare_request('POST', url, json.dumps(query))
return self._check_response(response)
return super().toggle_warninglist(warninglist_id, warninglist_name, force_enable)

def make_timestamp(self, value: DateTypes):
if isinstance(value, datetime):
Expand Down
99 changes: 89 additions & 10 deletions tests/testlive_comprehensive.py
Original file line number Diff line number Diff line change
Expand Up @@ -750,27 +750,106 @@ def test_upload_sample(self):
self.admin_misp_connector.delete_event(third.id)

def test_update_modules(self):
# warninglist
self.admin_misp_connector.update_warninglists()
r = self.admin_misp_connector.update_warninglists()
self.assertEqual(r['name'], 'All warninglists are up to date already.')
# taxonomies
self.admin_misp_connector.update_taxonomies()
r = self.admin_misp_connector.update_taxonomies()
self.assertEqual(r['name'], 'All taxonomy libraries are up to date already.')
# object templates
self.admin_misp_connector.update_object_templates()
r = self.admin_misp_connector.update_object_templates()
self.assertEqual(type(r), list)
# notice lists

def test_tags(self):
# Get list
tags = self.admin_misp_connector.get_tags_list()
self.assertTrue(isinstance(tags, list))
# Get tag
for tag in tags:
if not tag['hide_tag']:
break
tag = self.admin_misp_connector.get_tag(tags[0]['id'])
self.assertTrue('name' in tag)
self.admin_misp_connector.disable_tag(tag['id'])
# FIXME: returns the tag with ID 1
self.admin_misp_connector.enable_tag(tag['id'])
# FIXME: returns the tag with ID 1

def test_taxonomies(self):
# Make sure we're up-to-date
self.admin_misp_connector.update_taxonomies()
r = self.admin_misp_connector.update_taxonomies()
self.assertEqual(r['name'], 'All taxonomy libraries are up to date already.')
# Get list
taxonomies = self.admin_misp_connector.get_taxonomies_list()
self.assertTrue(isinstance(taxonomies, list))
list_name_test = 'tlp'
for tax in taxonomies:
if tax['Taxonomy']['namespace'] == list_name_test:
break
r = self.admin_misp_connector.get_taxonomy(tax['Taxonomy']['id'])
self.assertEqual(r['Taxonomy']['namespace'], list_name_test)
self.assertTrue('enabled' in r['Taxonomy'])
r = self.admin_misp_connector.enable_taxonomy(tax['Taxonomy']['id'])
self.assertEqual(r['message'], 'Taxonomy enabled')
r = self.admin_misp_connector.disable_taxonomy(tax['Taxonomy']['id'])
self.assertEqual(r['message'], 'Taxonomy disabled')

def test_warninglists(self):
# Make sure we're up-to-date
self.admin_misp_connector.update_warninglists()
r = self.admin_misp_connector.update_warninglists()
self.assertEqual(r['name'], 'All warninglists are up to date already.')
# Get list
r = self.admin_misp_connector.get_warninglists()
# FIXME It returns Warninglists object instead of a list of warning lists directly. This is inconsistent.
warninglists = r['Warninglists']
self.assertTrue(isinstance(warninglists, list))
list_name_test = 'List of known hashes with common false-positives (based on Florian Roth input list)'
for wl in warninglists:
if wl['Warninglist']['name'] == list_name_test:
break
testwl = wl['Warninglist']
r = self.admin_misp_connector.get_warninglist(testwl['id'])
self.assertEqual(r['Warninglist']['name'], list_name_test)
self.assertTrue('WarninglistEntry' in r['Warninglist'])
r = self.admin_misp_connector.enable_warninglist(testwl['id'])
self.assertEqual(r['success'], '1 warninglist(s) enabled')
r = self.admin_misp_connector.disable_warninglist(testwl['id'])
self.assertEqual(r['success'], '1 warninglist(s) disabled')

def test_noticelists(self):
# Make sure we're up-to-date
self.admin_misp_connector.update_noticelists()
r = self.admin_misp_connector.update_noticelists()
self.assertEqual(r['name'], 'All noticelists are up to date already.')
# Get list
noticelists = self.admin_misp_connector.get_noticelists()
self.assertTrue(isinstance(noticelists, list))
list_name_test = 'gdpr'
for nl in noticelists:
if nl['Noticelist']['name'] == list_name_test:
break
testnl = nl
r = self.admin_misp_connector.get_noticelist(testnl['Noticelist']['id'])
self.assertEqual(r['Noticelist']['name'], list_name_test)
self.assertTrue('NoticelistEntry' in r['Noticelist'])
r = self.admin_misp_connector.enable_noticelist(testnl['Noticelist']['id'])
self.assertTrue(r['Noticelist']['enabled'])
r = self.admin_misp_connector.disable_noticelist(testnl['Noticelist']['id'])
self.assertFalse(r['Noticelist']['enabled'])

def test_galaxies(self):
if not travis_run:
# galaxies
# Make sure we're up-to-date
self.admin_misp_connector.update_galaxies()
r = self.admin_misp_connector.update_galaxies()
self.assertEqual(r['name'], 'Galaxies updated.')
# Get list
galaxies = self.admin_misp_connector.get_galaxies()
self.assertTrue(isinstance(galaxies, list))
list_name_test = 'Mobile Attack - Attack Pattern'
for galaxy in galaxies:
if galaxy['Galaxy']['name'] == list_name_test:
break
r = self.admin_misp_connector.get_galaxy(galaxy['Galaxy']['id'])
self.assertEqual(r['Galaxy']['name'], list_name_test)
self.assertTrue('GalaxyCluster' in r)

@unittest.skip("Currently failing")
def test_search_type_event_csv(self):
Expand Down

0 comments on commit 275f667

Please sign in to comment.