Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wip analystdata #1212

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pymisp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def warning_2024() -> None:
MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed,
MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist,
MISPEventReport, MISPCorrelationExclusion, MISPDecayingModel, MISPGalaxy, MISPGalaxyCluster,
MISPGalaxyClusterElement, MISPGalaxyClusterRelation)
MISPGalaxyClusterElement, MISPGalaxyClusterRelation, MISPNote, MISPOpinion, MISPRelationship)
from .api import PyMISP, register_user # noqa
# NOTE: the direct imports to .tools are kept for backward compatibility but should be removed in the future
from .tools import AbstractMISPObjectGenerator # noqa
Expand Down Expand Up @@ -76,7 +76,8 @@ def __init__(self, *args, **kwargs):
'MISPEventDelegation', 'MISPUserSetting', 'MISPInbox', 'MISPEventBlocklist',
'MISPOrganisationBlocklist', 'MISPEventReport', 'MISPCorrelationExclusion',
'MISPDecayingModel', 'MISPGalaxy', 'MISPGalaxyCluster', 'MISPGalaxyClusterElement',
'MISPGalaxyClusterRelation', 'PyMISPError', 'NewEventError', 'NewAttributeError',
'MISPGalaxyClusterRelation', 'MISPNote', 'MISPOpinion', 'MISPRelationship',
'PyMISPError', 'NewEventError', 'NewAttributeError',
'NoURL', 'NoKey', 'InvalidMISPObject', 'UnknownMISPObjectTemplate', 'PyMISPInvalidFormat',
'Distribution', 'ThreatLevel', 'Analysis', 'ExpandedPyMISP'
]
2 changes: 1 addition & 1 deletion pymisp/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from json import JSONEncoder
from uuid import UUID
from abc import ABCMeta
from enum import Enum, IntEnum
from enum import Enum
from typing import Any, Mapping
from collections.abc import MutableMapping
from functools import lru_cache
Expand Down
189 changes: 187 additions & 2 deletions pymisp/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, \
MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \
MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, \
MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion, MISPDecayingModel
MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion, MISPDecayingModel, \
MISPNote, MISPOpinion, MISPRelationship, AnalystDataBehaviorMixin
from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types


