Skip to content

Commit

Permalink
first commit, needs polishing
Browse files Browse the repository at this point in the history
  • Loading branch information
gleicon committed Apr 29, 2012
0 parents commit aa9b80a
Show file tree
Hide file tree
Showing 12 changed files with 559 additions and 0 deletions.
15 changes: 15 additions & 0 deletions README
@@ -0,0 +1,15 @@
Tiny RestMQ implementation using gevent + bottle.

run: $ python tinymq.py

test:
$ curl -d "value=oy" http://localhost:8081/q/test
$ curl http://localhost:8081/q/test

test COMET:
$ curl http://localhost:8081/c/test

in another terminal, execute:
$ curl -d "value=oy" http://localhost:8081/q/test


96 changes: 96 additions & 0 deletions python/tinymq.py
@@ -0,0 +1,96 @@
from bottle import run, debug, abort, request, ServerAdapter, response, get, post
import redis
import json
import gevent
from gevent.queue import Queue
from Queue import Empty

from gevent import monkey
from collections import defaultdict
monkey.patch_all()

QUEUESET = 'QUEUESET'
UUID_SUFFIX = ':UUID'
QUEUE_SUFFIX = ':queue'

redis_cli = redis.Redis()
_queue_presence = defaultdict(lambda :Queue())

def queue_hub():
while True:
queues = list(redis_cli.smembers(QUEUESET))
queues = map(lambda x : "%s%s" % (x, QUEUE_SUFFIX), queues)
if len(queues) > 0:
res = redis_cli.brpop(queues)
if res is not None:
_queue_presence[res[0]].put(res[1])

gevent.spawn(queue_hub)

@get('/')
def all_queues():
qs = redis_cli.smembers(QUEUESET)
if qs is None: abort(404, "No queues created")
l = ["/q/%s" % q for q in qs]
return json.dumps(l)

@get('/c/:q')
def queue(q = None):
if q is None: abort(404, "No queue given")
qn = q + QUEUE_SUFFIX
response.set_header("Transfer-Encoding", "chunked")
response.content_type = 'application/json; charset=UTF-8'
redis_cli.sadd(QUEUESET, q)
v = b = None
yield " \n\n" # clean the pipes
while(True):
try:
b = _queue_presence[qn].get(block=True, timeout=60)
if b is not None: v = redis_cli.get(b)
except Empty, e:
pass

if v is not None:
yield "%s\n" % json.dumps({"value": v, "key": b})
v = None

@get('/q/:q')
def queue(q = None):
if q is None: abort(404, "No queue given")
redis_cli.sadd(QUEUESET, q)
q = q + QUEUE_SUFFIX

if "soft" in request.GET.keys(): b = redis_cli.lindex(q, -1)
else: b = redis_cli.rpop(q)

if b is None: abort(404, "Empty queue")
v = redis_cli.get(b)

if v is None: abort(404, "Empty value/no val")
return json.dumps({"value": v, "key": b})

@post('/q/:q')
def queue(q = None):
if q is None: abort(404, "No queue given")
qkey = q + QUEUE_SUFFIX

value = request.POST['value']
if value is None: abort(401, "No value given")

uuid = redis_cli.incr(q + UUID_SUFFIX)
redis_cli.sadd(QUEUESET, q)

lkey = "%s:%d" % (q, uuid)
redis_cli.set(lkey, value)
redis_cli.lpush(qkey, lkey)
return json.dumps({"ok": lkey})

class GEventServerAdapter(ServerAdapter):
def run(self, handler):
from gevent import monkey
monkey.patch_socket()
from gevent.wsgi import WSGIServer
WSGIServer((self.host, self.port), handler).serve_forever()

#debug(True)
run(host='localhost', port=8081, server=GEventServerAdapter)
8 changes: 8 additions & 0 deletions ruby/Gemfile
@@ -0,0 +1,8 @@
source "http://rubygems.org"
gem "eventmachine"
gem "thin"
gem "sinatra"
gem "json"
gem 'redis'
gem 'em-synchrony'
gem 'sinatra-synchrony'
50 changes: 50 additions & 0 deletions ruby/Gemfile.lock
@@ -0,0 +1,50 @@
GEM
remote: http://rubygems.org/
specs:
addressable (2.2.7)
async-rack (0.5.1)
rack (~> 1.1)
daemons (1.1.8)
em-http-request (0.3.0)
addressable (>= 2.0.0)
escape_utils
eventmachine (>= 0.12.9)
em-resolv-replace (1.1.2)
em-synchrony (0.2.0)
eventmachine (>= 0.12.9)
escape_utils (0.2.4)
eventmachine (0.12.10)
json (1.6.5)
rack (1.4.1)
rack-fiber_pool (0.9.1)
rack-protection (1.2.0)
rack
redis (2.2.2)
sinatra (1.3.2)
rack (~> 1.3, >= 1.3.6)
rack-protection (~> 1.2)
tilt (~> 1.3, >= 1.3.3)
sinatra-synchrony (0.1.1)
async-rack
em-http-request (= 0.3.0)
em-resolv-replace
em-synchrony (= 0.2.0)
rack-fiber_pool (= 0.9.1)
sinatra (>= 1.0)
thin (1.3.1)
daemons (>= 1.0.9)
eventmachine (>= 0.12.6)
rack (>= 1.0.0)
tilt (1.3.3)

PLATFORMS
ruby

