Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Rapid prototype of master / worker over 0MQ

  • Loading branch information...
commit 6734acc7ab02164b3eeec00c23ab11b86149e14a 1 parent ca6930b
Paul Bergeron authored
View
4 Gemfile
@@ -3,6 +3,10 @@ source "http://rubygems.org"
gem 'beanstalk-client'
gem 'zmq'
+gem 'eventmachine'
+gem 'ffi'
+gem 'em-zeromq'
+gem 'json'
# Add dependencies to develop your gem here.
# Include everything needed to run rake, tests, features, etc.
View
11 Gemfile.lock
@@ -3,11 +3,18 @@ GEM
specs:
beanstalk-client (1.1.0)
diff-lcs (1.1.3)
+ em-zeromq (0.2.0)
+ eventmachine (>= 0.12.10)
+ ffi-rzmq (>= 0.7.2)
+ eventmachine (0.12.10)
+ ffi (1.0.9)
+ ffi-rzmq (0.8.0)
git (1.2.5)
jeweler (1.6.4)
bundler (~> 1.0)
git (>= 1.2.5)
rake
+ json (1.5.4)
rake (0.9.2)
rcov (0.9.10)
rspec (2.3.0)
@@ -26,7 +33,11 @@ PLATFORMS
DEPENDENCIES
beanstalk-client
bundler (~> 1.0.0)
+ em-zeromq
+ eventmachine
+ ffi
jeweler (~> 1.6.4)
+ json
rcov
rspec (~> 2.3.0)
zmq
View
0  lib/master.rb
No changes.
View
53 lib/metl.rb
@@ -0,0 +1,53 @@
+require 'beanstalk-client'
+require 'em-zeromq'
+require 'json'
+
+Thread.abort_on_exception = true
+
+class WorkerSubHandler
+ attr_reader :received
+
+ def initialize(client_ctx)
+
+ end
+
+ def on_readable(socket, messages)
+ messages.each do |m|
+ puts m.copy_out_string
+ end
+ end
+end
+
+if fork.nil?
+ exec("beanstalkd")
+else
+ sleep(1)
+ EM.run do
+ beanstalk = Beanstalk::Pool.new(['127.0.0.1:11300'])
+ beanstalk.put("{'test':'value'}")
+
+ worker_ctx = EM::ZeroMQ::Context.new(1)
+ client_ctx = EM::ZeroMQ::Context.new(1)
+
+ worker_sub_handler = WorkerSubHandler.new client_ctx
+
+ # setup push sockets
+ worker_sub = worker_ctx.bind( ZMQ::SUB, 'ipc:///tmp/metl_sub.zmqsock', worker_sub_handler)
+ worker_sub.subscribe 'test'
+ worker_push = worker_ctx.bind( ZMQ::PUSH, 'ipc:///tmp/metl_push.zmqsock')
+
+ client_pub = client_ctx.bind( ZMQ::PUB, 'tcp://127.0.0.1:65431')
+ client_pull = client_ctx.bind( ZMQ::PULL, 'tcp://127.0.0.1:65432')
+
+ EM::PeriodicTimer.new(0.1) do
+ job = beanstalk.reserve
+ worker_push.send_msg job.body
+ job.delete
+ end
+ end
+end
+
+
+
+
+
View
26 lib/worker.rb
@@ -0,0 +1,26 @@
+require 'em-zeromq'
+
+class Worker
+ attr_reader :received
+
+ def initialize(ctx)
+ @pub = ctx.connect( ZMQ::PUB, 'ipc:///tmp/metl_sub.zmqsock')
+ end
+
+ def on_readable(socket, messages)
+ messages.each do |m|
+ puts m.copy_out_string
+ @pub.send_msg "test|#{m.copy_out_string}"
+ end
+ end
+end
+
+
+EM.run do
+ ctx = EM::ZeroMQ::Context.new(1)
+
+ worker = Worker.new(ctx)
+
+ # setup push sockets
+ pull = ctx.connect( ZMQ::PULL, 'ipc:///tmp/metl_push.zmqsock', worker)
+end
Please sign in to comment.
Something went wrong with that request. Please try again.