Skip to content
Browse files

Initial.

  • Loading branch information...
0 parents commit 2f105c4f90a176b82246b017e42e9cb9ed8ac67f @dpiddy committed Jul 17, 2011
Showing with 291 additions and 0 deletions.
  1. +9 −0 Gemfile
  2. +41 −0 Gemfile.lock
  3. +4 −0 README.md
  4. +105 −0 agent.rb
  5. +132 −0 manager.rb
9 Gemfile
@@ -0,0 +1,9 @@
+source :rubygems
+
+gem "em-http-request"
+gem "eventmachine"
+gem "nats"
+gem "rack-fiber_pool"
+gem "sinatra"
+gem "thin"
+gem "uuidtools"
41 Gemfile.lock
@@ -0,0 +1,41 @@
+GEM
+ remote: http://rubygems.org/
+ specs:
+ addressable (2.2.6)
+ daemons (1.1.4)
+ em-http-request (0.3.0)
+ addressable (>= 2.0.0)
+ escape_utils
+ eventmachine (>= 0.12.9)
+ escape_utils (0.2.3)
+ eventmachine (0.12.10)
+ json_pure (1.5.3)
+ nats (0.4.10)
+ daemons (>= 1.1.0)
+ eventmachine (>= 0.12.10)
+ json_pure (>= 1.5.1)
+ rack (1.3.1)
+ rack-fiber_pool (0.9.1)
+ redis (2.2.1)
+ sinatra (1.2.6)
+ rack (~> 1.1)
+ tilt (< 2.0, >= 1.2.2)
+ thin (1.2.11)
+ daemons (>= 1.0.9)
+ eventmachine (>= 0.12.6)
+ rack (>= 1.0.0)
+ tilt (1.3.2)
+ uuidtools (2.1.2)
+
+PLATFORMS
+ ruby
+
+DEPENDENCIES
+ em-http-request
+ eventmachine
+ nats
+ rack-fiber_pool
+ redis
+ sinatra
+ thin
+ uuidtools
4 README.md
@@ -0,0 +1,4 @@
+Kind of like [runit](http://smarden.org/runit/) and similar process
+management systems but distributed across machines using agents.
+
+Still experimenting.
105 agent.rb
@@ -0,0 +1,105 @@
+require "eventmachine"
+require "json"
+require "nats/client"
+require "sinatra/base"
+require "thin"
+require "uuidtools"
+
+module ChildProcess
+ attr_reader :data
+
+ def initialize(data)
+ @data = data.merge("uuid" => UUIDTools::UUID.random_create.to_s)
+ super
+ end
+
+ def uuid
+ @data["uuid"]
+ end
+
+ def name
+ @data["name"]
+ end
+
+ def kill(signal = "TERM")
+ puts "#{name} #{uuid} kill #{signal}"
+ begin
+ Process.kill(signal, get_pid)
+ EventMachine.add_timer(5) { kill("KILL") }
+ rescue Errno::ESRCH
+ puts "#{name} #{uuid} #{get_pid} gone"
+ end
+ end
+
+ def post_init
+ puts "#{name} #{uuid} #{get_pid} started with data #{data.inspect}"
+ end
+
+ def receive_data(data)
+ # send to log, note stream source (stdout/stderr)
+ puts "#{name} #{uuid} #{get_pid} output #{data.inspect}"
+ end
+
+ def unbind
+ status = get_status
+
+ postmortem = {
+ "name" => name,
+ "uuid" => uuid,
+ "success" => status.success?,
+ "exitstatus" => status.exitstatus,
+ "termsig" => status.termsig
+ }
+
+ puts "#{name} #{uuid} #{get_pid} stopped #{postmortem.inspect}"
+
+ NATS.publish("terminations", postmortem.to_json)
+
+ Agent.forget(uuid)
+ end
+end
+
+class App < Sinatra::Base
+ post "/run" do
+ content_type :json
+
+ data = JSON.parse(request.body.read)
+
+ Agent.run(data).to_json
+ end
+end
+
+module Agent
+ class << self
+ def processes
+ @processes ||= []
+ end
+
+ def forget(uuid)
+ processes.delete_if {|p| p.uuid == uuid }
+ end
+
+ def run(data)
+ process = EventMachine.popen(data["command"], ChildProcess, data)
+ processes << process
+
+ { "uuid" => process.uuid }
+ end
+
+ def kill(data)
+ processes.find_all {|p| p.name == data["name"] }.each {|p| p.kill }
+ end
+ end
+end
+
+EventMachine.run do
+ NATS.start
+
+ NATS.subscribe("run") {|message, reply| NATS.publish(reply, { "url" => "http://localhost:3001/run" }.to_json) }
+ NATS.subscribe("kill") {|message| Agent.kill(JSON.parse(message)) }
+
+ Thin::Server.start(3001) do
+ use Rack::CommonLogger
+ run App
+ end
+end
132 manager.rb
@@ -0,0 +1,132 @@
+require "em-http-request"
+require "fiber"
+require "json"
+require "nats/client"
+require "rack/fiber_pool"
+require "sinatra/base"
+require "thin"
+
+DATA = {}
+
+# This is a Fiber (1.9) aware extension.
+module NATS
+ class << self
+ def timed_request(subject, data=nil, opts = {})
+ expected = opts[:expected] || 1
+ timeout = opts[:timeout] || 1
+ f = Fiber.current
+ results = []
+ sid = NATS.request(subject, data, :max => expected) do |msg|
+ results << msg
+ f.resume if results.length >= expected
+ end
+ NATS.timeout(sid, timeout, :expected => expected) { f.resume }
+ Fiber.yield
+ return results.slice(0, expected)
+ end
+ end
+end
+
+class App < Sinatra::Base
+ enable :lock
+
+ put "/services/:name" do
+ data = JSON.parse(request.body.read)
+
+ DATA["services:#{params[:name]}"] = data["command"]
+
+ status 200
+ end
+
+ get "/services" do
+ content_type :json
+
+ DATA.keys.find_all {|k| k =~ /^services:/ }.map {|k| k.split(":", 2).last }.to_json
+ end
+
+ post "/services/:name/start" do
+ case Manager.start(params[:name])
+ when true
+ status 200
+ when :not_found
+ halt 404
+ else
+ halt 500
+ end
+ end
+
+ post "/services/:name/stop" do
+ case Manager.stop(params[:name])
+ when true
+ status 200
+ when :not_found
+ halt 404
+ else
+ halt 500
+ end
+ end
+end
+
+module Manager
+ class << self
+ def run(name)
+ return :not_found unless command = DATA["services:#{name}"]
+ agents = NATS.timed_request("run").map {|m| JSON.parse(m) }
+ return :no_agents if agents.empty?
+
+ agents.each do |agent|
+ data = { "name" => name, "command" => command }
+ puts "running #{name.inspect} on #{agent.inspect}"
+
+ body = data.to_json
+ EventMachine::HttpRequest.new(agent["url"]).post(:body => body)
+ end
+
+ true
+ end
+
+ def kill(name)
+ return :not_found unless command = DATA["services:#{name}"]
+
+ puts "killing #{name}"
+ NATS.publish("kill", { "name" => name }.to_json)
+
+ true
+ end
+
+ def handle_termination(data)
+ puts "termination #{data.inspect}"
+ EventMachine.add_timer(5) do
+ Fiber.new do
+ if DATA["services:#{data["name"]}"] && DATA["goals:#{data["name"]}"] == "up"
+ run(data["name"])
+ end
+ end.resume
+ end
+ end
+
+ def start(name)
+ puts "starting #{name}"
+ DATA["goals:#{name}"] = "up"
+ run(name)
+ end
+
+ def stop(name)
+ puts "stopping #{name}"
+ DATA["goals:#{name}"] = "down"
+ kill(name)
+ end
+ end
+end
+
+EventMachine.run do
+ NATS.start
+
+ NATS.subscribe("terminations") {|message| Manager.handle_termination(JSON.parse(message)) }
+
+ Thin::Server.start do
+ use Rack::CommonLogger
+ use Rack::FiberPool
+ run App
+ end
+end

0 comments on commit 2f105c4

Please sign in to comment.
Something went wrong with that request. Please try again.