DEPENDENCIES
em-synchrony
eventmachine
json
redis
sinatra
sinatra-synchrony
thin
8 changes: 8 additions & 0 deletions ruby/c2.ru
@@ -0,0 +1,8 @@
require 'rubygems'
require 'sinatra'
set :env, :production
disable :run

require './web'

run Sinatra::Application
8 changes: 8 additions & 0 deletions ruby/config.ru
@@ -0,0 +1,8 @@
require 'rubygems'
require 'sinatra'
set :env, :production
disable :run

require './tinymq'

run Sinatra::Application
86 changes: 86 additions & 0 deletions ruby/restmq.rb
@@ -0,0 +1,86 @@
require 'rubygems'
require "bundler/setup"

require 'sinatra'
require 'redis'
require 'json'
require 'em-synchrony'
require 'sinatra/synchrony'

QUEUESET = 'QUEUESET' # queue index
UUID_SUFFIX = ':UUID' # queue unique id
QUEUE_SUFFIX = ':queue' # suffix to identify each queue's LIST

class RestMQ < Sinatra::Base
register Sinatra::Synchrony
enable :show_exceptions

def initialize
super
@reds = Redis.new
$_queue_presence = Hash.new { |h,k| h[k] = Array.new }
EM.add_periodic_timer(1) do dispatcher end
end

def dispatcher
puts $_queue_presence
queues = @reds.smembers QUEUESET
queues.map! do |q| "#{q}#{QUEUE_SUFFIX}" end
if queues.size > 0 then
q, k= @reds.brpop *queues, 2
return unless q != nil
v = @reds.get k
puts "--"
puts k
puts q
puts v
puts "--"
$_queue_presence[q].each do |out|
out << "#{v}\n" unless v == nil
end
end
end

get '/q' do
b = @reds.smembers QUEUESET
throw :halt, [404, 'Not found (empty queueset)'] if b == nil
b.map! do |q| q = '/q/'+q end
b.to_json
end

get '/q/:queue' do |queue|
soft = params['soft'] # soft = true doesn't rpop values
throw :halt, [404, 'Not found'] if queue == nil
queue = queue + QUEUE_SUFFIX
if soft != nil
b = @reds.lindex queue, -1
else
b = @reds.rpop queue
end
throw :halt, [404, 'Not found (empty queue)'] if b == nil
v = @reds.get b
throw :halt, [200, "{'value':" + v + ", 'key':" + b + "}"] unless v == nil
'empty value'
end

post '/q/*' do |queue|
value = params['value'].to_s
throw :halt, [404, "Not found"] if queue == nil
q1 = queue + QUEUE_SUFFIX
uuid = @reds.incr queue + UUID_SUFFIX
@reds.sadd QUEUESET, q1
lkey = queue + ':' + uuid.to_s
@reds.set lkey, value
@reds.lpush q1, lkey
body '{ok, ' + lkey + '}'
end

get '/c/*' do |queue|
stream do |out|
out << "\n"
$_queue_presence[queue] << out
$_queue_presence[queue].delete out
end
end

end
80 changes: 80 additions & 0 deletions ruby/restmq_async.rb
@@ -0,0 +1,80 @@
# Sinatra minimalist RestMQ
# no COMET, just /q/ routes and queue logic
# the core of RestMQ is how it uses Redis' data types

require 'rubygems'
require "bundler/setup"

require 'eventmachine'
require 'sinatra/async'
require 'redis'
require 'json'

QUEUESET = 'QUEUESET' # queue index
UUID_SUFFIX = ':UUID' # queue unique id
QUEUE_SUFFIX = ':queue' # suffix to identify each queue's LIST

class CometProducer
include EM::Deferrable
def produce_forever(body, queue)
body.call ['oie\n']
puts queue
puts body
while true do
body.call ['oy #{queue}']
sleep 5
end
end
end


class RestMQ < Sinatra::Base
register Sinatra::Async
enable :show_exceptions

def initialize
super
@reds = Redis.new
@message_hub = Hash.new { |h,k| h[k] = EM::Queue.new }
end

aget '/q' do
b = @reds.smembers QUEUESET
throw :halt, [404, 'Not found (empty queueset)'] if b == nil
b.map! do |q| q = '/q/'+q end
b.to_json
end

aget '/q/:queue' do |queue|
soft = params['soft'] # soft = true doesn't rpop values
throw :halt, [404, 'Not found'] if queue == nil
queue = queue + QUEUE_SUFFIX
if soft != nil
b = @reds.lindex queue, -1
else
b = @reds.rpop queue
end
throw :halt, [404, 'Not found (empty queue)'] if b == nil
v = @reds.get b
throw :halt, [200, "{'value':" + v + ", 'key':" + b + "}"] unless v == nil
'empty value'
end

apost '/q/*' do |queue|
value = params['value'].to_s
throw :halt, [404, "Not found"] if queue == nil
q1 = queue + QUEUE_SUFFIX
uuid = @reds.incr queue + UUID_SUFFIX
@reds.sadd QUEUESET, q1
lkey = queue + ':' + uuid.to_s
@reds.set lkey, value
@reds.lpush q1, lkey
body '{ok, ' + lkey + '}'
end

aget '/c/:queue' do |queue|
cp = CometProducer.new
Thread.new { cp.produce_forever body, queue }
end

end

0 comments on commit aa9b80a

Please sign in to comment.