Skip to content

Commit

Permalink
Server infrastructure
Browse files Browse the repository at this point in the history
  • Loading branch information
heynemann committed Jul 16, 2012
1 parent 1da3d3b commit 3d8975e
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 56 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
# %%%%%%%%%%%%%% SERVICE %%%%%%%%%%%%%%
run:
@PYTHONPATH=$$PYTHONPATH:.:./test python r3/app/server.py 0.0.0.0 9999 localhost 7778
@PYTHONPATH=$$PYTHONPATH:.:./test python r3/app/server.py --redis-port=7778 --redis-pass=r3


# %%%%%%%%%%%%%% WORKER %%%%%%%%%%%%%%
worker:
@cd test && PYTHONPATH=$$PYTHONPATH:.:.. python count_words_mapper.py localhost 7778 ${KEY}
@PYTHONPATH=$$PYTHONPATH:. python r3/worker/mapper.py --job-type="count-words" --mapper-key="${KEY}" --mapper-class="test.count_words_mapper.CountWordsMapper" --redis-port=7778 --redis-pass=r3


# %%%%%%%%%%%%%% WEB %%%%%%%%%%%%%%
web:
@PYTHONPATH=$$PYTHONPATH:.:./test python r3/web/app.py 0.0.0.0 8888 localhost 7778
@PYTHONPATH=$$PYTHONPATH:.:./test python r3/web/server.py --redis-port=7778 --redis-pass=r3 --config-file=./r3/web/config.py --debug


# %%%%%%%%%%%%%% REDIS %%%%%%%%%%%%%%
Expand Down
2 changes: 1 addition & 1 deletion r3/app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def __init__(self, redis, log_level):

handlers = [
(r'/healthcheck', HealthcheckHandler),
(r'/stream', StreamHandler),
(r'/stream/(?P<job_key>.+)/?', StreamHandler),
]

self.redis.delete('r3::mappers')
Expand Down
3 changes: 1 addition & 2 deletions r3/app/handlers/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ def group_items(self, stream_items, group_size):
return items

@tornado.web.asynchronous
def get(self):
def get(self, job_key):
arguments = self.request.arguments
job_key = arguments['key'][0]
job_id = uuid4()
job_date = datetime.now()

Expand Down
28 changes: 15 additions & 13 deletions r3/app/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import sys
import logging
import argparse

import tornado.ioloop
from tornado.httpserver import HTTPServer
Expand All @@ -13,29 +14,30 @@

def main(arguments=None):
'''Runs r³ server with the specified arguments.'''
log_level = 'warning'
port = int(arguments[1])
ip = arguments[0]

redis_host = arguments[2]
redis_port = int(arguments[3])
parser = argparse.ArgumentParser(description='runs the application that processes stream requests for r³')
parser.add_argument('-b', '--bind', type=str, default='0.0.0.0', help='the ip that r³ will bind to')
parser.add_argument('-p', '--port', type=int, default=9999, help='the port that r³ will bind to')
parser.add_argument('-l', '--loglevel', type=str, default='warning', help='the log level that r³ will run under')
parser.add_argument('--redis-host', type=str, default='0.0.0.0', help='the ip that r³ will use to connect to redis')
parser.add_argument('--redis-port', type=int, default=6379, help='the port that r³ will use to connect to redis')
parser.add_argument('--redis-db', type=int, default=0, help='the database that r³ will use to connect to redis')
parser.add_argument('--redis-pass', type=str, default='', help='the password that r³ will use to connect to redis')

#log_level = arguments['log_level']
#port = arguments['port']
#ip = arguments['ip']
args = parser.parse_args(arguments)

c = redis.StrictRedis(host=redis_host, port=redis_port, db=0, password='r3')
c = redis.StrictRedis(host=args.redis_host, port=args.redis_port, db=args.redis_db, password=args.redis_pass)

logging.basicConfig(level=getattr(logging, log_level.upper()))
logging.basicConfig(level=getattr(logging, args.loglevel.upper()))

application = R3ServiceApp(redis=c, log_level=log_level)
application = R3ServiceApp(redis=c, log_level=args.loglevel.upper())

