Skip to content

Commit

Permalink
working... kinda...
Browse files Browse the repository at this point in the history
  • Loading branch information
heynemann committed Jul 10, 2012
1 parent a08f372 commit b665a51
Show file tree
Hide file tree
Showing 11 changed files with 10,029 additions and 5 deletions.
21 changes: 21 additions & 0 deletions r3/app/app.py
Expand Up @@ -18,4 +18,25 @@ def __init__(self, redis, log_level):
(r'/stream', StreamHandler),
]

self.redis.delete('r3::mappers')

self.load_input_streams()
self.load_reducers()

super(R3ServiceApp, self).__init__(handlers)

def load_input_streams(self):
self.input_streams = {}

stream = __import__('count_words_stream')

self.input_streams['count-words'] = stream.CountWordsStream()

def load_reducers(self):
self.reducers = {}

reducer = __import__('count_words_reducer')

self.reducers['count-words'] = reducer.CountWordsReducer()


67 changes: 62 additions & 5 deletions r3/app/handlers/stream.py
@@ -1,6 +1,10 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-

import time
from uuid import uuid4
from ujson import dumps, loads

import tornado.web
import tornado.gen

Expand All @@ -10,11 +14,64 @@ class StreamHandler(BaseHandler):
@tornado.web.asynchronous
@tornado.gen.engine
def get(self):
yield tornado.gen.Task(self.redis.set, 'foo', 'something')
something = yield tornado.gen.Task(self.redis.get, 'foo')
arguments = self.request.arguments
job_key = arguments['key'][0]
job_id = uuid4()

#mapper_input_queue = 'r3::jobs::%s::input' % job_key
#self.redis.delete(mapper_input_queue)
#return

start = time.time()
input_stream = self.application.input_streams[job_key]
items = input_stream.process(arguments)
print "input stream took %.2f" % (time.time() - start)

mapper_input_queue = 'r3::jobs::%s::input' % job_key
mapper_output_queue = 'r3::jobs::%s::%s::output' % (job_key, job_id)

start = time.time()
for item in items:
msg = {
'output_queue': mapper_output_queue,
'job_id': str(job_id),
'job_key': job_key,
'item': item
}
yield tornado.gen.Task(self.redis.rpush, mapper_input_queue, dumps(msg))

#pipe = self.redis.pipeline()
#results = []
#for i in range(len(items)):
#result = yield tornado.gen.Task(self.redis.blpop, mapper_output_queue)
#results.append(loads(result))
#pipe.blpop(mapper_output_queue)
#results = yield tornado.gen.Task(pipe.execute)
#results = [loads(item) for item in results if item is not None]
#print len(results)
#print len(items)

results = []
#mapped_items = yield tornado.gen.Task(self.redis.llen, mapper_output_queue)
def wtf(item):
import ipdb;ipdb.set_trace()

#while (len(results) < len(items)):
#print "%d < %d" % (len(results), len(items))
self.redis.blpop(mapper_output_queue, callback=wtf)
#item = loads(item)
#results.append(item)

#print "map took %.2f" % (time.time() - start)

#start = time.time()
#reducer = self.application.reducers[job_key]
#result = reducer.reduce(results)
#print "reduce took %.2f" % (time.time() - start)

#self.set_header('Content-Type', 'application/json')

self.set_header('Content-Type', 'text/html')
#self.write(dumps(result))

self.write(something)
self.finish()
#self.finish()

Empty file added r3/worker/__init__.py
Empty file.
89 changes: 89 additions & 0 deletions r3/worker/mapper.py
@@ -0,0 +1,89 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-

import signal
import time
from datetime import datetime
import sys
import os

import redis
from ujson import loads, dumps

from r3.app.utils import logger

class JobError(RuntimeError):
pass

class CrashError(JobError):
pass

class TimeoutError(JobError):
pass

class Mapper:
def __init__(self, key, redis_host, redis_port, redis_db, redis_pass):
self.mapper_key = key
self.redis = redis.StrictRedis(host=redis_host, port=redis_port, db=redis_db, password=redis_pass)
self.timeout = None
self.initialize()
print "MAPPER UP - PID: %s" % os.getpid()

def initialize(self):
pass

def map(self):
raise NotImplementedError()

def run_block(self):
while True:
mapper_input_queue = 'r3::jobs::%s::input' % self.mapper_key

#print "waiting for item..."
key, item = self.redis.blpop(mapper_input_queue, timeout=0)
#print item

item = loads(item)
self.child = os.fork()
if self.child:
try:
start = datetime.now()

# waits for the result or times out
while True:
pid, status = os.waitpid(self.child, os.WNOHANG)
if pid != 0:
if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0:
break
if os.WIFSTOPPED(status):
logger.warning("Process stopped by signal %d" % os.WSTOPSIG(status))
else:
if os.WIFSIGNALED(status):
raise CrashError("Unexpected exit by signal %d" % os.WTERMSIG(status))
raise CrashError("Unexpected exit status %d" % os.WEXITSTATUS(status))

time.sleep(0.0005)

now = datetime.now()
if self.timeout and ((now - start).seconds > self.timeout):
os.kill(self.child, signal.SIGKILL)
os.waitpid(-1, os.WNOHANG)
raise TimeoutError("Timed out after %d seconds" % self.timeout)

except OSError as ose:
import errno

if ose.errno != errno.EINTR:
raise ose
except JobError, e:
logger.error(str(e))

else:
result = dumps(self.map(item['item']))
self.redis.lpush(item['output_queue'], result)
os._exit(0)


if __name__ == '__main__':
mapper = Mapper("generic-mapper", redis_host=sys.argv[1], redis_port=int(sys.argv[2]), redis_db=0, redis_pass='r3')
mapper.run_block()
Empty file added test/__init__.py
Empty file.

0 comments on commit b665a51

Please sign in to comment.