Permalink
Browse files

New reformat_output function for OutputElasticsearch and OutputRedis,

with unittest.
New OutputRedis with two queues: mails and attachments.
Package bitmap for all bitmap.
New topology with output in Redis.
New Rabbit class.
  • Loading branch information...
1 parent 22f19c9 commit 9fbeb7aea2e867cb689fca4b1025adc318d84877 @fedelemantuano fedelemantuano committed Jan 5, 2017
@@ -207,7 +207,8 @@ output-redis:
reconnect_interval: 1
max_retry: 60
flush_size: 50
- queue_name: spamscope
+ queue_mails: spamscope_mails
+ queue_attachments: spamscope_attachments
output-elasticsearch:
@@ -43,7 +43,7 @@
"eager_global_ordinals": true
},
"match_pattern": "regex",
- "match": "(^|.*\\.)(md5|sha1|sha256|sha512|ssdeep)$"
+ "match": "(^|.*\\.)(md5|sha1|sha256|sha512|ssdeep|extension)$"
}
},
{
View
@@ -1,9 +0,0 @@
-from os.path import join
-
-__version__ = "v1.3.rc3"
-__configuration_path__ = "/etc/spamscope"
-
-__defaults__ = {
- "SPAMSCOPE_CONF_PATH": __configuration_path__,
- "SPAMSCOPE_CONF_FILE": join(__configuration_path__, "spamscope.yml"),
- "SPAMSCOPE_VER": __version__, }
@@ -101,10 +101,9 @@ def _load_whitelist(self):
if not isinstance(domains, list):
raise ImproperlyConfigured(
- "Whitelist {} not loaded".format(k)
- )
- domains = [i.lower() for i in domains]
- self._whitelist |= set(domains)
+ "Whitelist {} not loaded".format(k))
+ domains = {i.lower() for i in domains}
+ self._whitelist |= domains
self.log("Whitelist '{}' loaded".format(k))
def process_tick(self, freq):
@@ -47,7 +47,6 @@ def _load_settings(self):
thug_user_agents=self.conf["thug"]["user_agents"])
def _load_lists(self):
-
# Load content types for details
self._tika_valid_content_types = set()
if self.conf["tika"]["enabled"]:
@@ -16,7 +16,7 @@
from __future__ import absolute_import, print_function, unicode_literals
from streamparse.bolt import Bolt
-from modules.phishing_bitmap import PhishingBitMap
+from modules.bitmap import PhishingBitMap
class JsonMaker(Bolt):
@@ -16,10 +16,9 @@
from __future__ import absolute_import, print_function, unicode_literals
from bolts.abstracts import AbstractBolt
+from modules.utils import reformat_output
from elasticsearch import Elasticsearch
from elasticsearch import helpers
-import copy
-import datetime
class OutputElasticsearch(AbstractBolt):
@@ -28,26 +27,29 @@ class OutputElasticsearch(AbstractBolt):
def initialize(self, stormconf, context):
super(OutputElasticsearch, self).initialize(stormconf, context)
+ # Load settings
+ self._load_settings()
+
+ # Init
+ self._mails = []
+ self._attachments = []
+ self._count = 1
+
+ def _load_settings(self):
# Elasticsearch parameters
servers = self.conf['servers']
self._flush_size = servers['flush_size']
- # For mails
- self._index_mails = servers['index.prefix.mails']
- self._doc_type_mails = servers['doc.type.mails']
-
- # For attachments
- self._index_attachments = servers['index.prefix.attachments']
- self._doc_type_attachments = servers['doc.type.attachments']
+ # Parameters for reformat_output function
+ self._parameters = {
+ "elastic_index_mail": servers['index.prefix.mails'],
+ "elastic_type_mail": servers['doc.type.mails'],
+ "elastic_index_attach": servers['index.prefix.attachments'],
+ "elastic_type_attach": servers['doc.type.attachments']}
# Elasticsearch object
self._es = Elasticsearch(hosts=servers['hosts'])
- # Init
- self._mails = []
- self._attachments = []
- self._count = 1
-
def flush(self):
helpers.bulk(self._es, self._mails)
helpers.bulk(self._es, self._attachments)
@@ -56,73 +58,14 @@ def flush(self):
self._count = 1
def process(self, tup):
- mail = tup.values[1]
-
- # Date for daily index
- try:
- timestamp = datetime.datetime.strptime(
- mail['analisys_date'], "%Y-%m-%dT%H:%M:%S.%f")
- except:
- # Without microseconds
- timestamp = datetime.datetime.strptime(
- mail['analisys_date'], "%Y-%m-%dT%H:%M:%S")
-
- mail_date = timestamp.strftime("%Y.%m.%d")
-
- # Get a copy of attachments
- attachments = []
- if mail.get("attachments", []):
- attachments = copy.deepcopy(mail["attachments"])
-
- # Prepair attachments for bulk
- for i in attachments:
- i['@timestamp'] = timestamp
- i['_index'] = self._index_attachments + mail_date
- i['_type'] = self._doc_type_attachments
- i['type'] = self._doc_type_attachments
- i['is_archived'] = False
-
- for j in i.get("files", []):
-
- f = copy.deepcopy(j)
-
- # Prepair archived files
- f['@timestamp'] = timestamp
- f['_index'] = self._index_attachments + mail_date
- f['_type'] = self._doc_type_attachments
- f['type'] = self._doc_type_attachments
- f['is_archived'] = True
- self._attachments.append(f)
-
- # Remove from archived payload, virustotal and thug
- # now in root
- j.pop("payload", None)
- j.pop("virustotal", None)
- j.pop("thug", None)
-
- self._attachments.append(i)
-
- # Remove from mail the attachments huge fields like payload
- # Fetch from Elasticsearch more fast
- for i in mail.get("attachments", []):
- i.pop("payload", None)
- i.pop("tika", None)
- i.pop("virustotal", None)
- i.pop("thug", None)
-
- for j in i.get("files", []):
- j.pop("payload", None)
- j.pop("virustotal", None)
- j.pop("thug", None)
-
- # Prepair mail for bulk
- mail['@timestamp'] = timestamp
- mail['_index'] = self._index_mails + mail_date
- mail['type'] = self._doc_type_mails
- mail['_type'] = self._doc_type_mails
-
- # Append mail in own date
+ raw_mail = tup.values[1]
+
+ # Reformat output
+ mail, attachments = reformat_output(
+ raw_mail, self.component_name, **self._parameters)
+
self._mails.append(mail)
+ self._attachments += attachments
# Flush
if self._count == self._flush_size:
@@ -134,6 +77,10 @@ def process_tick(self, freq):
"""Every freq seconds flush messages. """
super(OutputElasticsearch, self).process_tick(freq)
+ # Flush mails
if self._mails or self._attachments:
- self.log("Flush mail in Elasticsearch after tick")
+ self.log("Flush mails/attachments in Elasticsearch after tick")
self.flush()
+
+ # Reload settings
+ self._load_settings()
@@ -17,6 +17,7 @@
from __future__ import absolute_import, print_function, unicode_literals
from bolts.abstracts import AbstractBolt
from modules.redis_client import Redis
+from modules.utils import reformat_output
class OutputRedis(AbstractBolt):
@@ -25,12 +26,20 @@ class OutputRedis(AbstractBolt):
def initialize(self, stormconf, context):
super(OutputRedis, self).initialize(stormconf, context)
+ # Load settings
+ self._load_settings()
+
+ # Init
+ self._mails = []
+ self._attachments = []
+ self._count = 1
+
+ def _load_settings(self):
# Redis parameters
servers = self.conf['servers']
self._flush_size = servers['flush_size']
- self._queue_name = servers['queue_name']
- self._mails = []
- self._count = 1
+ self._queue_mails = servers['queue_mails']
+ self._queue_attachments = servers['queue_attachments']
# Redis class
self._redis_client = Redis(
@@ -44,14 +53,24 @@ def initialize(self, stormconf, context):
def flush(self):
self._redis_client.push_messages(
- queue=self._queue_name,
- messages=self._mails)
+ queue=self._queue_mails, messages=self._mails)
+ self._redis_client.push_messages(
+ queue=self._queue_attachments, messages=self._attachments)
self._mails = []
+ self._attachments = []
self._count = 1
def process(self, tup):
- self._mails.append(tup.values[1])
+ raw_mail = tup.values[1]
+
+ # Reformat output
+ mail, attachments = reformat_output(
+ raw_mail, self.component_name)
+
+ self._mails.append(mail)
+ self._attachments += attachments
+ # Flush
if self._count == self._flush_size:
self.flush()
else:
@@ -60,6 +79,10 @@ def process(self, tup):
def process_tick(self, freq):
"""Every freq seconds flush messages. """
super(OutputRedis, self).process_tick(freq)
- if self._mails:
- self.log("Flush mail in Redis server after tick")
+
+ if self._mails or self._attachments:
+ self.log("Flush mails/attachments in Redis server after tick")
self.flush()
+
+ # Reload settings
+ self._load_settings()
@@ -16,7 +16,7 @@
from __future__ import absolute_import, print_function, unicode_literals
from bolts.abstracts import AbstractBolt
-from modules.phishing_bitmap import PhishingBitMap
+from modules.bitmap import PhishingBitMap
from modules.exceptions import ImproperlyConfigured
from modules.utils import \
search_words_in_text as swt, \
@@ -50,8 +50,7 @@ def _load_lists(self):
keywords = load_config(v)
if not isinstance(keywords, list):
raise ImproperlyConfigured(
- "Keywords subjects list '{}' not valid".format(k)
- )
+ "Keywords subjects list '{}' not valid".format(k))
self._s_keys |= set(keywords)
# Load targets keywords
@@ -61,8 +60,7 @@ def _load_lists(self):
keywords = load_config(v)
if not isinstance(keywords, dict):
raise ImproperlyConfigured(
- "Keywords targets dict '{}' not valid".format(k)
- )
+ "Keywords targets dict '{}' not valid".format(k))
self._t_keys.update(keywords)
def _check_urls(self, urls, keywords):
@@ -0,0 +1,19 @@
+"""
+Copyright 2016 Fedele Mantuano (https://twitter.com/fedelemantuano)
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+
+from .phishing_bitmap import PhishingBitMap
+from .bitmap import *
@@ -80,9 +80,7 @@ def _valide_bitmap(self):
if not isinstance(self.bitmap, dict):
raise BitMapNotValid(
"BitMap must be a dict, {} given".format(
- type(self.bitmap)
- )
- )
+ type(self.bitmap)))
bitmap_values = set(self.bitmap.values())
expected_values = set(range(0, len(self.bitmap)))
@@ -101,8 +99,7 @@ def unset_property_score(self, *args):
for p in args:
if p not in self.bitmap:
raise PropertyDoesNotExists(
- "Property '{}' does not exists".format(p)
- )
+ "Property '{}' does not exists".format(p))
value = self.bitmap.get(p)
@@ -115,8 +112,7 @@ def set_property_score(self, *args):
for p in args:
if p not in self.bitmap:
raise PropertyDoesNotExists(
- "Property '{}' does not exists".format(p)
- )
+ "Property '{}' does not exists".format(p))
value = self.bitmap.get(p)
@@ -130,8 +126,7 @@ def calculate_score(self, *args):
for p in args:
if p not in self.bitmap:
raise PropertyDoesNotExists(
- "Property '{}' does not exists".format(p)
- )
+ "Property '{}' does not exists".format(p))
value = self.bitmap.get(p)
@@ -151,8 +146,7 @@ def get_score_sum(self, *args):
for score in args:
if not isinstance(score, int):
raise ScoreNotInteger(
- "Score '{}' is not a integer".format(score)
- )
+ "Score '{}' is not a integer".format(score))
score_sum |= (1 << score)
return score_sum
@@ -189,10 +183,7 @@ def score(self, value):
if value > threshold:
raise ScoreOutOfRange(
"{} can only have values in the range [0, {}]".format(
- self.map_name,
- threshold
- )
- )
+ self.map_name, threshold))
self._score = value
Oops, something went wrong.

0 comments on commit 9fbeb7a

Please sign in to comment.