Expand Down Expand Up @@ -584,9 +585,192 @@ def delete_event_report(self, event_report: MISPEventReport | int | str | UUID,
data['hard'] = 1
r = self._prepare_request('POST', request_url, data=data)
return self._check_json_response(r)

# ## END Event Report ###

# ## BEGIN Analyst Data ###a
def get_analyst_data(self, analyst_data: AnalystDataBehaviorMixin | int | str | UUID,
pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship:
"""Get an analyst data from a MISP instance

:param analyst_data: analyst data to get
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
if isinstance(analyst_data, AnalystDataBehaviorMixin):
analyst_data_type = analyst_data.analyst_data_object_type
else:
analyst_data_type = 'all'
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
r = self._prepare_request('GET', f'analyst_data/view/{analyst_data_type}/{analyst_data_id}')
analyst_data_r = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in analyst_data_r or analyst_data_type == 'all':
return analyst_data_r
er = type(analyst_data)()
er.from_dict(**analyst_data_r)
return er

def add_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship,
pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship:
"""Add an analyst data to an existing MISP element

:param analyst_data: analyst_data to add
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
object_uuid = analyst_data.object_uuid
object_type = analyst_data.object_type
r = self._prepare_request('POST', f'analyst_data/add/{analyst_data.analyst_data_object_type}/{object_uuid}/{object_type}', data=analyst_data)
new_analyst_data = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in new_analyst_data:
return new_analyst_data
er = type(analyst_data)()
er.from_dict(**new_analyst_data)
return er

def update_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship, analyst_data_id: int | None = None,
pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship:
"""Update an analyst data on a MISP instance

:param analyst_data: analyst data to update
:param analyst_data_id: analyst data ID to update
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
if isinstance(analyst_data, AnalystDataBehaviorMixin):
analyst_data_type = analyst_data.analyst_data_object_type
else:
analyst_data_type = 'all'
if analyst_data_id is None:
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
r = self._prepare_request('POST', f'analyst_data/edit/{analyst_data_type}/{analyst_data_id}', data=analyst_data)
updated_analyst_data = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in updated_analyst_data or analyst_data_type == 'all':
return updated_analyst_data
er = type(analyst_data)()
er.from_dict(**updated_analyst_data)
return er

def delete_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]:
"""Delete an analyst data from a MISP instance

:param analyst_data: analyst data to delete
"""
if isinstance(analyst_data, AnalystDataBehaviorMixin):
analyst_data_type = analyst_data.analyst_data_object_type
else:
analyst_data_type = 'all'
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
request_url = f'analyst_data/delete/{analyst_data_type}/{analyst_data_id}'
r = self._prepare_request('POST', request_url)
return self._check_json_response(r)

# ## END Analyst Data ###

# ## BEGIN Analyst Note ###

def get_note(self, note: MISPNote, pythonify: bool = False) -> dict[str, Any] | MISPNote:
"""Get a note from a MISP instance

:param note: note to get
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
return self.get_analyst_data(note, pythonify)

def add_note(self, note: MISPNote, pythonify: bool = False) -> dict[str, Any] | MISPNote:
"""Add a note to an existing MISP element

:param note: note to add
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
return self.add_analyst_data(note, pythonify)

def update_note(self, note: MISPNote, note_id: int | None = None, pythonify: bool = False) -> dict[str, Any] | MISPNote:
"""Update a note on a MISP instance

:param note: note to update
:param note_id: note ID to update
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
return self.update_analyst_data(note, note_id, pythonify)

def delete_note(self, note: MISPNote | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]:
"""Delete a note from a MISP instance

:param note: note delete
"""
return self.delete_analyst_data(note)

# ## END Analyst Note ###

# ## BEGIN Analyst Opinion ###

def get_opinion(self, opinion: MISPOpinion, pythonify: bool = False) -> dict[str, Any] | MISPOpinion:
"""Get an opinion from a MISP instance

:param opinion: opinion to get
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
return self.get_analyst_data(opinion, pythonify)

def add_opinion(self, opinion: MISPOpinion, pythonify: bool = False) -> dict[str, Any] | MISPOpinion:
"""Add an opinion to an existing MISP element

:param opinion: opinion to add
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
return self.add_analyst_data(opinion, pythonify)

def update_opinion(self, opinion: MISPOpinion, opinion_id: int | None = None, pythonify: bool = False) -> dict[str, Any] | MISPOpinion:
"""Update an opinion on a MISP instance

:param opinion: opinion to update
:param opinion_id: opinion ID to update
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
return self.update_analyst_data(opinion, opinion_id, pythonify)

def delete_opinion(self, opinion: MISPOpinion | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]:
"""Delete an opinion from a MISP instance

:param opinion: opinion to delete
"""
return self.delete_analyst_data(opinion)

# ## END Analyst Opinion ###

# ## BEGIN Analyst Relationship ###

def get_relationship(self, relationship: MISPRelationship, pythonify: bool = False) -> dict[str, Any] | MISPRelationship:
"""Get a relationship from a MISP instance

:param relationship: relationship to get
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
return self.get_analyst_data(relationship, pythonify)

def add_relationship(self, relationship: MISPRelationship, pythonify: bool = False) -> dict[str, Any] | MISPRelationship:
"""Add a relationship to an existing MISP element

:param relationship: relationship to add
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
return self.add_analyst_data(relationship, pythonify)

def update_relationship(self, relationship: MISPRelationship, relationship_id: int | None = None, pythonify: bool = False) -> dict[str, Any] | MISPRelationship:
"""Update a relationship on a MISP instance

:param relationship: relationship to update
:param relationship_id: relationship ID to update
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
return self.update_analyst_data(relationship, relationship_id, pythonify)

def delete_relationship(self, relationship: MISPRelationship | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]:
"""Delete a relationship from a MISP instance

:param relationship: relationship to delete
"""
return self.delete_analyst_data(relationship)

# ## END Analyst Relationship ###

# ## BEGIN Galaxy Cluster ###
def attach_galaxy_cluster(self, misp_entity: MISPEvent | MISPAttribute, galaxy_cluster: MISPGalaxyCluster | int | str, local: bool = False, pythonify: bool = False) -> dict[str, Any] | list[dict[str, Any]]:
"""Attach a galaxy cluster to an event or an attribute
Expand Down Expand Up @@ -620,6 +804,7 @@ def attach_galaxy_cluster(self, misp_entity: MISPEvent | MISPAttribute, galaxy_c
return self._check_json_response(r)
# ## END Galaxy Cluster ###


# ## BEGIN Object ###

def get_object(self, misp_object: MISPObject | int | str | UUID, pythonify: bool = False) -> dict[str, Any] | MISPObject:
Expand Down
15 changes: 15 additions & 0 deletions pymisp/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,21 @@ class NewEventReportError(PyMISPError):
pass


class NewAnalystDataError(PyMISPError):
pass


class NewNoteError(PyMISPError):
pass


class NewOpinionError(PyMISPError):
pass

class NewRelationshipError(PyMISPError):
pass


class UpdateAttributeError(PyMISPError):
pass

Expand Down
Loading
Loading