Skip to content

Commit

Permalink
one payload handler per channel
Browse files Browse the repository at this point in the history
  • Loading branch information
fw42 committed Sep 24, 2012
1 parent 579e956 commit 29cc59e
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 103 deletions.
21 changes: 14 additions & 7 deletions cli.rb
@@ -1,7 +1,6 @@
#!/usr/bin/ruby1.9.1
#!/usr/bin/env ruby
require 'eventmachine'
require 'json'
require 'pp'
require 'fiber'
require 'wirble'
require './hpfeeds.rb'
Expand All @@ -16,9 +15,17 @@ def handle_payload(name, chan, payload)
end
end

EventMachine::run do
Fiber.new{
hp = HPFeed.new($config[:server], $config[:port], $config[:ident], $config[:auth], method(:handle_payload))
hp.subscribe("geoloc.events")
}.resume
begin
EventMachine::run do
Fiber.new{
hp = HPFeed.new(
$config[:server],
$config[:port],
$config[:ident],
$config[:auth],
)
hp.subscribe("geoloc.events", method(:handle_payload))
}.resume
end
rescue Interrupt
end
4 changes: 2 additions & 2 deletions config.rb.example
@@ -1,6 +1,6 @@
#!/usr/bin/ruby1.9.1
#!/usr/bin/env ruby

config = {
$config = {
server: 'hpfeeds.honeycloud.net',
port: 10000,
ident: 'MyUsername',
Expand Down
121 changes: 107 additions & 14 deletions hpfeeds.rb
@@ -1,21 +1,114 @@
#!/usr/bin/ruby1.9.1
require 'eventmachine'
require './hpfeedshandler.rb'

def mysleep(n)
f = Fiber.current
EventMachine::Timer.new(n) do f.resume end
Fiber.yield
end
#!/usr/bin/env ruby
require 'digest/sha1'
require 'fiber'

class HPFeed
def initialize(server, port, ident, auth, payload_handler)
@feed = EventMachine::connect(server, port, HPFeedHandler, ident, auth, payload_handler)
@feed.connect
def initialize(server, port, ident, auth)
@feed = EventMachine::connect(server, port, HPFeedConnection, ident, auth)
Fiber.yield

# Fix for stupid bug in old version of feed server
mysleep(1)
end

def subscribe(chan)
@feed.subscribe(chan)
def subscribe(*args)
@feed.subscribe(*args)
end

private
def mysleep(n)
f = Fiber.current
EventMachine::Timer.new(n) do f.resume end
Fiber.yield
end
end

module HPFeedConnection
OP = { error: 0, info: 1, auth: 2, publish: 3, subscribe: 4 }

public

def initialize(ident, auth)
@ident, @auth = ident, auth
@f = Fiber.current
@buf = ""
@handler = {}
end

def subscribe(chan, payload_handler)
@handler[chan] = payload_handler
send(msg_sub(@ident, chan))
end

######

private

def receive_data(data)
@buf << data
while @buf.length > 5
len = @buf[0,4].unpack("l>")[0]
op = @buf[4,1].unpack("C")[0]
break if @buf.length < len
data = @buf[5,(len-5)]
@buf = @buf[len..-1] || ""
parse(op, data)
end
end

def connection_completed
@peer = Socket.unpack_sockaddr_in(get_peername)
puts "Connected to #{@peer[1]}:#{@peer[0]}"
end

def unbind
puts "Connection to #{@peer[1]}:#{@peer[0]} closed"
end

######

private

def send(data)
send_data(data)
end

def parse(op, data)
if op == OP[:info]
len = data[0,1].unpack("C")[0]
name = data[1,len]
rand = data[(1+len)..-1]
send(msg_auth(rand, @ident, @auth))
@f.resume
elsif op == OP[:publish]
len = data[0,1].unpack("C")[0]
name = data[1,len]
len2 = data[(1+len),1].ord
chan = data[(1+len+1),len2]
payload = data[(1+len+1+len2)..-1]
@handler[chan].call(name, chan, payload) if @handler[chan]
elsif op == OP[:error]
STDERR.puts "ERROR: " + [op, data].inspect
else
STDERR.puts "ERROR: Unknown opcode in " + [op, data].inspect
end
end

######

private

def msg_hdr(op, data)
[5+data.length].pack("l>") + [op].pack("C") + data
end

def msg_sub(ident, chan)
msg_hdr(OP[:subscribe], [ident.length].pack("C") + ident + chan)
end

def msg_auth(rand, ident, secret)
mac = Digest::SHA1.digest(rand + secret) # TODO: use HMAC
msg_hdr(OP[:auth], [ident.length].pack("C") + ident + mac)
end

end
80 changes: 0 additions & 80 deletions hpfeedshandler.rb

This file was deleted.

0 comments on commit 29cc59e

Please sign in to comment.