Skip to content

Commit

Permalink
Debugger, alienvault
Browse files Browse the repository at this point in the history
certtools#975 - debugger can accept files as messages
certtools#993 - alienvault-otx conforming ParserBot
  • Loading branch information
e3rd committed Jun 6, 2017
1 parent 378c4a9 commit 264d5fd
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 87 deletions.
162 changes: 80 additions & 82 deletions intelmq/bots/parsers/alienvault/parser_otx.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import urllib.parse as parse

from intelmq.lib import utils
from intelmq.lib.bot import Bot
from intelmq.lib.bot import ParserBot

HASHES = {
'FileHash-SHA256': 'malware.hash.sha256',
Expand All @@ -19,90 +19,88 @@
}


class AlienVaultOTXParserBot(Bot):

def process(self):
report = self.receive_message()

raw_report = utils.base64_decode(report.get("raw"))

for pulse in json.loads(raw_report):
additional_pulse = {"author": pulse['author_name'],
"pulse": pulse['name']}

for indicator in pulse["indicators"]:
additional_indicator = {}
event = self.new_event(report)
# hashes
if indicator["type"] in HASHES.keys():
event.add(HASHES[indicator["type"]], indicator["indicator"])
# fqdn
elif indicator["type"] in ['hostname', 'domain']:
# not all domains in the report are just domains
# some are URLs, we can manage those here instead
# of raising errors
#
# dirty check if there is a scheme

resource = indicator["indicator"] \
if '://' in indicator["indicator"] \
else 'http://' + indicator["indicator"]
path = parse.urlparse(resource).path
if len(path) > 0:
event.add('source.url', resource)
else:
event.add('source.fqdn',
indicator["indicator"])
# IP addresses
elif indicator["type"] in ['IPv4', 'IPv6']:
event.add('source.ip',
indicator["indicator"])
# emails
elif indicator["type"] == 'email':
event.add('source.account',
indicator["indicator"])
# URLs
elif indicator["type"] in ['URL', 'URI']:
resource = indicator["indicator"] \
if '://' in indicator["indicator"] \
else 'http://' + indicator["indicator"]
class AlienVaultOTXParserBot(ParserBot):
#import ipdb; ipdb.set_trace()
parse = ParserBot.parse_json
recover_line = ParserBot.recover_line_json

def parse_line(self, pulse, report):
additional_pulse = {"author": pulse['author_name'],
"pulse": pulse['name']}

for indicator in pulse["indicators"]:
additional_indicator = {}
event = self.new_event(report)
# hashes
if indicator["type"] in HASHES.keys():
event.add(HASHES[indicator["type"]], indicator["indicator"])
# fqdn
elif indicator["type"] in ['hostname', 'domain']:
# not all domains in the report are just domains
# some are URLs, we can manage those here instead
# of raising errors
#
# dirty check if there is a scheme

resource = indicator["indicator"] \
if '://' in indicator["indicator"] \
else 'http://' + indicator["indicator"]
path = parse.urlparse(resource).path
if len(path) > 0:
event.add('source.url', resource)
# CIDR
elif indicator["type"] in ['CIDR']:
event.add('source.network',
indicator["indicator"])

# CVE
elif indicator["type"] in ['CVE']:
additional_indicator['CVE'] = indicator["indicator"]
# TODO: Process these IoCs: FilePath, Mutex
else:
continue

if 'tags' in pulse:
additional_indicator['tags'] = pulse['tags']
if 'modified' in pulse:
additional_indicator['time_updated'] = \
pulse["modified"][:-4] + "+00:00"
if 'industries' in pulse:
additional_indicator['industries'] = pulse["industries"]
if 'adversary' in pulse:
additional_indicator['adversary'] = pulse["adversary"]
if 'targeted_countries' in pulse:
tc = pulse['targeted_countries']
if tc:
additional_indicator['targeted_countries'] = tc

additional = additional_pulse.copy()
additional.update(additional_indicator)
event.add('source.fqdn',
indicator["indicator"])
# IP addresses
elif indicator["type"] in ['IPv4', 'IPv6']:
event.add('source.ip',
indicator["indicator"])
# emails
elif indicator["type"] == 'email':
event.add('source.account',
indicator["indicator"])
# URLs
elif indicator["type"] in ['URL', 'URI']:
resource = indicator["indicator"] \
if '://' in indicator["indicator"] \
else 'http://' + indicator["indicator"]
event.add('source.url', resource)
# CIDR
elif indicator["type"] in ['CIDR']:
event.add('source.network',
indicator["indicator"])

