Permalink
Browse files

Init repository

  • Loading branch information...
0 parents commit 3614d7570074b3ded0dff031058c81c1c08ff2fc @fedelemantuano fedelemantuano committed Apr 12, 2016
@@ -0,0 +1,7 @@
+.DS_Store
+.ropeproject
+_build
+_resources
+logs
+virtualenvs
+.*.swp
No changes.
@@ -0,0 +1,19 @@
+{
+ "library": "",
+ "topology_specs": "topologies/",
+ "virtualenv_specs": "virtualenvs/",
+ "envs": {
+ "prod": {
+ "user": "",
+ "nimbus": "",
+ "workers": [],
+ "log": {
+ "path": "",
+ "max_bytes": 1000000,
+ "backup_count": 10,
+ "level": "info"
+ },
+ "virtualenv_root": ""
+ }
+ }
+}
@@ -0,0 +1,11 @@
+def pre_submit(topology_name, env_name, env_config):
+ """Override this function to perform custom actions prior to topology
+ submission. No SSH tunnels will be active when this function is called."""
+ pass
+
+
+def post_submit(topo_name, env_name, env_config):
+ """Override this function to perform custom actions after topology
+ submission. Note that the SSH tunnel to Nimbus will still be active
+ when this function is called."""
+ pass
@@ -0,0 +1,12 @@
+(defproject spamscope "0.0.1-SNAPSHOT"
+ :source-paths ["topologies"]
+ :resource-paths ["_resources"]
+ :target-path "_build"
+ :min-lein-version "2.0.0"
+ :jvm-opts ["-client"]
+ :dependencies [[org.apache.storm/storm-core "0.9.5"]
+ [com.parsely/streamparse "0.0.4-SNAPSHOT"]
+ ]
+ :jar-exclusions [#"log4j\.properties" #"backtype" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
+ :uberjar-exclusions [#"log4j\.properties" #"backtype" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
+ )
@@ -0,0 +1,13 @@
+contextlib2==0.5.1
+ecdsa==0.13
+Fabric==1.11.1
+invoke==0.12.2
+Jinja2==2.8
+MarkupSafe==0.23
+paramiko==1.16.0
+prettytable==0.7.2
+pycrypto==2.6.1
+requests==2.9.1
+simplejson==3.8.2
+six==1.10.0
+streamparse==2.1.4
No changes.
@@ -0,0 +1,16 @@
+from __future__ import absolute_import, print_function, unicode_literals
+from streamparse.bolt import Bolt
+
+import email
+
+
+class MailParser(Bolt):
+
+ def process(self, tup):
+ message = email.message_from_string(tup.values[0].decode('base64'))
+
+ for i in message.walk():
+ if not i.is_multipart():
+ self.log("Mime type: {}".format(i.get_content_type()))
+ content_type = i.get('content-transfer-encoding', '')
+ self.log("Content type: {}".format(content_type))
No changes.
@@ -0,0 +1,18 @@
+from __future__ import absolute_import, print_function, unicode_literals
+from streamparse.spout import Spout
+
+import glob
+import os
+
+
+class MailSpout(Spout):
+
+ def initialize(self, stormconf, context):
+ mails_path = '/home/fedelemantuano/Desktop/mails/'
+ # TODO: get from QueueItem
+ self._mails_iglob = glob.iglob(os.path.join(mails_path, '*heidi*'))
+
+ def next_tuple(self):
+ for i in self._mails_iglob:
+ with open(i) as mail:
+ self.emit([mail.read().encode('base64')])
@@ -0,0 +1,11 @@
+def pre_submit(topology_name, env_name, env_config):
+ """Override this function to perform custom actions prior to topology
+ submission. No SSH tunnels will be active when this function is called."""
+ pass
+
+
+def post_submit(topo_name, env_name, env_config):
+ """Override this function to perform custom actions after topology
+ submission. Note that the SSH tunnel to Nimbus will still be active
+ when this function is called."""
+ pass
@@ -0,0 +1,23 @@
+(ns spamscope
+ (:use [streamparse.specs])
+ (:gen-class))
+
+(defn spamscope [options]
+ [
+ ;; spout configuration
+ {"mails-spout" (python-spout-spec
+ options
+ "spouts.mails.MailSpout"
+ ["mail"]
+ )
+ }
+ ;; bolt configuration
+ {"mailparse-bolt" (python-bolt-spec
+ options
+ {"mails-spout" :shuffle}
+ "bolts.mailparse.MailParser"
+ []
+ )
+ }
+ ]
+)

0 comments on commit 3614d75

Please sign in to comment.