server = HTTPServer(application)
server.bind(port, ip)
server.bind(args.port, args.bind)
server.start(1)

try:
logging.debug('r³ service app running at %s:%d' % (ip, port))
logging.debug('r³ service app running at %s:%d' % (args.bind, args.port))
tornado.ioloop.IOLoop.instance().start()
except KeyboardInterrupt:
print
Expand Down
57 changes: 28 additions & 29 deletions r3/web/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from flask import Flask, render_template, g, redirect, url_for
from ujson import loads

from r3.web.extensions import RedisDB
from r3.version import __version__
from r3.app.utils import flush_dead_mappers
from r3.app.keys import MAPPERS_KEY, JOB_TYPES_KEY, JOB_TYPE_KEY, LAST_PING_KEY, MAPPER_ERROR_KEY, MAPPER_WORKING_KEY, JOB_TYPES_ERRORS_KEY, ALL_KEYS, PROCESSED, PROCESSED_FAILED
Expand All @@ -21,16 +20,16 @@ def server_context():
def before_request():
g.config = app.config
g.server = server_context()
g.job_types = db.connection.smembers(JOB_TYPES_KEY)
g.job_types = app.db.connection.smembers(JOB_TYPES_KEY)
g.jobs = get_all_jobs(g.job_types)
g.mappers = get_mappers()

def get_mappers():
all_mappers = db.connection.smembers(MAPPERS_KEY)
all_mappers = app.db.connection.smembers(MAPPERS_KEY)
mappers_status = {}
for mapper in all_mappers:
key = MAPPER_WORKING_KEY % mapper
working = db.connection.lrange(key, 0, -1)
working = app.db.connection.lrange(key, 0, -1)
if not working:
mappers_status[mapper] = None
else:
Expand All @@ -41,7 +40,7 @@ def get_mappers():
def get_all_jobs(all_job_types):
all_jobs = {}
for job_type in all_job_types:
job_type_jobs = db.connection.smembers(JOB_TYPE_KEY % job_type)
job_type_jobs = app.db.connection.smembers(JOB_TYPE_KEY % job_type)
all_jobs[job_type] = []
if job_type_jobs:
all_jobs[job_type] = job_type_jobs
Expand All @@ -51,26 +50,26 @@ def get_all_jobs(all_job_types):
def get_errors():
errors = []
for job_type in g.job_types:
errors = [loads(item) for key, item in db.connection.hgetall(MAPPER_ERROR_KEY % job_type).iteritems()]
errors = [loads(item) for key, item in app.db.connection.hgetall(MAPPER_ERROR_KEY % job_type).iteritems()]

return errors

@app.route("/")
def index():
error_queues = db.connection.keys(JOB_TYPES_ERRORS_KEY)
error_queues = app.db.connection.keys(JOB_TYPES_ERRORS_KEY)

has_errors = False
for queue in error_queues:
if db.connection.hlen(queue) > 0:
if app.db.connection.hlen(queue) > 0:
has_errors = True

flush_dead_mappers(db.connection, MAPPERS_KEY, LAST_PING_KEY)
flush_dead_mappers(app.db.connection, MAPPERS_KEY, LAST_PING_KEY)

return render_template('index.html', failed_warning=has_errors)

@app.route("/mappers")
def mappers():
flush_dead_mappers(db.connection, MAPPERS_KEY, LAST_PING_KEY)
flush_dead_mappers(app.db.connection, MAPPERS_KEY, LAST_PING_KEY)
return render_template('mappers.html')

@app.route("/failed")
Expand All @@ -81,16 +80,16 @@ def failed():
def delete_all_failed():
for job_type in g.job_types:
key = MAPPER_ERROR_KEY % job_type
db.connection.delete(key)
app.db.connection.delete(key)

return redirect(url_for('failed'))

@app.route("/failed/delete/<job_id>")
def delete_failed(job_id):
for job_type in g.job_types:
key = MAPPER_ERROR_KEY % job_type
if db.connection.hexists(key, job_id):
db.connection.hdel(key, job_id)
if app.db.connection.hexists(key, job_id):
app.db.connection.hdel(key, job_id)

