/
mispaction.py
127 lines (118 loc) · 4.96 KB
/
mispaction.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# -*- coding: utf-8 -*-
import json
import logging
import time
from pastepwn.util import Request
from .basicaction import BasicAction
class MISPAction(BasicAction):
"""
Action to add an event to a MISP instance on a matched paste
Documentation for adding events:
https://www.circl.lu/doc/misp/automation/#post-events
The MISPAction objects can take a `transformer` function as a constructor parameter.
This function (by default MISPAction.default_transformer) should take a Paste and an
optional analyzer name as parameters (just like BasicAction.perform), and return a
dictionary representing a MISP event, which will then be sent to the API.
Additional attributes can be sent with each event, specified by the `attributes`
parameter. Here is the documentation regarding types and categories:
https://www.circl.lu/doc/misp/categories-and-types/
"""
name = "MISPAction"
def __init__(self, url, access_key, transformer=None, attributes=None):
"""
Init method for the MISPAction
:param url: string URL of the MISP instance (complete with protocol and port)
:param access_key: string MISP access key for authorization
:param transformer: Callable Takes a Paste (and optional analyzer name) as parameter
and returns a MISP-formatted event as a dictionary
:param attributes: Iterable List of fully defined attributes to add to events
"""
super().__init__()
self.logger = logging.getLogger(__name__)
self.url = url
self.access_key = access_key
if transformer is None:
self.transformer = MISPAction.default_transformer
else:
self.transformer = transformer
self.attributes = attributes
@staticmethod
def default_transformer(paste, analyzer_name=None):
"""
Create a default transformer
Args:
paste: (str): write your description
analyzer_name: (str): write your description
"""
timestamp = time.gmtime(int(paste.date))
attrs = []
# Build event
event = {
"date": time.strftime('%Y-%m-%d' , timestamp),
"info":"Sensitive information found on pastebin (type: %s)" % analyzer_name,
"threat_level_id": 4, # Undefined
"published": False, # Unpublished
"analysis": 0, # Not yet analyzed
"distribution": 0, # Shared with organization only
"Attribute": []
}
# Add link to the paste
attrs.append({
"type": "url",
"category": "Network activity",
"comment": "Link to pastebin paste containing information",
"value": paste.full_url
})
# Add username of the author
attrs.append({
"type": "text",
"category": "Attribution",
"comment": "Username of paste author",
"value": paste.user
})
# Add size of the paste
attrs.append({
"type": "size-in-bytes",
"category": "Other",
"comment": "Size of the paste",
"value": paste.size
})
# Attach full paste if it's small
if int(paste.size) <= 1024 and paste.body is not None:
attrs.append({
"type": "attachment",
"category": "Artifacts dropped",
"comment": "Raw body of the paste",
"value": paste.body
})
# Add attributes to the event
event['Attribute'] = attrs
return event
def perform(self, paste, analyzer_name=None, matches=None):
"""
Sends the event to the MISP instance.
:param paste: The paste passed by the ActionHandler
:param analyzer_name: The name of the analyzer which matched the paste
"""
# Call transformer to construct payload
event = self.transformer(paste, analyzer_name)
if self.attributes:
# Add extra attributes
event['Attributes'].extend(self.attributes)
data = json.dumps({"Event": event})
# Send event to MISP instance
r = Request()
r.headers = {'Authorization': self.access_key, 'Accept': 'application/json', 'Content-Type': 'application/json'}
res = r.post(self.url + "/events", data=data)
# Error handling
if not res:
self.logger.warning("Empty response when adding event")
else:
res = json.loads(res)
if 'Event' in res:
self.logger.info('Event #%s successfully added to MISP', res['Event']['id'])
else:
# An error has happened, but the 'errors' field is not always present
if 'errors' in res:
self.logger.error('Error when adding event: %s', res['errors'])
self.logger.warning('Failed to add event: %s', res.get('message'))