Skip to content
This repository has been archived by the owner on Jul 24, 2021. It is now read-only.

Commit

Permalink
Base relay server
Browse files Browse the repository at this point in the history
  • Loading branch information
DamianZaremba committed Aug 20, 2015
1 parent e095795 commit f7803e4
Show file tree
Hide file tree
Showing 12 changed files with 320 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cbng_relay/.gitignore
@@ -0,0 +1,2 @@
*.py[co]
*.cnf
16 changes: 16 additions & 0 deletions cbng_relay/README.md
@@ -0,0 +1,16 @@
ClueBot NG Relay
================

TCP relay server designed to 'multi-plex' out events.

The bot hits an endpoint and places an item on the queue, each backend then consumer the queue and do 'stuff'.

Backends
--------

### IRC



### Redis

1 change: 1 addition & 0 deletions cbng_relay/__init__.py
@@ -0,0 +1 @@
__author__ = 'damian'
Empty file added cbng_relay/backends/__init__.py
Empty file.
99 changes: 99 additions & 0 deletions cbng_relay/backends/irc_backend.py
@@ -0,0 +1,99 @@
import threading
import logging
from Queue import Empty
import time
import socket
import os
from config import Config
import ConfigParser
import errno
import fcntl

log = logging.getLogger(__name__)


class IRC(threading.Thread):
running = True
config = Config()
irc_bufer = ''
irc_first_ping = False

def __init__(self, queue):
self.queue = queue
threading.Thread.__init__(self)

def run(self):
self.connect()
while self.running:
self.process_irc()
self.process_queue()

def stop(self):
self.running = False
self.irc_client.close()

def connect(self):
self.irc_client = socket.socket()
self.irc_client.connect(
(self.config.get('irc', 'host'), self.config.getint('irc', 'port')))
fcntl.fcntl(self.irc_client, fcntl.F_SETFL, os.O_NONBLOCK)
self.irc_client.send(
bytes("NICK %s\r\n" % self.config.get('irc', 'nick')))
self.irc_client.send(bytes(
"USER %s localhost localhost :ClueBotNG Relay\r\n" % self.config.get('irc', 'nick')))

def process_irc(self):
try:
self.irc_bufer += self.irc_client.recv(1024).decode("UTF-8")
except socket.error, e:
if e.args[0] == errno.EAGAIN or e.args[0] == errno.EWOULDBLOCK:
return

lines = self.irc_bufer.split("\n")
self.irc_bufer = lines.pop()

for line in lines:
line = line.strip().split(' ')

log.debug('IRC line: %s' % line)
if line[0] == 'PING':
if not self.irc_first_ping:
self.irc_first_ping = True
log.debug('Running oper')
try:
oper_user = self.config.get('irc', 'oper_user')
oper_pass = self.config.get('irc', 'oper_pass')
self.irc_client.send(
bytes("OPER %s %s\r\n" % (oper_user, oper_pass)))
except ConfigParser.NoOptionError:
log.info('No oper in config')
pass

log.debug('Joining channels')
try:
for channel in self.config.get('irc', 'channels').split(','):
self.irc_client.send(
bytes("JOIN #%s\r\n" % channel.strip()))
except ConfigParser.NoOptionError:
log.info('No channels in config')
pass
self.irc_client.send(bytes("PONG %s\r\n" % line[1]))

def process_queue(self):
try:
(type, data) = self.queue.get_nowait()
except Empty:
return

dispatcher = {
'test': self.logger
}
if type in dispatcher.keys():
try:
dispatcher[type](data)
except Exception as e:
log.error('Dispatch failed', e)

# Handlers
def logger(self, data):
log.info(data)
46 changes: 46 additions & 0 deletions cbng_relay/backends/redis_backend.py
@@ -0,0 +1,46 @@
import threading
import logging
from Queue import Empty
from redis import StrictRedis
from config import Config

log = logging.getLogger(__name__)


class Redis(threading.Thread):
running = True
config = Config()
prefix = ''

def __init__(self, queue):
self.queue = queue
threading.Thread.__init__(self)
self.prefix = self.config.get('redis', 'prefix')

def run(self):
self.client = StrictRedis(host=self.config.get('redis', 'host'),
port=self.config.getint('redis', 'port'),
db=self.config.getint('redis', 'db'))

