Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

HPFeeds Thug client

  • Loading branch information...
commit 9cd642ecaab16529c7ac2966e6631c6135c77bf6 1 parent 6f683da
@buffer authored
View
186 hpfeeds/ThugHPFeeds.py
@@ -0,0 +1,186 @@
+#!/usr/bin/env python
+#
+# ThugHPFeeds.py
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
+# MA 02111-1307 USA
+
+import os
+import sys
+import time
+import hashlib
+import datetime
+import json
+import logging
+import threading
+import ConfigParser
+import hpfeeds
+
+
+class ThugFiles(threading.Thread):
+ def __init__(self, opts):
+ self.opts = opts
+ self.logging_init()
+ threading.Thread.__init__(self)
+
+ def logging_init(self):
+ if self.opts['logdir'] is None:
+ return
+
+ if not os.path.exists(self.opts['logdir']):
+ os.mkdir(self.opts['logdir'])
+
+ self.log = logging.getLogger("ThugFiles.HPFeeds")
+ handler = logging.FileHandler(os.path.join(self.opts['logdir'], 'thugfiles.log'))
+ formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s')
+ handler.setFormatter(formatter)
+ self.log.addHandler(handler)
+ self.log.setLevel(logging.INFO)
+
+ def run(self):
+ def on_message(identifier, channel, payload):
+ try:
+ decoded = json.loads(str(payload))
+ except:
+ decoded = {'raw': payload}
+
+ if not 'md5' in decoded or not 'data' in decoded:
+ self.log.info("Received message does not contain hash or data - Ignoring it")
+ return
+
+ csv = ', '.join(['{0} = {1}'.format(i, decoded[i]) for i in ['md5', 'sha1', 'type']])
+ outmsg = 'PUBLISH channel = %s, identifier = %s, %s' % (channel, identifier, csv)
+ self.log.info(outmsg)
+
+ if self.opts['logdir'] is None:
+ return
+
+ filedata = decoded['data'].decode('base64')
+ fpath = os.path.join(self.opts['logdir'], decoded['md5'])
+ with open(fpath, 'wb') as fd:
+ fd.write(filedata)
+
+ def on_error(payload):
+ self.log.critical("Error message from server: %s" % (payload, ))
+ self.hpc.stop()
+
+ while True:
+ try:
+ self.hpc = hpfeeds.new(self.opts['host'], int(self.opts['port']), self.opts['ident'], self.opts['secret'])
+ self.log.info("Connected to %s" % (self.hpc.brokername, ))
+ self.hpc.subscribe([self.opts['channel'], ])
+ except hpfeeds.FeedException:
+ break
+
+ try:
+ self.hpc.run(on_message, on_error)
+ except:
+ self.hpc.close()
+ time.sleep(20)
+
+
+class ThugEvents(threading.Thread):
+ def __init__(self, opts):
+ self.opts = opts
+ self.logging_init()
+ threading.Thread.__init__(self)
+
+ def logging_init(self):
+ if self.opts['logdir'] is None:
+ return
+
+ if not os.path.exists(self.opts['logdir']):
+ os.mkdir(self.opts['logdir'])
+
+ self.log = logging.getLogger("ThugEvents.HPFeeds")
+ handler = logging.FileHandler(os.path.join(self.opts['logdir'], 'thugevents.log'))
+ formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s')
+ handler.setFormatter(formatter)
+ self.log.addHandler(handler)
+ self.log.setLevel(logging.INFO)
+
+ def run(self):
+ def on_message(identifier, channel, payload):
+ m = hashlib.md5()
+ m.update(payload)
+
+ outmsg = 'PUBLISH channel = %s, identifier = %s, MAEC = %s' % (channel, identifier, m.hexdigest())
+ self.log.info(outmsg)
+
+ if self.opts['logdir'] is None:
+ return
+
+ fpath = os.path.join(self.opts['logdir'], m.hexdigest())
+ with open(fpath, 'wb') as fd:
+ fd.write(payload)
+
+ def on_error(payload):
+ self.log.critical("Error message from server: %s" % (payload, ))
+ self.hpc.stop()
+
+ while True:
+ try:
+ self.hpc = hpfeeds.new(self.opts['host'], int(self.opts['port']), self.opts['ident'], self.opts['secret'])
+ self.log.info("Connected to %s" % (self.hpc.brokername, ))
+ self.hpc.subscribe(self.opts['channel'])
+ except hpfeeds.FeedException:
+ break
+
+ try:
+ self.hpc.run(on_message, on_error)
+ except:
+ self.hpc.close()
+ time.sleep(20)
+
+
+class ThugHPFeeds:
+ def __init__(self):
+ self.events_opts = dict()
+ self.files_opts = dict()
+ self.config_init()
+
+ def config_init(self):
+ config = ConfigParser.ConfigParser()
+
+ conf_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'hpfeeds.conf')
+ config.read(conf_file)
+
+ for option in config.options('HPFeeds'):
+ opt = config.get('HPFeeds', option)
+ self.events_opts[option] = opt
+ self.files_opts[option] = opt
+
+ for option in config.options('ThugFiles'):
+ self.files_opts[option] = config.get('ThugFiles', option)
+
+ for option in config.options('ThugEvents'):
+ self.events_opts[option] = config.get('ThugEvents', option)
+
+ def run(self):
+ if self.files_opts['enable']:
+ files = ThugFiles(self.files_opts)
+ files.start()
+
+ if self.events_opts['enable']:
+ events = ThugEvents(self.events_opts)
+ events.start()
+
+
+if __name__ == '__main__':
+ try:
+ f = ThugHPFeeds()
+ f.run()
+ except KeyboardInterrupt:
+ sys.exit(0)
+
View
17 hpfeeds/hpfeeds.conf.sample
@@ -0,0 +1,17 @@
+[HPFeeds]
+host: hpfeeds.honeycloud.net
+port: 10000
+
+[ThugFiles]
+enable: True
+channel: thug.files
+ident: <thug.files ident>
+secret: <thug.files secret>
+logdir: ./files
+
+[ThugEvents]
+enable: True
+channel: thug.events
+ident: <thug.events ident>
+secret: <thug.events secret>
+logdir: ./events
View
182 hpfeeds/hpfeeds.py
@@ -0,0 +1,182 @@
+#!/usr/bin/env python
+#
+# hpfeeds.py
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
+# MA 02111-1307 USA
+
+
+import sys
+import struct
+import socket
+import hashlib
+import logging
+from time import sleep
+
+logger = logging.getLogger('pyhpfeeds')
+
+OP_ERROR = 0
+OP_INFO = 1
+OP_AUTH = 2
+OP_PUBLISH = 3
+OP_SUBSCRIBE = 4
+BUFSIZ = 16384
+
+__all__ = ["new", "FeedException"]
+
+def msghdr(op, data):
+ return struct.pack('!iB', 5+len(data), op) + data
+
+def msgpublish(ident, chan, data):
+# if isinstance(data, str):
+# data = data.encode('latin1')
+ return msghdr(OP_PUBLISH, struct.pack('!B', len(ident)) + ident + struct.pack('!B', len(chan)) + chan + data)
+
+def msgsubscribe(ident, chan):
+ return msghdr(OP_SUBSCRIBE, struct.pack('!B', len(ident)) + ident + chan)
+
+def msgauth(rand, ident, secret):
+ hash = hashlib.sha1(rand+secret).digest()
+ return msghdr(OP_AUTH, struct.pack('!B', len(ident)) + ident + hash)
+
+class FeedUnpack(object):
+ def __init__(self):
+ self.buf = bytearray()
+ def __iter__(self):
+ return self
+ def next(self):
+ return self.unpack()
+ def feed(self, data):
+ self.buf.extend(data)
+ def unpack(self):
+ if len(self.buf) < 5:
+ raise StopIteration('No message.')
+
+ ml, opcode = struct.unpack('!iB', buffer(self.buf,0,5))
+ if len(self.buf) < ml:
+ raise StopIteration('No message.')
+
+ data = bytearray(buffer(self.buf, 5, ml-5))
+ del self.buf[:ml]
+ return opcode, data
+
+class FeedException(Exception):
+ pass
+
+class HPC(object):
+ def __init__(self, host, port, ident, secret, timeout=3, reconnect=False, sleepwait=20):
+ self.host, self.port = host, port
+ self.ident, self.secret = ident, secret
+ self.timeout = timeout
+ self.reconnect = reconnect
+ self.sleepwait = sleepwait
+ self.brokername = 'unknown'
+ self.connected = False
+ self.stopped = False
+ self.unpacker = FeedUnpack()
+
+ self.connect()
+
+ def connect(self):
+ logger.info('connecting to {0}:{1}'.format(self.host, self.port))
+ self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.s.settimeout(self.timeout)
+ try: self.s.connect((self.host, self.port))
+ except: raise FeedException('Could not connect to broker.')
+ self.connected = True
+
+ try: d = self.s.recv(BUFSIZ)
+ except socket.timeout: raise FeedException('Connection receive timeout.')
+
+ self.unpacker.feed(d)
+ for opcode, data in self.unpacker:
+ if opcode == OP_INFO:
+ rest = buffer(data, 0)
+ name, rest = rest[1:1+ord(rest[0])], buffer(rest, 1+ord(rest[0]))
+ rand = str(rest)
+
+ logger.debug('info message name: {0}, rand: {1}'.format(name, repr(rand)))
+ self.brokername = name
+
+ self.s.send(msgauth(rand, self.ident, self.secret))
+ break
+ else:
+ raise FeedException('Expected info message at this point.')
+
+ self.s.settimeout(None)
+ self.s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+
+ if sys.platform in ('linux2', ):
+ self.s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 60)
+
+ def _run(self, message_callback, error_callback):
+ while not self.stopped:
+ while self.connected:
+ d = self.s.recv(BUFSIZ)
+ if not d:
+ self.connected = False
+ break
+
+ self.unpacker.feed(d)
+ for opcode, data in self.unpacker:
+ if opcode == OP_PUBLISH:
+ rest = buffer(data, 0)
+ ident, rest = rest[1:1+ord(rest[0])], buffer(rest, 1+ord(rest[0]))
+ chan, content = rest[1:1+ord(rest[0])], buffer(rest, 1+ord(rest[0]))
+
+ message_callback(str(ident), str(chan), content)
+ elif opcode == OP_ERROR:
+ error_callback(data)
+
+ if self.stopped: break
+
+ if self.stopped: break
+ self.connect()
+
+ def run(self, message_callback, error_callback):
+ if not self.reconnect:
+ self._run(message_callback, error_callback)
+ else:
+ while True:
+ self._run(message_callback, error_callback)
+ # reconnect now we've failed
+ sleep(self.sleepwait)
+ while True:
+ try:
+ self.connect()
+ break
+ except FeedException:
+ sleep(self.sleepwait)
+
+ def subscribe(self, chaninfo):
+ if type(chaninfo) == str:
+ chaninfo = [chaninfo,]
+ for c in chaninfo:
+ self.s.send(msgsubscribe(self.ident, c))
+ def publish(self, chaninfo, data):
+ if type(chaninfo) == str:
+ chaninfo = [chaninfo,]
+ for c in chaninfo:
+ self.s.send(msgpublish(self.ident, c, data))
+
+ def stop(self):
+ self.stopped = True
+
+ def close(self):
+ try: self.s.close()
+ except: logger.warn('Socket exception when closing.')
+
+def new(host=None, port=10000, ident=None, secret=None, reconnect=True, sleepwait=20):
+ return HPC(host, port, ident, secret, reconnect)
+
Please sign in to comment.
Something went wrong with that request. Please try again.