Skip to content

Commit

Permalink
update distributor
Browse files Browse the repository at this point in the history
  • Loading branch information
ddollar committed Sep 19, 2012
1 parent f4060e4 commit 5b6c5b1
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 11 deletions.
4 changes: 2 additions & 2 deletions vendor/distributor/Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
PATH
remote: .
specs:
distributor (0.5.1)
distributor (0.6.0)
thor (>= 0.13.6)

GEM
remote: http://rubygems.org/
specs:
thor (0.15.3)
thor (0.16.0)

PLATFORMS
ruby
Expand Down
23 changes: 23 additions & 0 deletions vendor/distributor/example/local.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,15 @@ def set_buffer(enable)
client.run("bash 2>&1") do |ch|
client.hookup ch, $stdin.dup, $stdout.dup
client.on_close(ch) do
p [:closing]
exit 0
end
end

client.run("ls -la") do |ch|
client.on_close(ch) { puts "ls closed" }
end

# echo commands to stdout
client.on_command do |command, data|
$stdout.puts "received: #{command} #{data.inspect}"
Expand All @@ -74,6 +79,24 @@ def set_buffer(enable)
end
end

# create a unix socket to proxy through
unix = UNIXServer.new("/tmp/client.sock")

Thread.new do
loop do

# every time a connection comes to localhost:8000 on the client
Thread.start(unix.accept) do |unix_client|

# create a tunnel to localhost:5000 on the server
client.tunnel(5000) do |ch|
client.hookup ch, unix_client
end

end
end
end

# turn off terminal specials
set_buffer false

Expand Down
5 changes: 5 additions & 0 deletions vendor/distributor/lib/distributor/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ def run(command, &handler)
@handlers[id] = handler
end

def socket(path, &handler)
id = command("socket", "path" => path)
@handlers[id] = handler
end

def tunnel(port, &handler)
id = command("tunnel", "port" => port)
@handlers[id] = handler
Expand Down
30 changes: 22 additions & 8 deletions vendor/distributor/lib/distributor/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@ def initialize(input, output=input)

dequeue_json do |data|
case command = data["command"]
when "socket" then
path = data["path"]
ch = socket(path)
ack data["id"], "ch" => ch, "path" => path
when "tunnel" then
port = (data["port"] || ENV["PORT"] || 5000).to_i
ch = tunnel(port)
@multiplexer.output 0, Distributor::OkJson.encode({ "id" => data["id"], "command" => "ack", "ch" => ch, "port" => port })
ack data["id"], "ch" => ch, "port" => port
when "close" then
@multiplexer.close data["ch"]
when "run" then
Expand All @@ -46,6 +50,10 @@ def initialize(input, output=input)
end
end

def ack(id, options={})
@multiplexer.output 0, Distributor::OkJson.encode(options.merge({ "id" => id, "command" => "ack" }))
end

def run(command)
ch = @multiplexer.reserve

Expand All @@ -70,13 +78,9 @@ def run(command)
ch
end

def tunnel(port)
ch = @multiplexer.reserve

tcp = TCPSocket.new("localhost", port)

def handle_socket(ch, socket)
# handle data incoming from process
@connector.handle(tcp) do |io|
@connector.handle(socket) do |io|
begin
@multiplexer.output(ch, io.readpartial(4096))
rescue EOFError
Expand All @@ -88,12 +92,22 @@ def tunnel(port)
# handle data incoming on the multiplexer
@connector.handle(@multiplexer.reader(ch)) do |input_io|
data = input_io.readpartial(4096)
tcp.write data
socket.write data
end

ch
end

def socket(path)
ch = @multiplexer.reserve
handle_socket ch, UNIXSocket.new(path)
end

def tunnel(port)
ch = @multiplexer.reserve
handle_socket ch, TCPSocket.new("localhost", port)
end

def command(command, data={})
data["id"] ||= @multiplexer.generate_id
data["command"] = command
Expand Down
2 changes: 1 addition & 1 deletion vendor/distributor/lib/distributor/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Distributor

VERSION = "0.5.1"
VERSION = "0.6.0"

end

0 comments on commit 5b6c5b1

Please sign in to comment.