Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bot debugging #975

Merged
12 commits merged into from
Jun 13, 2017
80 changes: 55 additions & 25 deletions intelmq/bin/intelmqctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
STARTUP_CONF_FILE, SYSTEM_CONF_FILE, VAR_RUN_PATH,
BOTS_FILE)
from intelmq.lib import utils
from intelmq.lib.bot_debugger import BotDebugger
from intelmq.lib.pipeline import PipelineFactory


Expand Down Expand Up @@ -117,33 +118,33 @@ def __init__(self, runtime_configuration, logger, controller):
self.logger.error('Directory %s does not exist and cannot be '
'created: %s.', self.PIDDIR, exc)

def bot_run(self, bot_id):
def bot_run(self, bot_id, run_subcommand=None, console_type=None, message_action_kind=None, dryrun=None, msg=None):
pid = self.__read_pidfile(bot_id)
if pid:
if self.__status_process(pid):
log_bot_error('running', bot_id)
return 'running'
else:
self.__remove_pidfile(bot_id)
log_bot_message('starting', bot_id)
filename = self.PIDFILE.format(bot_id)
with open(filename, 'w') as fp:
fp.write(str(os.getpid()))
if pid and self.__status_process(pid):
self.logger.warning("Main instance of the bot is running in the background. You may want to launch: intelmqctl stop {}".format(bot_id))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed IntelMQProcessManager.bot_run method so that it's not problem anymore to run multiple instance of bot.
If we run a bot that is already running, only warning "Main instance of the bot is running in the background. You may want to launch: intelmqctl stop {}" is printed.
However, if there is a reason for the background instance is rather paused and relaunched after debugging is finished, I commented all the code so that I can restore it easily.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, now the background process will get stopped and relaunched afterwards.

# paused = True
# self.bot_stop(bot_id)
# else:
# paused = False

# log_bot_message('starting', bot_id)
# filename = self.PIDFILE.format(bot_id)
# with open(filename, 'w') as fp:
# fp.write(str(os.getpid()))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without writing the PID the bot can still be started in parallel leading to weird behavior and duplicate messages.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! Thanks for reviewing me.


bot_module = self.__runtime_configuration[bot_id]['module']
module = importlib.import_module(bot_module)
bot = getattr(module, 'BOT')
try:
instance = bot(bot_id)
instance.start()
except (Exception, KeyboardInterrupt) as exc:
print('Bot failed: %s' % exc)
retval = 1
BotDebugger(self.__runtime_configuration[bot_id], bot_id, run_subcommand, console_type, dryrun, message_action_kind, msg)
retval = 0
except KeyboardInterrupt:
print('Keyboard interrupt.')
retval = 0
except SystemExit as exc:
print('Bot exited with code %s.' % exc)
retval = exc

self.__remove_pidfile(bot_id)
# self.__remove_pidfile(bot_id)
# if paused:
# self.bot_start(bot_id)
return retval

