Skip to content

Commit

Permalink
Merge "warden: Use em-posix-spawn"
Browse files Browse the repository at this point in the history
  • Loading branch information
pietern authored and testazuretrain committed Nov 30, 2011
2 parents 727290f + 79d198d commit 31c3432
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 48 deletions.
1 change: 1 addition & 0 deletions warden/Rakefile
Expand Up @@ -11,6 +11,7 @@ namespace :warden do
desc "Run Warden server"
task :start do
require "warden/server"
Warden::Server.setup
Warden::Server.run!
end
end
76 changes: 48 additions & 28 deletions warden/lib/warden/container/base.rb
Expand Up @@ -3,6 +3,7 @@
require "warden/container/script_handler"

require "eventmachine"
require "em/posix/spawn"
require "set"

module Warden
Expand All @@ -11,6 +12,8 @@ module Container

class Base

include ::EM::POSIX::Spawn
include EventEmitter
include Logger

class << self
Expand Down Expand Up @@ -72,20 +75,30 @@ def initialize
@jobs = {}
@created = false
@destroyed = false
end

def handle
@network.to_hex
end
on(:after_create) {
# Clients should be able to look this container up
self.class.registry[handle] = self
}

on(:before_destroy) {
# Clients should no longer be able to look this container up
self.class.registry.delete(handle)
}

def register
self.class.registry[handle] = self
nil
on(:after_destroy) {
# Release network address only if the container has successfully been
# destroyed. If not, the network address will "leak" and cannot be
# reused until this process is restarted. We should probably add
# extra logic to destroy a container in a failure scenario.
::EM.add_timer(5) {
Server.network_pool.release(network)
}
}
end

def unregister
self.class.registry.delete(handle)
nil
def handle
@network.to_hex
end

def gateway_ip
Expand Down Expand Up @@ -133,10 +146,9 @@ def create

@created = true

emit(:before_create)
do_create

# Any client should now be able to look this container up
register
emit(:after_create)

handle
end
Expand All @@ -156,18 +168,9 @@ def destroy

@destroyed = true

# Clients should no longer be able to look this container up
unregister

emit(:before_destroy)
do_destroy

# Release network address only if the container has successfully been
# destroyed. If not, the network address will "leak" and cannot be
# reused until this process is restarted. We should probably add extra
# logic to destroy a container in a failure scenario.
::EM.add_timer(5) {
Server.network_pool.release(network)
}
emit(:after_destroy)

"ok"
end
Expand Down Expand Up @@ -215,10 +218,27 @@ def run(script)

protected

def sh(command)
handler = ::EM.popen(command, ScriptHandler)
yield handler if block_given?
handler.yield # Yields fiber
def sh(*args)
env, argv, options = extract_process_spawn_arguments(*args)
options = { :timeout => 5.0, :max => 1024 * 1024 }.merge(options)
p = Child.new(env, *(argv + [options]))

f = Fiber.current
p.callback { f.resume(:ok) }
p.errback { |err| f.resume(:err, err) }

status, err = Fiber.yield
if status == :err
message = case err
when MaximumOutputExceeded
"command exceeded maximum output"
when TimeoutExceeded
"command exceeded maximum runtime"
else
"unknown error"
end
raise WardenError.new(message)
end

rescue WardenError
error "error running: #{command.inspect}"
Expand Down
17 changes: 4 additions & 13 deletions warden/lib/warden/container/insecure.rb
Expand Up @@ -39,20 +39,11 @@ def do_destroy
end

def create_job(script)
# Store script in temporary file. This is done because run.sh moves the
# subshell that actually runs the script to the background, and with
# that closes its stdin. In addition, we cannot capture stdin before
# executing the subshell because we cannot shutdown the write side of a
# socket from EM.
stdin = Tempfile.new("stdin", container_path)
stdin.write(script)
stdin.close

# Create new job and run script
job = Job.new(self)
command = "env job_path=#{container_root_path}/#{job.path} #{container_path}/run.sh #{stdin.path}"
handler = ::EM.popen(command, RemoteScriptHandler)
handler.callback { job.finish }
env = { "job_path" => File.join(container_root_path, job.path) }
child = Child.new(env, File.join(container_path, "run.sh"), :input => script)
child.callback { job.finish }
child.errback { job.finish }

job
end
Expand Down
12 changes: 6 additions & 6 deletions warden/lib/warden/server.rb
Expand Up @@ -216,25 +216,25 @@ def process_create(request)
end

def process_destroy(request)
request.require_arguments 2
request.require_arguments { |n| n == 2 }
container = find_container(request[1])
container.destroy
end

def process_spawn(request)
request.require_arguments 3
request.require_arguments { |n| n == 3 }
container = find_container(request[1])
container.spawn(request[2])
end

def process_link(request)
request.require_arguments 3
request.require_arguments { |n| n == 3 }
container = find_container(request[1])
container.link(request[2])
end

def process_run(request)
request.require_arguments 3
request.require_arguments { |n| n == 3 }
container = find_container(request[1])
container.run(request[2])
end
Expand All @@ -252,8 +252,8 @@ def find_container(handle)

class Request < Array

def require_arguments(num)
if size != num
def require_arguments
unless yield(size)
raise WardenError.new("invalid number of arguments")
end
end
Expand Down
2 changes: 1 addition & 1 deletion warden/root/insecure/.instance-skeleton/run.sh
Expand Up @@ -16,7 +16,7 @@ mkdir -p ${tmp}
# Run script with PWD=root. Bash closes stdin for processes that is moves to
# the background so we need to pass the script via a temporary file.
cd root
cat ${1:-/dev/null} > ${tmp}/stdin
cat - > ${tmp}/stdin
env -i bash < ${tmp}/stdin 1> ${tmp}/stdout 2> ${tmp}/stderr &
cd ..

Expand Down
1 change: 1 addition & 0 deletions warden/warden.gemspec
Expand Up @@ -20,4 +20,5 @@ Gem::Specification.new do |s|

s.add_runtime_dependency "eventmachine"
s.add_runtime_dependency "hiredis", "~> 0.4.0"
s.add_runtime_dependency "em-posix-spawn"
end

0 comments on commit 31c3432

Please sign in to comment.