From aa9b80a02d0f1a93fcbad85a361da11c477c8123 Mon Sep 17 00:00:00 2001 From: Gleicon Moraes Date: Sun, 29 Apr 2012 00:13:46 -0300 Subject: [PATCH] first commit, needs polishing --- README | 15 +++++++ python/tinymq.py | 96 ++++++++++++++++++++++++++++++++++++++++ ruby/Gemfile | 8 ++++ ruby/Gemfile.lock | 50 +++++++++++++++++++++ ruby/c2.ru | 8 ++++ ruby/config.ru | 8 ++++ ruby/restmq.rb | 86 +++++++++++++++++++++++++++++++++++ ruby/restmq_async.rb | 80 +++++++++++++++++++++++++++++++++ ruby/restmq_synchrony.rb | 92 ++++++++++++++++++++++++++++++++++++++ ruby/stream.rb | 21 +++++++++ ruby/tinymq.rb | 71 +++++++++++++++++++++++++++++ ruby/web.rb | 24 ++++++++++ 12 files changed, 559 insertions(+) create mode 100644 README create mode 100644 python/tinymq.py create mode 100644 ruby/Gemfile create mode 100644 ruby/Gemfile.lock create mode 100644 ruby/c2.ru create mode 100644 ruby/config.ru create mode 100644 ruby/restmq.rb create mode 100644 ruby/restmq_async.rb create mode 100644 ruby/restmq_synchrony.rb create mode 100644 ruby/stream.rb create mode 100644 ruby/tinymq.rb create mode 100644 ruby/web.rb diff --git a/README b/README new file mode 100644 index 0000000..d1d28d3 --- /dev/null +++ b/README @@ -0,0 +1,15 @@ +Tiny RestMQ implementation using gevent + bottle. + +run: $ python tinymq.py + +test: +$ curl -d "value=oy" http://localhost:8081/q/test +$ curl http://localhost:8081/q/test + +test COMET: +$ curl http://localhost:8081/c/test + +in another terminal, execute: +$ curl -d "value=oy" http://localhost:8081/q/test + + diff --git a/python/tinymq.py b/python/tinymq.py new file mode 100644 index 0000000..a7a8abd --- /dev/null +++ b/python/tinymq.py @@ -0,0 +1,96 @@ +from bottle import run, debug, abort, request, ServerAdapter, response, get, post +import redis +import json +import gevent +from gevent.queue import Queue +from Queue import Empty + +from gevent import monkey +from collections import defaultdict +monkey.patch_all() + +QUEUESET = 'QUEUESET' +UUID_SUFFIX = ':UUID' +QUEUE_SUFFIX = ':queue' + +redis_cli = redis.Redis() +_queue_presence = defaultdict(lambda :Queue()) + +def queue_hub(): + while True: + queues = list(redis_cli.smembers(QUEUESET)) + queues = map(lambda x : "%s%s" % (x, QUEUE_SUFFIX), queues) + if len(queues) > 0: + res = redis_cli.brpop(queues) + if res is not None: + _queue_presence[res[0]].put(res[1]) + +gevent.spawn(queue_hub) + +@get('/') +def all_queues(): + qs = redis_cli.smembers(QUEUESET) + if qs is None: abort(404, "No queues created") + l = ["/q/%s" % q for q in qs] + return json.dumps(l) + +@get('/c/:q') +def queue(q = None): + if q is None: abort(404, "No queue given") + qn = q + QUEUE_SUFFIX + response.set_header("Transfer-Encoding", "chunked") + response.content_type = 'application/json; charset=UTF-8' + redis_cli.sadd(QUEUESET, q) + v = b = None + yield " \n\n" # clean the pipes + while(True): + try: + b = _queue_presence[qn].get(block=True, timeout=60) + if b is not None: v = redis_cli.get(b) + except Empty, e: + pass + + if v is not None: + yield "%s\n" % json.dumps({"value": v, "key": b}) + v = None + +@get('/q/:q') +def queue(q = None): + if q is None: abort(404, "No queue given") + redis_cli.sadd(QUEUESET, q) + q = q + QUEUE_SUFFIX + + if "soft" in request.GET.keys(): b = redis_cli.lindex(q, -1) + else: b = redis_cli.rpop(q) + + if b is None: abort(404, "Empty queue") + v = redis_cli.get(b) + + if v is None: abort(404, "Empty value/no val") + return json.dumps({"value": v, "key": b}) + +@post('/q/:q') +def queue(q = None): + if q is None: abort(404, "No queue given") + qkey = q + QUEUE_SUFFIX + + value = request.POST['value'] + if value is None: abort(401, "No value given") + + uuid = redis_cli.incr(q + UUID_SUFFIX) + redis_cli.sadd(QUEUESET, q) + + lkey = "%s:%d" % (q, uuid) + redis_cli.set(lkey, value) + redis_cli.lpush(qkey, lkey) + return json.dumps({"ok": lkey}) + +class GEventServerAdapter(ServerAdapter): + def run(self, handler): + from gevent import monkey + monkey.patch_socket() + from gevent.wsgi import WSGIServer + WSGIServer((self.host, self.port), handler).serve_forever() + +#debug(True) +run(host='localhost', port=8081, server=GEventServerAdapter) diff --git a/ruby/Gemfile b/ruby/Gemfile new file mode 100644 index 0000000..dfab2aa --- /dev/null +++ b/ruby/Gemfile @@ -0,0 +1,8 @@ +source "http://rubygems.org" +gem "eventmachine" +gem "thin" +gem "sinatra" +gem "json" +gem 'redis' +gem 'em-synchrony' +gem 'sinatra-synchrony' diff --git a/ruby/Gemfile.lock b/ruby/Gemfile.lock new file mode 100644 index 0000000..387cbd9 --- /dev/null +++ b/ruby/Gemfile.lock @@ -0,0 +1,50 @@ +GEM + remote: http://rubygems.org/ + specs: + addressable (2.2.7) + async-rack (0.5.1) + rack (~> 1.1) + daemons (1.1.8) + em-http-request (0.3.0) + addressable (>= 2.0.0) + escape_utils + eventmachine (>= 0.12.9) + em-resolv-replace (1.1.2) + em-synchrony (0.2.0) + eventmachine (>= 0.12.9) + escape_utils (0.2.4) + eventmachine (0.12.10) + json (1.6.5) + rack (1.4.1) + rack-fiber_pool (0.9.1) + rack-protection (1.2.0) + rack + redis (2.2.2) + sinatra (1.3.2) + rack (~> 1.3, >= 1.3.6) + rack-protection (~> 1.2) + tilt (~> 1.3, >= 1.3.3) + sinatra-synchrony (0.1.1) + async-rack + em-http-request (= 0.3.0) + em-resolv-replace + em-synchrony (= 0.2.0) + rack-fiber_pool (= 0.9.1) + sinatra (>= 1.0) + thin (1.3.1) + daemons (>= 1.0.9) + eventmachine (>= 0.12.6) + rack (>= 1.0.0) + tilt (1.3.3) + +PLATFORMS + ruby + +DEPENDENCIES + em-synchrony + eventmachine + json + redis + sinatra + sinatra-synchrony + thin diff --git a/ruby/c2.ru b/ruby/c2.ru new file mode 100644 index 0000000..fb96e1b --- /dev/null +++ b/ruby/c2.ru @@ -0,0 +1,8 @@ +require 'rubygems' +require 'sinatra' +set :env, :production +disable :run + +require './web' + +run Sinatra::Application diff --git a/ruby/config.ru b/ruby/config.ru new file mode 100644 index 0000000..7cc0596 --- /dev/null +++ b/ruby/config.ru @@ -0,0 +1,8 @@ +require 'rubygems' +require 'sinatra' +set :env, :production +disable :run + +require './tinymq' + +run Sinatra::Application diff --git a/ruby/restmq.rb b/ruby/restmq.rb new file mode 100644 index 0000000..0443f77 --- /dev/null +++ b/ruby/restmq.rb @@ -0,0 +1,86 @@ +require 'rubygems' +require "bundler/setup" + +require 'sinatra' +require 'redis' +require 'json' +require 'em-synchrony' +require 'sinatra/synchrony' + +QUEUESET = 'QUEUESET' # queue index +UUID_SUFFIX = ':UUID' # queue unique id +QUEUE_SUFFIX = ':queue' # suffix to identify each queue's LIST + +class RestMQ < Sinatra::Base + register Sinatra::Synchrony + enable :show_exceptions + + def initialize + super + @reds = Redis.new + $_queue_presence = Hash.new { |h,k| h[k] = Array.new } + EM.add_periodic_timer(1) do dispatcher end + end + + def dispatcher + puts $_queue_presence + queues = @reds.smembers QUEUESET + queues.map! do |q| "#{q}#{QUEUE_SUFFIX}" end + if queues.size > 0 then + q, k= @reds.brpop *queues, 2 + return unless q != nil + v = @reds.get k + puts "--" + puts k + puts q + puts v + puts "--" + $_queue_presence[q].each do |out| + out << "#{v}\n" unless v == nil + end + end + end + + get '/q' do + b = @reds.smembers QUEUESET + throw :halt, [404, 'Not found (empty queueset)'] if b == nil + b.map! do |q| q = '/q/'+q end + b.to_json + end + + get '/q/:queue' do |queue| + soft = params['soft'] # soft = true doesn't rpop values + throw :halt, [404, 'Not found'] if queue == nil + queue = queue + QUEUE_SUFFIX + if soft != nil + b = @reds.lindex queue, -1 + else + b = @reds.rpop queue + end + throw :halt, [404, 'Not found (empty queue)'] if b == nil + v = @reds.get b + throw :halt, [200, "{'value':" + v + ", 'key':" + b + "}"] unless v == nil + 'empty value' + end + + post '/q/*' do |queue| + value = params['value'].to_s + throw :halt, [404, "Not found"] if queue == nil + q1 = queue + QUEUE_SUFFIX + uuid = @reds.incr queue + UUID_SUFFIX + @reds.sadd QUEUESET, q1 + lkey = queue + ':' + uuid.to_s + @reds.set lkey, value + @reds.lpush q1, lkey + body '{ok, ' + lkey + '}' + end + + get '/c/*' do |queue| + stream do |out| + out << "\n" + $_queue_presence[queue] << out + $_queue_presence[queue].delete out + end + end + +end diff --git a/ruby/restmq_async.rb b/ruby/restmq_async.rb new file mode 100644 index 0000000..aad4904 --- /dev/null +++ b/ruby/restmq_async.rb @@ -0,0 +1,80 @@ +# Sinatra minimalist RestMQ +# no COMET, just /q/ routes and queue logic +# the core of RestMQ is how it uses Redis' data types + +require 'rubygems' +require "bundler/setup" + +require 'eventmachine' +require 'sinatra/async' +require 'redis' +require 'json' + +QUEUESET = 'QUEUESET' # queue index +UUID_SUFFIX = ':UUID' # queue unique id +QUEUE_SUFFIX = ':queue' # suffix to identify each queue's LIST + +class CometProducer + include EM::Deferrable + def produce_forever(body, queue) + body.call ['oie\n'] + puts queue + puts body + while true do + body.call ['oy #{queue}'] + sleep 5 + end + end +end + + +class RestMQ < Sinatra::Base + register Sinatra::Async + enable :show_exceptions + + def initialize + super + @reds = Redis.new + @message_hub = Hash.new { |h,k| h[k] = EM::Queue.new } + end + + aget '/q' do + b = @reds.smembers QUEUESET + throw :halt, [404, 'Not found (empty queueset)'] if b == nil + b.map! do |q| q = '/q/'+q end + b.to_json + end + + aget '/q/:queue' do |queue| + soft = params['soft'] # soft = true doesn't rpop values + throw :halt, [404, 'Not found'] if queue == nil + queue = queue + QUEUE_SUFFIX + if soft != nil + b = @reds.lindex queue, -1 + else + b = @reds.rpop queue + end + throw :halt, [404, 'Not found (empty queue)'] if b == nil + v = @reds.get b + throw :halt, [200, "{'value':" + v + ", 'key':" + b + "}"] unless v == nil + 'empty value' + end + + apost '/q/*' do |queue| + value = params['value'].to_s + throw :halt, [404, "Not found"] if queue == nil + q1 = queue + QUEUE_SUFFIX + uuid = @reds.incr queue + UUID_SUFFIX + @reds.sadd QUEUESET, q1 + lkey = queue + ':' + uuid.to_s + @reds.set lkey, value + @reds.lpush q1, lkey + body '{ok, ' + lkey + '}' + end + + aget '/c/:queue' do |queue| + cp = CometProducer.new + Thread.new { cp.produce_forever body, queue } + end + +end diff --git a/ruby/restmq_synchrony.rb b/ruby/restmq_synchrony.rb new file mode 100644 index 0000000..ce4df7a --- /dev/null +++ b/ruby/restmq_synchrony.rb @@ -0,0 +1,92 @@ +# Sinatra minimalist RestMQ + +require 'rubygems' +require "bundler/setup" + +require 'sinatra' +require 'redis' +require 'json' +require 'em-synchrony' +require 'sinatra/synchrony' + +QUEUESET = 'QUEUESET' # queue index +UUID_SUFFIX = ':UUID' # queue unique id +QUEUE_SUFFIX = ':queue' # suffix to identify each queue's LIST + +class RestMQ < Sinatra::Base + register Sinatra::Synchrony + enable :show_exceptions + + def initialize + super + @reds = Redis.new + $_queue_presence = Hash.new { |h,k| h[k] = Array.new } + dispatcher + end + + def dispatcher + EM.synchrony do + while true do + puts $_queue_presence + queues = @reds.smembers QUEUESET + queues.map! do |q| "#{q}#{QUEUE_SUFFIX}" end + if queues.size > 0 then + q, k= @reds.brpop *queues, 2 + EM::Synchrony.sleep(1) unless q != nil + v = @reds.get k + puts "--" + puts k + puts q + puts v + puts "--" + $_queue_presence[q].each do |out| + out << "#{v}\n" unless v != nil + end + end + end + end + end + + get '/q' do + b = @reds.smembers QUEUESET + throw :halt, [404, 'Not found (empty queueset)'] if b == nil + b.map! do |q| q = '/q/'+q end + b.to_json + end + + get '/q/:queue' do |queue| + soft = params['soft'] # soft = true doesn't rpop values + throw :halt, [404, 'Not found'] if queue == nil + queue = queue + QUEUE_SUFFIX + if soft != nil + b = @reds.lindex queue, -1 + else + b = @reds.rpop queue + end + throw :halt, [404, 'Not found (empty queue)'] if b == nil + v = @reds.get b + throw :halt, [200, "{'value':" + v + ", 'key':" + b + "}"] unless v == nil + 'empty value' + end + + post '/q/*' do |queue| + value = params['value'].to_s + throw :halt, [404, "Not found"] if queue == nil + q1 = queue + QUEUE_SUFFIX + uuid = @reds.incr queue + UUID_SUFFIX + @reds.sadd QUEUESET, q1 + lkey = queue + ':' + uuid.to_s + @reds.set lkey, value + @reds.lpush q1, lkey + body '{ok, ' + lkey + '}' + end + + get '/c/*' do |queue| + stream do |out| + out << "\n" + $_queue_presence[queue] << out + $_queue_presence[queue].delete out + end + end + +end diff --git a/ruby/stream.rb b/ruby/stream.rb new file mode 100644 index 0000000..631ae02 --- /dev/null +++ b/ruby/stream.rb @@ -0,0 +1,21 @@ +require 'sinatra' + +set :server, :thin +connections = [] +queue_presence = Hash.new { |h,k| h[k] = Array.new } + +get '/:queue' do |queue| + # keep stream open + stream(:keep_open) do |out| + queue_presence[queue] << out + end +end + +post '/:queue' do |queue| + # write to all open streams + queue_presence[queue].each do |out| + out << params[:message] << "\n" + end + "message sent" +end + diff --git a/ruby/tinymq.rb b/ruby/tinymq.rb new file mode 100644 index 0000000..0f834d0 --- /dev/null +++ b/ruby/tinymq.rb @@ -0,0 +1,71 @@ +require 'rubygems' +require "bundler/setup" +require 'sinatra' +require 'redis' +require 'json' + +QUEUESET = 'QUEUESET' # queue index +UUID_SUFFIX = ':UUID' # queue unique id +QUEUE_SUFFIX = ':queue' # suffix to identify each queue's LIST + +set :server, :thin + +reds = Redis.new +_queue_presence = Hash.new { |h,k| h[k] = Array.new } + +Thread.new do + while true do + queues = reds.smembers QUEUESET + queues.map! do |q| "#{q}#{QUEUE_SUFFIX}" end + if queues.size > 0 then + q, k= reds.brpop *queues, 5 + v = reds.get k + _queue_presence[q].each do |out| + b = {"value"=>v, "key"=>k} + out << b.to_json << "\n" + end + end + end +end + +get '/q' do + b = reds.smembers QUEUESET + throw :halt, [404, 'Not found (empty queueset)'] if b == nil + b.map! do |q| q = '/q/'+q end + b.to_json +end + +get '/q/:queue' do |queue| + soft = params['soft'] # soft = true doesn't rpop values + throw :halt, [404, 'Not found'] if queue == nil + queue = queue + QUEUE_SUFFIX + if soft != nil + b = reds.lindex queue, -1 + else + b = reds.rpop queue + end + throw :halt, [404, 'Not found (empty queue)'] if b == nil + v = reds.get b + r = {"value"=>v, "key"=>b} + throw :halt, [200, r.to_json] unless v == nil + 'empty value' +end + +post '/q/*' do |queue| + value = params['value'].to_s + throw :halt, [404, "Not found"] if queue == nil + q1 = queue + QUEUE_SUFFIX + uuid = reds.incr queue + UUID_SUFFIX + reds.sadd QUEUESET, q1 + lkey = queue + ':' + uuid.to_s + reds.set lkey, value + reds.lpush q1, lkey + body '{ok, ' + lkey + '}' +end + +get '/c/*' do |queue| + queue = queue + QUEUE_SUFFIX + stream(keep_open=true) do |out| + _queue_presence[queue] << out + end +end diff --git a/ruby/web.rb b/ruby/web.rb new file mode 100644 index 0000000..5e877f9 --- /dev/null +++ b/ruby/web.rb @@ -0,0 +1,24 @@ +require 'redis' +require 'sinatra' + +configure do + redis_url = ENV["REDISTOGO_URL"] || "redis://localhost:6379" + uri = URI.parse(redis_url) + set :redis, Redis.new(:host => uri.host, :port => uri.port, :password => uri.password) +end + +get '/' do + "
curl -v https://sinatra-streaming-example.herokuapp.com/stream
" +end + +get '/stream' do + puts "connection made" + + stream do |out| + settings.redis.subscribe 'time' do |on| + on.message do |channel, message| + out << "#{message}\n" + end + end + end +end