# CVE
elif indicator["type"] in ['CVE']:
additional_indicator['CVE'] = indicator["indicator"]
# TODO: Process these IoCs: FilePath, Mutex
else:
return

if 'tags' in pulse:
additional_indicator['tags'] = pulse['tags']
if 'modified' in pulse:
additional_indicator['time_updated'] = \
pulse["modified"][:-4] + "+00:00"
if 'industries' in pulse:
additional_indicator['industries'] = pulse["industries"]
if 'adversary' in pulse:
additional_indicator['adversary'] = pulse["adversary"]
if 'targeted_countries' in pulse:
tc = pulse['targeted_countries']
if tc:
additional_indicator['targeted_countries'] = tc

additional = additional_pulse.copy()
additional.update(additional_indicator)

event.add('comment', pulse['description'])
event.add('extra', additional)
event.add('classification.type', 'blacklist')
event.add('time.source', indicator["created"][:-4] + "+00:00")
event.add("raw", json.dumps(indicator, sort_keys=True))
return event]

event.add('comment', pulse['description'])
event.add('extra', additional)
event.add('classification.type', 'blacklist')
event.add('time.source', indicator["created"][:-4] + "+00:00")
event.add("raw", json.dumps(indicator, sort_keys=True))
self.send_message(event)
self.acknowledge_message()


BOT = AlienVaultOTXParserBot
36 changes: 33 additions & 3 deletions intelmq/lib/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,14 @@ def parse_csv_dict(self, report: dict):

for line in csv.DictReader(io.StringIO(raw_report)):
yield line

def parse_json(self, report: dict):
"""
A basic JSON parser
"""
raw_report = utils.base64_decode(report.get("raw"))
for line in json.loads(raw_report):
yield line

def parse(self, report: dict):
"""
Expand All @@ -573,12 +581,16 @@ def parse(self, report: dict):
Override for your use or use an existing parser, e.g.::
parse = ParserBot.parse_csv
You should do that for recovering lines too.
recover_line = ParserBot.recover_line_csv
"""
for line in utils.base64_decode(report.get("raw")).splitlines():
line = line.strip()
if not any([line.startswith(prefix) for prefix in self.ignore_lines_starting]):
yield line


def parse_line(self, line, report):
"""
Expand All @@ -603,9 +615,19 @@ def process(self):
for line in self.parse(report):
if not line:
continue
try:
# filter out None
events = list(filter(bool, self.parse_line(line, report)))
try:
value = self.parse_line(line, report)
if value is None:
continue
elif type(value) is list:
# filter out None
events = list(filter(bool, value))
else:
events = [value]
except TypeError:
print("TYPE error")
print(line)
import ipdb; ipdb.set_trace()
except Exception:
self.logger.exception('Failed to parse line.')
self.__failed.append((traceback.format_exc(), line))
Expand Down Expand Up @@ -642,6 +664,14 @@ def recover_line_csv_dict(self, line: str):
writer.writeheader()
writer.writerow(line)
return out.getvalue()

def recover_line_json(self, line: dict):
"""
Reverse of parse for JSON pulses.
Recovers a fully functional report with only the problematic pulse.
"""
return json.dumps(line)


class CollectorBot(Bot):
Expand Down
8 changes: 6 additions & 2 deletions intelmq/lib/bot_debugger.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import time
import json
import logging
from os.path import exists
from importlib import import_module

from intelmq.lib import utils
Expand Down Expand Up @@ -90,7 +91,7 @@ def _message(self, message_action_kind, msg):
# However, we have to wait manually till there is the message in the queue.
pl = self.instance._Bot__source_pipeline
pl.pipe.brpoplpush = lambda source_q, inter_q, i: pl.pipe.lindex(source_q, -1)
while not pl.pipe.llen(pl.source_queue):
while not (pl.pipe.llen(pl.source_queue) or pl.pipe.llen(pl.internal_queue)):
time.sleep(1)
self.pprint(self.instance.receive_message())
elif message_action_kind == "pop":
Expand Down Expand Up @@ -126,7 +127,10 @@ def arg2msg(self, msg):
try:
default_type = "Report" if self.runtime_configuration["group"] is "Parser" else "Event"
msg = MessageFactory.unserialize(msg, default_type=default_type)
except (Exception, KeyError, TypeError, ValueError) as exc:
except (Exception, KeyError, TypeError, ValueError) as exc:
if exists(msg):
with open(msg,"r") as f:
return self.arg2msg(f.read())
self.messageWizzard("Message can not be parsed from JSON: {}".format(error_message_from_exc(exc)))
exit(1)
return msg
Expand Down

0 comments on commit 264d5fd

Please sign in to comment.