diff --git a/cli.rb b/cli.rb index 6025975..d840b37 100755 --- a/cli.rb +++ b/cli.rb @@ -1,7 +1,6 @@ -#!/usr/bin/ruby1.9.1 +#!/usr/bin/env ruby require 'eventmachine' require 'json' -require 'pp' require 'fiber' require 'wirble' require './hpfeeds.rb' @@ -16,9 +15,17 @@ def handle_payload(name, chan, payload) end end -EventMachine::run do - Fiber.new{ - hp = HPFeed.new($config[:server], $config[:port], $config[:ident], $config[:auth], method(:handle_payload)) - hp.subscribe("geoloc.events") - }.resume +begin + EventMachine::run do + Fiber.new{ + hp = HPFeed.new( + $config[:server], + $config[:port], + $config[:ident], + $config[:auth], + ) + hp.subscribe("geoloc.events", method(:handle_payload)) + }.resume + end +rescue Interrupt end diff --git a/config.rb.example b/config.rb.example index c785303..ad2dcaf 100644 --- a/config.rb.example +++ b/config.rb.example @@ -1,6 +1,6 @@ -#!/usr/bin/ruby1.9.1 +#!/usr/bin/env ruby -config = { +$config = { server: 'hpfeeds.honeycloud.net', port: 10000, ident: 'MyUsername', diff --git a/hpfeeds.rb b/hpfeeds.rb index 6429018..cc815ee 100644 --- a/hpfeeds.rb +++ b/hpfeeds.rb @@ -1,21 +1,114 @@ -#!/usr/bin/ruby1.9.1 -require 'eventmachine' -require './hpfeedshandler.rb' - -def mysleep(n) - f = Fiber.current - EventMachine::Timer.new(n) do f.resume end - Fiber.yield -end +#!/usr/bin/env ruby +require 'digest/sha1' +require 'fiber' class HPFeed - def initialize(server, port, ident, auth, payload_handler) - @feed = EventMachine::connect(server, port, HPFeedHandler, ident, auth, payload_handler) - @feed.connect + def initialize(server, port, ident, auth) + @feed = EventMachine::connect(server, port, HPFeedConnection, ident, auth) + Fiber.yield + + # Fix for stupid bug in old version of feed server mysleep(1) end - def subscribe(chan) - @feed.subscribe(chan) + def subscribe(*args) + @feed.subscribe(*args) + end + + private + def mysleep(n) + f = Fiber.current + EventMachine::Timer.new(n) do f.resume end + Fiber.yield + end +end + +module HPFeedConnection + OP = { error: 0, info: 1, auth: 2, publish: 3, subscribe: 4 } + + public + + def initialize(ident, auth) + @ident, @auth = ident, auth + @f = Fiber.current + @buf = "" + @handler = {} + end + + def subscribe(chan, payload_handler) + @handler[chan] = payload_handler + send(msg_sub(@ident, chan)) + end + + ###### + + private + + def receive_data(data) + @buf << data + while @buf.length > 5 + len = @buf[0,4].unpack("l>")[0] + op = @buf[4,1].unpack("C")[0] + break if @buf.length < len + data = @buf[5,(len-5)] + @buf = @buf[len..-1] || "" + parse(op, data) + end + end + + def connection_completed + @peer = Socket.unpack_sockaddr_in(get_peername) + puts "Connected to #{@peer[1]}:#{@peer[0]}" end + + def unbind + puts "Connection to #{@peer[1]}:#{@peer[0]} closed" + end + + ###### + + private + + def send(data) + send_data(data) + end + + def parse(op, data) + if op == OP[:info] + len = data[0,1].unpack("C")[0] + name = data[1,len] + rand = data[(1+len)..-1] + send(msg_auth(rand, @ident, @auth)) + @f.resume + elsif op == OP[:publish] + len = data[0,1].unpack("C")[0] + name = data[1,len] + len2 = data[(1+len),1].ord + chan = data[(1+len+1),len2] + payload = data[(1+len+1+len2)..-1] + @handler[chan].call(name, chan, payload) if @handler[chan] + elsif op == OP[:error] + STDERR.puts "ERROR: " + [op, data].inspect + else + STDERR.puts "ERROR: Unknown opcode in " + [op, data].inspect + end + end + + ###### + + private + + def msg_hdr(op, data) + [5+data.length].pack("l>") + [op].pack("C") + data + end + + def msg_sub(ident, chan) + msg_hdr(OP[:subscribe], [ident.length].pack("C") + ident + chan) + end + + def msg_auth(rand, ident, secret) + mac = Digest::SHA1.digest(rand + secret) # TODO: use HMAC + msg_hdr(OP[:auth], [ident.length].pack("C") + ident + mac) + end + end diff --git a/hpfeedshandler.rb b/hpfeedshandler.rb deleted file mode 100644 index 56a9054..0000000 --- a/hpfeedshandler.rb +++ /dev/null @@ -1,80 +0,0 @@ -#!/usr/bin/ruby1.9.1 -require 'digest/sha1' -require 'fiber' - -module HPFeedHandler - - OP = { error: 0, info: 1, auth: 2, publish: 3, subscribe: 4 } - - def initialize(ident, auth, payload_handler) - @payload_handler = payload_handler - @ident, @auth = ident, auth - @f = Fiber.current - @ready = false - @buf = "" - end - - def receive_data(data) - @buf << data - while @buf.length > 5 - len = @buf[0,4].unpack("l>")[0] - op = @buf[4,1].unpack("C")[0] - break if @buf.length < len - data = @buf[5,(len-5)] - @buf = @buf[len..-1] || "" - parse(op, data) - end - end - - ###### - - def connect - Fiber.yield - end - - def subscribe(chan) - send(msg_sub(@ident, chan)) - end - - def send(data) - send_data(data) - end - - def parse(op, data) - if op == OP[:info] - len = data[0,1].unpack("C")[0] - name = data[1,len] - rand = data[(1+len)..-1] - send(msg_auth(rand, @ident, @auth)) - @ready = true - @f.resume - elsif op == OP[:publish] - len = data[0,1].unpack("C")[0] - name = data[1,len] - len2 = data[(1+len),1].ord - chan = data[(1+len+1),len2] - payload = data[(1+len+1+len2)..-1] - @payload_handler.call(name, chan, payload) if @payload_handler - elsif op == OP[:error] - STDERR.puts "ERROR: " + [op, data].inspect - else - STDERR.puts "ERROR: Unknown opcode (#{op.inspect})" - end - end - - ###### - - def msg_hdr(op, data) - [5+data.length].pack("l>") + [op].pack("C") + data - end - - def msg_sub(ident, chan) - msg_hdr(OP[:subscribe], [ident.length].pack("C") + ident + chan) - end - - def msg_auth(rand, ident, secret) - mac = Digest::SHA1.digest(rand + secret) # TODO: use HMAC - msg_hdr(OP[:auth], [ident.length].pack("C") + ident + mac) - end - -end