Permalink
Browse files

Migration to streamparse 3.1.1:

1) New topology output debug
2) Merged spouts and bolts in a spamscope file
  • Loading branch information...
1 parent 663d261 commit 88f63a509300687bbc1ea862974b07b098530678 @fedelemantuano fedelemantuano committed Sep 22, 2016
@@ -1,38 +0,0 @@
-# Spouts configurations
-
-# Spout file on file system
-files-mails-spout:
-
- # Reload new mails after reload.mails analyzed
- reload.mails: 1000
-
- # Waiting new mails, sleep seconds
- waiting.sleep: 1
-
- # Max retry failed tuple
- max.retry: 3
-
- # Post processing
- post_processing:
-
- # move or remove mails?
- what: remove
-
- # if move where
- where: /mnt/testing/spamscope/mails/moved
-
- # if failed move in where.failed
- where.failed: /mnt/testing/spamscope/mails/failed
-
- # Mailboxes
- mailboxes:
- test:
- mail_server: hostname
- files_pattern: "*untroubled*"
- priority: 1
- path_mails: /path/mails1
- test1:
- mail_server: hostname
- files_pattern: "*"
- priority: 2
- path_mails: /path/mails2
@@ -1,7 +1,45 @@
+# Spouts configurations
+# Spout file on file system
+files-mails:
+
+ # Reload new mails after reload.mails analyzed
+ reload.mails: 1000
+
+ # Waiting new mails, sleep seconds
+ waiting.sleep: 1
+
+ # Max retry failed tuple
+ max.retry: 3
+
+ # Post processing
+ post_processing:
+
+ # move or remove mails?
+ what: remove
+
+ # if move where
+ where: /mnt/testing/spamscope/mails/moved
+
+ # if failed move in where.failed
+ where.failed: /mnt/testing/spamscope/mails/failed
+
+ # Mailboxes
+ mailboxes:
+ test:
+ mail_server: hostname
+ files_pattern: "*untroubled*"
+ priority: 1
+ path_mails: /path/mails1
+ test1:
+ mail_server: hostname
+ files_pattern: "*"
+ priority: 2
+ path_mails: /path/mails2
+
+
# Bolts configurations
-
# Phishing bolt configuration
-phishing-bolt:
+phishing:
lists:
subjects:
# Suspect subjects
@@ -17,7 +55,7 @@ phishing-bolt:
# Attachments bolt configuration
-attachments-bolt:
+attachments:
tika:
# Enable Tika but it's very slow:
# https://github.com/fedelemantuano/tika-app-python#performance-tests
@@ -45,7 +83,7 @@ attachments-bolt:
# Urls handler body
-urls_handler_body-bolt:
+urls-handler-body:
whitelists:
# Only second level domains to whitelisting
# Example in conf/whitelists/generic.example.yml
@@ -60,7 +98,7 @@ urls_handler_body-bolt:
# Urls handler attachments
-urls_handler_attachments-bolt:
+urls-handler-attachments:
whitelists:
# Only second level domains to whitelisting
# Example in conf/whitelists/generic.example.yml
@@ -75,13 +113,12 @@ urls_handler_attachments-bolt:
# Output debug bolt configuration
-output-debug-bolt:
+output-debug:
json.indent: 4
output.path: /path/to/output
-# Output debug bolt configuration
-
+# Output Redis
#hosts:
#The hostname(s) of your Redis server(s). Ports may be specified on any
#hostname, which will override the global port config.
@@ -117,7 +154,7 @@ output-debug-bolt:
#queue_name:
#name of Redis server list
-output-redis-bolt:
+output-redis:
servers:
hosts:
- "localhost:6379"
@@ -132,7 +169,7 @@ output-redis-bolt:
queue_name: spamscope
-output-elasticsearch-bolt:
+output-elasticsearch:
servers:
hosts:
- "node1:9200"
View
@@ -1,5 +1,5 @@
{
- "library": "",
+ "serializer": "json",
"topology_specs": "topologies/",
"virtualenv_specs": "virtualenvs/",
"use_virtualenv": "false",
View
@@ -1,12 +1,10 @@
(defproject spamscope "0.0.1-SNAPSHOT"
- :source-paths ["topologies"]
:resource-paths ["_resources"]
:target-path "_build"
:min-lein-version "2.0.0"
:jvm-opts ["-client"]
- :dependencies [[org.apache.storm/storm-core "0.9.5"]
- [com.parsely/streamparse "0.0.4-SNAPSHOT"]
- ]
- :jar-exclusions [#"log4j\.properties" #"backtype" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
- :uberjar-exclusions [#"log4j\.properties" #"backtype" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
- )
+ :dependencies [[org.apache.storm/storm-core "1.0.2"]
+ [org.apache.storm/flux-core "1.0.2"]]
+ :jar-exclusions [#"log4j\.properties" #"org\.apache\.storm\.(?!flux)" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
+ :uberjar-exclusions [#"log4j\.properties" #"org\.apache\.storm\.(?!flux)" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
+ )
@@ -34,7 +34,7 @@ class AbstractBolt(Bolt):
__metaclass__ = ABCMeta
def initialize(self, stormconf, context):
- self._conf_file = stormconf.get("bolts.conf", None)
+ self._conf_file = stormconf.get("spamscope_conf", None)
self._conf_loader()
def _conf_loader(self):
@@ -27,6 +27,7 @@
class Attachments(AbstractBolt):
+ outputs = ['sha256_random', 'with_attachments', 'attachments']
def initialize(self, stormconf, context):
super(Attachments, self).initialize(stormconf, context)
View
@@ -26,10 +26,11 @@
class Forms(Bolt):
+ outputs = ['sha256_random', 'with_form']
def process(self, tup):
sha256_random = tup.values[0]
- forms = False
+ with_form = False
try:
mail = json.loads(tup.values[1])
@@ -39,7 +40,7 @@ def process(self, tup):
tree = html.fromstring(body)
results = tree.xpath('//form')
if results:
- forms = True
+ with_form = True
self.log("Forms for mail '{}'".format(sha256_random))
except Exception as e:
@@ -50,4 +51,4 @@ def process(self, tup):
self.raise_exception(e, tup)
finally:
- self.emit([sha256_random, forms])
+ self.emit([sha256_random, with_form])
@@ -26,6 +26,7 @@
class JsonMaker(Bolt):
+ outputs = ['sha256_random', 'json']
def initialize(self, stormconf, context):
self.mails = {}
@@ -30,6 +30,7 @@
class Phishing(AbstractBolt):
+ outputs = ['sha256_random', 'with_phishing', 'score', 'targets']
def initialize(self, stormconf, context):
super(Phishing, self).initialize(stormconf, context)
@@ -41,6 +41,8 @@ class InvalidKindData(ValueError):
class Tokenizer(Bolt):
"""Split the mail in token parts (body, attachments, etc.). """
+ outputs = ['sha256_random', 'mail']
+
def initialize(self, stormconf, context):
self.p = MailParser()
@@ -24,6 +24,7 @@
class UrlsHandlerAttachments(AbstractUrlsHandlerBolt):
+ outputs = ['sha256_random', 'with_urls', 'urls']
def initialize(self, stormconf, context):
super(UrlsHandlerAttachments, self).initialize(stormconf, context)
@@ -24,6 +24,7 @@
class UrlsHandlerBody(AbstractUrlsHandlerBolt):
+ outputs = ['sha256_random', 'with_urls', 'urls']
def initialize(self, stormconf, context):
super(UrlsHandlerBody, self).initialize(stormconf, context)
@@ -27,7 +27,7 @@ class AbstractSpout(Spout):
__metaclass__ = ABCMeta
def initialize(self, stormconf, context):
- self._conf_file = stormconf.get("spouts.conf", None)
+ self._conf_file = stormconf.get("spamscope_conf", None)
self._conf_loader()
def _conf_loader(self):
@@ -30,6 +30,13 @@
class FilesMailSpout(AbstractSpout):
+ outputs = [
+ 'mail_path',
+ 'mail_server',
+ 'mailbox',
+ 'priority',
+ 'kind_data',
+ ]
def initialize(self, stormconf, context):
super(FilesMailSpout, self).initialize(stormconf, context)
View
@@ -1,11 +0,0 @@
-def pre_submit(topology_name, env_name, env_config):
- """Override this function to perform custom actions prior to topology
- submission. No SSH tunnels will be active when this function is called."""
- pass
-
-
-def post_submit(topo_name, env_name, env_config):
- """Override this function to perform custom actions after topology
- submission. Note that the SSH tunnel to Nimbus will still be active
- when this function is called."""
- pass
Oops, something went wrong.

0 comments on commit 88f63a5

Please sign in to comment.