-
Notifications
You must be signed in to change notification settings - Fork 333
/
threat_intel.py
376 lines (301 loc) · 13.5 KB
/
threat_intel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
"""
Copyright 2017-present, Airbnb Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from collections import defaultdict
from os import environ as env
import backoff
import boto3
from boto3.dynamodb.types import TypeDeserializer
from botocore.exceptions import ClientError, ParamValidationError
from netaddr import IPNetwork
from stream_alert.shared.backoff_handlers import (
backoff_handler,
success_handler,
giveup_handler
)
from stream_alert.shared.logger import get_logger
from stream_alert.shared.normalize import Normalizer
from stream_alert.shared.utils import in_network, valid_ip
LOGGER = get_logger(__name__)
class ThreatIntel(object):
"""Load threat intelligence data from DynamoDB and perform IOC detection"""
IOC_KEY = 'streamalert:ioc'
EXCEPTIONS_TO_BACKOFF = (ClientError,)
BACKOFF_MAX_RETRIES = 3
# DynamoDB Table settings
MAX_QUERY_CNT = 100
PRIMARY_KEY = 'ioc_value'
SUB_TYPE_KEY = 'sub_type'
PROJECTION_EXPRESSION = '{},{}'.format(PRIMARY_KEY, SUB_TYPE_KEY)
_deserializer = TypeDeserializer()
_client = None
def __init__(self, table, enabled_clusters, ioc_types_map, excluded_iocs=None):
self._table = table
self._enabled_clusters = enabled_clusters
self._ioc_config = ioc_types_map
self._excluded_iocs = self._setup_excluded_iocs(excluded_iocs)
region = env.get('AWS_REGION') or env.get('AWS_DEFAULT_REGION') or 'us-east-1'
ThreatIntel._client = ThreatIntel._client or boto3.client('dynamodb', region_name=region)
@property
def _dynamodb(self):
return ThreatIntel._client
@staticmethod
def _exceptions_to_giveup(err):
"""Function to decide if giveup backoff or not."""
error_code = {
'AccessDeniedException',
'ProvisionedThroughputExceededException',
'ResourceNotFoundException'
}
return err.response['Error']['Code'] in error_code
def threat_detection(self, records):
"""Public instance method to run threat intelligence against normalized records
The records will be modified in-place by inserting IOC information if the
records contain malicious IOC(s).
Args:
records (list): A list of payload instance with normalized records.
Returns:
list: A list of payload instances including IOC information.
"""
if not records:
return
# Extract information from the records for IOC detection
potential_iocs = self._extract_ioc_values(records)
# Query DynamoDB IOC type to verify if the extracted info are malicious IOC(s)
for valid_ioc in self._process_ioc_values(list(potential_iocs)):
value = valid_ioc['ioc_value']
for ioc_type, record in potential_iocs[value]:
# Inserted the IOC info into the record
self._insert_ioc_info(record, ioc_type, value)
@classmethod
def _insert_ioc_info(cls, rec, ioc_type, ioc_value):
"""Insert ioc info to a record
Record is modified/updated in-place with IOC info inserted.
Example:
A new field of 'streamalert:ioc' will be added to the record:
{
'key': 'value',
'sourceAddress': '4.3.2.1',
'sourceDomain': 'evil1.com',
'streamalert:ioc': {
'ip': {'4.3.2.1'},
'domain' : {'evil1.com'}
}
}
Args:
rec (dict): Record data
ioc_type (str): IOC type, can be 'ip', 'domain', or 'md5'
ioc_value (str): Malicious IOC value
"""
# Get the current collection of IOCs, or create a new empty dictionary
record_iocs = rec.get(cls.IOC_KEY, defaultdict(set))
record_iocs[ioc_type].add(ioc_value)
rec[cls.IOC_KEY] = record_iocs
def _process_ioc_values(self, potential_iocs):
"""Check if any info is malicious by querying DynamoDB IOC table
Args:
potential_iocs (list<str>): A list of potential IOC values
"""
LOGGER.debug('Checking %d potential IOCs for validity', len(potential_iocs))
# Segment data before calling DynamoDB table with batch_get_item.
for query_values in self._segment(potential_iocs):
try:
query_result = self._query(query_values)
except (ClientError, ParamValidationError):
LOGGER.exception('An error occurred while querying dynamodb table')
continue
for ioc in query_result:
yield ioc
@classmethod
def _segment(cls, potential_iocs):
"""Segment list of potential IOC values into smaller set(s)
Batch query to dynamodb supports up to 100 items.
Args:
potential_iocs (list<str>): A list of potential IOC values
Yields:
set: Subset of total potential IOC values
"""
end = len(potential_iocs)
for index in range(0, end, cls.MAX_QUERY_CNT):
yield set(potential_iocs[index:min(index + cls.MAX_QUERY_CNT, end)])
def _query(self, values):
"""Instance method to query DynamoDB table
Args:
values (list): A list of string which contains IOC values
Returns:
A tuple(list, dict)
list: A list of dict returned from dynamodb
table query, in the format of
[
{'sub_type': 'c2_domain', 'ioc_value': 'evil.com'},
{'sub_type': 'mal_ip', 'ioc_value': '1.1.1.2'},
]
dict: A dict containing unprocesed keys.
"""
@backoff.on_predicate(backoff.fibo,
lambda resp: bool(resp['UnprocessedKeys']), # retry if this is true
max_tries=2, # only retry unprocessed key 2 times max
on_backoff=backoff_handler(),
on_success=success_handler(),
on_giveup=giveup_handler())
@backoff.on_exception(backoff.expo,
self.EXCEPTIONS_TO_BACKOFF,
max_tries=self.BACKOFF_MAX_RETRIES,
giveup=self._exceptions_to_giveup,
on_backoff=backoff_handler(),
on_success=success_handler(),
on_giveup=giveup_handler())
def _run_query(query_values, results):
query_keys = [{self.PRIMARY_KEY: {'S': ioc}} for ioc in query_values if ioc]
response = self._dynamodb.batch_get_item(
RequestItems={
self._table: {
'Keys': query_keys,
'ProjectionExpression': self.PROJECTION_EXPRESSION
}
}
)
results.extend([
result for result in self._deserialize(response['Responses'].get(self._table))
])
# Log this as an error for now so it can be picked up in logs
if response['UnprocessedKeys']:
LOGGER.error('Retrying unprocessed keys in response: %s',
response['UnprocessedKeys'])
# Strip out the successful keys so only the unprocesed ones are retried.
# This changes the list in place, so the called function sees the updated list
self._remove_processed_keys(
query_values,
response['UnprocessedKeys'][self._table]['Keys']
)
return response
results = []
_run_query(values, results)
return results
@classmethod
def _remove_processed_keys(cls, query_values, unprocesed_keys):
keys = {elem[cls.PRIMARY_KEY] for elem in cls._deserialize(unprocesed_keys)}
# Update the set with only unprocesed_keys
query_values.intersection_update(keys)
@classmethod
def _deserialize(cls, dynamodb_data):
"""Convert dynamodb data types to python data types
Types conversion between DynamoDB and Python.
Reference link: http://boto3.readthedocs.io/en/latest/_modules/boto3/dynamodb/types.html
DynamoDB Python
-------- ------
{'NULL': True} None
{'BOOL': True/False} True/False
{'N': str(value)} Decimal(str(value))
{'S': string} string
{'B': bytes} Binary(bytes)
{'NS': [str(value)]} set([Decimal(str(value))])
{'SS': [string]} set([string])
{'BS': [bytes]} set([bytes])
{'L': list} list
{'M': dict} dict
Args:
dynamodb_data (list): Contains IOC info with DynamoDB types
Yields:
dict: Dictionaries containing ioc_value and ioc_type
"""
if not dynamodb_data:
return
for raw_data in dynamodb_data:
yield {
key: cls._deserializer.deserialize(val)
for key, val in raw_data.iteritems()
}
def _is_excluded_ioc(self, ioc_type, ioc_value):
"""Determine if we should bypass IOC lookup for specified IOC
Args:
ioc_type (string): Type of IOC to evaluate (md5, ip, domain, etc)
value (string): Value of IOC to evaluate
Returns:
bool: True if IOC lookup should be bypassed for this value, False otherwise
"""
if not (self._excluded_iocs and ioc_type in self._excluded_iocs):
return False
exclusions = self._excluded_iocs[ioc_type]
if ioc_type == 'ip':
# filter out *.amazonaws.com "IP"s
return not valid_ip(ioc_value) or in_network(ioc_value, exclusions)
return ioc_value in exclusions
def _extract_ioc_values(self, payloads):
"""Instance method to extract IOC info from the record based on normalized keys
Args:
payloads (list<dict>): A list of dictionary payloads with records containing
normalized data
Returns:
dict: Map of ioc values to the source record and type of ioc
"""
ioc_values = defaultdict(list)
for payload in payloads:
record = payload['record']
if Normalizer.NORMALIZATION_KEY not in record:
continue
normalized_values = record[Normalizer.NORMALIZATION_KEY]
for normalized_key, values in normalized_values.iteritems():
# Look up mapped IOC type based on normalized CEF type
ioc_type = self._ioc_config.get(normalized_key)
if not ioc_type:
LOGGER.debug('Skipping undefined IOC type for normalized key: %s',
normalized_key)
continue
for value in values:
# Skip excluded IOCs
if self._is_excluded_ioc(ioc_type, value):
continue
ioc_values[value].append((ioc_type, record))
return ioc_values
@staticmethod
def _setup_excluded_iocs(excluded):
if not excluded:
return None
excluded = {itype: set(iocs) for itype, iocs in excluded.iteritems()}
# Try to load IP addresses
if 'ip' in excluded:
excluded['ip'] = {IPNetwork(ip) for ip in excluded['ip']}
return excluded
@classmethod
def load_from_config(cls, config):
"""Public class constructor method to return instance of ThreatIntel class
Args:
config (dict): Config read from 'conf/' directory
Returns:
ThreatIntel: Class to be used for threat intelligence logic
"""
if 'threat_intel' not in config:
return
intel_config = config['threat_intel']
if not intel_config.get('enabled'):
return
# Threat Intel can be disabled for any given cluster
enabled_clusters = {
cluster for cluster, values in config['clusters'].iteritems()
if values['modules']['stream_alert'].get('enable_threat_intel', False)
}
if not enabled_clusters:
return # if not clusters have threat intel enabled, there's nothing to do
# Convert the current IOC mapping to be in the format
# {'normalized_key': 'ioc_type'} for simpler lookups
ioc_config = {
key: ioc_type
for ioc_type, keys in intel_config['normalized_ioc_types'].iteritems()
for key in keys
}
return cls(
table=intel_config['dynamodb_table_name'],
enabled_clusters=enabled_clusters,
ioc_types_map=ioc_config,
excluded_iocs=intel_config.get('excluded_iocs')
)