Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 69 additions & 43 deletions intelmq/bin/intelmqdump.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,13 @@ def dump_info(fname):


def save_file(fname, content):
with open(fname, 'wt') as handle:
json.dump(content, handle)
try:
with open(fname, 'wt') as handle:
json.dump(content, handle)
except KeyboardInterrupt:
with open(fname, 'wt') as handle:
json.dump(content, handle)
exit(1)


def load_meta(dump):
Expand Down Expand Up @@ -140,11 +145,15 @@ def main():
info = dump_info(fname)
print("{c:3}: {s:{l}} {i}".format(c=count, s=shortname, i=info,
l=length))
botid = input(inverted('Which dump file to process (id or name)?') +
' ')
botid = botid.strip()
if botid == 'q' or not botid:
try:
botid = input(inverted('Which dump file to process (id or name)?') +
' ')
except EOFError:
exit(0)
else:
botid = botid.strip()
if botid == 'q' or not botid:
exit(0)
try:
fname, botid = filenames[int(botid)]
except ValueError:
Expand All @@ -156,31 +165,43 @@ def main():
if not os.path.isfile(fname):
print(bold('Given file does not exist: {}'.format(fname)))
exit(1)

answer = None
while True:
info = dump_info(fname)
print('Processing {}: {}'.format(bold(botid), info))

if info.startswith(str(red)):
available_opts = [item[0] for item in ACTIONS.values() if item[2]]
print('Restricted actions.')
else:
# don't display list after 'show' and 'recover' command
if not (answer and isinstance(answer, list) and answer[0] in ['s', 'r']):
with open(fname, 'rt') as handle:
content = json.load(handle)
meta = load_meta(content)
available_opts = [item[0] for item in ACTIONS.values()]
for count, line in enumerate(meta):
print('{:3}: {} {}'.format(count, *line))

# Determine bot status
bot_status = ctl.bot_status(botid)
if bot_status == 'running':
print(red('Attention: This bot is currently running!'))
elif bot_status == 'error':
print(red('Attention: This bot is not defined!'))

if info.startswith(str(red)):
available_opts = [item[0] for item in ACTIONS.values() if item[2]]
print('Restricted actions.')
try:
answer = input(inverted(', '.join(available_opts) + '?') + ' ').split()
except EOFError:
break
else:
with open(fname, 'rt') as handle:
content = json.load(handle)
meta = load_meta(content)
available_opts = [item[0] for item in ACTIONS.values()]
for count, line in enumerate(meta):
print('{:3}: {} {}'.format(count, *line))
answer = input(inverted(', '.join(available_opts) + '?') + ' ').split()
if not answer:
continue
if any([answer[0] == char for char in AVAILABLE_IDS]):
if not answer:
continue
if any([answer[0] == char for char in AVAILABLE_IDS]) and len(answer) > 1:
ids = [int(item) for item in answer[1].split(',')]
else:
ids = []
queue_name = None
if answer[0] == 'a':
# recover all -> recover all by ids
Expand All @@ -196,37 +217,42 @@ def main():
del content[meta[entry][0]]
save_file(fname, content)
elif answer[0] == 'r':
if bot_status == 'running':
# See https://github.com/certtools/intelmq/issues/574
print(red('Recovery for running bots not possible.'))
continue
# recover entries
default = utils.load_configuration(DEFAULTS_CONF_FILE)
runtime = utils.load_configuration(RUNTIME_CONF_FILE)
params = utils.load_parameters(default, runtime)
pipe = pipeline.PipelineFactory.create(params)
for i, (key, entry) in enumerate([item for (count, item)
in enumerate(content.items()) if count in ids]):
if entry['message']:
msg = entry['message']
else:
print('No message here, deleting entry.')
del content[key]
save_file(fname, content)
continue
try:
for i, (key, entry) in enumerate([item for (count, item)
in enumerate(content.items()) if count in ids]):
if entry['message']:
msg = entry['message']
else:
print('No message here, deleting entry.')
del content[key]
continue

if queue_name is None:
if len(answer) == 3:
queue_name = answer[2]
if queue_name is None:
if len(answer) == 3:
queue_name = answer[2]
else:
queue_name = entry['source_queue']
try:
pipe.set_queues(queue_name, 'destination')
pipe.connect()
pipe.send(msg)
except exceptions.PipelineError:
print(red('Could not reinject into queue {}: {}'
''.format(queue_name, traceback.format_exc())))
else:
queue_name = entry['source_queue']
try:
pipe.set_queues(queue_name, 'destination')
pipe.connect()
pipe.send(msg)
except exceptions.PipelineError:
print(red('Could not reinject into queue {}: {}'
''.format(queue_name, traceback.format_exc())))
else:
del content[key]
save_file(fname, content)
print(green('Recovered dump {}.'.format(i)))
del content[key]
print(green('Recovered dump {}.'.format(i)))
finally:
save_file(fname, content)
if not content:
os.remove(fname)
print('Deleted empty file {}'.format(fname))
Expand Down