Skip to content
Browse files

Added configuration support for input streams and reducers

  • Loading branch information...
1 parent 3d8975e commit 166c3097884c18c02196b1dcaf30353fb380b75d @heynemann committed Jul 16, 2012
View
4 Makefile
@@ -1,11 +1,11 @@
# %%%%%%%%%%%%%% SERVICE %%%%%%%%%%%%%%
run:
- @PYTHONPATH=$$PYTHONPATH:.:./test python r3/app/server.py --redis-port=7778 --redis-pass=r3
+ @PYTHONPATH=$$PYTHONPATH:.:./test python r3/app/server.py --redis-port=7778 --redis-pass=r3 --config-file="./test/app_config.py"
# %%%%%%%%%%%%%% WORKER %%%%%%%%%%%%%%
worker:
- @PYTHONPATH=$$PYTHONPATH:. python r3/worker/mapper.py --job-type="count-words" --mapper-key="${KEY}" --mapper-class="test.count_words_mapper.CountWordsMapper" --redis-port=7778 --redis-pass=r3
+ @PYTHONPATH=$$PYTHONPATH:. python r3/worker/mapper.py --mapper-key="${KEY}" --mapper-class="test.count_words_mapper.CountWordsMapper" --redis-port=7778 --redis-pass=r3
# %%%%%%%%%%%%%% WEB %%%%%%%%%%%%%%
View
16 r3/app/app.py
@@ -6,12 +6,14 @@
from r3.app.handlers.healthcheck import HealthcheckHandler
from r3.app.handlers.stream import StreamHandler
+from r3.app.utils import kls_import
class R3ServiceApp(tornado.web.Application):
- def __init__(self, redis, log_level):
+ def __init__(self, redis, config, log_level):
self.redis = redis
self.log_level = log_level
+ self.config = config
handlers = [
(r'/healthcheck', HealthcheckHandler),
@@ -28,15 +30,15 @@ def __init__(self, redis, log_level):
def load_input_streams(self):
self.input_streams = {}
- stream = __import__('count_words_stream')
-
- self.input_streams['count-words'] = stream.CountWordsStream()
+ for stream_class in self.config.INPUT_STREAMS:
+ stream = kls_import(stream_class)
+ self.input_streams[stream.job_type] = stream()
def load_reducers(self):
self.reducers = {}
- reducer = __import__('count_words_reducer')
-
- self.reducers['count-words'] = reducer.CountWordsReducer()
+ for reducer_class in self.config.REDUCERS:
+ reducer = kls_import(reducer_class)
+ self.reducers[reducer.job_type] = reducer()
View
26 r3/app/config.py
@@ -0,0 +1,26 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+
+from os.path import isabs, abspath
+import imp
+
+class Config:
+ def __init__(self, path):
+ if not isabs(path):
+ self.path = abspath(path)
+ else:
+ self.path = path
+
+ self.load()
+
+ def load(self):
+ with open(self.path) as config_file:
+ name = 'configuration'
+ code = config_file.read()
+ module = imp.new_module(name)
+ exec code in module.__dict__
+
+ for name, value in module.__dict__.iteritems():
+ setattr(self, name, value)
+
+
View
4 r3/app/handlers/stream.py
@@ -38,7 +38,7 @@ def get(self, job_key):
try:
start = time.time()
input_stream = self.application.input_streams[job_key]
- items = input_stream.process(arguments)
+ items = input_stream.process(self.application, arguments)
if hasattr(input_stream, 'group_size'):
items = self.group_items(items, input_stream.group_size)
@@ -85,7 +85,7 @@ def get(self, job_key):
else:
start = time.time()
reducer = self.application.reducers[job_key]
- result = reducer.reduce(results)
+ result = reducer.reduce(self.application, results)
logging.debug("reduce took %.2f" % (time.time() - start))
self.set_header('Content-Type', 'application/json')
View
6 r3/app/server.py
@@ -10,6 +10,7 @@
import redis
from r3.app.app import R3ServiceApp
+from r3.app.config import Config
def main(arguments=None):
@@ -23,14 +24,17 @@ def main(arguments=None):
parser.add_argument('--redis-port', type=int, default=6379, help='the port that r³ will use to connect to redis')
parser.add_argument('--redis-db', type=int, default=0, help='the database that r³ will use to connect to redis')
parser.add_argument('--redis-pass', type=str, default='', help='the password that r³ will use to connect to redis')
+ parser.add_argument('-c', '--config-file', type=str, help='the config file that r³ will use to load input stream classes and reducers', required=True)
args = parser.parse_args(arguments)
+ cfg = Config(args.config_file)
+
c = redis.StrictRedis(host=args.redis_host, port=args.redis_port, db=args.redis_db, password=args.redis_pass)
logging.basicConfig(level=getattr(logging, args.loglevel.upper()))
- application = R3ServiceApp(redis=c, log_level=args.loglevel.upper())
+ application = R3ServiceApp(redis=c, config=cfg, log_level=args.loglevel.upper())
server = HTTPServer(application)
server.bind(args.port, args.bind)
View
12 r3/app/utils.py
@@ -27,3 +27,15 @@ def flush_dead_mappers(redis, mappers_key, ping_key):
redis.delete(ping_key % mapper)
+def kls_import(fullname):
+ if not '.' in fullname:
+ return __import__(fullname)
+
+ name_parts = fullname.split('.')
+ klass_name = name_parts[-1]
+ module_parts = name_parts[:-1]
+ module = reduce(getattr, module_parts[1:], __import__('.'.join(module_parts)))
+ klass = getattr(module, klass_name)
+ return klass
+
+
View
16 r3/worker/mapper.py
@@ -11,7 +11,7 @@
import redis
from ujson import loads, dumps
-from r3.app.utils import DATETIME_FORMAT
+from r3.app.utils import DATETIME_FORMAT, kls_import
from r3.app.keys import MAPPERS_KEY, JOB_TYPES_KEY, MAPPER_INPUT_KEY, MAPPER_WORKING_KEY, LAST_PING_KEY
class JobError(RuntimeError):
@@ -97,27 +97,21 @@ def map_item(self, item, json_item):
self.redis.delete(self.working_queue)
self.redis.delete('r3::mappers::%s::working' % self.full_name)
-def kls_import(fullname):
- name_parts = fullname.split('.')
- klass_name = name_parts[-1]
- module_parts = name_parts[:-1]
- module = reduce(getattr, module_parts[1:], __import__('.'.join(module_parts)))
- klass = getattr(module, klass_name)
- return klass
-
def main(arguments):
parser = argparse.ArgumentParser(description='runs the application that processes stream requests for r³')
parser.add_argument('-l', '--loglevel', type=str, default='warning', help='the log level that r³ will run under')
parser.add_argument('--redis-host', type=str, default='0.0.0.0', help='the ip that r³ will use to connect to redis')
parser.add_argument('--redis-port', type=int, default=6379, help='the port that r³ will use to connect to redis')
parser.add_argument('--redis-db', type=int, default=0, help='the database that r³ will use to connect to redis')
parser.add_argument('--redis-pass', type=str, default='', help='the password that r³ will use to connect to redis')
- parser.add_argument('--job-type', type=str, help='the job-type for this mapper', required=True)
parser.add_argument('--mapper-key', type=str, help='the unique identifier for this mapper', required=True)
parser.add_argument('--mapper-class', type=str, help='the fullname of the class that this mapper will run', required=True)
args = parser.parse_args(arguments)
+ if not args.mapper_key:
+ raise RuntimeError('The --mapper_key argument is required.')
+
logging.basicConfig(level=getattr(logging, args.loglevel.upper()))
try:
@@ -126,7 +120,7 @@ def main(arguments):
print "Could not import the specified %s class. Error: %s" % (args.mapper_class, err)
raise
- mapper = klass(args.job_type, args.mapper_key, redis_host=args.redis_host, redis_port=args.redis_port, redis_db=args.redis_db, redis_pass=args.redis_pass)
+ mapper = klass(klass.job_type, args.mapper_key, redis_host=args.redis_host, redis_port=args.redis_port, redis_db=args.redis_db, redis_pass=args.redis_pass)
try:
mapper.run_block()
except KeyboardInterrupt:
View
10 test/app_config.py
@@ -0,0 +1,10 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+
+INPUT_STREAMS = [
+ 'test.count_words_stream.CountWordsStream'
+]
+
+REDUCERS = [
+ 'test.count_words_reducer.CountWordsReducer'
+]
View
9 test/count_words_mapper.py
@@ -1,12 +1,11 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
-import sys
-import time
-
from r3.worker.mapper import Mapper
class CountWordsMapper(Mapper):
+ job_type = 'count-words'
+
def map(self, lines):
#time.sleep(0.5)
return list(self.split_words(lines))
@@ -15,7 +14,3 @@ def split_words(self, lines):
for line in lines:
for word in line.split():
yield word.strip().strip('.').strip(','), 1
-
-if __name__ == '__main__':
- mapper = CountWordsMapper("count-words", sys.argv[3], redis_host=sys.argv[1], redis_port=int(sys.argv[2]), redis_db=0, redis_pass='r3')
- mapper.run_block()
View
4 test/count_words_reducer.py
@@ -4,7 +4,9 @@
from collections import defaultdict
class CountWordsReducer:
- def reduce(self, items):
+ job_type = 'count-words'
+
+ def reduce(self, app, items):
word_freq = defaultdict(int)
for line in items:
for word, frequency in line:
View
3 test/count_words_stream.py
@@ -4,9 +4,10 @@
from os.path import abspath, dirname, join
class CountWordsStream:
+ job_type = 'count-words'
group_size = 1000
- def process(self, arguments):
+ def process(self, app, arguments):
with open(abspath(join(dirname(__file__), 'chekhov.txt'))) as f:
contents = f.readlines()

0 comments on commit 166c309

Please sign in to comment.
Something went wrong with that request. Please try again.