/
test.py
451 lines (374 loc) · 17.5 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
# -*- coding: utf-8 -*-
"""
Utilities for testing intelmq bots.
The BotTestCase can be used as base class for unittests on bots. It includes
some basic generic tests (logged errors, correct pipeline setup).
"""
import copy
import io
import json
import logging
import os
import re
import unittest
import unittest.mock as mock
import pkg_resources
import redis
import intelmq.lib.message as message
import intelmq.lib.pipeline as pipeline
import intelmq.lib.utils as utils
from intelmq import CONFIG_DIR, PIPELINE_CONF_FILE, RUNTIME_CONF_FILE
__all__ = ['BotTestCase']
BOT_CONFIG = {"http_proxy": None,
"https_proxy": None,
"broker": "pythonlist",
"rate_limit": 0,
"retry_delay": 0,
"error_retry_delay": 0,
"error_max_retries": 0,
"redis_cache_host": "localhost",
"redis_cache_port": 6379,
"redis_cache_db": 10,
"redis_cache_ttl": 10,
"testing": True,
}
def mocked_config(bot_id='test-bot', src_name='', dst_names=(), sysconfig={}):
def mocked(conf_file):
if conf_file == PIPELINE_CONF_FILE:
return {bot_id: {"source-queue": src_name,
"destination-queues": dst_names},
}
elif conf_file == RUNTIME_CONF_FILE:
conf = BOT_CONFIG.copy()
conf.update(sysconfig)
return {bot_id: {'parameters': conf}}
elif conf_file.startswith(CONFIG_DIR):
confname = os.path.join('etc/', os.path.split(conf_file)[-1])
fname = pkg_resources.resource_filename('intelmq',
confname)
with open(fname, 'rt') as fpconfig:
return json.load(fpconfig)
else:
return utils.load_configuration(conf_file)
return mocked
def mocked_logger(logger):
def log(name, log_path=None, log_level=None, stream=None, syslog=None):
# Return a copy as the bot may modify the logger and we should always return the intial logger
logger_new = copy.copy(logger)
logger_new.setLevel(log_level)
return logger_new
return log
def skip_database():
return unittest.skipUnless(os.environ.get('INTELMQ_TEST_DATABASES'),
'Skipping database tests.')
def skip_internet():
return unittest.skipIf(os.environ.get('INTELMQ_SKIP_INTERNET'),
'Skipping without internet connection.')
def skip_redis():
return unittest.skipIf(os.environ.get('INTELMQ_SKIP_REDIS'),
'Skipping without running redis.')
def skip_local_web():
return unittest.skipUnless(os.environ.get('INTELMQ_TEST_LOCAL_WEB'),
'Skipping local web tests.')
def skip_exotic():
return unittest.skipUnless(os.environ.get('INTELMQ_TEST_EXOTIC'),
'Skipping tests requiring exotic libs.')
class BotTestCase(object):
"""
Provides common tests and assert methods for bot testing.
"""
bot_types = {'collector': 'CollectorBot',
'parser': 'ParserBot',
'expert': 'ExpertBot',
'output': 'OutputBot',
}
@classmethod
def setUpClass(cls):
"""
Set default values and save original functions.
"""
cls.bot_id = 'test-bot'
cls.bot_name = None
cls.bot = None
cls.bot_reference = None
cls.bot_type = None
cls.default_input_message = ''
cls.input_message = None
cls.loglines = []
cls.loglines_buffer = ''
cls.log_stream = None
cls.maxDiff = None # For unittest module, prints long diffs
cls.pipe = None
cls.sysconfig = {}
cls.use_cache = False
cls.allowed_warning_count = 0
cls.allowed_error_count = 0 # allows dumping of some lines
cls.set_bot()
cls.bot_name = cls.bot_reference.__name__
if cls.bot_type is None:
for type_name, type_match in cls.bot_types.items():
if cls.bot_name.endswith(type_match):
cls.bot_type = type_name
break
if cls.bot_type == 'parser' and cls.default_input_message == '':
cls.default_input_message = {'__type': 'Report',
'raw': 'Cg==',
'feed.name': 'Test Feed',
'time.observation': '2016-01-01T00:00:00+00:00'}
elif cls.default_input_message == '':
cls.default_input_message = {'__type': 'Event'}
if type(cls.default_input_message) is dict:
cls.default_input_message = \
utils.decode(json.dumps(cls.default_input_message))
if cls.use_cache and not os.environ.get('INTELMQ_SKIP_REDIS'):
cls.cache = redis.Redis(host=BOT_CONFIG['redis_cache_host'],
port=BOT_CONFIG['redis_cache_port'],
db=BOT_CONFIG['redis_cache_db'],
socket_timeout=BOT_CONFIG['redis_cache_ttl'])
harmonization = utils.load_configuration(pkg_resources.resource_filename('intelmq',
'etc/harmonization.conf'))
def new_report(self, auto=False, examples=False):
return message.Report(harmonization=self.harmonization, auto=auto)
def new_event(self):
return message.Event(harmonization=self.harmonization)
def prepare_bot(self):
"""Reconfigures the bot with the changed attributes"""
self.log_stream = io.StringIO()
src_name = "{}-input".format(self.bot_id)
dst_name = "{}-output".format(self.bot_id)
self.mocked_config = mocked_config(self.bot_id,
src_name,
[dst_name],
sysconfig=self.sysconfig,
)
logger = logging.getLogger(self.bot_id)
logger.setLevel("INFO")
console_formatter = logging.Formatter(utils.LOG_FORMAT)
console_handler = logging.StreamHandler(self.log_stream)
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
self.mocked_log = mocked_logger(logger)
logging.captureWarnings(True)
warnings_logger = logging.getLogger("py.warnings")
warnings_logger.addHandler(console_handler)
class Parameters(object):
source_queue = src_name
destination_queues = [dst_name]
parameters = Parameters()
self.pipe = pipeline.Pythonlist(parameters)
self.pipe.set_queues(parameters.source_queue, "source")
self.pipe.set_queues(parameters.destination_queues, "destination")
with mock.patch('intelmq.lib.utils.load_configuration',
new=self.mocked_config):
with mock.patch('intelmq.lib.utils.log', self.mocked_log):
self.bot = self.bot_reference(self.bot_id)
if self.input_message is not None:
if type(self.input_message) is not list:
self.input_message = [self.input_message]
self.input_queue = []
for msg in self.input_message:
if type(msg) is dict:
self.input_queue.append(json.dumps(msg))
elif issubclass(type(msg), message.Message):
self.input_queue.append(msg.serialize())
else:
self.input_queue.append(msg)
self.input_message = None
else:
if self.default_input_message: # None for collectors
self.input_queue = [self.default_input_message]
def run_bot(self, iterations: int = 1, error_on_pipeline: bool = False, prepare=True):
"""
Call this method for actually doing a test run for the specified bot.
Parameters:
iterations: Bot instance will be run the given times, defaults to 1.
"""
if prepare:
self.prepare_bot()
with mock.patch('intelmq.lib.utils.load_configuration',
new=self.mocked_config):
with mock.patch('intelmq.lib.utils.log', self.mocked_log):
for run in range(iterations):
self.bot.start(error_on_pipeline=error_on_pipeline,
source_pipeline=self.pipe,
destination_pipeline=self.pipe)
self.loglines_buffer = self.log_stream.getvalue()
self.loglines = self.loglines_buffer.splitlines()
""" Test if all pipes are created with correct names. """
pipenames = ["{}-input", "{}-input-internal", "{}-output"]
self.assertSetEqual({x.format(self.bot_id) for x in pipenames},
set(self.pipe.state.keys()))
""" Test if report has required fields. """
if self.bot_type == 'collector':
for report_json in self.get_output_queue():
report = message.MessageFactory.unserialize(report_json,
harmonization=self.harmonization)
self.assertIsInstance(report, message.Report)
self.assertIn('feed.name', report)
self.assertIn('raw', report)
self.assertIn('time.observation', report)
""" Test if event has required fields. """
if self.bot_type == 'parser':
for event_json in self.get_output_queue():
event = message.MessageFactory.unserialize(event_json,
harmonization=self.harmonization)
self.assertIsInstance(event, message.Event)
self.assertIn('classification.type', event)
self.assertIn('raw', event)
""" Test if bot log messages are correctly formatted. """
self.assertLoglineMatches(0, "{} initialized with id {} and intelmq [0-9a-z.]* and python"
r" [0-9a-z.]{{5,8}}\+? \([a-zA-Z0-9,:. ]+\)( \[GCC\])?"
r" as process [0-9]+\."
"".format(self.bot_name,
self.bot_id), "INFO")
self.assertRegexpMatchesLog("INFO - Bot is starting.")
self.assertLoglineEqual(-1, "Bot stopped.", "INFO")
self.assertNotRegexpMatchesLog("(ERROR.*?){%d}" % (self.allowed_error_count + 1))
self.assertNotRegexpMatchesLog("(WARNING.*?){%d}" % (self.allowed_warning_count + 1))
self.assertNotRegexpMatchesLog("CRITICAL")
""" If no error happened (incl. tracebacks) we can check for formatting """
if not self.allowed_error_count:
for logline in self.loglines:
fields = utils.parse_logline(logline)
if not isinstance(fields, dict):
# Traceback
continue
self.assertTrue(fields['message'][-1] in '.:?!',
msg='Logline {!r} does not end with .? or !.'
''.format(fields['message']))
self.assertTrue(fields['message'].upper() == fields['message'].upper(),
msg='Logline {!r} does not begin with an upper case char.'
''.format(fields['message']))
@classmethod
def tearDownClass(cls):
if cls.use_cache and not os.environ.get('INTELMQ_SKIP_REDIS'):
cls.cache.flushdb()
def get_input_queue(self):
"""Returns the input queue of this bot which can be filled
with fixture data in setUp()"""
return self.pipe.state["%s-input" % self.bot_id]
def set_input_queue(self, seq):
"""Setter for the input queue of this bot"""
self.pipe.state["%s-input" % self.bot_id] = [utils.encode(text) for
text in seq]
input_queue = property(get_input_queue, set_input_queue)
def get_output_queue(self):
"""Getter for the input queue of this bot. Use in TestCase scenarios"""
return [utils.decode(text) for text
in self.pipe.state["%s-output" % self.bot_id]]
def test_bot_name(self):
"""
Test if Bot has a valid name.
Must be CamelCase and end with CollectorBot etc.
Test class name must be Test{botclassname}
"""
counter = 0
for type_name, type_match in self.bot_types.items():
try:
self.assertRegex(self.bot_name,
r'\A[a-zA-Z0-9]+{}\Z'.format(type_match))
except AssertionError:
counter += 1
if counter != len(self.bot_types) - 1:
self.fail("Bot name {!r} does not match one of {!r}"
"".format(self.bot_name, list(self.bot_types.values())))
self.assertEqual('Test{}'.format(self.bot_name),
self.__class__.__name__)
def assertAnyLoglineEqual(self, message: str, levelname: str = "ERROR"):
"""
Asserts if any logline matches a specific requirement.
Parameters:
message: Message text which is compared
type: Type of logline which is asserted
Raises:
ValueError: if logline message has not been found
"""
self.assertIsNotNone(self.loglines)
for logline in self.loglines:
fields = utils.parse_logline(logline)
if levelname == fields["log_level"] and message == fields["message"]:
return
else:
raise ValueError('Logline with level {!r} and message {!r} not found'
''.format(levelname, message))
def assertLoglineEqual(self, line_no: int, message: str, levelname: str = "ERROR"):
"""
Asserts if a logline matches a specific requirement.
Parameters:
line_no: Number of the logline which is asserted
message: Message text which is compared
levelname: Log level of logline which is asserted
"""
self.assertIsNotNone(self.loglines)
logline = self.loglines[line_no]
fields = utils.parse_logline(logline)
self.assertEqual(self.bot_id, fields["bot_id"],
"bot_id %s didn't match %s"
"".format(self.bot_id, fields["bot_id"]))
self.assertEqual(levelname, fields["log_level"])
self.assertEqual(message, fields["message"])
def assertLoglineMatches(self, line_no: int, pattern: str, levelname: str = "ERROR"):
"""
Asserts if a logline matches a specific requirement.
Parameters:
line_no: Number of the logline which is asserted
pattern: Message text which is compared
type: Type of logline which is asserted
"""
self.assertIsNotNone(self.loglines)
logline = self.loglines[line_no]
fields = utils.parse_logline(logline)
self.assertEqual(self.bot_id, fields["bot_id"],
"bot_id %s didn't match %s"
"".format(self.bot_id, fields["bot_id"]))
self.assertEqual(levelname, fields["log_level"])
self.assertRegex(fields["message"], pattern)
def assertLogMatches(self, pattern: str, levelname: str = "ERROR"):
"""
Asserts if any logline matches a specific requirement.
Parameters:
pattern: Message text which is compared
type: Type of logline which is asserted
"""
self.assertIsNotNone(self.loglines)
for logline in self.loglines:
fields = utils.parse_logline(logline)
# Exception tracebacks
if isinstance(fields, str):
if levelname == "ERROR" and re.match(pattern, fields):
break
elif levelname == fields["log_level"] and re.match(pattern, fields["message"]):
break
else:
raise ValueError('No matching logline found.')
def assertRegexpMatchesLog(self, pattern):
"""Asserts that pattern matches against log. """
self.assertIsNotNone(self.loglines_buffer)
self.assertRegex(self.loglines_buffer, pattern)
def assertNotRegexpMatchesLog(self, pattern):
"""Asserts that pattern doesn't match against log."""
self.assertIsNotNone(self.loglines_buffer)
self.assertNotRegex(self.loglines_buffer, pattern)
def assertOutputQueueLen(self, queue_len=0):
"""
Asserts that the output queue has the expected length.
"""
self.assertEqual(len(self.get_output_queue()), queue_len)
def assertMessageEqual(self, queue_pos, expected_msg):
"""
Asserts that the given expected_message is
contained in the generated event with
given queue position.
"""
event = self.get_output_queue()[queue_pos]
self.assertIsInstance(event, str)
event_dict = json.loads(event)
if isinstance(expected_msg, (message.Event, message.Report)):
expected = expected_msg.to_dict(with_type=True)
else:
expected = expected_msg.copy()
if 'time.observation' in event_dict:
del event_dict['time.observation']
if 'time.observation' in expected:
del expected['time.observation']
self.assertDictEqual(expected, event_dict)