return redirect(url_for('failed'))

Expand All @@ -100,17 +99,17 @@ def job_types():

@app.route("/stats")
def stats():
info = db.connection.info()
key_names = db.connection.keys(ALL_KEYS)
info = app.db.connection.info()
key_names = app.db.connection.keys(ALL_KEYS)

keys = []
for key in key_names:
key_type = db.connection.type(key)
key_type = app.db.connection.type(key)

if key_type == 'list':
size = db.connection.llen(key)
size = app.db.connection.llen(key)
elif key_type == 'set':
size = db.connection.scard(key)
size = app.db.connection.scard(key)
else:
size = 1

Expand All @@ -120,34 +119,34 @@ def stats():
'type': key_type
})

processed = db.connection.get(PROCESSED)
processed_failed = db.connection.get(PROCESSED_FAILED)
processed = app.db.connection.get(PROCESSED)
processed_failed = app.db.connection.get(PROCESSED_FAILED)

return render_template('stats.html', info=info, keys=keys, processed=processed, failed=processed_failed)

@app.route("/stats/keys/<key>")
def key(key):
key_type = db.connection.type(key)
key_type = app.db.connection.type(key)

if key_type == 'list':
value = db.connection.lrange(key, 0, -1)
value = app.db.connection.lrange(key, 0, -1)
multi = True
elif key_type == 'set':
value = db.connection.smembers(key)
value = app.db.connection.smembers(key)
multi = True
else:
value = db.connection.get(key)
value = app.db.connection.get(key)
multi = False

return render_template('show_key.html', key=key, multi=multi, value=value)

@app.route("/stats/keys/<key>/delete")
def delete_key(key):
db.connection.delete(key)
app.db.connection.delete(key)
return redirect(url_for('stats'))

if __name__ == "__main__":
app.config.from_object('r3.web.config')
db = RedisDB(app)
app.run(debug=True, host=app.config['WEB_HOST'], port=app.config['WEB_PORT'])
#if __name__ == "__main__":
#app.config.from_object('r3.web.config')
#db = RedisDB(app)
#app.run(debug=True, host=app.config['WEB_HOST'], port=app.config['WEB_PORT'])

49 changes: 49 additions & 0 deletions r3/web/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys
import os
from os.path import abspath, isabs, join
import logging
import argparse

from r3.web.app import app
from r3.web.extensions import RedisDB

def main(arguments=None):
'''Runs r³ web app with the specified arguments.'''

parser = argparse.ArgumentParser(description='runs the web admin that helps in monitoring r³ usage')
parser.add_argument('-b', '--bind', type=str, default='0.0.0.0', help='the ip that r³ will bind to')
parser.add_argument('-p', '--port', type=int, default=8888, help='the port that r³ will bind to')
parser.add_argument('-l', '--loglevel', type=str, default='warning', help='the log level that r³ will run under')
parser.add_argument('--redis-host', type=str, default='0.0.0.0', help='the ip that r³ will use to connect to redis')
parser.add_argument('--redis-port', type=int, default=6379, help='the port that r³ will use to connect to redis')
parser.add_argument('--redis-db', type=int, default=0, help='the database that r³ will use to connect to redis')
parser.add_argument('--redis-pass', type=str, default='', help='the password that r³ will use to connect to redis')
parser.add_argument('-c', '--config-file', type=str, default='', help='the configuration file that r³ will use')
parser.add_argument('-d', '--debug', default=False, action='store_true', help='indicates that r³ will be run in debug mode')

args = parser.parse_args(arguments)

logging.basicConfig(level=getattr(logging, args.loglevel.upper()))

if args.config_file:
config_path = args.config_file
if not isabs(args.config_file):
config_path = abspath(join(os.curdir, args.config_file))

app.config.from_pyfile(config_path, silent=False)
else:
app.config.from_object('r3.web.config')