def bot_start(self, bot_id, getstatus=True):
Expand Down Expand Up @@ -305,10 +306,13 @@ def __init__(self, interactive: bool=False, return_type: str="python", quiet: bo

Outputs are logged to /opt/intelmq/var/log/intelmqctl"""
EPILOG = '''
intelmqctl [start|stop|restart|status|reload|run] bot-id
intelmqctl [start|stop|restart|status|reload] bot-id
intelmqctl [start|stop|restart|status|reload]
intelmqctl list [bots|queues]
intelmqctl log bot-id [number-of-lines [log-level]]
intelmqctl run bot-id message [get|pop|send]
intelmqctl run bot-id process [--msg|--dryrun]
intelmqctl run bot-id console
intelmqctl clear queue-id
intelmqctl check

Expand All @@ -321,8 +325,14 @@ def __init__(self, interactive: bool=False, return_type: str="python", quiet: bo
Get status of a bot:
intelmqctl status bot-id

Run a bot directly (blocking) for debugging purpose:
Run a bot directly for debugging purpose and temporarily leverage the logging level to DEBUG:
intelmqctl run bot-id
Get a pdb (or ipdb if installed) live console.
intelmqctl run bot-id console
See the message that waits in the input queue.
intelmqctl run bot-id message get
See additional help for further explanation.
intelmqctl run bot-id --help

Starting the botnet (all bots):
intelmqctl start
Expand Down Expand Up @@ -434,6 +444,27 @@ def __init__(self, interactive: bool=False, return_type: str="python", quiet: bo
parser_run = subparsers.add_parser('run', help='Run a bot interactively')
parser_run.add_argument('bot_id',
choices=self.runtime_configuration.keys())
parser_run_subparsers = parser_run.add_subparsers(title='run-subcommands')

parser_run_console = parser_run_subparsers.add_parser('console', help='Get a ipdb live console.')
parser_run_console.add_argument('console_type', nargs='?', help='You may specify which console should be run. Default is ipdb (if installed)'
' or pudb (if installed) or pdb but you may want to use another one.')
parser_run_console.set_defaults(run_subcommand="console")

parser_run_message = parser_run_subparsers.add_parser('message', help='Debug bot\'s pipelines. Get the message in the input pipeline, pop it (cut it)'
' and display it, or send the message directly to bot\'s output pipeline.')
parser_run_message.add_argument('message_action_kind', choices=["get", "pop", "send"])
parser_run_message.add_argument('msg', nargs='?', help='If send was chosen, put here the message in JSON.')
parser_run_message.set_defaults(run_subcommand="message")

parser_run_process = parser_run_subparsers.add_parser('process', help='Single run of bot\'s process() method.')
parser_run_process.add_argument('--dryrun', '-d', action='store_true',
help='Never really pop the message from the input pipeline '
'nor send to output pipeline.')
parser_run_process.add_argument('--msg', '-m',
help='Trick the bot to process this JSON '
'instead of the Message in its pipeline.')
parser_run_process.set_defaults(run_subcommand="process")
parser_run.set_defaults(func=self.bot_run)

parser_check = subparsers.add_parser('check',
Expand Down Expand Up @@ -518,8 +549,8 @@ def run(self):
elif results == 'error':
return 1

def bot_run(self, bot_id):
return self.bot_process_manager.bot_run(bot_id)
def bot_run(self, **kwargs):
return self.bot_process_manager.bot_run(**kwargs)

def bot_start(self, bot_id, getstatus=True):
if bot_id is None:
Expand Down Expand Up @@ -865,6 +896,5 @@ def main(): # pragma: no cover
x = IntelMQController(interactive=True)
return x.run()


if __name__ == "__main__": # pragma: no cover
exit(main())
155 changes: 155 additions & 0 deletions intelmq/lib/bot_debugger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# -*- coding: utf-8 -*-
"""
Utilities for debugging intelmq bots.

BotDebugger is called via intelmqctl. It starts a live running bot instance,
leverages logging to DEBUG level and permits even a non-skilled programmer
who may find themselves puzzled with Python nuances and server deployment twists
to see what's happening in the bot and where's the error.

Depending on the subcommand received, the class either
* starts the bot as is (default)
* processes single message, either injected or from default pipeline (process subcommand)
* reads the message from input pipeline or send a message to output pipeline (message subcommand)
"""
import time
import json
import logging
from importlib import import_module

from intelmq.lib import utils
from intelmq.lib.message import Event
from intelmq.lib.message import MessageFactory
from intelmq.lib.utils import StreamHandler
from intelmq.lib.utils import error_message_from_exc


class BotDebugger:

EXAMPLE = """\nThe message may look like: '{"source.network": "178.72.192.0/18", "time.observation": "2017-05-12T05:23:06+00:00"}' """

load_configuration = utils.load_configuration
logging_level = "DEBUG"
init_log_level = {"console": logging.DEBUG, "message": logging.WARNING, "process": logging.INFO, None: logging.INFO}

def __init__(self, runtime_configuration, bot_id, run_subcommand=None, console_type=None, dryrun=None, message_kind=None, msg=None):
self.runtime_configuration = runtime_configuration
self.leverageLogger(level=self.init_log_level[run_subcommand])
module = import_module(self.runtime_configuration['module'])
bot = getattr(module, 'BOT')
if run_subcommand == "message":
bot.init = lambda *args: None
self.instance = bot(bot_id)

if not run_subcommand:
self.instance.start()
else:
self.instance._Bot__connect_pipelines()
if run_subcommand == "console":
self._console(console_type)
elif run_subcommand == "message":
self.leverageLogger(logging.INFO)
self._message(message_kind, msg)
return
elif run_subcommand == "process":
self.leverageLogger(logging.DEBUG)
self._process(dryrun, msg)
else:
print("Subcommand {} not known.".format(run_subcommand))

def _console(self, console_type):
consoles = [console_type, "ipdb", "pudb", "pdb"]
for console in consoles:
try:
module = import_module(console)
except Exception as exc:
pass
else:
if console_type and console != console_type:
print("Console {} not available.".format(console_type))
print("*** Using console {}. Please use 'self' to access to the bot instance properties. ***".format(module.__name__))
break
else:
print("Can't run console.")
return

self = self.instance
module.set_trace()

def _message(self, message_action_kind, msg):
if message_action_kind == "get":
self.instance.logger.info("Waiting for a message to get...")
if not bool(self.instance._Bot__source_queues):
self.instance.logger.warning("Bot has no source queue.")
return

# Never pops from source to internal queue, thx to disabling brpoplpush operation.
# However, we have to wait manually till there is the message in the queue.
pl = self.instance._Bot__source_pipeline
pl.pipe.brpoplpush = lambda source_q, inter_q, i: pl.pipe.lindex(source_q, -1)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not read messages from the internal queue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, corrected.

while not pl.pipe.llen(pl.source_queue):
time.sleep(1)
self.pprint(self.instance.receive_message())
elif message_action_kind == "pop":
self.instance.logger.info("Waiting for a message to pop...")
self.pprint(self.instance.receive_message())
self.instance.acknowledge_message()
elif message_action_kind == "send":
if not bool(self.instance._Bot__destination_queues):
self.instance.logger.warning("Bot has no destination queues.")
return
if msg:
msg = self.arg2msg(msg)
self.instance.send_message(msg)
self.instance.logger.info("Message sent to output pipelines.")
else:
self.messageWizzard("Message missing!")

def _process(self, dryrun, msg):
if msg:
self.instance._Bot__source_pipeline.receive = lambda: msg
self.instance.logger.info(" * Message from cli will be used when processing.")

if dryrun:
self.instance.send_message = lambda msg: self.instance.logger.info("DRYRUN: Message would be sent now!")
self.instance.acknowledge_message = lambda: self.instance.logger.info("DRYRUN: Message would be acknowledged now!")
self.instance.logger.info(" * Dryrun only, no message will be really sent through.")

self.instance.logger.info("Processing...")
self.instance.process()

def arg2msg(self, msg):
try:
default_type = "Report" if self.runtime_configuration["group"] is "Parser" else "Event"
msg = MessageFactory.unserialize(msg, default_type=default_type)
except (Exception, KeyError, TypeError, json.JSONDecodeError) as exc:
self.messageWizzard("Message can not be parsed from JSON: {}".format(error_message_from_exc(exc)))
exit(1)
return msg

def leverageLogger(self, level):
utils.load_configuration = BotDebugger.load_configuration_patch
BotDebugger.logging_level = level
if hasattr(self, "instance"):
self.instance.logger.setLevel(level)
for h in self.instance.logger.handlers:
if isinstance(h, StreamHandler):
h.setLevel(level)

@staticmethod
def load_configuration_patch(*args, ** kwargs):
d = BotDebugger.load_configuration(*args, ** kwargs)
if "logging_level" in d:
d["logging_level"] = BotDebugger.logging_level
return d

def messageWizzard(self, msg):
self.instance.logger.error(msg)
print(self.EXAMPLE)
if input("Do you want to display current harmonization (available fields)? y/[n]: ") is "y":
self.pprint(self.instance.harmonization)

@staticmethod
def pprint(msg):
""" We can't use standard pprint as JSON standard asks for double quotes. """
print(json.dumps(msg, indent=4, sort_keys=True))