while self.running:
try:
(type, data) = self.queue.get_nowait()
except Empty:
continue

dispatcher = {
'test': self.logger
}
if type in dispatcher.keys():
try:
dispatcher[type](data)
except Exception as e:
log.error('Dispatch failed', e)

def stop(self):
self.running = False

# Handlers
def logger(self, data):
self.client.set('%stest' % self.prefix, 'test')
log.info(data)
16 changes: 16 additions & 0 deletions cbng_relay/config.py
@@ -0,0 +1,16 @@
import logging
import ConfigParser
import os

log = logging.getLogger(__name__)


class Config(ConfigParser.SafeConfigParser):

def __init__(self):
ConfigParser.SafeConfigParser.__init__(self)

cnf = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'relay.cnf')
if os.path.isfile(cnf):
self.read(cnf)
14 changes: 14 additions & 0 deletions cbng_relay/relay.cnf.dist
@@ -0,0 +1,14 @@
[irc]
nick = CBNGRelay_NG
host = irc.cluenet.org
port = 6667
channels =

[redis]
host = tools-redis
port = 6379
db = 0
prefix = cbng_

[general]
port = 8080
48 changes: 48 additions & 0 deletions cbng_relay/relay.py
@@ -0,0 +1,48 @@
#!/usr/bin/env python
from twisted.internet import protocol, reactor
import json
import logging
from utils import Utils
from config import Config

log = logging.getLogger(__name__)


class Relay(protocol.Protocol):

def dataReceived(self, data):
try:
type = data.split(':')[0]
data = json.loads(':'.join(data.split(':')[1:]))
payload = (type, data)

for thread, queue in self.factory.backends.items():
queue.put(payload)
except Exception as e:
log.debug(e)
pass


class RelayFactory(protocol.Factory):
protocol = Relay

def __init__(self, backends):
self.backends = backends

if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
backends = Utils().startBackends()

try:
port = Config().getint('general', 'port')
except ConfigParser.NoOptionError:
port = 0

srv = reactor.listenTCP(port, RelayFactory(backends))
Utils().takeoverActiveNode(srv.getHost().port)
reactor.run()

for thread, queue in backends.items():
log.debug('Stopping thread')
thread.stop()
thread.join()
1 change: 1 addition & 0 deletions cbng_relay/requirements.txt
@@ -0,0 +1 @@
redis
21 changes: 21 additions & 0 deletions cbng_relay/test_client.py
@@ -0,0 +1,21 @@
#!/usr/bin/env python
import json
import socket

DATA = {
'test': {},
}


def fire():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('localhost', 8080))
for key, data in DATA.items():
msg = '%s:%s' % (key, json.dumps(data))
print('Sending: %s' % msg)
s.send(msg)
s.close()


if __name__ == '__main__':
fire()
56 changes: 56 additions & 0 deletions cbng_relay/utils.py
@@ -0,0 +1,56 @@
import logging
from socket import gethostname
import MySQLdb
import ConfigParser
import Queue
import os
from backends.redis_backend import Redis
from backends.irc_backend import IRC

log = logging.getLogger(__name__)


class Utils:

def takeoverActiveNode(self, port):
hostname = gethostname()

my_cnf = os.path.join(os.path.expanduser("~"), '.cbng.cnf')
if not os.path.isfile(my_cnf):
return

try:
config = ConfigParser.RawConfigParser()
config.read(my_cnf)
user = config.get('discovery_mysql', 'user')
password = config.get('discovery_mysql', 'password')
db = config.get('discovery_mysql', 'name')
host = config.get('discovery_mysql', 'host')

print port
db = MySQLdb.connect(user=user, passwd=password, db=db, host=host)
c = db.cursor()
c.execute(
'replace into `cluster_node` values (%s, %s, "ng_relay");', (hostname, port))
c.close()
db.commit()
db.close()
except Exception as e:
log.debug('Could not takeover active node', e)

def startBackends(self):
backends = {}

log.debug('Starting irc thread')
irc_queue = Queue.Queue(0)
irc = IRC(irc_queue)
irc.start()
backends[irc] = irc_queue

log.debug('Starting redis thread')
redis_queue = Queue.Queue(0)
redis = Redis(redis_queue)
redis.start()
backends[redis] = redis_queue

return backends

0 comments on commit f7803e4

Please sign in to comment.