app.db = RedisDB(app)
try:
logging.debug('r³ web app running at %s:%d' % (args.bind, args.port))
app.run(debug=args.debug, host=app.config['WEB_HOST'], port=app.config['WEB_PORT'])
except KeyboardInterrupt:
print
print "-- r³ web app closed by user interruption --"

if __name__ == "__main__":
main(sys.argv[1:])
54 changes: 46 additions & 8 deletions r3/worker/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import sys
import os
import argparse

import redis
from ujson import loads, dumps
Expand All @@ -23,12 +24,12 @@ class TimeoutError(JobError):
pass

class Mapper:
def __init__(self, key, process_name, redis_host, redis_port, redis_db, redis_pass):
self.mapper_key = key
self.process_name = process_name
self.full_name = '%s::%s' % (self.mapper_key, self.process_name)
def __init__(self, job_type, mapper_key, redis_host, redis_port, redis_db, redis_pass):
self.job_type = job_type
self.mapper_key = mapper_key
self.full_name = '%s::%s' % (self.job_type, self.mapper_key)
self.timeout = None
self.input_queue = MAPPER_INPUT_KEY % self.mapper_key
self.input_queue = MAPPER_INPUT_KEY % self.job_type
self.working_queue = MAPPER_WORKING_KEY % self.full_name
self.max_retries = 5

Expand Down Expand Up @@ -82,7 +83,7 @@ def unregister(self):

def ping(self):
self.redis.delete(MAPPER_WORKING_KEY % self.full_name)
self.redis.sadd(JOB_TYPES_KEY, self.mapper_key)
self.redis.sadd(JOB_TYPES_KEY, self.job_type)
self.redis.sadd(MAPPERS_KEY, self.full_name)
self.redis.set(LAST_PING_KEY % self.full_name, datetime.now().strftime(DATETIME_FORMAT))

Expand All @@ -96,6 +97,43 @@ def map_item(self, item, json_item):
self.redis.delete(self.working_queue)
self.redis.delete('r3::mappers::%s::working' % self.full_name)

def kls_import(fullname):
name_parts = fullname.split('.')
klass_name = name_parts[-1]
module_parts = name_parts[:-1]
module = reduce(getattr, module_parts[1:], __import__('.'.join(module_parts)))
klass = getattr(module, klass_name)
return klass

def main(arguments):
parser = argparse.ArgumentParser(description='runs the application that processes stream requests for r³')
parser.add_argument('-l', '--loglevel', type=str, default='warning', help='the log level that r³ will run under')
parser.add_argument('--redis-host', type=str, default='0.0.0.0', help='the ip that r³ will use to connect to redis')
parser.add_argument('--redis-port', type=int, default=6379, help='the port that r³ will use to connect to redis')
parser.add_argument('--redis-db', type=int, default=0, help='the database that r³ will use to connect to redis')
parser.add_argument('--redis-pass', type=str, default='', help='the password that r³ will use to connect to redis')
parser.add_argument('--job-type', type=str, help='the job-type for this mapper', required=True)
parser.add_argument('--mapper-key', type=str, help='the unique identifier for this mapper', required=True)
parser.add_argument('--mapper-class', type=str, help='the fullname of the class that this mapper will run', required=True)

args = parser.parse_args(arguments)

logging.basicConfig(level=getattr(logging, args.loglevel.upper()))

try:
klass = kls_import(args.mapper_class)
except Exception, err:
print "Could not import the specified %s class. Error: %s" % (args.mapper_class, err)
raise

mapper = klass(args.job_type, args.mapper_key, redis_host=args.redis_host, redis_port=args.redis_port, redis_db=args.redis_db, redis_pass=args.redis_pass)
try:
mapper.run_block()
except KeyboardInterrupt:
print
print "-- r³ mapper closed by user interruption --"



if __name__ == '__main__':
mapper = Mapper("generic-mapper", redis_host=sys.argv[1], redis_port=int(sys.argv[2]), redis_db=0, redis_pass='r3')
mapper.run_block()
main(sys.argv[1:])
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ http://gevent.googlecode.com/files/gevent-1.0b2.tar.gz
tornado-pyvows
ujson
flask
argparse

0 comments on commit 3d8975e

Please sign in to comment.