Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New API #651

Merged
merged 3 commits into from Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 24 additions & 4 deletions pymisp/api.py
Expand Up @@ -307,32 +307,44 @@ def get_event(self, event: Union[MISPEvent, int, str, UUID],
e.load(event_r)
return e

def add_event(self, event: MISPEvent, pythonify: bool = False) -> Union[Dict, MISPEvent]:
def event_exists(self, event: Union[MISPEvent, int, str, UUID]) -> bool:
"""Fast check if event exists.

:param event: Event to check
"""
event_id = get_uuid_or_id_from_abstract_misp(event)
r = self._prepare_request('HEAD', f'events/view/{event_id}')
return self._check_head_response(r)

def add_event(self, event: MISPEvent, pythonify: bool = False, metadata: bool = False) -> Union[Dict, MISPEvent]:
"""Add a new event on a MISP instance

:param event: event to add
:param pythonify: Returns a PyMISP Object instead of the plain json output
:param metadata: Return just event metadata after successful creating
"""
r = self._prepare_request('POST', 'events/add', data=event)
r = self._prepare_request('POST', 'events/add' + '/metadata:1' if metadata else '', data=event)
new_event = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in new_event:
return new_event
e = MISPEvent()
e.load(new_event)
return e

def update_event(self, event: MISPEvent, event_id: Optional[int] = None, pythonify: bool = False) -> Union[Dict, MISPEvent]:
def update_event(self, event: MISPEvent, event_id: Optional[int] = None, pythonify: bool = False,
metadata: bool = False) -> Union[Dict, MISPEvent]:
"""Update an event on a MISP instance'''

:param event: event to update
:param event_id: ID of event to update
:param pythonify: Returns a PyMISP Object instead of the plain json output
:param metadata: Return just event metadata after successful update
"""
if event_id is None:
eid = get_uuid_or_id_from_abstract_misp(event)
else:
eid = get_uuid_or_id_from_abstract_misp(event_id)
r = self._prepare_request('POST', f'events/edit/{eid}', data=event)
r = self._prepare_request('POST', f'events/edit/{eid}' + '/metadata:1' if metadata else '', data=event)
updated_event = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in updated_event:
return updated_event
Expand Down Expand Up @@ -2965,6 +2977,14 @@ def _check_json_response(self, response: requests.Response) -> Dict: # type: ig
return r
# Else: an exception was raised anyway

def _check_head_response(self, response: requests.Response) -> bool:
if response.status_code == 200:
return True
elif response.status_code == 404:
return False
else:
raise MISPServerError(f'Error code {response.status_code} for HEAD request')

def _check_response(self, response: requests.Response, lenient_response_type: bool = False, expect_json: bool = False) -> Union[Dict, str]:
"""Check if the response from the server is not an unexpected error"""
if response.status_code >= 500:
Expand Down
15 changes: 15 additions & 0 deletions tests/testlive_comprehensive.py
Expand Up @@ -713,7 +713,9 @@ def test_simple_event(self):
second.add_attribute('ip-src', '8.8.8.8')
# second has two attributes: text and ip-src
try:
self.assertFalse(self.user_misp_connector.event_exists(first))
first = self.user_misp_connector.add_event(first)
self.assertTrue(self.user_misp_connector.event_exists(first))
second = self.user_misp_connector.add_event(second)
timeframe = [first.timestamp.timestamp() - 5, first.timestamp.timestamp() + 5]
# Search event we just created in multiple ways. Make sure it doesn't catch it when it shouldn't
Expand Down Expand Up @@ -884,6 +886,19 @@ def test_simple_event(self):
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)

def test_event_add_update_metadata(self):
event = self.create_simple_event()
event.add_attribute('ip-src', '9.9.9.9')
try:
response = self.user_misp_connector.add_event(event, metadata=True)
self.assertEqual(len(response.attributes), 0) # response should contains zero attributes

event.info = "New name"
response = self.user_misp_connector.update_event(event, metadata=True)
self.assertEqual(len(response.attributes), 0) # response should contains zero attributes
finally: # cleanup
self.admin_misp_connector.delete_event(event)

def test_extend_event(self):
first = self.create_simple_event()
first.info = 'parent event'
Expand Down